Skip to content

Commit

Permalink
fix agent stack closing issue
Browse files Browse the repository at this point in the history
  • Loading branch information
dewmal committed Jan 22, 2025
1 parent 969841d commit bb48d07
Showing 1 changed file with 40 additions and 30 deletions.
70 changes: 40 additions & 30 deletions libs/ceylon-core/src/workspace/uniffied_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ pub struct UnifiedAgent {
pub shutdown_recv: Arc<Mutex<mpsc::UnboundedReceiver<String>>>,

_connected_agents: Arc<RwLock<HashMap<String, AgentDetail>>>,

_cancel_token: CancellationToken,
}

impl UnifiedAgent {
Expand Down Expand Up @@ -184,6 +186,8 @@ impl UnifiedAgent {
shutdown_recv: Arc::new(Mutex::new(shutdown_recv)),

_connected_agents: Arc::new(RwLock::new(HashMap::new())),

_cancel_token: CancellationToken::new(),
}
}

Expand Down Expand Up @@ -235,8 +239,7 @@ impl UnifiedAgent {
.build()
.unwrap();

let cancel_token = CancellationToken::new();
let cancel_token_clone = cancel_token.clone();
let cancel_token = self._cancel_token.clone();

// Get all task handlers
let mut self_agent_handlers = self
Expand All @@ -258,16 +261,45 @@ impl UnifiedAgent {

let all_tasks = join_all(self_agent_handlers);

let cancel_token_clone = cancel_token.clone();
let shutdown_recv = self.shutdown_recv.clone();
let admin_id = self._peer_id.clone();
let task_shutdown = runtime.spawn(async move {
let mut shutdown_recv_lock = shutdown_recv.lock().await;
loop {
select! {
_ = cancel_token_clone.cancelled() => {
debug!("Shutdown handler shutting down");
break;
}
msg = shutdown_recv_lock.recv() => {
if let Some(raw_data) = msg {
if raw_data == admin_id {
debug!("Received shutdown signal");
cancel_token_clone.cancel();
break;
}
}
}
}
}
});

// Use select to handle either ctrl-c or task completion
runtime
.spawn(async move {
select! {

_ = task_shutdown => {
debug!("Shutdown handler completed");
}

_ = all_tasks => {
debug!("All agent tasks completed normally");
}
_ = signal::ctrl_c() => {
debug!("Received ctrl-c, initiating shutdown");
cancel_token_clone.cancel();
cancel_token .cancel();
// Wait for tasks to complete after cancellation
// all_tasks.await;
}
Expand Down Expand Up @@ -367,12 +399,13 @@ impl UnifiedAgent {
loop {
select! {
_ = cancel_token_clone.cancelled() => {
debug!("Peer listener shutting down");
debug!("Peer listener select shutting down");
break;
}
event = peer_listener.recv() => {
if let Some(node_message) = event {
if cancel_token_clone.is_cancelled() {
debug!( "Peer listener shutting down");
break;
}
// debug!( "Node Message: {:?}", node_message);
Expand Down Expand Up @@ -493,7 +526,7 @@ impl UnifiedAgent {
processor.lock().await.run(inputs).await;
loop {
if cancel_token_clone.is_cancelled() {
info!("Processor shutting down");
debug!("Processor shutting down");
break;
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
Expand Down Expand Up @@ -521,29 +554,6 @@ impl UnifiedAgent {
});

// Spawn shutdown handler
let cancel_token_clone = cancel_token.clone();
let shutdown_recv = self.shutdown_recv.clone();
let admin_id = self._peer_id.clone();
let task_shutdown = handle.spawn(async move {
let mut shutdown_recv_lock = shutdown_recv.lock().await;
loop {
select! {
_ = cancel_token_clone.cancelled() => {
debug!("Shutdown handler shutting down");
break;
}
msg = shutdown_recv_lock.recv() => {
if let Some(raw_data) = msg {
if raw_data == admin_id {
info!("Received shutdown signal");
cancel_token_clone.cancel();
break;
}
}
}
}
}
});

let cancel_token_clone = cancel_token.clone();
let run_holder_process = handle.spawn(async move {
Expand All @@ -559,7 +569,6 @@ impl UnifiedAgent {
task_peer_listener,
task_processor,
task_broadcast,
task_shutdown,
run_holder_process,
]
}
Expand All @@ -574,7 +583,8 @@ impl UnifiedAgent {

pub async fn stop(&self) {
debug!("Agent {} stop called", self._config.name);
self.shutdown_send.send(self._peer_id.clone()).unwrap();
self._cancel_token.cancel();
// self.shutdown_send.send(self._peer_id.clone()).unwrap();
self.cleanup().await;
}
}

0 comments on commit bb48d07

Please sign in to comment.