diff --git a/docs/best_practice.md b/docs/best-practices.md
similarity index 100%
rename from docs/best_practice.md
rename to docs/best-practices.md
diff --git a/docs/core-concepts.md b/docs/core-concepts.md
new file mode 100644
index 0000000..26ab4ad
--- /dev/null
+++ b/docs/core-concepts.md
@@ -0,0 +1,313 @@
+# Ceylon Framework Core Concepts Guide
+
+## Agent System Architecture
+
+### Admin Agent
+The central coordinator in a Ceylon system.
+
+```python
+from ceylon import Admin
+
+class CoordinatorAdmin(Admin):
+ def __init__(self, name="coordinator", port=8888):
+ super().__init__(name=name, port=port)
+```
+
+Key characteristics:
+- One admin per system
+- Manages worker connections
+- Coordinates task distribution
+- Handles system-wide state
+
+### Worker Agent
+Performs specific tasks within the system.
+
+```python
+from ceylon import Worker
+
+class TaskWorker(Worker):
+ def __init__(self, name: str, role: str):
+ super().__init__(name=name, role=role)
+```
+
+Key characteristics:
+- Multiple workers per system
+- Specialized task execution
+- Reports to admin agent
+- Independent operation
+
+## Message System
+
+### Message Types
+```python
+from dataclasses import dataclass
+from typing import Any
+
+@dataclass(frozen=True)
+class Message:
+ id: str
+ content: Any
+ timestamp: float
+```
+
+Core message principles:
+- Immutable data structures
+- Type-safe communication
+- Metadata inclusion
+- Serializable format
+
+### Message Handlers
+```python
+from ceylon import on, on_run, on_connect
+
+class MessageHandling:
+ @on(MessageType)
+ async def handle_specific(self, msg: MessageType):
+ # Handle specific message type
+ pass
+
+ @on_run()
+ async def handle_run(self, inputs: bytes):
+ # Main execution loop
+ pass
+
+ @on_connect("*")
+ async def handle_connection(self, topic: str, agent: AgentDetail):
+ # Connection handling
+ pass
+```
+
+## Communication Patterns
+
+### Direct Communication
+```python
+class DirectCommunication(Worker):
+ async def send_to_peer(self, peer_id: str, data: Any):
+ await self.send_message(peer_id, data)
+```
+
+### Broadcast Communication
+```python
+class BroadcastCommunication(Admin):
+ async def notify_all(self, data: Any):
+ await self.broadcast_message(data)
+```
+
+## State Management
+
+### Agent State
+```python
+from enum import Enum
+
+class AgentState(Enum):
+ IDLE = "idle"
+ PROCESSING = "processing"
+ ERROR = "error"
+
+class StateManagement(Worker):
+ def __init__(self):
+ super().__init__()
+ self.state = AgentState.IDLE
+```
+
+### State Transitions
+```python
+class StatefulAgent(Worker):
+ async def transition_state(self, new_state: AgentState):
+ old_state = self.state
+ self.state = new_state
+ await self.notify_state_change(old_state, new_state)
+```
+
+## Event Processing
+
+### Event Handling
+```python
+class EventProcessor(Worker):
+ @on(Event)
+ async def process_event(self, event: Event):
+ if self.can_handle(event):
+ await self.handle_event(event)
+ else:
+ await self.forward_event(event)
+```
+
+### Event Flow
+```python
+class EventFlow(Admin):
+ async def manage_event_flow(self, event: Event):
+ # Preprocessing
+ processed_event = await self.preprocess(event)
+
+ # Distribution
+ await self.distribute_event(processed_event)
+
+ # Monitoring
+ await self.monitor_event_processing(processed_event)
+```
+
+## Resource Management
+
+### Resource Lifecycle
+```python
+class ResourceManager(Worker):
+ async def __aenter__(self):
+ await self.acquire_resources()
+ return self
+
+ async def __aexit__(self, exc_type, exc_val, exc_tb):
+ await self.release_resources()
+```
+
+### Resource Pooling
+```python
+class ResourcePool:
+ def __init__(self, size: int):
+ self.pool = asyncio.Queue(size)
+ self.in_use = set()
+
+ async def acquire(self):
+ resource = await self.pool.get()
+ self.in_use.add(resource)
+ return resource
+
+ async def release(self, resource):
+ self.in_use.remove(resource)
+ await self.pool.put(resource)
+```
+
+## Error Handling
+
+### Basic Error Handling
+```python
+class ErrorHandler(Worker):
+ async def safe_execute(self, task):
+ try:
+ return await self.execute_task(task)
+ except Exception as e:
+ await self.handle_error(task, e)
+ raise
+```
+
+### Retry Mechanism
+```python
+class RetryMechanism(Worker):
+ async def with_retry(self, operation, max_retries=3):
+ for attempt in range(max_retries):
+ try:
+ return await operation()
+ except Exception as e:
+ if attempt == max_retries - 1:
+ raise
+ await asyncio.sleep(2 ** attempt)
+```
+
+## System Integration
+
+### External Service Integration
+```python
+class ServiceIntegrator(Worker):
+ async def interact_with_service(self, service_request):
+ # Convert to external format
+ external_request = self.convert_request(service_request)
+
+ # Make external call
+ response = await self.call_service(external_request)
+
+ # Convert response back
+ return self.convert_response(response)
+```
+
+### Data Flow
+```python
+class DataFlowManager(Admin):
+ async def manage_data_flow(self, data):
+ # Input validation
+ validated_data = await self.validate(data)
+
+ # Processing
+ processed_data = await self.process(validated_data)
+
+ # Distribution
+ await self.distribute(processed_data)
+```
+
+## Core Utilities
+
+### Message Conversion
+```python
+class MessageConverter:
+ @staticmethod
+ def to_bytes(message: Any) -> bytes:
+ return pickle.dumps(message)
+
+ @staticmethod
+ def from_bytes(data: bytes) -> Any:
+ return pickle.loads(data)
+```
+
+### Agent Identification
+```python
+class AgentIdentification:
+ @staticmethod
+ def create_agent_id(name: str, role: str) -> str:
+ return f"{name}_{role}_{uuid.uuid4()}"
+```
+
+## System Lifecycle
+
+### Initialization
+```python
+async def initialize_system():
+ # Create admin
+ admin = AdminAgent(port=8888)
+
+ # Create workers
+ workers = [
+ WorkerAgent(f"worker_{i}")
+ for i in range(3)
+ ]
+
+ # Start system
+ await admin.start_agent(b"", workers)
+```
+
+### Shutdown
+```python
+async def shutdown_system(admin: Admin, workers: List[Worker]):
+ # Stop workers
+ for worker in workers:
+ await worker.stop()
+
+ # Stop admin
+ await admin.stop()
+```
+
+## Key Concepts Summary
+
+1. **Agent Hierarchy**
+ - Admin agents coordinate
+ - Worker agents execute
+ - Clear responsibility separation
+
+2. **Message-Based Communication**
+ - Type-safe messages
+ - Asynchronous processing
+ - Event-driven architecture
+
+3. **State Management**
+ - Clear state definitions
+ - Controlled transitions
+ - State monitoring
+
+4. **Resource Handling**
+ - Proper initialization
+ - Clean cleanup
+ - Resource pooling
+
+5. **Error Management**
+ - Graceful error handling
+ - Retry mechanisms
+ - Error reporting
+
+These core concepts provide the foundation for building robust distributed systems with Ceylon.
\ No newline at end of file
diff --git a/docs/examples/auction.md b/docs/examples/auction.md
new file mode 100644
index 0000000..20c90a5
--- /dev/null
+++ b/docs/examples/auction.md
@@ -0,0 +1,251 @@
+# Distributed Auction System
+
+This tutorial demonstrates building a robust, secure distributed auction system using the Ceylon multi-agent framework. The system implements a scalable architecture with an auctioneer agent orchestrating the auction process and multiple authenticated bidder agents participating in real-time auctions.
+
+## System Overview
+
+The auction system implements:
+- Multiple concurrent auction support with isolation
+- Secure bid validation and authentication
+- Configurable auction rules (reserve price, minimum increment)
+- Fault-tolerant communication with automatic recovery
+- Scalable bid processing with rate limiting
+
+## Core Components
+
+### Data Models
+
+```python
+@dataclass
+class Item:
+ name: str
+ starting_price: float
+ reserve_price: Optional[float] = None
+ min_increment: float = 1.0
+ auction_duration: timedelta = timedelta(minutes=5)
+
+@dataclass
+class Bid:
+ bidder: str
+ amount: float
+ timestamp: float
+ signature: bytes # For bid verification
+ auction_id: str
+
+@dataclass
+class AuctionStart:
+ item: Item
+ auction_id: str
+ start_time: float
+
+@dataclass
+class AuctionResult:
+ winner: str
+ winning_bid: float
+ auction_id: str
+ end_time: float
+
+@dataclass
+class AuctionEnd:
+ auction_id: str
+ reason: str # "completed", "reserve_not_met", "cancelled"
+```
+
+### Auctioneer Agent
+
+The auctioneer manages multiple concurrent auctions:
+
+```python
+class Auctioneer(BaseAgent):
+ def __init__(self, name="auctioneer", port=8888):
+ super().__init__(
+ name=name,
+ mode=PeerMode.ADMIN,
+ role="auctioneer",
+ port=port
+ )
+ self.active_auctions: Dict[str, AuctionState] = {}
+ self.bid_queue = asyncio.Queue(maxsize=1000)
+ self.bidder_registry: Dict[str, BidderInfo] = {}
+```
+
+Key features:
+- Asynchronous bid processing with rate limiting
+- Automatic auction timeouts and cleanup
+- Bid verification and signature validation
+- Bidder reputation tracking
+- Comprehensive error handling and recovery
+
+### Bidder Agent
+
+Each bidder implements secure auction participation:
+
+```python
+class Bidder(BaseAgent):
+ def __init__(self, name: str, budget: float, strategy: BiddingStrategy):
+ super().__init__(
+ name=name,
+ mode=PeerMode.CLIENT,
+ role="bidder"
+ )
+ self.budget = budget
+ self.strategy = strategy
+ self.active_bids: Dict[str, float] = {}
+ self.key_pair = generate_keypair() # For bid signing
+```
+
+Key features:
+- Configurable bidding strategies
+- Bid signing and verification
+- Budget enforcement and tracking
+- Automatic bid updates on price changes
+- Graceful handling of connection issues
+
+## Advanced Bidding Strategies
+
+```python
+class SmartBidder(Bidder):
+ async def calculate_bid(self, item: Item, auction_state: AuctionState) -> float:
+ market_value = await self.estimate_market_value(item)
+ competition = len(auction_state.active_bidders)
+ time_remaining = auction_state.end_time - time.time()
+
+ if time_remaining < 60: # Last minute strategy
+ return min(self.budget, market_value * 1.1)
+
+ return min(
+ self.budget,
+ market_value * (0.8 + (0.4 * (1 - time_remaining/auction_state.duration)))
+ )
+```
+
+## Error Handling and Recovery
+
+```python
+class AuctionManager:
+ async def handle_disconnection(self, bidder_id: str):
+ affected_auctions = self.find_affected_auctions(bidder_id)
+ for auction in affected_auctions:
+ if auction.status == "pending_payment":
+ await self.revert_auction(auction.id)
+ else:
+ await self.pause_auction(auction.id)
+ await self.notify_participants(auction.id, "auction_paused")
+
+ async def recover_auction_state(self, auction_id: str):
+ state = await self.load_backup_state(auction_id)
+ await self.verify_bid_sequence(state.bids)
+ await self.resume_auction(state)
+```
+
+## Running the System
+
+1. Initialize auction system with configuration:
+```python
+config = AuctionConfig(
+ min_bidders=3,
+ bid_timeout=30,
+ max_concurrent_auctions=10,
+ bid_verification_required=True
+)
+auctioneer = Auctioneer(config)
+```
+
+2. Create authenticated bidders:
+```python
+bidders = [
+ SmartBidder("Alice", 1500.0, MarketValueStrategy()),
+ SmartBidder("Bob", 1200.0, AggressiveStrategy()),
+ SmartBidder("Charlie", 2000.0, ConservativeStrategy())
+]
+```
+
+3. Start the system with monitoring:
+```python
+async with AuctionMonitor() as monitor:
+ await auctioneer.start_agent(b"", bidders)
+ await monitor.track_metrics()
+```
+
+## System Architecture
+
+```mermaid
+sequenceDiagram
+ participant A as Auctioneer
+ participant B1 as Bidder1
+ participant B2 as Bidder2
+ participant B3 as Bidder3
+ participant M as Monitor
+
+ Note over A,M: Initialization Phase
+ A->>M: Register System
+ B1->>A: Connect(auth_token)
+ A->>B1: Connection Confirmed
+ B2->>A: Connect(auth_token)
+ A->>B2: Connection Confirmed
+ B3->>A: Connect(auth_token)
+ A->>B3: Connection Confirmed
+
+ Note over A,M: Auction Start
+ A->>M: Start Monitoring
+ A->>B1: AuctionStart(item, rules)
+ A->>B2: AuctionStart(item, rules)
+ A->>B3: AuctionStart(item, rules)
+
+ Note over A,M: Active Bidding
+ B1-->>A: SignedBid(amount, timestamp)
+ A-->>M: Log Bid
+ A->>B1: BidAcknowledged
+ B2-->>A: SignedBid(amount, timestamp)
+ A-->>M: Log Bid
+ A->>B2: BidAcknowledged
+ B3-->>A: SignedBid(amount, timestamp)
+ A-->>M: Log Bid
+ A->>B3: BidAcknowledged
+
+ Note over A,M: Auction Close
+ A->>M: Finalize Metrics
+ A->>B1: AuctionResult(winner, amount)
+ A->>B2: AuctionResult(winner, amount)
+ A->>B3: AuctionResult(winner, amount)
+ A->>M: Store Results
+```
+
+## Performance Monitoring
+
+The system includes built-in monitoring for:
+- Bid processing latency
+- Message queue depths
+- Bidder connection health
+- Auction completion rates
+- System resource utilization
+
+## Security Considerations
+
+1. Bid Authentication
+- All bids are cryptographically signed
+- Timestamps prevent replay attacks
+- Rate limiting prevents DoS attacks
+
+2. Data Integrity
+- Auction state is regularly checkpointed
+- Bid history is immutable and verifiable
+- Transaction logs for audit trails
+
+3. Access Control
+- Bidder authentication required
+- Role-based permissions
+- Resource usage quotas
+
+## Scalability Features
+
+1. Horizontal Scaling
+- Multiple auctioneer instances
+- Load-balanced bidder connections
+- Distributed state management
+
+2. Performance Optimization
+- Asynchronous bid processing
+- Efficient state management
+- Connection pooling
+- Message batching
\ No newline at end of file
diff --git a/docs/examples/connect-through-network.md b/docs/examples/connect-through-network.md
index a7a98b1..93093bc 100644
--- a/docs/examples/connect-through-network.md
+++ b/docs/examples/connect-through-network.md
@@ -5,41 +5,71 @@
### 1. Server (server.py)
```python
-from ceylon import Admin
+from ceylon import Admin, enable_log
+from loguru import logger
-app = Admin(name="admin", port=8888, role="admin")
+enable_log("INFO")
+app = Admin(
+ name="admin",
+ port=8888,
+ role="admin",
+ workspace_id="default"
+)
@app.on_run()
async def run_worker(inputs: bytes):
+ logger.info(f"Worker started - {app.details().name}")
while True:
- await app.broadcast_message("Hello World from Server")
+ try:
+ message = {"type": "heartbeat", "source": app.details().name}
+ await app.broadcast_message(message)
+ await asyncio.sleep(1) # Prevent CPU spinning
+ except Exception as e:
+ logger.error(f"Error in broadcast: {e}")
```
-The server broadcasts messages continuously to all connected workers.
+The server implements a heartbeat system and proper error handling.
### 2. Worker (worker_agent.py)
```python
-from ceylon import Worker
-
-worker = Worker(name="worker", port=8888, role="worker")
-
-
-@worker.on(str)
-async def on_message(agent_id: str, data: str, time: int):
- print(f"Received message from {agent_id}: {data} at {time}")
+from ceylon import Worker, AgentDetail, enable_log
+from loguru import logger
+
+enable_log("INFO")
+worker = Worker(
+ name="worker",
+ role="worker",
+ workspace_id="default"
+)
+
+
+@worker.on(dict)
+async def on_message(agent: AgentDetail, data: dict, time: int):
+ try:
+ logger.info(f"Message from {agent.name}: {data}")
+ if data.get("type") == "heartbeat":
+ # Handle heartbeat
+ pass
+ except Exception as e:
+ logger.error(f"Error processing message: {e}")
+
+
+@worker.on_connect("*")
+async def on_connect(topic: str, agent: AgentDetail):
+ logger.info(f"Connected to {agent.name} on {topic}")
```
-The worker listens for and processes messages from the server.
+Workers implement proper message handling and connection events.
### 3. Configuration (.ceylon_network)
-```
+```ini
WORKSPACE_ID=default
WORKSPACE_IP=127.0.0.1
-WORKSPACE_PEER=12D3KooWMrqMLuYL3vExw7qaBJRzjN43kkkZwqSxUog7oaQCmnFE
WORKSPACE_PORT=8888
+WORKSPACE_BUFFER_SIZE=1024
```
## Setup Instructions
@@ -48,43 +78,102 @@ WORKSPACE_PORT=8888
```bash
python server.py
```
- - Server creates .ceylon_network file with connection details
- - WORKSPACE_PEER is auto-generated unique identifier
+ - Creates .ceylon_network with auto-generated configuration
+ - Initializes message handlers and event system
2. Start Worker(s):
```bash
python worker_agent.py
```
- - Worker reads .ceylon_network file
- - Connects to server using WORKSPACE_PEER
+ - Connects using configuration from .ceylon_network
+ - Establishes bidirectional communication
## Remote Connection Setup
-1. Copy .ceylon_network to remote machine
-2. Update WORKSPACE_IP if needed
-3. Run worker_agent.py on remote machine
-
-## Network Configuration
-
-- Default port: 8888
-- Local IP: 127.0.0.1
-- For remote connections:
- - Update WORKSPACE_IP to server's IP
- - Ensure port 8888 is accessible
-
-## Common Issues
+1. Configure Network:
+ - Open required ports on firewall
+ - Set up secure network tunneling if needed
+ - Update WORKSPACE_IP to server's external IP
-1. Connection Failures
- - Verify .ceylon_network file exists
- - Check WORKSPACE_IP and port accessibility
- - Ensure WORKSPACE_PEER matches server
+2. Security Considerations:
+ - Use environment-specific configuration files
+ - Implement proper access controls
+ - Monitor connections and traffic
-2. Network Constraints
- - Configure firewalls to allow port 8888
- - Use correct IP for non-local connections
-
-## Security Notes
+## Network Configuration
-- WORKSPACE_PEER acts as unique identifier
-- Keep .ceylon_network secure for controlled access
-- Update configuration for production environments
\ No newline at end of file
+- Default settings in static_val.py:
+ - Port: DEFAULT_WORKSPACE_PORT (8888)
+ - Buffer: DEFAULT_WORKSPACE_BUFFER_SIZE (100)
+ - Workspace: DEFAULT_WORKSPACE_ID
+
+## Common Issues & Solutions
+
+1. Connection Issues:
+ - Verify network connectivity
+ - Check configuration file paths
+ - Enable DEBUG logging for troubleshooting
+ - Validate port accessibility
+
+2. System Performance:
+ - Monitor buffer sizes
+ - Implement rate limiting if needed
+ - Use appropriate logging levels
+
+## Production Guidelines
+
+1. Security:
+ - Use secure configuration management
+ - Implement authentication
+ - Enable network encryption
+ - Regular security audits
+
+2. Monitoring:
+ - Implement health checks
+ - Set up logging aggregation
+ - Monitor system metrics
+ - Establish alerting
+
+## System Interaction Flow
+
+```mermaid
+sequenceDiagram
+ participant Admin
+ participant Network
+ participant Worker1
+ participant Worker2
+
+ Note over Admin: Start server
+ Admin->>Network: Create .ceylon_network
+
+ Note over Worker1,Worker2: Workers start
+ Worker1->>Network: Read config
+ Worker2->>Network: Read config
+
+ Worker1->>Admin: Connect
+ activate Admin
+ Admin-->>Worker1: Connection accepted
+ Admin->>Worker1: on_agent_connected event
+ deactivate Admin
+
+ Worker2->>Admin: Connect
+ activate Admin
+ Admin-->>Worker2: Connection accepted
+ Admin->>Worker2: on_agent_connected event
+ deactivate Admin
+
+ loop Heartbeat
+ Admin->>Worker1: Broadcast message
+ Admin->>Worker2: Broadcast message
+ Worker1-->>Admin: Process message
+ Worker2-->>Admin: Process message
+ end
+
+ Note over Worker1: Disconnect
+ Worker1->>Admin: Stop connection
+ Admin->>Worker2: Update connected agents
+
+ Note over Admin: Shutdown
+ Admin->>Worker2: Broadcast shutdown
+ Admin->>Network: Cleanup
+```
\ No newline at end of file
diff --git a/docs/examples/meeting-sechdular.md b/docs/examples/meeting-sechdular.md
deleted file mode 100644
index 95e0889..0000000
--- a/docs/examples/meeting-sechdular.md
+++ /dev/null
@@ -1,366 +0,0 @@
-# Meeting Scheduler
-
-
-## Introduction
-
-This tutorial will show you how to create a distributed meeting scheduler using Python with Ceylon framework. The scheduler finds optimal meeting times by coordinating between multiple participants using an agent-based approach.
-
-## Step 1: Set Up the Environment
-
-First, install the required dependencies:
-
-```bash
-pip install ceylon pydantic
-```
-
-## Step 2: Define the Data Models
-
-We'll use Pydantic for data validation and serialization. Here are our core data models:
-
-```python
-from pydantic import BaseModel
-from pydantic.dataclasses import dataclass
-from typing import List, Any
-
-# Input model for the scheduler
-class RunnerInput(BaseModel):
- request: Any
-
- class Config:
- arbitrary_types_allowed = True
-
-@dataclass(repr=True)
-class Meeting:
- name: str # Meeting name
- date: str # Meeting date
- duration: int # Duration in hours
- minimum_participants: int # Minimum required participants
-
-@dataclass(repr=True)
-class TimeSlot:
- date: str # Date of the slot
- start_time: int # Start hour (0-23)
- end_time: int # End hour (0-23)
-
- @property
- def duration(self):
- return self.end_time - self.start_time
-
- def is_greater_than(self, other):
- return self.end_time > other.end_time
-
-@dataclass(repr=True)
-class AvailabilityRequest:
- time_slot: TimeSlot
-
-@dataclass(repr=True)
-class AvailabilityResponse:
- owner: str # Participant name
- time_slot: TimeSlot
- accepted: bool # Availability status
-```
-
-## Step 3: Implement the Participant Agent
-
-The Participant class represents each meeting attendee:
-
-```python
-class Participant(Worker):
- name: str
- available_times: List[TimeSlot]
-
- def __init__(self, name, available_times):
- self.name = name
- self.available_times = available_times
- super().__init__(name=name, workspace_id=workspace_id,
- admin_peer=admin_peer, admin_port=admin_port)
-
- async def on_message(self, agent_id: str, data: bytes, time: int):
- data = pickle.loads(data)
- if type(data) == AvailabilityRequest:
- data: AvailabilityRequest = data
- # Check availability and respond
- is_available = any(self.is_overlap(slot, data.time_slot,
- data.time_slot.duration)
- for slot in self.available_times)
-
- response = AvailabilityResponse(
- owner=self.details().name,
- time_slot=data.time_slot,
- accepted=is_available
- )
- await self.broadcast(pickle.dumps(response))
-
- @staticmethod
- def is_overlap(slot1: TimeSlot, slot2: TimeSlot, duration: int) -> bool:
- latest_start = max(slot1.start_time, slot2.start_time)
- earliest_end = min(slot1.end_time, slot2.end_time)
- return earliest_end - latest_start >= duration
-```
-
-## Step 4: Implement the Coordinator
-
-The Coordinator manages the scheduling process:
-
-```python
-class Coordinator(Admin):
- meeting: Meeting = None
- agreed_slots = {}
- next_time_slot = None
-
- def __init__(self):
- super().__init__(name=workspace_id, port=admin_port)
-
- async def run(self, inputs: bytes):
- input: RunnerInput = pickle.loads(inputs)
- self.meeting = input.request
- print("Meeting Schedule request: ", self.meeting)
-
- async def on_agent_connected(self, topic: str, agent_id: str):
- if self.next_time_slot is None and self.meeting is not None:
- self.next_time_slot = TimeSlot(
- self.meeting.date, 0, self.meeting.duration
- )
- await self.broadcast(pickle.dumps(
- AvailabilityRequest(time_slot=self.next_time_slot)
- ))
-
- async def on_message(self, agent_id: str, data: bytes, time: int):
- data = pickle.loads(data)
- if type(data) == AvailabilityResponse:
- await self.handle_availability_response(data)
-
- async def handle_availability_response(self, data: AvailabilityResponse):
- if data.accepted:
- time_slot_key = f"{data.time_slot}"
- print(f"{data.owner} accepts {data.time_slot}")
-
- # Track acceptances and check if we have enough participants
- if time_slot_key in self.agreed_slots:
- slots = self.agreed_slots[time_slot_key]
- if data.owner not in slots:
- slots.append(data.owner)
- self.agreed_slots[time_slot_key] = slots
- if len(slots) >= self.meeting.minimum_participants:
- print(f"Meeting {slots} participants agreed on {data.time_slot}")
- await self.stop()
- return
- else:
- self.agreed_slots[time_slot_key] = [data.owner]
-
- # Try next time slot
- current_time_slot = data.time_slot
- next_time_slot = TimeSlot(
- self.meeting.date,
- current_time_slot.start_time + 1,
- current_time_slot.start_time + 1 + self.meeting.duration
- )
-
- if next_time_slot.is_greater_than(self.next_time_slot):
- self.next_time_slot = next_time_slot
- await self.broadcast(pickle.dumps(
- AvailabilityRequest(time_slot=self.next_time_slot)
- ))
-```
-
-## Step 5: Run the Scheduler
-
-Here's how to set up and run the scheduler:
-
-```python
-async def main():
- # Create participants with their available times
- participants = [
- Participant("Alice", [
- TimeSlot("2024-07-21", 9, 12),
- TimeSlot("2024-07-21", 14, 18)
- ]),
- Participant("Bob", [
- TimeSlot("2024-07-21", 10, 13),
- TimeSlot("2024-07-21", 15, 17)
- ]),
- Participant("Charlie", [
- TimeSlot("2024-07-21", 11, 14),
- TimeSlot("2024-07-21", 16, 18)
- ]),
- Participant("David", [
- TimeSlot("2024-07-21", 11, 14),
- TimeSlot("2024-07-21", 16, 18)
- ]),
- Participant("Kevin", [
- TimeSlot("2024-07-21", 10, 13),
- TimeSlot("2024-07-21", 15, 17)
- ])
- ]
-
- # Create and run coordinator
- coordinator = Coordinator()
- meeting = Meeting(
- name="Meeting 1",
- duration=2,
- date="2024-07-21",
- minimum_participants=3
- )
-
- await coordinator.arun_admin(
- inputs=pickle.dumps(RunnerInput(request=meeting)),
- workers=participants
- )
-
-if __name__ == '__main__':
- asyncio.run(main())
-```
-
-## How It Works
-
-1. The scheduler uses a distributed agent-based architecture where each participant is an independent agent.
-2. The Coordinator initiates the scheduling process by sending availability requests for time slots.
-3. Each Participant agent checks their availability and responds to requests.
-4. The Coordinator tracks responses and finds a time slot that works for the minimum required participants.
-5. The system uses efficient overlap detection to check time slot compatibility.
-
-## Key Improvements from Basic Version
-
-1. Uses Pydantic for robust data validation
-2. Implements proper serialization/deserialization
-3. Adds duration-aware time slot overlap detection
-4. Supports multiple participants beyond the minimum required
-5. Includes better error handling and type safety
-6. Uses asynchronous communication for better performance
-
-## Potential Enhancements
-
-1. Add persistence layer for storing scheduling history
-2. Implement priority-based scheduling
-3. Add support for recurring meetings
-4. Implement calendar integration (Google Calendar, Outlook)
-5. Add conflict resolution for competing meeting requests
-6. Implement notification system for scheduled meetings
-7. Add support for different time zones
-8. Create a REST API interface for web/mobile clients
-
----
-
-## Multiple case scenarios
-
-### All participants are available at a given time
-````mermaid
-sequenceDiagram
- participant C as Coordinator
- participant A as Alice
- participant B as Bob
- participant D as Charlie
-
- Note over C,D: All participants connect to coordinator
-
- C->>A: AvailabilityRequest(slot=9:00)
- C->>B: AvailabilityRequest(slot=9:00)
- C->>D: AvailabilityRequest(slot=9:00)
-
- A->>C: AvailabilityResponse(accepted=true)
- B->>C: AvailabilityResponse(accepted=true)
- D->>C: AvailabilityResponse(accepted=true)
-
- Note over C: Minimum participants reached
(3 acceptances for 9:00 slot)
-
- C->>A: Meeting Confirmed
- C->>B: Meeting Confirmed
- C->>D: Meeting Confirmed
-
- Note over C: Coordinator stops
scheduling process
-````
-### Someone is unavailable at a given time
-````mermaid
-sequenceDiagram
- participant C as Coordinator
- participant A as Alice
- participant B as Bob
- participant D as Charlie
-
- Note over C,D: Initial connection phase
-
- C->>A: AvailabilityRequest(slot=9:00)
- C->>B: AvailabilityRequest(slot=9:00)
- C->>D: AvailabilityRequest(slot=9:00)
-
- A->>C: AvailabilityResponse(accepted=true)
- B->>C: AvailabilityResponse(accepted=false)
- D->>C: AvailabilityResponse(accepted=true)
-
- Note over C: Not enough acceptances,
try next slot
-
- C->>A: AvailabilityRequest(slot=10:00)
- C->>B: AvailabilityRequest(slot=10:00)
- C->>D: AvailabilityRequest(slot=10:00)
-
- A->>C: AvailabilityResponse(accepted=true)
- B->>C: AvailabilityResponse(accepted=true)
- D->>C: AvailabilityResponse(accepted=true)
-
- Note over C: Minimum participants reached
(3 acceptances for 10:00 slot)
-
- C->>A: Meeting Confirmed
- C->>B: Meeting Confirmed
- C->>D: Meeting Confirmed
-
- Note over C: Coordinator stops
scheduling process
-````
-
-### Advance
-
-````mermaid
-sequenceDiagram
- participant M as Main
- participant C as Coordinator
- participant A as Alice
- participant B as Bob
- participant Ch as Charlie
-
- M->>C: start_agent(Meeting)
- M->>A: Initialize
- M->>B: Initialize
- M->>Ch: Initialize
-
- A-->>C: Connect
- B-->>C: Connect
- Ch-->>C: Connect
-
- Note over C: Create TimeSlot
(start=8:00)
-
- par Broadcast Availability Request
- C->>A: AvailabilityRequest(slot=8:00)
- C->>B: AvailabilityRequest(slot=8:00)
- C->>Ch: AvailabilityRequest(slot=8:00)
- end
-
- Note over A: Check is_overlap()
- Note over B: Check is_overlap()
- Note over Ch: Check is_overlap()
-
- A->>C: AvailabilityResponse(accepted=false)
- B->>C: AvailabilityResponse(accepted=false)
- Ch->>C: AvailabilityResponse(accepted=false)
-
- Note over C: Calculate next slot
(start=9:00)
-
- par Broadcast Next Slot
- C->>A: AvailabilityRequest(slot=9:00)
- C->>B: AvailabilityRequest(slot=9:00)
- C->>Ch: AvailabilityRequest(slot=9:00)
- end
-
- A->>C: AvailabilityResponse(accepted=true)
- B->>C: AvailabilityResponse(accepted=true)
- Ch->>C: AvailabilityResponse(accepted=true)
-
- Note over C: Minimum participants reached
-
- C-->>A: Stop
- C-->>B: Stop
- C-->>Ch: Stop
- C-->>M: Meeting Scheduled
-````
-
-## License
-Copyright 2024-Present, Syigen Ltd. and Syigen Private Limited. All rights reserved.
-Licensed under the Apache License, Version 2.0 (See LICENSE or http://www.apache.org/licenses/LICENSE-2.0).
\ No newline at end of file
diff --git a/docs/examples/single-item-auction.md b/docs/examples/single-item-auction.md
deleted file mode 100644
index bf816ed..0000000
--- a/docs/examples/single-item-auction.md
+++ /dev/null
@@ -1,196 +0,0 @@
-# Distributed Auction System
-
-This tutorial demonstrates how to build a distributed auction system using the Ceylon multi-agent framework. The system consists of an auctioneer agent managing the auction process and multiple bidder agents competing for items.
-
-## System Overview
-
-The auction system implements:
-- Single-item auctions with multiple bidders
-- Automatic bid placement based on budget constraints
-- Real-time auction status updates
-- Distributed communication between auctioneer and bidders
-
-## Core Components
-
-### Data Models
-
-```python
-@dataclass
-class Item:
- name: str
- starting_price: float
-
-@dataclass
-class Bid:
- bidder: str
- amount: float
-
-@dataclass
-class AuctionStart:
- item: Item
-
-@dataclass
-class AuctionResult:
- winner: str
- winning_bid: float
-
-@dataclass
-class AuctionEnd:
- pass
-```
-
-### Auctioneer Agent
-
-The auctioneer manages the auction process:
-
-```python
-class Auctioneer(BaseAgent):
- def __init__(self, item: Item, expected_bidders: int, name="auctioneer", port=8888):
- super().__init__(
- name=name,
- mode=PeerMode.ADMIN,
- role="auctioneer",
- port=port
- )
- self.item = item
- self.expected_bidders = expected_bidders
- self.bids: List[Bid] = []
- self.auction_ended = False
-```
-
-Key methods:
-- `handle_connection`: Monitors bidder connections and starts auction when all bidders join
-- `handle_bid`: Processes incoming bids
-- `end_auction`: Determines winner and broadcasts results
-
-### Bidder Agent
-
-Each bidder participates in the auction:
-
-```python
-class Bidder(BaseAgent):
- def __init__(self, name: str, budget: float,
- workspace_id=DEFAULT_WORKSPACE_ID,
- admin_peer="",
- admin_port=8888):
- super().__init__(
- name=name,
- mode=PeerMode.CLIENT,
- role="bidder"
- )
- self.budget = budget
- self.has_bid = False
-```
-
-Key methods:
-- `handle_auction_start`: Places bid when auction begins
-- `handle_auction_result`: Processes auction results
-- `handle_auction_end`: Acknowledges auction completion
-
-## Bidding Strategy
-
-Bidders use a simple random strategy:
-```python
-random_multiplier = random.randint(100, 1000) / 100
-bid_amount = min(self.budget, auction_start.item.starting_price * random_multiplier)
-```
-
-## Running the System
-
-1. Create auction item and auctioneer:
-```python
-item = Item("Rare Painting", 1000.0)
-auctioneer = Auctioneer(item, expected_bidders=3, port=8455)
-admin_details = auctioneer.details()
-```
-
-2. Create bidders:
-```python
-bidders = [
- Bidder("Alice", 1500.0, admin_peer=admin_details.id),
- Bidder("Bob", 1200.0, admin_peer=admin_details.id),
- Bidder("Charlie", 2000.0, admin_peer=admin_details.id)
-]
-```
-
-3. Start the system:
-```python
-await auctioneer.start_agent(b"", bidders)
-```
-
-## Sample Output
-
-```
-ceylon version: 0.22.1
-visit https://ceylon.ai for more information
-2025-01-26 00:04:41.323 | INFO | __main__::161 - Initializing auction system...
-2025-01-26 00:04:41.327 | INFO | __main__:main:155 - Starting auction system...
-2025-01-26 00:04:41.327 | INFO | ceylon.base.uni_agent:start_agent:76 - Starting auctioneer agent in ADMIN mode
-2025-01-26 00:04:41.389 | INFO | __main__:handle_run:91 - Auctioneer started - auctioneer
-2025-01-26 00:04:41.389 | INFO | __main__:handle_run:138 - Bidder started - Alice
-2025-01-26 00:04:41.389 | INFO | __main__:handle_run:138 - Bidder started - Bob
-2025-01-26 00:04:41.389 | INFO | __main__:handle_run:138 - Bidder started - Charlie
-2025-01-26 00:04:41.389 | INFO | __main__:handle_run:138 - Bidder started - Jon
-2025-01-26 00:04:41.548 | INFO | __main__:handle_connection:50 - Bidder Alice connected with auctioneer. 0/3 bidders connected.
-2025-01-26 00:04:41.548 | INFO | __main__:handle_connection:57 - Waiting for more bidders to connect...
-2025-01-26 00:04:41.549 | INFO | __main__:handle_connection:50 - Bidder Bob connected with auctioneer. 1/3 bidders connected.
-2025-01-26 00:04:41.549 | INFO | __main__:handle_connection:57 - Waiting for more bidders to connect...
-2025-01-26 00:04:41.589 | INFO | __main__:handle_connection:50 - Bidder Jon connected with auctioneer. 2/3 bidders connected.
-2025-01-26 00:04:41.589 | INFO | __main__:handle_connection:57 - Waiting for more bidders to connect...
-2025-01-26 00:04:41.591 | INFO | __main__:handle_connection:50 - Bidder Charlie connected with auctioneer. 3/3 bidders connected.
-2025-01-26 00:04:41.591 | INFO | __main__:handle_connection:54 - All bidders connected. Starting the auction.
-2025-01-26 00:04:41.591 | INFO | __main__:start_auction:60 - Starting auction for Rare Painting with starting price $1000.0
-2025-01-26 00:04:41.654 | INFO | __main__:handle_auction_start:122 - Bob placed bid: $1200.00
-2025-01-26 00:04:41.654 | INFO | __main__:handle_auction_start:122 - Alice placed bid: $1500.00
-2025-01-26 00:04:41.665 | INFO | __main__:handle_auction_start:122 - Jon placed bid: $2800.00
-2025-01-26 00:04:41.669 | INFO | __main__:handle_auction_start:122 - Charlie placed bid: $2000.00
-2025-01-26 00:04:41.685 | INFO | __main__:handle_bid:70 - Received bid from Jon for $2800.00
-2025-01-26 00:04:41.687 | INFO | __main__:handle_bid:70 - Received bid from Charlie for $2000.00
-2025-01-26 00:04:41.695 | INFO | __main__:handle_bid:70 - Received bid from Bob for $1200.00
-2025-01-26 00:04:41.695 | INFO | __main__:end_auction:84 - Auction ended. Winner: Jon, Winning Bid: $2800.00
-```
-
-## Customization Options
-
-- Modify bidding strategy by adjusting the random multiplier range
-- Add minimum bid increments
-- Implement multiple auction rounds
-- Add timeout mechanisms for bidder responses
-- Implement different auction types (Dutch, English, etc.)
-
-## Sequence Diagram
-
-````mermaid
-sequenceDiagram
- participant A as Auctioneer
- participant B1 as Bidder1
- participant B2 as Bidder2
- participant B3 as Bidder3
-
- Note over A,B3: Connection Phase
- B1->>A: Connect
- A->>B1: Connection Confirmed
- B2->>A: Connect
- A->>B2: Connection Confirmed
- B3->>A: Connect
- A->>B3: Connection Confirmed
-
- Note over A,B3: Auction Start
- A->>B1: AuctionStart(item)
- A->>B2: AuctionStart(item)
- A->>B3: AuctionStart(item)
-
- Note over A,B3: Bidding Phase
- B1-->>A: Bid(amount)
- B2-->>A: Bid(amount)
- B3-->>A: Bid(amount)
-
- Note over A,B3: Auction End
- A->>B1: AuctionResult(winner, amount)
- A->>B2: AuctionResult(winner, amount)
- A->>B3: AuctionResult(winner, amount)
-
- A->>B1: AuctionEnd
- A->>B2: AuctionEnd
- A->>B3: AuctionEnd
-````
\ No newline at end of file
diff --git a/docs/examples/task-manager.md b/docs/examples/task-manager.md
index c3462f4..cd08af2 100644
--- a/docs/examples/task-manager.md
+++ b/docs/examples/task-manager.md
@@ -1,47 +1,56 @@
-# Task Manager
+# Distributed Task Management System
-## Introduction
-This tutorial walks through building a distributed task management system using Ceylon. The system assigns tasks to workers based on skill levels and monitors completion success.
+## System Architecture
-## Prerequisites
-- Python 3.7+
-- Ceylon framework
-- Basic understanding of async programming
+### Core Components
+1. Task Manager (Admin Node)
+ - Task distribution controller
+ - Worker state management
+ - Result aggregation
-## Part 1: Data Models
+2. Worker Nodes
+ - Task execution
+ - Progress reporting
+ - Skill-based routing
+
+3. Message Protocol
+ - Task assignments
+ - Status updates
+ - Completion reports
+
+## Implementation Details
+
+### 1. Data Models
-### Task Definition
```python
+from dataclasses import dataclass
+from typing import List, Optional
+
@dataclass
class Task:
id: int
description: str
difficulty: int
-```
+
+ def validate(self) -> bool:
+ return 1 <= self.difficulty <= 10
-Tasks have three key attributes:
-- `id`: Unique identifier
-- `description`: Task details
-- `difficulty`: Required skill level (1-10)
-
-### Message Types
-```python
@dataclass
class TaskAssignment:
task: Task
+ assigned_at: int # Unix timestamp
+ timeout: Optional[int] = None
@dataclass
class TaskResult:
task_id: int
worker: str
success: bool
+ execution_time: float
+ error_message: Optional[str] = None
```
-These classes handle:
-- Task assignments to workers
-- Results reporting back to manager
-
-## Part 2: Worker Implementation
+### 2. Enhanced Worker Implementation
```python
class WorkerAgent(BaseAgent):
@@ -51,47 +60,63 @@ class WorkerAgent(BaseAgent):
self.name = name
self.skill_level = skill_level
self.has_task = False
+ self.current_task: Optional[Task] = None
+ self.task_history: List[TaskResult] = []
+
super().__init__(
name=name,
workspace_id=workspace_id,
admin_peer=admin_peer,
mode=PeerMode.CLIENT
)
-```
-
-Key worker features:
-1. Skill level determines task success probability
-2. Track task assignment status
-3. Connect to task manager via admin_peer
-
-### Task Handling
-```python
-@on(TaskAssignment)
-async def handle_task(self, data: TaskAssignment, time: int, agent: AgentDetail):
- if self.has_task:
- return
-
- self.has_task = True
- await asyncio.sleep(data.task.difficulty) # Simulate work
- success = self.skill_level >= data.task.difficulty
-
- result = TaskResult(
- task_id=data.task.id,
- worker=self.name,
- success=success
- )
- await self.broadcast(pickle.dumps(result))
+ @on(TaskAssignment)
+ async def handle_task(self, data: TaskAssignment,
+ time: int, agent: AgentDetail):
+ try:
+ if self.has_task:
+ logger.warning(f"Worker {self.name} already has task")
+ return
+
+ self.has_task = True
+ self.current_task = data.task
+ start_time = time.time()
+
+ # Simulate work with proper error handling
+ try:
+ await asyncio.sleep(data.task.difficulty)
+ success = self.skill_level >= data.task.difficulty
+ except asyncio.CancelledError:
+ success = False
+ error_msg = "Task cancelled"
+
+ execution_time = time.time() - start_time
+
+ result = TaskResult(
+ task_id=data.task.id,
+ worker=self.name,
+ success=success,
+ execution_time=execution_time,
+ error_message=error_msg if not success else None
+ )
+
+ self.task_history.append(result)
+ await self.broadcast(pickle.dumps(result))
+
+ except Exception as e:
+ logger.error(f"Error in task handling: {e}")
+ # Send failure result
+ finally:
+ self.has_task = False
+ self.current_task = None
+
+ @on_connect("*")
+ async def handle_connection(self, topic: str,
+ agent: AgentDetail):
+ logger.info(f"Connected to {agent.name} on {topic}")
```
-This method:
-
-1. Checks if worker is available
-2. Simulates work duration based on difficulty
-3. Determines success based on skill level
-4. Reports result back to manager
-
-## Part 3: Task Manager Implementation
+### 3. Task Manager Implementation
```python
class TaskManager(BaseAgent):
@@ -107,236 +132,254 @@ class TaskManager(BaseAgent):
self.expected_workers = expected_workers
self.task_results = []
self.tasks_assigned = False
+ self.worker_stats = {}
+ self.task_timeouts = {}
+
+ async def assign_tasks(self):
+ if self.tasks_assigned:
+ return
+
+ self.tasks_assigned = True
+ connected_workers = await self.get_connected_agents()
+
+ # Intelligent task distribution
+ worker_tasks = self._match_tasks_to_workers(
+ self.tasks, connected_workers)
+
+ for worker, task in worker_tasks:
+ assignment = TaskAssignment(
+ task=task,
+ assigned_at=int(time.time()),
+ timeout=task.difficulty * 2
+ )
+ await self.broadcast(pickle.dumps(assignment))
+
+ # Set timeout handler
+ self.task_timeouts[task.id] = asyncio.create_task(
+ self._handle_timeout(task.id, assignment.timeout)
+ )
+
+ async def _handle_timeout(self, task_id: int, timeout: int):
+ await asyncio.sleep(timeout)
+ if task_id not in self.completed_tasks:
+ logger.warning(f"Task {task_id} timed out")
+ # Implement timeout handling
+
+ def _match_tasks_to_workers(self, tasks, workers):
+ # Intelligent matching algorithm
+ matches = []
+ sorted_tasks = sorted(tasks,
+ key=lambda t: t.difficulty,
+ reverse=True)
+ sorted_workers = sorted(workers,
+ key=lambda w: self._get_worker_score(w))
+
+ for task, worker in zip(sorted_tasks, sorted_workers):
+ matches.append((worker, task))
+ return matches
+
+ def _get_worker_score(self, worker):
+ stats = self.worker_stats.get(worker.name, {})
+ success_rate = stats.get('success_rate', 0.5)
+ completed_tasks = stats.get('completed_tasks', 0)
+ return success_rate * 0.7 + completed_tasks * 0.3
+
+ @on(TaskResult)
+ async def handle_result(self, data: TaskResult,
+ time: int, agent: AgentDetail):
+ # Cancel timeout handler
+ if data.task_id in self.task_timeouts:
+ self.task_timeouts[data.task_id].cancel()
+
+ self.task_results.append(data)
+ self._update_worker_stats(data)
+
+ if len(self.task_results) == len(self.tasks):
+ await self._generate_final_report()
+ await self.end_task_management()
+
+ def _update_worker_stats(self, result: TaskResult):
+ if result.worker not in self.worker_stats:
+ self.worker_stats[result.worker] = {
+ 'completed_tasks': 0,
+ 'successful_tasks': 0,
+ 'total_time': 0
+ }
+
+ stats = self.worker_stats[result.worker]
+ stats['completed_tasks'] += 1
+ if result.success:
+ stats['successful_tasks'] += 1
+ stats['total_time'] += result.execution_time
+ stats['success_rate'] = (stats['successful_tasks'] /
+ stats['completed_tasks'])
```
-Manager responsibilities:
+## System Flow Diagrams
-1. Track available tasks
-2. Monitor connected workers
-3. Collect and process results
-
-### Connection Handling
-```python
-@on_connect("*")
-async def handle_connection(self, topic: str, agent: AgentDetail):
- connected_count = len(await self.get_connected_agents())
- if connected_count == self.expected_workers and not self.tasks_assigned:
- await self.assign_tasks()
+### 1. Normal Operation Flow
+```mermaid
+sequenceDiagram
+ participant TM as TaskManager
+ participant W1 as Worker(skill=5)
+ participant W2 as Worker(skill=8)
+
+ W1->>TM: Connect
+ W2->>TM: Connect
+
+ Note over TM: All workers connected
+
+ par Task Assignment
+ TM->>W1: TaskAssignment(id=1, difficulty=4)
+ TM->>W2: TaskAssignment(id=2, difficulty=7)
+ end
+
+ Note over W1: Processing (4s)
+ Note over W2: Processing (7s)
+
+ W1->>TM: TaskResult(success=true)
+ W2->>TM: TaskResult(success=true)
+
+ Note over TM: Generate Report
+
+ TM->>W1: Shutdown
+ TM->>W2: Shutdown
```
-Starts task assignment when all workers connect.
-
-### Task Assignment
-```python
-async def assign_tasks(self):
- if self.tasks_assigned:
- return
-
- self.tasks_assigned = True
- connected_workers = await self.get_connected_agents()
- for task, worker in zip(self.tasks, connected_workers):
- await self.broadcast(pickle.dumps(TaskAssignment(task=task)))
+### 2. Error Handling Flow
+```mermaid
+sequenceDiagram
+ participant TM as TaskManager
+ participant W1 as Worker(skill=3)
+ participant W2 as Worker(skill=7)
+
+ W1->>TM: Connect
+ W2->>TM: Connect
+
+ TM->>W1: TaskAssignment(difficulty=6)
+ TM->>W2: TaskAssignment(difficulty=4)
+
+ Note over W1: Task too difficult
+ Note over W2: Processing
+
+ W1->>TM: TaskResult(success=false)
+ W2->>TM: TaskResult(success=true)
+
+ Note over TM: Update worker stats
+ Note over TM: Worker1 success_rate = 0%
+ Note over TM: Worker2 success_rate = 100%
```
-Distribution logic:
+### 3. Timeout Handling
+```mermaid
+sequenceDiagram
+ participant TM as TaskManager
+ participant W1 as Worker
+
+ W1->>TM: Connect
+
+ TM->>W1: TaskAssignment(timeout=10s)
+
+ Note over TM: Start timeout timer
+
+ Note over W1: Task processing stuck
+
+ Note over TM: Timeout reached
+ TM->>W1: Cancel task
+
+ Note over TM: Mark task as failed
+ Note over TM: Update worker stats
+```
-1. Checks if tasks already assigned
-2. Gets connected worker list
-3. Pairs tasks with workers
-4. Broadcasts assignments
+## Best Practices and Error Handling
-### Result Processing
+1. Task Validation
```python
-@on(TaskResult)
-async def handle_result(self, data: TaskResult, time: int, agent: AgentDetail):
- self.task_results.append(data)
- if len(self.task_results) == len(self.tasks):
- print("All tasks completed")
- for result in self.task_results:
- print(f"Task {result.task_id} assigned to {result.worker} - "
- f"{'Success' if result.success else 'Failure'}")
- await self.end_task_management()
+def validate_task(task: Task) -> bool:
+ return (1 <= task.difficulty <= 10 and
+ task.description.strip() and
+ task.id > 0)
```
-Tracks completion and calculates success rate.
-
-## Part 4: System Setup
-
+2. Worker Health Monitoring
```python
-async def main():
- # Define tasks
- tasks = [
- Task(id=1, description="Simple calculation", difficulty=2),
- Task(id=2, description="Data analysis", difficulty=5),
- Task(id=3, description="ML model training", difficulty=8),
- ]
-
- # Create manager
- task_manager = TaskManager(tasks, expected_workers=3)
- admin_details = task_manager.details()
-
- # Create workers with varying skills
- workers = [
- WorkerAgent("Junior", skill_level=3, admin_peer=admin_details.id),
- WorkerAgent("Intermediate", skill_level=6, admin_peer=admin_details.id),
- WorkerAgent("Senior", skill_level=9, admin_peer=admin_details.id),
- ]
-
- # Start system
- await task_manager.start_agent(b"", workers)
+async def check_worker_health(worker: WorkerAgent):
+ while True:
+ try:
+ await worker.ping()
+ await asyncio.sleep(30)
+ except ConnectionError:
+ logger.error(f"Worker {worker.name} disconnected")
```
-## Running the System
-
-1. Create task list with varying difficulties
-2. Initialize task manager
-3. Create workers with appropriate skill levels
-4. Launch system with manager and workers
-
-## Example Output
-```
-All tasks completed
-Task 1 assigned to Junior - Success
-Task 2 assigned to Intermediate - Success
-Task 3 assigned to Senior - Success
-Success rate: 100.00%
+3. Task Recovery
+```python
+async def recover_failed_task(task: Task,
+ result: TaskResult):
+ if result.success:
+ return
+
+ available_workers = [w for w in workers
+ if not w.has_task]
+ if not available_workers:
+ logger.error("No workers available for recovery")
+ return
+
+ # Select best worker for recovery
+ worker = max(available_workers,
+ key=lambda w: w.skill_level)
+ await reassign_task(task, worker)
```
-## Customization Options
+## Performance Optimization
-### Priority-based Tasks
+1. Task Batching
```python
-@dataclass
-class PriorityTask(Task):
- priority: int
-
- def get_processing_time(self):
- return self.difficulty * (1/self.priority)
+async def batch_assign_tasks(tasks: List[Task],
+ batch_size: int = 3):
+ for batch in chunks(tasks, batch_size):
+ await asyncio.gather(
+ *[assign_task(t) for t in batch]
+ )
```
-### Specialized Workers
+2. Worker Scaling
```python
-class SpecializedWorker(WorkerAgent):
- def __init__(self, name, skill_level, specialties):
- super().__init__(name, skill_level)
- self.specialties = specialties
-
- async def handle_task(self, data: TaskAssignment):
- specialty_bonus = 2 if data.task.type in self.specialties else 0
- success = (self.skill_level + specialty_bonus) >= data.task.difficulty
- # Rest of implementation...
+async def scale_workers(current_load: float,
+ target_load: float = 0.7):
+ if current_load > target_load:
+ await spawn_new_worker()
+ elif current_load < target_load * 0.5:
+ await remove_idle_worker()
```
-## Best Practices
-
-1. Task Design
- - Set appropriate difficulty levels
- - Balance task distribution
- - Consider task dependencies
-
-2. Worker Configuration
- - Match skill levels to task range
- - Provide adequate worker count
- - Consider specializations
-
-3. Error Handling
- - Handle worker disconnections
- - Implement task timeouts
- - Plan for task failures
-
-## Troubleshooting
-
-Common issues and solutions:
-1. Workers not connecting
- - Check admin_peer ID
- - Verify network configuration
- - Ensure port availability
-
-2. Task assignment failures
- - Verify task format
- - Check worker availability
- - Monitor connection status
-
-3. Performance issues
- - Adjust task difficulty
- - Balance worker load
- - Monitor system resources
-
-## Use Cases
-
-### All are well
-
-````mermaid
-sequenceDiagram
- participant M as Main
- participant TM as TaskManager
- participant JW as Junior Worker
- participant IW as Intermediate Worker
- participant SW as Senior Worker
-
- M->>TM: Initialize(tasks=[T1,T2,T3])
- M->>JW: Create(skill=3)
- M->>IW: Create(skill=6)
- M->>SW: Create(skill=9)
-
- JW-->>TM: Connect
- IW-->>TM: Connect
- SW-->>TM: Connect
-
- Note over TM: All workers connected
Begin task assignment
+## Monitoring and Metrics
- par Assign Tasks
- TM->>JW: TaskAssignment(T1, difficulty=2)
- TM->>IW: TaskAssignment(T2, difficulty=5)
- TM->>SW: TaskAssignment(T3, difficulty=8)
- end
-
- Note over JW: Process T1
skill(3) >= difficulty(2)
- Note over IW: Process T2
skill(6) >= difficulty(5)
- Note over SW: Process T3
skill(9) >= difficulty(8)
-
- JW->>TM: TaskResult(T1, success=true)
- IW->>TM: TaskResult(T2, success=true)
- SW->>TM: TaskResult(T3, success=true)
-
- Note over TM: All tasks completed
Calculate success rate
-
- TM-->>JW: Stop
- TM-->>IW: Stop
- TM-->>SW: Stop
- TM-->>M: System Shutdown
-````
-
-### With some Failures
-
-````mermaid
-sequenceDiagram
- participant TM as TaskManager
- participant JW as Junior Worker
(skill=3)
- participant IW as Intermediate Worker
(skill=6)
- participant SW as Senior Worker
(skill=9)
-
- JW-->>TM: Connect
- IW-->>TM: Connect
- SW-->>TM: Connect
-
- Note over TM: Workers connected
Begin assignment
-
- TM->>JW: TaskAssignment(T3, difficulty=8)
- TM->>IW: TaskAssignment(T2, difficulty=5)
- TM->>SW: TaskAssignment(T1, difficulty=2)
-
- Note over JW: Process T3
FAIL: skill(3) < difficulty(8)
- Note over IW: Process T2
SUCCESS: skill(6) >= difficulty(5)
- Note over SW: Process T1
SUCCESS: skill(9) >= difficulty(2)
-
- JW->>TM: TaskResult(T3, success=false)
- IW->>TM: TaskResult(T2, success=true)
- SW->>TM: TaskResult(T1, success=true)
-
- Note over TM: Tasks completed
Success rate: 66.7%
+1. System Metrics
+```python
+class SystemMetrics:
+ def __init__(self):
+ self.task_completion_times = []
+ self.worker_utilization = {}
+ self.error_counts = defaultdict(int)
+
+ async def collect_metrics(self):
+ while True:
+ self.update_metrics()
+ await asyncio.sleep(60)
+```
- TM-->>JW: Stop
- TM-->>IW: Stop
- TM-->>SW: Stop
-````
\ No newline at end of file
+2. Performance Reporting
+```python
+def generate_performance_report():
+ avg_completion_time = sum(completion_times) / len(completion_times)
+ worker_success_rates = {
+ w: stats['success_rate']
+ for w, stats in worker_stats.items()
+ }
+ return PerformanceReport(
+ avg_completion_time=avg_completion_time,
+ worker_stats=worker_success_rates,
+ error_rates=error_counts
+ )
+```
\ No newline at end of file
diff --git a/docs/examples/time-scheduling.md b/docs/examples/time-scheduling.md
new file mode 100644
index 0000000..6c0960d
--- /dev/null
+++ b/docs/examples/time-scheduling.md
@@ -0,0 +1,436 @@
+# Distributed Meeting Scheduler
+
+## System Overview
+
+This tutorial demonstrates building a distributed meeting scheduling system using Ceylon's agent-based architecture. The system efficiently coordinates meeting times across multiple participants while handling real-world scheduling complexities.
+
+### What We'll Build
+
+A distributed system that:
+1. Coordinates meeting schedules across multiple participants
+2. Finds optimal meeting times based on availability
+3. Handles scheduling conflicts and constraints
+4. Manages concurrent scheduling requests
+5. Provides real-time responses and updates
+
+### Key Features
+
+- **Distributed Processing**: Each participant runs as an independent agent
+- **Automated Negotiation**: System automatically finds suitable time slots
+- **Conflict Resolution**: Handles overlapping meetings and time conflicts
+- **Scalable Architecture**: Easily add or remove participants
+- **Fault Tolerance**: Handles participant disconnections and failures
+
+### Architecture Diagram
+```mermaid
+graph TD
+ A[Scheduler Agent] --> B[Time Slot Manager]
+ A --> C[Conflict Resolver]
+ A --> D[Response Aggregator]
+
+ B --> E[Availability Checker]
+ C --> E
+
+ P1[Participant 1] --> A
+ P2[Participant 2] --> A
+ P3[Participant 3] --> A
+
+ subgraph "Each Participant"
+ CAL[Calendar Manager]
+ AH[Availability Handler]
+ COM[Communication Module]
+ end
+```
+
+### System Components
+
+1. **Scheduler Agent**
+ - Coordinates the scheduling process
+ - Manages participant responses
+ - Implements scheduling algorithms
+
+2. **Participant Agents**
+ - Manage individual availability
+ - Handle meeting requests
+ - Track scheduled meetings
+
+3. **Communication Protocol**
+ - Availability requests/responses
+ - Meeting confirmations
+ - Schedule updates
+
+4. **Business Logic**
+ - Time slot validation
+ - Conflict detection
+ - Priority handling
+
+## Core Components
+
+### Data Models
+```python
+@dataclass
+class TimeSlot:
+ date: str
+ start_time: int # 24-hour format (0-23)
+ end_time: int
+
+ def __hash__(self):
+ return hash((self.date, self.start_time, self.end_time))
+
+ def validate(self) -> bool:
+ return (0 <= self.start_time < 24 and
+ 0 <= self.end_time <= 24 and
+ self.start_time < self.end_time)
+
+@dataclass
+class Meeting:
+ name: str
+ date: str
+ duration: int
+ minimum_participants: int
+ priority: int = 1
+
+ def validate(self) -> bool:
+ return (self.duration > 0 and
+ self.minimum_participants > 0 and
+ self.priority > 0)
+
+@dataclass
+class SchedulingResult:
+ meeting: Meeting
+ time_slot: TimeSlot
+ participants: List[str]
+ status: str
+ error_message: Optional[str] = None
+```
+
+### Enhanced Participant Implementation
+```python
+class Participant(Worker):
+ def __init__(self, name: str, available_times: List[TimeSlot],
+ admin_peer: str):
+ self.name = name
+ self.available_times = self._validate_times(available_times)
+ self.scheduled_meetings: Dict[str, TimeSlot] = {}
+ super().__init__(name=name, admin_peer=admin_peer)
+
+ def _validate_times(self, times: List[TimeSlot]) -> List[TimeSlot]:
+ return [t for t in times if t.validate()]
+
+ @on(AvailabilityRequest)
+ async def handle_request(self, data: AvailabilityRequest,
+ time: int, agent: AgentDetail):
+ try:
+ is_available = self._check_availability(
+ data.time_slot, data.time_slot.duration)
+
+ response = AvailabilityResponse(
+ participant=self.name,
+ time_slot=data.time_slot,
+ available=is_available
+ )
+ await self.broadcast_message(response)
+
+ except Exception as e:
+ logger.error(f"Error checking availability: {e}")
+
+ def _check_availability(self, slot: TimeSlot,
+ duration: int) -> bool:
+ # Check conflicts with scheduled meetings
+ if any(self._has_conflict(slot, scheduled_slot)
+ for scheduled_slot in self.scheduled_meetings.values()):
+ return False
+
+ # Check if slot fits in available times
+ return any(self._fits_in_slot(slot, available_slot, duration)
+ for available_slot in self.available_times)
+
+ @staticmethod
+ def _has_conflict(slot1: TimeSlot, slot2: TimeSlot) -> bool:
+ return (slot1.date == slot2.date and
+ slot1.start_time < slot2.end_time and
+ slot2.start_time < slot1.end_time)
+
+ @staticmethod
+ def _fits_in_slot(slot: TimeSlot, available: TimeSlot,
+ duration: int) -> bool:
+ if slot.date != available.date:
+ return False
+ latest_start = max(slot.start_time, available.start_time)
+ earliest_end = min(slot.end_time, available.end_time)
+ return earliest_end - latest_start >= duration
+```
+
+### Enhanced Scheduler Implementation
+```python
+class Scheduler(Admin):
+ def __init__(self, meeting: Meeting):
+ super().__init__(name="scheduler")
+ self.meeting = meeting
+ self.agreed_slots: Dict[TimeSlot, List[str]] = {}
+ self.current_slot: Optional[TimeSlot] = None
+ self.max_attempts = 10
+ self.attempt_count = 0
+
+ @on_connect("*")
+ async def handle_connection(self, topic: str, agent: AgentDetail):
+ if not self.current_slot:
+ self.current_slot = TimeSlot(
+ self.meeting.date,
+ 8, # Start at 8 AM
+ 8 + self.meeting.duration
+ )
+ await self._try_schedule()
+
+ async def _try_schedule(self):
+ if self.attempt_count >= self.max_attempts:
+ await self._handle_scheduling_failure()
+ return
+
+ self.attempt_count += 1
+ await self.broadcast_message(
+ AvailabilityRequest(time_slot=self.current_slot)
+ )
+
+ @on(AvailabilityResponse)
+ async def handle_response(self, data: AvailabilityResponse,
+ time: int, agent: AgentDetail):
+ if not data.available:
+ await self._try_next_slot()
+ return
+
+ slot_key = f"{data.time_slot.date}_{data.time_slot.start_time}"
+ if slot_key not in self.agreed_slots:
+ self.agreed_slots[slot_key] = []
+
+ if data.participant not in self.agreed_slots[slot_key]:
+ self.agreed_slots[slot_key].append(data.participant)
+
+ if len(self.agreed_slots[slot_key]) >= self.meeting.minimum_participants:
+ await self._finalize_meeting(slot_key)
+
+ async def _try_next_slot(self):
+ next_slot = TimeSlot(
+ self.meeting.date,
+ self.current_slot.start_time + 1,
+ self.current_slot.start_time + 1 + self.meeting.duration
+ )
+
+ if next_slot.end_time > 17: # Don't schedule after 5 PM
+ await self._handle_scheduling_failure()
+ return
+
+ self.current_slot = next_slot
+ await self._try_schedule()
+
+ async def _finalize_meeting(self, slot_key):
+ participants = self.agreed_slots[slot_key]
+ print(f"Meeting scheduled at {slot_key}")
+ print(f"Participants: {', '.join(participants)}")
+ await self.stop()
+
+ async def _handle_scheduling_failure(self):
+ print("Failed to find suitable time slot")
+ await self.stop()
+```
+
+## Scheduling Scenarios
+
+### 1. Successful First Attempt
+```mermaid
+sequenceDiagram
+ participant S as Scheduler
+ participant A as Alice
+ participant B as Bob
+ participant C as Charlie
+
+ Note over S: Meeting(duration=1, min_participants=2)
+
+ A->>S: Connect
+ B->>S: Connect
+ C->>S: Connect
+
+ S->>+A: AvailabilityRequest(8:00-9:00)
+ S->>+B: AvailabilityRequest(8:00-9:00)
+ S->>+C: AvailabilityRequest(8:00-9:00)
+
+ A-->>-S: Response(available=true)
+ B-->>-S: Response(available=true)
+ C-->>-S: Response(available=false)
+
+ Note over S: Minimum participants reached
+
+ S->>A: Meeting Confirmed
+ S->>B: Meeting Confirmed
+ S->>C: Meeting Confirmed
+```
+
+### 2. Multiple Attempts
+```mermaid
+sequenceDiagram
+ participant S as Scheduler
+ participant A as Alice
+ participant B as Bob
+
+ Note over S: Initial slot: 8:00-9:00
+
+ S->>A: AvailabilityRequest(8:00)
+ S->>B: AvailabilityRequest(8:00)
+
+ A-->>S: Response(false)
+ B-->>S: Response(false)
+
+ Note over S: Try next slot: 9:00-10:00
+
+ S->>A: AvailabilityRequest(9:00)
+ S->>B: AvailabilityRequest(9:00)
+
+ A-->>S: Response(true)
+ B-->>S: Response(true)
+
+ Note over S: Success at second attempt
+
+ S->>A: Meeting Confirmed(9:00)
+ S->>B: Meeting Confirmed(9:00)
+```
+
+### 3. Scheduling Failure
+```mermaid
+sequenceDiagram
+ participant S as Scheduler
+ participant A as Alice
+ participant B as Bob
+
+ Note over S: max_attempts = 3
+
+ loop 3 times
+ S->>A: AvailabilityRequest
+ S->>B: AvailabilityRequest
+ A-->>S: Response(false)
+ B-->>S: Response(false)
+ Note over S: Try next slot
+ end
+
+ Note over S: Max attempts reached
+
+ S->>A: Scheduling Failed
+ S->>B: Scheduling Failed
+```
+
+### 4. Conflict Resolution
+```mermaid
+sequenceDiagram
+ participant S as Scheduler
+ participant P1 as Participant1
+ participant P2 as Participant2
+
+ Note over P1,P2: Both have existing meeting 10:00-11:00
+
+ S->>P1: AvailabilityRequest(10:00)
+ S->>P2: AvailabilityRequest(10:00)
+
+ Note over P1: Check conflicts
+ Note over P2: Check conflicts
+
+ P1-->>S: Response(false)
+ P2-->>S: Response(false)
+
+ Note over S: Adjust time slot
+
+ S->>P1: AvailabilityRequest(11:00)
+ S->>P2: AvailabilityRequest(11:00)
+
+ P1-->>S: Response(true)
+ P2-->>S: Response(true)
+```
+
+## Error Handling and Recovery
+
+1. Participant Disconnection:
+```python
+@on_disconnect
+async def handle_disconnection(self, agent: AgentDetail):
+ # Remove from current slot agreements
+ for participants in self.agreed_slots.values():
+ if agent.name in participants:
+ participants.remove(agent.name)
+
+ # Retry current slot if minimum participants lost
+ if self.current_slot:
+ await self._try_schedule()
+```
+
+2. Invalid Time Slots:
+```python
+def validate_time_slot(slot: TimeSlot) -> bool:
+ if not slot.validate():
+ return False
+
+ # Business hours check (8 AM - 5 PM)
+ if not (8 <= slot.start_time <= 17 and
+ 8 <= slot.end_time <= 17):
+ return False
+
+ # Weekend check
+ date_obj = datetime.strptime(slot.date, "%Y-%m-%d")
+ if date_obj.weekday() >= 5:
+ return False
+
+ return True
+```
+
+3. Scheduling Timeout:
+```python
+async def schedule_with_timeout(self, timeout: int = 300):
+ try:
+ async with asyncio.timeout(timeout):
+ await self._try_schedule()
+ except asyncio.TimeoutError:
+ await self._handle_scheduling_failure()
+```
+
+## System Extensions
+
+1. Priority-based Scheduling:
+```python
+class PriorityScheduler(Scheduler):
+ def __init__(self, meetings: List[Meeting]):
+ self.meetings = sorted(
+ meetings,
+ key=lambda m: m.priority,
+ reverse=True
+ )
+```
+
+2. Recurring Meetings:
+```python
+@dataclass
+class RecurringMeeting(Meeting):
+ frequency: str # "daily", "weekly", "monthly"
+ end_date: str
+
+ def generate_instances(self) -> List[Meeting]:
+ instances = []
+ current = datetime.strptime(self.date, "%Y-%m-%d")
+ end = datetime.strptime(self.end_date, "%Y-%m-%d")
+
+ while current <= end:
+ instances.append(Meeting(
+ name=self.name,
+ date=current.strftime("%Y-%m-%d"),
+ duration=self.duration,
+ minimum_participants=self.minimum_participants
+ ))
+
+ if self.frequency == "daily":
+ current += timedelta(days=1)
+ elif self.frequency == "weekly":
+ current += timedelta(weeks=1)
+ elif self.frequency == "monthly":
+ current = current.replace(
+ month=current.month % 12 + 1,
+ year=current.year + current.month // 12
+ )
+
+ return instances
+```
\ No newline at end of file
diff --git a/docs/getting_start.md b/docs/getting-started.md
similarity index 100%
rename from docs/getting_start.md
rename to docs/getting-started.md
diff --git a/docs/technolgy.md b/docs/technology.md
similarity index 100%
rename from docs/technolgy.md
rename to docs/technology.md
diff --git a/mkdocs.yml b/mkdocs.yml
index dc5af5d..303fc31 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -62,6 +62,7 @@ use_directory_urls: false
plugins:
- search
- tags
+ - offline
markdown_extensions:
- tables
@@ -85,4 +86,21 @@ markdown_extensions:
custom_fences:
- name: mermaid
class: mermaid
- format: !!python/name:pymdownx.superfences.fence_code_format
\ No newline at end of file
+ format: !!python/name:pymdownx.superfences.fence_code_format
+
+nav:
+ - Home: index.md
+ - Getting started:
+ - Installation: getting-started.md
+ - Quickstart: quickstart.md
+ - Technology: technology.md
+ - Core Concepts: core-concepts.md
+ - How to:
+ - Overview: tutorials.md
+ - Best Practices: best-practices.md
+ - Examples:
+ - Time Scheduling: examples/time-scheduling.md
+ - Single Item Auction: examples/auction.md
+ - Task Manager: examples/task-manager.md
+ - Distributed Agents: examples/connect-through-network.md
+