Skip to content

Commit

Permalink
fix agent introduction issue
Browse files Browse the repository at this point in the history
  • Loading branch information
dewmal committed Jan 22, 2025
1 parent 7404dae commit 969841d
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 20 deletions.
2 changes: 1 addition & 1 deletion bindings/ceylon/ceylon/base/uni_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async def broadcast_message(self, message: Any) -> None:
if not isinstance(message, bytes):
message = pickle.dumps(message)
await self.broadcast(message)
logger.debug(f"Broadcast message sent: {message}")
# logger.debug(f"Broadcast message sent: {message}")
except Exception as e:
logger.error(f"Error broadcasting message: {e}")

Expand Down
4 changes: 2 additions & 2 deletions bindings/ceylon/examples/auction/single_item_auction.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async def start_auction(self):
await self.broadcast_message(start_msg)

async def on_message(self, agent_id: str, data: bytes, time: int):
logger.debug(f"Received message from {agent_id}: {data}")
# logger.debug(f"Received message from {agent_id}: {data}")
if self.auction_ended:
return

Expand Down Expand Up @@ -130,7 +130,7 @@ def __init__(self, name: str, budget: float,
async def on_message(self, agent_id: str, data: bytes, time: int):
try:
message = pickle.loads(data)
logger.debug(f"Received message from {agent_id}: {message}")
# logger.debug(f"Received message from {agent_id}: {message}")

if isinstance(message, AuctionStart) and not self.has_bid:
if self.budget > message.item.starting_price:
Expand Down
2 changes: 1 addition & 1 deletion bindings/ceylon/examples/simple_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from ceylon.base.uni_agent import BaseAgent
from ceylon.ceylon import PeerMode

# enable_log("INFO")
enable_log("INFO")


class AdminAgent(BaseAgent):
Expand Down
11 changes: 11 additions & 0 deletions libs/ceylon-core/src/workspace/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ pub enum AgentMessage {
name: String,
topic: String,
},
AgentRegistrationAck {
id: String,
status: bool,
},
}

impl AgentMessage {
Expand Down Expand Up @@ -77,4 +81,11 @@ impl AgentMessage {
topic,
}
}

pub fn create_registration_ack_message(peer: String, status: bool) -> Self {
AgentMessage::AgentRegistrationAck {
id: peer,
status,
}
}
}
69 changes: 53 additions & 16 deletions libs/ceylon-core/src/workspace/uniffied_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,12 +352,14 @@ impl UnifiedAgent {
}
});

let registration_intro_send_cancel_token = CancellationToken::new();

let on_message = self._on_message.clone();
let on_event = self._on_event.clone();
let peer_id = self._peer_id.clone();
let cancel_token_clone = cancel_token.clone();

let agent_details = self.details().clone();
let my_self_details = self.details().clone();
// Handle peer events
let task_peer_listener = handle.spawn(async move {
let mut is_call_agent_on_connect_list: HashMap<String, bool> = HashMap::new();
Expand Down Expand Up @@ -401,19 +403,35 @@ impl UnifiedAgent {
}
}
AgentMessage::AgentIntroduction { id, name, role, topic } => {
info!( "Agent introduction {:?}", id);
debug!( "Agent introduction {:?}", id);
let peer_id = id.clone();
let id_key = id.clone();
let agent_detail = AgentDetail{
let _ag = AgentDetail{
name,
id,
role
};
on_event.lock().await.on_agent_connected(
topic,
agent_detail.clone()
topic.clone(),
_ag.clone()
).await;
worker_details.write().await.insert(id_key, agent_detail);
worker_details.write().await.insert(id_key, _ag.clone());
if config.mode == PeerMode::Admin {
let agent_intro_message = AgentMessage::create_registration_ack_message(
peer_id.clone(),
true,
);
peer_emitter_clone.send(
(my_self_details.id.clone(),agent_intro_message.to_bytes(),None)
).await.unwrap();
}
debug!( "{:?} Worker details: {:#?}", my_self_details.clone().id, worker_details.read().await);
}
AgentMessage::AgentRegistrationAck { id,status } => {
debug!( "Agent registration ack: {:#?}", status);
if (status){
registration_intro_send_cancel_token.cancel();
}
}
_ => {}
}
Expand All @@ -425,17 +443,36 @@ impl UnifiedAgent {
peer_id,
topic,
}=>{
// if worker_details.read().await.get(&peer_id).is_none() {
if worker_details.read().await.get(&peer_id).is_none() {
let agent_intro_message = AgentMessage::create_introduction_message(
peer_id.clone(),
agent_details.clone().name,
agent_details.clone().role,
my_self_details.clone().id,
my_self_details.clone().name,
my_self_details.clone().role,
topic.clone(),
);
peer_emitter_clone.send(
(agent_details.id.clone(),agent_intro_message.to_bytes(),None)
).await.unwrap();
// }
let _cancel_token = registration_intro_send_cancel_token.clone();
let _emitter = peer_emitter_clone.clone();
let _id = my_self_details.id.clone();


if config.mode == PeerMode::Admin {
_emitter.send(
(_id.clone(),agent_intro_message.to_bytes(),None)
).await.unwrap();
}else{
tokio::spawn(async move {
loop{
if _cancel_token.is_cancelled() {
break;
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
_emitter.send(
(_id.clone(),agent_intro_message.to_bytes(),None)
).await.unwrap();
}
});
}
}
}
_ => {
debug!("Admin Received Event {:?}", event);
Expand All @@ -456,7 +493,7 @@ impl UnifiedAgent {
processor.lock().await.run(inputs).await;
loop {
if cancel_token_clone.is_cancelled() {
debug!("Processor shutting down");
info!("Processor shutting down");
break;
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
Expand Down Expand Up @@ -498,7 +535,7 @@ impl UnifiedAgent {
msg = shutdown_recv_lock.recv() => {
if let Some(raw_data) = msg {
if raw_data == admin_id {
debug!("Received shutdown signal");
info!("Received shutdown signal");
cancel_token_clone.cancel();
break;
}
Expand Down

0 comments on commit 969841d

Please sign in to comment.