From 6afbe816853fbd26b087456587a7d4f026e628b4 Mon Sep 17 00:00:00 2001 From: Dewmal Date: Sun, 26 Jan 2025 07:38:23 +0530 Subject: [PATCH] improve documents --- docs/{best_practice.md => best-practices.md} | 0 docs/core-concepts.md | 313 ++++++++++ docs/examples/auction.md | 251 ++++++++ docs/examples/connect-through-network.md | 177 ++++-- docs/examples/meeting-sechdular.md | 366 ------------ docs/examples/single-item-auction.md | 196 ------ docs/examples/task-manager.md | 565 ++++++++++-------- docs/examples/time-scheduling.md | 436 ++++++++++++++ docs/{getting_start.md => getting-started.md} | 0 docs/{technolgy.md => technology.md} | 0 mkdocs.yml | 20 +- 11 files changed, 1456 insertions(+), 868 deletions(-) rename docs/{best_practice.md => best-practices.md} (100%) create mode 100644 docs/core-concepts.md create mode 100644 docs/examples/auction.md delete mode 100644 docs/examples/meeting-sechdular.md delete mode 100644 docs/examples/single-item-auction.md create mode 100644 docs/examples/time-scheduling.md rename docs/{getting_start.md => getting-started.md} (100%) rename docs/{technolgy.md => technology.md} (100%) diff --git a/docs/best_practice.md b/docs/best-practices.md similarity index 100% rename from docs/best_practice.md rename to docs/best-practices.md diff --git a/docs/core-concepts.md b/docs/core-concepts.md new file mode 100644 index 0000000..26ab4ad --- /dev/null +++ b/docs/core-concepts.md @@ -0,0 +1,313 @@ +# Ceylon Framework Core Concepts Guide + +## Agent System Architecture + +### Admin Agent +The central coordinator in a Ceylon system. + +```python +from ceylon import Admin + +class CoordinatorAdmin(Admin): + def __init__(self, name="coordinator", port=8888): + super().__init__(name=name, port=port) +``` + +Key characteristics: +- One admin per system +- Manages worker connections +- Coordinates task distribution +- Handles system-wide state + +### Worker Agent +Performs specific tasks within the system. + +```python +from ceylon import Worker + +class TaskWorker(Worker): + def __init__(self, name: str, role: str): + super().__init__(name=name, role=role) +``` + +Key characteristics: +- Multiple workers per system +- Specialized task execution +- Reports to admin agent +- Independent operation + +## Message System + +### Message Types +```python +from dataclasses import dataclass +from typing import Any + +@dataclass(frozen=True) +class Message: + id: str + content: Any + timestamp: float +``` + +Core message principles: +- Immutable data structures +- Type-safe communication +- Metadata inclusion +- Serializable format + +### Message Handlers +```python +from ceylon import on, on_run, on_connect + +class MessageHandling: + @on(MessageType) + async def handle_specific(self, msg: MessageType): + # Handle specific message type + pass + + @on_run() + async def handle_run(self, inputs: bytes): + # Main execution loop + pass + + @on_connect("*") + async def handle_connection(self, topic: str, agent: AgentDetail): + # Connection handling + pass +``` + +## Communication Patterns + +### Direct Communication +```python +class DirectCommunication(Worker): + async def send_to_peer(self, peer_id: str, data: Any): + await self.send_message(peer_id, data) +``` + +### Broadcast Communication +```python +class BroadcastCommunication(Admin): + async def notify_all(self, data: Any): + await self.broadcast_message(data) +``` + +## State Management + +### Agent State +```python +from enum import Enum + +class AgentState(Enum): + IDLE = "idle" + PROCESSING = "processing" + ERROR = "error" + +class StateManagement(Worker): + def __init__(self): + super().__init__() + self.state = AgentState.IDLE +``` + +### State Transitions +```python +class StatefulAgent(Worker): + async def transition_state(self, new_state: AgentState): + old_state = self.state + self.state = new_state + await self.notify_state_change(old_state, new_state) +``` + +## Event Processing + +### Event Handling +```python +class EventProcessor(Worker): + @on(Event) + async def process_event(self, event: Event): + if self.can_handle(event): + await self.handle_event(event) + else: + await self.forward_event(event) +``` + +### Event Flow +```python +class EventFlow(Admin): + async def manage_event_flow(self, event: Event): + # Preprocessing + processed_event = await self.preprocess(event) + + # Distribution + await self.distribute_event(processed_event) + + # Monitoring + await self.monitor_event_processing(processed_event) +``` + +## Resource Management + +### Resource Lifecycle +```python +class ResourceManager(Worker): + async def __aenter__(self): + await self.acquire_resources() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.release_resources() +``` + +### Resource Pooling +```python +class ResourcePool: + def __init__(self, size: int): + self.pool = asyncio.Queue(size) + self.in_use = set() + + async def acquire(self): + resource = await self.pool.get() + self.in_use.add(resource) + return resource + + async def release(self, resource): + self.in_use.remove(resource) + await self.pool.put(resource) +``` + +## Error Handling + +### Basic Error Handling +```python +class ErrorHandler(Worker): + async def safe_execute(self, task): + try: + return await self.execute_task(task) + except Exception as e: + await self.handle_error(task, e) + raise +``` + +### Retry Mechanism +```python +class RetryMechanism(Worker): + async def with_retry(self, operation, max_retries=3): + for attempt in range(max_retries): + try: + return await operation() + except Exception as e: + if attempt == max_retries - 1: + raise + await asyncio.sleep(2 ** attempt) +``` + +## System Integration + +### External Service Integration +```python +class ServiceIntegrator(Worker): + async def interact_with_service(self, service_request): + # Convert to external format + external_request = self.convert_request(service_request) + + # Make external call + response = await self.call_service(external_request) + + # Convert response back + return self.convert_response(response) +``` + +### Data Flow +```python +class DataFlowManager(Admin): + async def manage_data_flow(self, data): + # Input validation + validated_data = await self.validate(data) + + # Processing + processed_data = await self.process(validated_data) + + # Distribution + await self.distribute(processed_data) +``` + +## Core Utilities + +### Message Conversion +```python +class MessageConverter: + @staticmethod + def to_bytes(message: Any) -> bytes: + return pickle.dumps(message) + + @staticmethod + def from_bytes(data: bytes) -> Any: + return pickle.loads(data) +``` + +### Agent Identification +```python +class AgentIdentification: + @staticmethod + def create_agent_id(name: str, role: str) -> str: + return f"{name}_{role}_{uuid.uuid4()}" +``` + +## System Lifecycle + +### Initialization +```python +async def initialize_system(): + # Create admin + admin = AdminAgent(port=8888) + + # Create workers + workers = [ + WorkerAgent(f"worker_{i}") + for i in range(3) + ] + + # Start system + await admin.start_agent(b"", workers) +``` + +### Shutdown +```python +async def shutdown_system(admin: Admin, workers: List[Worker]): + # Stop workers + for worker in workers: + await worker.stop() + + # Stop admin + await admin.stop() +``` + +## Key Concepts Summary + +1. **Agent Hierarchy** + - Admin agents coordinate + - Worker agents execute + - Clear responsibility separation + +2. **Message-Based Communication** + - Type-safe messages + - Asynchronous processing + - Event-driven architecture + +3. **State Management** + - Clear state definitions + - Controlled transitions + - State monitoring + +4. **Resource Handling** + - Proper initialization + - Clean cleanup + - Resource pooling + +5. **Error Management** + - Graceful error handling + - Retry mechanisms + - Error reporting + +These core concepts provide the foundation for building robust distributed systems with Ceylon. \ No newline at end of file diff --git a/docs/examples/auction.md b/docs/examples/auction.md new file mode 100644 index 0000000..20c90a5 --- /dev/null +++ b/docs/examples/auction.md @@ -0,0 +1,251 @@ +# Distributed Auction System + +This tutorial demonstrates building a robust, secure distributed auction system using the Ceylon multi-agent framework. The system implements a scalable architecture with an auctioneer agent orchestrating the auction process and multiple authenticated bidder agents participating in real-time auctions. + +## System Overview + +The auction system implements: +- Multiple concurrent auction support with isolation +- Secure bid validation and authentication +- Configurable auction rules (reserve price, minimum increment) +- Fault-tolerant communication with automatic recovery +- Scalable bid processing with rate limiting + +## Core Components + +### Data Models + +```python +@dataclass +class Item: + name: str + starting_price: float + reserve_price: Optional[float] = None + min_increment: float = 1.0 + auction_duration: timedelta = timedelta(minutes=5) + +@dataclass +class Bid: + bidder: str + amount: float + timestamp: float + signature: bytes # For bid verification + auction_id: str + +@dataclass +class AuctionStart: + item: Item + auction_id: str + start_time: float + +@dataclass +class AuctionResult: + winner: str + winning_bid: float + auction_id: str + end_time: float + +@dataclass +class AuctionEnd: + auction_id: str + reason: str # "completed", "reserve_not_met", "cancelled" +``` + +### Auctioneer Agent + +The auctioneer manages multiple concurrent auctions: + +```python +class Auctioneer(BaseAgent): + def __init__(self, name="auctioneer", port=8888): + super().__init__( + name=name, + mode=PeerMode.ADMIN, + role="auctioneer", + port=port + ) + self.active_auctions: Dict[str, AuctionState] = {} + self.bid_queue = asyncio.Queue(maxsize=1000) + self.bidder_registry: Dict[str, BidderInfo] = {} +``` + +Key features: +- Asynchronous bid processing with rate limiting +- Automatic auction timeouts and cleanup +- Bid verification and signature validation +- Bidder reputation tracking +- Comprehensive error handling and recovery + +### Bidder Agent + +Each bidder implements secure auction participation: + +```python +class Bidder(BaseAgent): + def __init__(self, name: str, budget: float, strategy: BiddingStrategy): + super().__init__( + name=name, + mode=PeerMode.CLIENT, + role="bidder" + ) + self.budget = budget + self.strategy = strategy + self.active_bids: Dict[str, float] = {} + self.key_pair = generate_keypair() # For bid signing +``` + +Key features: +- Configurable bidding strategies +- Bid signing and verification +- Budget enforcement and tracking +- Automatic bid updates on price changes +- Graceful handling of connection issues + +## Advanced Bidding Strategies + +```python +class SmartBidder(Bidder): + async def calculate_bid(self, item: Item, auction_state: AuctionState) -> float: + market_value = await self.estimate_market_value(item) + competition = len(auction_state.active_bidders) + time_remaining = auction_state.end_time - time.time() + + if time_remaining < 60: # Last minute strategy + return min(self.budget, market_value * 1.1) + + return min( + self.budget, + market_value * (0.8 + (0.4 * (1 - time_remaining/auction_state.duration))) + ) +``` + +## Error Handling and Recovery + +```python +class AuctionManager: + async def handle_disconnection(self, bidder_id: str): + affected_auctions = self.find_affected_auctions(bidder_id) + for auction in affected_auctions: + if auction.status == "pending_payment": + await self.revert_auction(auction.id) + else: + await self.pause_auction(auction.id) + await self.notify_participants(auction.id, "auction_paused") + + async def recover_auction_state(self, auction_id: str): + state = await self.load_backup_state(auction_id) + await self.verify_bid_sequence(state.bids) + await self.resume_auction(state) +``` + +## Running the System + +1. Initialize auction system with configuration: +```python +config = AuctionConfig( + min_bidders=3, + bid_timeout=30, + max_concurrent_auctions=10, + bid_verification_required=True +) +auctioneer = Auctioneer(config) +``` + +2. Create authenticated bidders: +```python +bidders = [ + SmartBidder("Alice", 1500.0, MarketValueStrategy()), + SmartBidder("Bob", 1200.0, AggressiveStrategy()), + SmartBidder("Charlie", 2000.0, ConservativeStrategy()) +] +``` + +3. Start the system with monitoring: +```python +async with AuctionMonitor() as monitor: + await auctioneer.start_agent(b"", bidders) + await monitor.track_metrics() +``` + +## System Architecture + +```mermaid +sequenceDiagram + participant A as Auctioneer + participant B1 as Bidder1 + participant B2 as Bidder2 + participant B3 as Bidder3 + participant M as Monitor + + Note over A,M: Initialization Phase + A->>M: Register System + B1->>A: Connect(auth_token) + A->>B1: Connection Confirmed + B2->>A: Connect(auth_token) + A->>B2: Connection Confirmed + B3->>A: Connect(auth_token) + A->>B3: Connection Confirmed + + Note over A,M: Auction Start + A->>M: Start Monitoring + A->>B1: AuctionStart(item, rules) + A->>B2: AuctionStart(item, rules) + A->>B3: AuctionStart(item, rules) + + Note over A,M: Active Bidding + B1-->>A: SignedBid(amount, timestamp) + A-->>M: Log Bid + A->>B1: BidAcknowledged + B2-->>A: SignedBid(amount, timestamp) + A-->>M: Log Bid + A->>B2: BidAcknowledged + B3-->>A: SignedBid(amount, timestamp) + A-->>M: Log Bid + A->>B3: BidAcknowledged + + Note over A,M: Auction Close + A->>M: Finalize Metrics + A->>B1: AuctionResult(winner, amount) + A->>B2: AuctionResult(winner, amount) + A->>B3: AuctionResult(winner, amount) + A->>M: Store Results +``` + +## Performance Monitoring + +The system includes built-in monitoring for: +- Bid processing latency +- Message queue depths +- Bidder connection health +- Auction completion rates +- System resource utilization + +## Security Considerations + +1. Bid Authentication +- All bids are cryptographically signed +- Timestamps prevent replay attacks +- Rate limiting prevents DoS attacks + +2. Data Integrity +- Auction state is regularly checkpointed +- Bid history is immutable and verifiable +- Transaction logs for audit trails + +3. Access Control +- Bidder authentication required +- Role-based permissions +- Resource usage quotas + +## Scalability Features + +1. Horizontal Scaling +- Multiple auctioneer instances +- Load-balanced bidder connections +- Distributed state management + +2. Performance Optimization +- Asynchronous bid processing +- Efficient state management +- Connection pooling +- Message batching \ No newline at end of file diff --git a/docs/examples/connect-through-network.md b/docs/examples/connect-through-network.md index a7a98b1..93093bc 100644 --- a/docs/examples/connect-through-network.md +++ b/docs/examples/connect-through-network.md @@ -5,41 +5,71 @@ ### 1. Server (server.py) ```python -from ceylon import Admin +from ceylon import Admin, enable_log +from loguru import logger -app = Admin(name="admin", port=8888, role="admin") +enable_log("INFO") +app = Admin( + name="admin", + port=8888, + role="admin", + workspace_id="default" +) @app.on_run() async def run_worker(inputs: bytes): + logger.info(f"Worker started - {app.details().name}") while True: - await app.broadcast_message("Hello World from Server") + try: + message = {"type": "heartbeat", "source": app.details().name} + await app.broadcast_message(message) + await asyncio.sleep(1) # Prevent CPU spinning + except Exception as e: + logger.error(f"Error in broadcast: {e}") ``` -The server broadcasts messages continuously to all connected workers. +The server implements a heartbeat system and proper error handling. ### 2. Worker (worker_agent.py) ```python -from ceylon import Worker - -worker = Worker(name="worker", port=8888, role="worker") - - -@worker.on(str) -async def on_message(agent_id: str, data: str, time: int): - print(f"Received message from {agent_id}: {data} at {time}") +from ceylon import Worker, AgentDetail, enable_log +from loguru import logger + +enable_log("INFO") +worker = Worker( + name="worker", + role="worker", + workspace_id="default" +) + + +@worker.on(dict) +async def on_message(agent: AgentDetail, data: dict, time: int): + try: + logger.info(f"Message from {agent.name}: {data}") + if data.get("type") == "heartbeat": + # Handle heartbeat + pass + except Exception as e: + logger.error(f"Error processing message: {e}") + + +@worker.on_connect("*") +async def on_connect(topic: str, agent: AgentDetail): + logger.info(f"Connected to {agent.name} on {topic}") ``` -The worker listens for and processes messages from the server. +Workers implement proper message handling and connection events. ### 3. Configuration (.ceylon_network) -``` +```ini WORKSPACE_ID=default WORKSPACE_IP=127.0.0.1 -WORKSPACE_PEER=12D3KooWMrqMLuYL3vExw7qaBJRzjN43kkkZwqSxUog7oaQCmnFE WORKSPACE_PORT=8888 +WORKSPACE_BUFFER_SIZE=1024 ``` ## Setup Instructions @@ -48,43 +78,102 @@ WORKSPACE_PORT=8888 ```bash python server.py ``` - - Server creates .ceylon_network file with connection details - - WORKSPACE_PEER is auto-generated unique identifier + - Creates .ceylon_network with auto-generated configuration + - Initializes message handlers and event system 2. Start Worker(s): ```bash python worker_agent.py ``` - - Worker reads .ceylon_network file - - Connects to server using WORKSPACE_PEER + - Connects using configuration from .ceylon_network + - Establishes bidirectional communication ## Remote Connection Setup -1. Copy .ceylon_network to remote machine -2. Update WORKSPACE_IP if needed -3. Run worker_agent.py on remote machine - -## Network Configuration - -- Default port: 8888 -- Local IP: 127.0.0.1 -- For remote connections: - - Update WORKSPACE_IP to server's IP - - Ensure port 8888 is accessible - -## Common Issues +1. Configure Network: + - Open required ports on firewall + - Set up secure network tunneling if needed + - Update WORKSPACE_IP to server's external IP -1. Connection Failures - - Verify .ceylon_network file exists - - Check WORKSPACE_IP and port accessibility - - Ensure WORKSPACE_PEER matches server +2. Security Considerations: + - Use environment-specific configuration files + - Implement proper access controls + - Monitor connections and traffic -2. Network Constraints - - Configure firewalls to allow port 8888 - - Use correct IP for non-local connections - -## Security Notes +## Network Configuration -- WORKSPACE_PEER acts as unique identifier -- Keep .ceylon_network secure for controlled access -- Update configuration for production environments \ No newline at end of file +- Default settings in static_val.py: + - Port: DEFAULT_WORKSPACE_PORT (8888) + - Buffer: DEFAULT_WORKSPACE_BUFFER_SIZE (100) + - Workspace: DEFAULT_WORKSPACE_ID + +## Common Issues & Solutions + +1. Connection Issues: + - Verify network connectivity + - Check configuration file paths + - Enable DEBUG logging for troubleshooting + - Validate port accessibility + +2. System Performance: + - Monitor buffer sizes + - Implement rate limiting if needed + - Use appropriate logging levels + +## Production Guidelines + +1. Security: + - Use secure configuration management + - Implement authentication + - Enable network encryption + - Regular security audits + +2. Monitoring: + - Implement health checks + - Set up logging aggregation + - Monitor system metrics + - Establish alerting + +## System Interaction Flow + +```mermaid +sequenceDiagram + participant Admin + participant Network + participant Worker1 + participant Worker2 + + Note over Admin: Start server + Admin->>Network: Create .ceylon_network + + Note over Worker1,Worker2: Workers start + Worker1->>Network: Read config + Worker2->>Network: Read config + + Worker1->>Admin: Connect + activate Admin + Admin-->>Worker1: Connection accepted + Admin->>Worker1: on_agent_connected event + deactivate Admin + + Worker2->>Admin: Connect + activate Admin + Admin-->>Worker2: Connection accepted + Admin->>Worker2: on_agent_connected event + deactivate Admin + + loop Heartbeat + Admin->>Worker1: Broadcast message + Admin->>Worker2: Broadcast message + Worker1-->>Admin: Process message + Worker2-->>Admin: Process message + end + + Note over Worker1: Disconnect + Worker1->>Admin: Stop connection + Admin->>Worker2: Update connected agents + + Note over Admin: Shutdown + Admin->>Worker2: Broadcast shutdown + Admin->>Network: Cleanup +``` \ No newline at end of file diff --git a/docs/examples/meeting-sechdular.md b/docs/examples/meeting-sechdular.md deleted file mode 100644 index 95e0889..0000000 --- a/docs/examples/meeting-sechdular.md +++ /dev/null @@ -1,366 +0,0 @@ -# Meeting Scheduler - - -## Introduction - -This tutorial will show you how to create a distributed meeting scheduler using Python with Ceylon framework. The scheduler finds optimal meeting times by coordinating between multiple participants using an agent-based approach. - -## Step 1: Set Up the Environment - -First, install the required dependencies: - -```bash -pip install ceylon pydantic -``` - -## Step 2: Define the Data Models - -We'll use Pydantic for data validation and serialization. Here are our core data models: - -```python -from pydantic import BaseModel -from pydantic.dataclasses import dataclass -from typing import List, Any - -# Input model for the scheduler -class RunnerInput(BaseModel): - request: Any - - class Config: - arbitrary_types_allowed = True - -@dataclass(repr=True) -class Meeting: - name: str # Meeting name - date: str # Meeting date - duration: int # Duration in hours - minimum_participants: int # Minimum required participants - -@dataclass(repr=True) -class TimeSlot: - date: str # Date of the slot - start_time: int # Start hour (0-23) - end_time: int # End hour (0-23) - - @property - def duration(self): - return self.end_time - self.start_time - - def is_greater_than(self, other): - return self.end_time > other.end_time - -@dataclass(repr=True) -class AvailabilityRequest: - time_slot: TimeSlot - -@dataclass(repr=True) -class AvailabilityResponse: - owner: str # Participant name - time_slot: TimeSlot - accepted: bool # Availability status -``` - -## Step 3: Implement the Participant Agent - -The Participant class represents each meeting attendee: - -```python -class Participant(Worker): - name: str - available_times: List[TimeSlot] - - def __init__(self, name, available_times): - self.name = name - self.available_times = available_times - super().__init__(name=name, workspace_id=workspace_id, - admin_peer=admin_peer, admin_port=admin_port) - - async def on_message(self, agent_id: str, data: bytes, time: int): - data = pickle.loads(data) - if type(data) == AvailabilityRequest: - data: AvailabilityRequest = data - # Check availability and respond - is_available = any(self.is_overlap(slot, data.time_slot, - data.time_slot.duration) - for slot in self.available_times) - - response = AvailabilityResponse( - owner=self.details().name, - time_slot=data.time_slot, - accepted=is_available - ) - await self.broadcast(pickle.dumps(response)) - - @staticmethod - def is_overlap(slot1: TimeSlot, slot2: TimeSlot, duration: int) -> bool: - latest_start = max(slot1.start_time, slot2.start_time) - earliest_end = min(slot1.end_time, slot2.end_time) - return earliest_end - latest_start >= duration -``` - -## Step 4: Implement the Coordinator - -The Coordinator manages the scheduling process: - -```python -class Coordinator(Admin): - meeting: Meeting = None - agreed_slots = {} - next_time_slot = None - - def __init__(self): - super().__init__(name=workspace_id, port=admin_port) - - async def run(self, inputs: bytes): - input: RunnerInput = pickle.loads(inputs) - self.meeting = input.request - print("Meeting Schedule request: ", self.meeting) - - async def on_agent_connected(self, topic: str, agent_id: str): - if self.next_time_slot is None and self.meeting is not None: - self.next_time_slot = TimeSlot( - self.meeting.date, 0, self.meeting.duration - ) - await self.broadcast(pickle.dumps( - AvailabilityRequest(time_slot=self.next_time_slot) - )) - - async def on_message(self, agent_id: str, data: bytes, time: int): - data = pickle.loads(data) - if type(data) == AvailabilityResponse: - await self.handle_availability_response(data) - - async def handle_availability_response(self, data: AvailabilityResponse): - if data.accepted: - time_slot_key = f"{data.time_slot}" - print(f"{data.owner} accepts {data.time_slot}") - - # Track acceptances and check if we have enough participants - if time_slot_key in self.agreed_slots: - slots = self.agreed_slots[time_slot_key] - if data.owner not in slots: - slots.append(data.owner) - self.agreed_slots[time_slot_key] = slots - if len(slots) >= self.meeting.minimum_participants: - print(f"Meeting {slots} participants agreed on {data.time_slot}") - await self.stop() - return - else: - self.agreed_slots[time_slot_key] = [data.owner] - - # Try next time slot - current_time_slot = data.time_slot - next_time_slot = TimeSlot( - self.meeting.date, - current_time_slot.start_time + 1, - current_time_slot.start_time + 1 + self.meeting.duration - ) - - if next_time_slot.is_greater_than(self.next_time_slot): - self.next_time_slot = next_time_slot - await self.broadcast(pickle.dumps( - AvailabilityRequest(time_slot=self.next_time_slot) - )) -``` - -## Step 5: Run the Scheduler - -Here's how to set up and run the scheduler: - -```python -async def main(): - # Create participants with their available times - participants = [ - Participant("Alice", [ - TimeSlot("2024-07-21", 9, 12), - TimeSlot("2024-07-21", 14, 18) - ]), - Participant("Bob", [ - TimeSlot("2024-07-21", 10, 13), - TimeSlot("2024-07-21", 15, 17) - ]), - Participant("Charlie", [ - TimeSlot("2024-07-21", 11, 14), - TimeSlot("2024-07-21", 16, 18) - ]), - Participant("David", [ - TimeSlot("2024-07-21", 11, 14), - TimeSlot("2024-07-21", 16, 18) - ]), - Participant("Kevin", [ - TimeSlot("2024-07-21", 10, 13), - TimeSlot("2024-07-21", 15, 17) - ]) - ] - - # Create and run coordinator - coordinator = Coordinator() - meeting = Meeting( - name="Meeting 1", - duration=2, - date="2024-07-21", - minimum_participants=3 - ) - - await coordinator.arun_admin( - inputs=pickle.dumps(RunnerInput(request=meeting)), - workers=participants - ) - -if __name__ == '__main__': - asyncio.run(main()) -``` - -## How It Works - -1. The scheduler uses a distributed agent-based architecture where each participant is an independent agent. -2. The Coordinator initiates the scheduling process by sending availability requests for time slots. -3. Each Participant agent checks their availability and responds to requests. -4. The Coordinator tracks responses and finds a time slot that works for the minimum required participants. -5. The system uses efficient overlap detection to check time slot compatibility. - -## Key Improvements from Basic Version - -1. Uses Pydantic for robust data validation -2. Implements proper serialization/deserialization -3. Adds duration-aware time slot overlap detection -4. Supports multiple participants beyond the minimum required -5. Includes better error handling and type safety -6. Uses asynchronous communication for better performance - -## Potential Enhancements - -1. Add persistence layer for storing scheduling history -2. Implement priority-based scheduling -3. Add support for recurring meetings -4. Implement calendar integration (Google Calendar, Outlook) -5. Add conflict resolution for competing meeting requests -6. Implement notification system for scheduled meetings -7. Add support for different time zones -8. Create a REST API interface for web/mobile clients - ---- - -## Multiple case scenarios - -### All participants are available at a given time -````mermaid -sequenceDiagram - participant C as Coordinator - participant A as Alice - participant B as Bob - participant D as Charlie - - Note over C,D: All participants connect to coordinator - - C->>A: AvailabilityRequest(slot=9:00) - C->>B: AvailabilityRequest(slot=9:00) - C->>D: AvailabilityRequest(slot=9:00) - - A->>C: AvailabilityResponse(accepted=true) - B->>C: AvailabilityResponse(accepted=true) - D->>C: AvailabilityResponse(accepted=true) - - Note over C: Minimum participants reached
(3 acceptances for 9:00 slot) - - C->>A: Meeting Confirmed - C->>B: Meeting Confirmed - C->>D: Meeting Confirmed - - Note over C: Coordinator stops
scheduling process -```` -### Someone is unavailable at a given time -````mermaid -sequenceDiagram - participant C as Coordinator - participant A as Alice - participant B as Bob - participant D as Charlie - - Note over C,D: Initial connection phase - - C->>A: AvailabilityRequest(slot=9:00) - C->>B: AvailabilityRequest(slot=9:00) - C->>D: AvailabilityRequest(slot=9:00) - - A->>C: AvailabilityResponse(accepted=true) - B->>C: AvailabilityResponse(accepted=false) - D->>C: AvailabilityResponse(accepted=true) - - Note over C: Not enough acceptances,
try next slot - - C->>A: AvailabilityRequest(slot=10:00) - C->>B: AvailabilityRequest(slot=10:00) - C->>D: AvailabilityRequest(slot=10:00) - - A->>C: AvailabilityResponse(accepted=true) - B->>C: AvailabilityResponse(accepted=true) - D->>C: AvailabilityResponse(accepted=true) - - Note over C: Minimum participants reached
(3 acceptances for 10:00 slot) - - C->>A: Meeting Confirmed - C->>B: Meeting Confirmed - C->>D: Meeting Confirmed - - Note over C: Coordinator stops
scheduling process -```` - -### Advance - -````mermaid -sequenceDiagram - participant M as Main - participant C as Coordinator - participant A as Alice - participant B as Bob - participant Ch as Charlie - - M->>C: start_agent(Meeting) - M->>A: Initialize - M->>B: Initialize - M->>Ch: Initialize - - A-->>C: Connect - B-->>C: Connect - Ch-->>C: Connect - - Note over C: Create TimeSlot
(start=8:00) - - par Broadcast Availability Request - C->>A: AvailabilityRequest(slot=8:00) - C->>B: AvailabilityRequest(slot=8:00) - C->>Ch: AvailabilityRequest(slot=8:00) - end - - Note over A: Check is_overlap() - Note over B: Check is_overlap() - Note over Ch: Check is_overlap() - - A->>C: AvailabilityResponse(accepted=false) - B->>C: AvailabilityResponse(accepted=false) - Ch->>C: AvailabilityResponse(accepted=false) - - Note over C: Calculate next slot
(start=9:00) - - par Broadcast Next Slot - C->>A: AvailabilityRequest(slot=9:00) - C->>B: AvailabilityRequest(slot=9:00) - C->>Ch: AvailabilityRequest(slot=9:00) - end - - A->>C: AvailabilityResponse(accepted=true) - B->>C: AvailabilityResponse(accepted=true) - Ch->>C: AvailabilityResponse(accepted=true) - - Note over C: Minimum participants reached - - C-->>A: Stop - C-->>B: Stop - C-->>Ch: Stop - C-->>M: Meeting Scheduled -```` - -## License -Copyright 2024-Present, Syigen Ltd. and Syigen Private Limited. All rights reserved. -Licensed under the Apache License, Version 2.0 (See LICENSE or http://www.apache.org/licenses/LICENSE-2.0). \ No newline at end of file diff --git a/docs/examples/single-item-auction.md b/docs/examples/single-item-auction.md deleted file mode 100644 index bf816ed..0000000 --- a/docs/examples/single-item-auction.md +++ /dev/null @@ -1,196 +0,0 @@ -# Distributed Auction System - -This tutorial demonstrates how to build a distributed auction system using the Ceylon multi-agent framework. The system consists of an auctioneer agent managing the auction process and multiple bidder agents competing for items. - -## System Overview - -The auction system implements: -- Single-item auctions with multiple bidders -- Automatic bid placement based on budget constraints -- Real-time auction status updates -- Distributed communication between auctioneer and bidders - -## Core Components - -### Data Models - -```python -@dataclass -class Item: - name: str - starting_price: float - -@dataclass -class Bid: - bidder: str - amount: float - -@dataclass -class AuctionStart: - item: Item - -@dataclass -class AuctionResult: - winner: str - winning_bid: float - -@dataclass -class AuctionEnd: - pass -``` - -### Auctioneer Agent - -The auctioneer manages the auction process: - -```python -class Auctioneer(BaseAgent): - def __init__(self, item: Item, expected_bidders: int, name="auctioneer", port=8888): - super().__init__( - name=name, - mode=PeerMode.ADMIN, - role="auctioneer", - port=port - ) - self.item = item - self.expected_bidders = expected_bidders - self.bids: List[Bid] = [] - self.auction_ended = False -``` - -Key methods: -- `handle_connection`: Monitors bidder connections and starts auction when all bidders join -- `handle_bid`: Processes incoming bids -- `end_auction`: Determines winner and broadcasts results - -### Bidder Agent - -Each bidder participates in the auction: - -```python -class Bidder(BaseAgent): - def __init__(self, name: str, budget: float, - workspace_id=DEFAULT_WORKSPACE_ID, - admin_peer="", - admin_port=8888): - super().__init__( - name=name, - mode=PeerMode.CLIENT, - role="bidder" - ) - self.budget = budget - self.has_bid = False -``` - -Key methods: -- `handle_auction_start`: Places bid when auction begins -- `handle_auction_result`: Processes auction results -- `handle_auction_end`: Acknowledges auction completion - -## Bidding Strategy - -Bidders use a simple random strategy: -```python -random_multiplier = random.randint(100, 1000) / 100 -bid_amount = min(self.budget, auction_start.item.starting_price * random_multiplier) -``` - -## Running the System - -1. Create auction item and auctioneer: -```python -item = Item("Rare Painting", 1000.0) -auctioneer = Auctioneer(item, expected_bidders=3, port=8455) -admin_details = auctioneer.details() -``` - -2. Create bidders: -```python -bidders = [ - Bidder("Alice", 1500.0, admin_peer=admin_details.id), - Bidder("Bob", 1200.0, admin_peer=admin_details.id), - Bidder("Charlie", 2000.0, admin_peer=admin_details.id) -] -``` - -3. Start the system: -```python -await auctioneer.start_agent(b"", bidders) -``` - -## Sample Output - -``` -ceylon version: 0.22.1 -visit https://ceylon.ai for more information -2025-01-26 00:04:41.323 | INFO | __main__::161 - Initializing auction system... -2025-01-26 00:04:41.327 | INFO | __main__:main:155 - Starting auction system... -2025-01-26 00:04:41.327 | INFO | ceylon.base.uni_agent:start_agent:76 - Starting auctioneer agent in ADMIN mode -2025-01-26 00:04:41.389 | INFO | __main__:handle_run:91 - Auctioneer started - auctioneer -2025-01-26 00:04:41.389 | INFO | __main__:handle_run:138 - Bidder started - Alice -2025-01-26 00:04:41.389 | INFO | __main__:handle_run:138 - Bidder started - Bob -2025-01-26 00:04:41.389 | INFO | __main__:handle_run:138 - Bidder started - Charlie -2025-01-26 00:04:41.389 | INFO | __main__:handle_run:138 - Bidder started - Jon -2025-01-26 00:04:41.548 | INFO | __main__:handle_connection:50 - Bidder Alice connected with auctioneer. 0/3 bidders connected. -2025-01-26 00:04:41.548 | INFO | __main__:handle_connection:57 - Waiting for more bidders to connect... -2025-01-26 00:04:41.549 | INFO | __main__:handle_connection:50 - Bidder Bob connected with auctioneer. 1/3 bidders connected. -2025-01-26 00:04:41.549 | INFO | __main__:handle_connection:57 - Waiting for more bidders to connect... -2025-01-26 00:04:41.589 | INFO | __main__:handle_connection:50 - Bidder Jon connected with auctioneer. 2/3 bidders connected. -2025-01-26 00:04:41.589 | INFO | __main__:handle_connection:57 - Waiting for more bidders to connect... -2025-01-26 00:04:41.591 | INFO | __main__:handle_connection:50 - Bidder Charlie connected with auctioneer. 3/3 bidders connected. -2025-01-26 00:04:41.591 | INFO | __main__:handle_connection:54 - All bidders connected. Starting the auction. -2025-01-26 00:04:41.591 | INFO | __main__:start_auction:60 - Starting auction for Rare Painting with starting price $1000.0 -2025-01-26 00:04:41.654 | INFO | __main__:handle_auction_start:122 - Bob placed bid: $1200.00 -2025-01-26 00:04:41.654 | INFO | __main__:handle_auction_start:122 - Alice placed bid: $1500.00 -2025-01-26 00:04:41.665 | INFO | __main__:handle_auction_start:122 - Jon placed bid: $2800.00 -2025-01-26 00:04:41.669 | INFO | __main__:handle_auction_start:122 - Charlie placed bid: $2000.00 -2025-01-26 00:04:41.685 | INFO | __main__:handle_bid:70 - Received bid from Jon for $2800.00 -2025-01-26 00:04:41.687 | INFO | __main__:handle_bid:70 - Received bid from Charlie for $2000.00 -2025-01-26 00:04:41.695 | INFO | __main__:handle_bid:70 - Received bid from Bob for $1200.00 -2025-01-26 00:04:41.695 | INFO | __main__:end_auction:84 - Auction ended. Winner: Jon, Winning Bid: $2800.00 -``` - -## Customization Options - -- Modify bidding strategy by adjusting the random multiplier range -- Add minimum bid increments -- Implement multiple auction rounds -- Add timeout mechanisms for bidder responses -- Implement different auction types (Dutch, English, etc.) - -## Sequence Diagram - -````mermaid -sequenceDiagram - participant A as Auctioneer - participant B1 as Bidder1 - participant B2 as Bidder2 - participant B3 as Bidder3 - - Note over A,B3: Connection Phase - B1->>A: Connect - A->>B1: Connection Confirmed - B2->>A: Connect - A->>B2: Connection Confirmed - B3->>A: Connect - A->>B3: Connection Confirmed - - Note over A,B3: Auction Start - A->>B1: AuctionStart(item) - A->>B2: AuctionStart(item) - A->>B3: AuctionStart(item) - - Note over A,B3: Bidding Phase - B1-->>A: Bid(amount) - B2-->>A: Bid(amount) - B3-->>A: Bid(amount) - - Note over A,B3: Auction End - A->>B1: AuctionResult(winner, amount) - A->>B2: AuctionResult(winner, amount) - A->>B3: AuctionResult(winner, amount) - - A->>B1: AuctionEnd - A->>B2: AuctionEnd - A->>B3: AuctionEnd -```` \ No newline at end of file diff --git a/docs/examples/task-manager.md b/docs/examples/task-manager.md index c3462f4..cd08af2 100644 --- a/docs/examples/task-manager.md +++ b/docs/examples/task-manager.md @@ -1,47 +1,56 @@ -# Task Manager +# Distributed Task Management System -## Introduction -This tutorial walks through building a distributed task management system using Ceylon. The system assigns tasks to workers based on skill levels and monitors completion success. +## System Architecture -## Prerequisites -- Python 3.7+ -- Ceylon framework -- Basic understanding of async programming +### Core Components +1. Task Manager (Admin Node) + - Task distribution controller + - Worker state management + - Result aggregation -## Part 1: Data Models +2. Worker Nodes + - Task execution + - Progress reporting + - Skill-based routing + +3. Message Protocol + - Task assignments + - Status updates + - Completion reports + +## Implementation Details + +### 1. Data Models -### Task Definition ```python +from dataclasses import dataclass +from typing import List, Optional + @dataclass class Task: id: int description: str difficulty: int -``` + + def validate(self) -> bool: + return 1 <= self.difficulty <= 10 -Tasks have three key attributes: -- `id`: Unique identifier -- `description`: Task details -- `difficulty`: Required skill level (1-10) - -### Message Types -```python @dataclass class TaskAssignment: task: Task + assigned_at: int # Unix timestamp + timeout: Optional[int] = None @dataclass class TaskResult: task_id: int worker: str success: bool + execution_time: float + error_message: Optional[str] = None ``` -These classes handle: -- Task assignments to workers -- Results reporting back to manager - -## Part 2: Worker Implementation +### 2. Enhanced Worker Implementation ```python class WorkerAgent(BaseAgent): @@ -51,47 +60,63 @@ class WorkerAgent(BaseAgent): self.name = name self.skill_level = skill_level self.has_task = False + self.current_task: Optional[Task] = None + self.task_history: List[TaskResult] = [] + super().__init__( name=name, workspace_id=workspace_id, admin_peer=admin_peer, mode=PeerMode.CLIENT ) -``` - -Key worker features: -1. Skill level determines task success probability -2. Track task assignment status -3. Connect to task manager via admin_peer - -### Task Handling -```python -@on(TaskAssignment) -async def handle_task(self, data: TaskAssignment, time: int, agent: AgentDetail): - if self.has_task: - return - - self.has_task = True - await asyncio.sleep(data.task.difficulty) # Simulate work - success = self.skill_level >= data.task.difficulty - - result = TaskResult( - task_id=data.task.id, - worker=self.name, - success=success - ) - await self.broadcast(pickle.dumps(result)) + @on(TaskAssignment) + async def handle_task(self, data: TaskAssignment, + time: int, agent: AgentDetail): + try: + if self.has_task: + logger.warning(f"Worker {self.name} already has task") + return + + self.has_task = True + self.current_task = data.task + start_time = time.time() + + # Simulate work with proper error handling + try: + await asyncio.sleep(data.task.difficulty) + success = self.skill_level >= data.task.difficulty + except asyncio.CancelledError: + success = False + error_msg = "Task cancelled" + + execution_time = time.time() - start_time + + result = TaskResult( + task_id=data.task.id, + worker=self.name, + success=success, + execution_time=execution_time, + error_message=error_msg if not success else None + ) + + self.task_history.append(result) + await self.broadcast(pickle.dumps(result)) + + except Exception as e: + logger.error(f"Error in task handling: {e}") + # Send failure result + finally: + self.has_task = False + self.current_task = None + + @on_connect("*") + async def handle_connection(self, topic: str, + agent: AgentDetail): + logger.info(f"Connected to {agent.name} on {topic}") ``` -This method: - -1. Checks if worker is available -2. Simulates work duration based on difficulty -3. Determines success based on skill level -4. Reports result back to manager - -## Part 3: Task Manager Implementation +### 3. Task Manager Implementation ```python class TaskManager(BaseAgent): @@ -107,236 +132,254 @@ class TaskManager(BaseAgent): self.expected_workers = expected_workers self.task_results = [] self.tasks_assigned = False + self.worker_stats = {} + self.task_timeouts = {} + + async def assign_tasks(self): + if self.tasks_assigned: + return + + self.tasks_assigned = True + connected_workers = await self.get_connected_agents() + + # Intelligent task distribution + worker_tasks = self._match_tasks_to_workers( + self.tasks, connected_workers) + + for worker, task in worker_tasks: + assignment = TaskAssignment( + task=task, + assigned_at=int(time.time()), + timeout=task.difficulty * 2 + ) + await self.broadcast(pickle.dumps(assignment)) + + # Set timeout handler + self.task_timeouts[task.id] = asyncio.create_task( + self._handle_timeout(task.id, assignment.timeout) + ) + + async def _handle_timeout(self, task_id: int, timeout: int): + await asyncio.sleep(timeout) + if task_id not in self.completed_tasks: + logger.warning(f"Task {task_id} timed out") + # Implement timeout handling + + def _match_tasks_to_workers(self, tasks, workers): + # Intelligent matching algorithm + matches = [] + sorted_tasks = sorted(tasks, + key=lambda t: t.difficulty, + reverse=True) + sorted_workers = sorted(workers, + key=lambda w: self._get_worker_score(w)) + + for task, worker in zip(sorted_tasks, sorted_workers): + matches.append((worker, task)) + return matches + + def _get_worker_score(self, worker): + stats = self.worker_stats.get(worker.name, {}) + success_rate = stats.get('success_rate', 0.5) + completed_tasks = stats.get('completed_tasks', 0) + return success_rate * 0.7 + completed_tasks * 0.3 + + @on(TaskResult) + async def handle_result(self, data: TaskResult, + time: int, agent: AgentDetail): + # Cancel timeout handler + if data.task_id in self.task_timeouts: + self.task_timeouts[data.task_id].cancel() + + self.task_results.append(data) + self._update_worker_stats(data) + + if len(self.task_results) == len(self.tasks): + await self._generate_final_report() + await self.end_task_management() + + def _update_worker_stats(self, result: TaskResult): + if result.worker not in self.worker_stats: + self.worker_stats[result.worker] = { + 'completed_tasks': 0, + 'successful_tasks': 0, + 'total_time': 0 + } + + stats = self.worker_stats[result.worker] + stats['completed_tasks'] += 1 + if result.success: + stats['successful_tasks'] += 1 + stats['total_time'] += result.execution_time + stats['success_rate'] = (stats['successful_tasks'] / + stats['completed_tasks']) ``` -Manager responsibilities: +## System Flow Diagrams -1. Track available tasks -2. Monitor connected workers -3. Collect and process results - -### Connection Handling -```python -@on_connect("*") -async def handle_connection(self, topic: str, agent: AgentDetail): - connected_count = len(await self.get_connected_agents()) - if connected_count == self.expected_workers and not self.tasks_assigned: - await self.assign_tasks() +### 1. Normal Operation Flow +```mermaid +sequenceDiagram + participant TM as TaskManager + participant W1 as Worker(skill=5) + participant W2 as Worker(skill=8) + + W1->>TM: Connect + W2->>TM: Connect + + Note over TM: All workers connected + + par Task Assignment + TM->>W1: TaskAssignment(id=1, difficulty=4) + TM->>W2: TaskAssignment(id=2, difficulty=7) + end + + Note over W1: Processing (4s) + Note over W2: Processing (7s) + + W1->>TM: TaskResult(success=true) + W2->>TM: TaskResult(success=true) + + Note over TM: Generate Report + + TM->>W1: Shutdown + TM->>W2: Shutdown ``` -Starts task assignment when all workers connect. - -### Task Assignment -```python -async def assign_tasks(self): - if self.tasks_assigned: - return - - self.tasks_assigned = True - connected_workers = await self.get_connected_agents() - for task, worker in zip(self.tasks, connected_workers): - await self.broadcast(pickle.dumps(TaskAssignment(task=task))) +### 2. Error Handling Flow +```mermaid +sequenceDiagram + participant TM as TaskManager + participant W1 as Worker(skill=3) + participant W2 as Worker(skill=7) + + W1->>TM: Connect + W2->>TM: Connect + + TM->>W1: TaskAssignment(difficulty=6) + TM->>W2: TaskAssignment(difficulty=4) + + Note over W1: Task too difficult + Note over W2: Processing + + W1->>TM: TaskResult(success=false) + W2->>TM: TaskResult(success=true) + + Note over TM: Update worker stats + Note over TM: Worker1 success_rate = 0% + Note over TM: Worker2 success_rate = 100% ``` -Distribution logic: +### 3. Timeout Handling +```mermaid +sequenceDiagram + participant TM as TaskManager + participant W1 as Worker + + W1->>TM: Connect + + TM->>W1: TaskAssignment(timeout=10s) + + Note over TM: Start timeout timer + + Note over W1: Task processing stuck + + Note over TM: Timeout reached + TM->>W1: Cancel task + + Note over TM: Mark task as failed + Note over TM: Update worker stats +``` -1. Checks if tasks already assigned -2. Gets connected worker list -3. Pairs tasks with workers -4. Broadcasts assignments +## Best Practices and Error Handling -### Result Processing +1. Task Validation ```python -@on(TaskResult) -async def handle_result(self, data: TaskResult, time: int, agent: AgentDetail): - self.task_results.append(data) - if len(self.task_results) == len(self.tasks): - print("All tasks completed") - for result in self.task_results: - print(f"Task {result.task_id} assigned to {result.worker} - " - f"{'Success' if result.success else 'Failure'}") - await self.end_task_management() +def validate_task(task: Task) -> bool: + return (1 <= task.difficulty <= 10 and + task.description.strip() and + task.id > 0) ``` -Tracks completion and calculates success rate. - -## Part 4: System Setup - +2. Worker Health Monitoring ```python -async def main(): - # Define tasks - tasks = [ - Task(id=1, description="Simple calculation", difficulty=2), - Task(id=2, description="Data analysis", difficulty=5), - Task(id=3, description="ML model training", difficulty=8), - ] - - # Create manager - task_manager = TaskManager(tasks, expected_workers=3) - admin_details = task_manager.details() - - # Create workers with varying skills - workers = [ - WorkerAgent("Junior", skill_level=3, admin_peer=admin_details.id), - WorkerAgent("Intermediate", skill_level=6, admin_peer=admin_details.id), - WorkerAgent("Senior", skill_level=9, admin_peer=admin_details.id), - ] - - # Start system - await task_manager.start_agent(b"", workers) +async def check_worker_health(worker: WorkerAgent): + while True: + try: + await worker.ping() + await asyncio.sleep(30) + except ConnectionError: + logger.error(f"Worker {worker.name} disconnected") ``` -## Running the System - -1. Create task list with varying difficulties -2. Initialize task manager -3. Create workers with appropriate skill levels -4. Launch system with manager and workers - -## Example Output -``` -All tasks completed -Task 1 assigned to Junior - Success -Task 2 assigned to Intermediate - Success -Task 3 assigned to Senior - Success -Success rate: 100.00% +3. Task Recovery +```python +async def recover_failed_task(task: Task, + result: TaskResult): + if result.success: + return + + available_workers = [w for w in workers + if not w.has_task] + if not available_workers: + logger.error("No workers available for recovery") + return + + # Select best worker for recovery + worker = max(available_workers, + key=lambda w: w.skill_level) + await reassign_task(task, worker) ``` -## Customization Options +## Performance Optimization -### Priority-based Tasks +1. Task Batching ```python -@dataclass -class PriorityTask(Task): - priority: int - - def get_processing_time(self): - return self.difficulty * (1/self.priority) +async def batch_assign_tasks(tasks: List[Task], + batch_size: int = 3): + for batch in chunks(tasks, batch_size): + await asyncio.gather( + *[assign_task(t) for t in batch] + ) ``` -### Specialized Workers +2. Worker Scaling ```python -class SpecializedWorker(WorkerAgent): - def __init__(self, name, skill_level, specialties): - super().__init__(name, skill_level) - self.specialties = specialties - - async def handle_task(self, data: TaskAssignment): - specialty_bonus = 2 if data.task.type in self.specialties else 0 - success = (self.skill_level + specialty_bonus) >= data.task.difficulty - # Rest of implementation... +async def scale_workers(current_load: float, + target_load: float = 0.7): + if current_load > target_load: + await spawn_new_worker() + elif current_load < target_load * 0.5: + await remove_idle_worker() ``` -## Best Practices - -1. Task Design - - Set appropriate difficulty levels - - Balance task distribution - - Consider task dependencies - -2. Worker Configuration - - Match skill levels to task range - - Provide adequate worker count - - Consider specializations - -3. Error Handling - - Handle worker disconnections - - Implement task timeouts - - Plan for task failures - -## Troubleshooting - -Common issues and solutions: -1. Workers not connecting - - Check admin_peer ID - - Verify network configuration - - Ensure port availability - -2. Task assignment failures - - Verify task format - - Check worker availability - - Monitor connection status - -3. Performance issues - - Adjust task difficulty - - Balance worker load - - Monitor system resources - -## Use Cases - -### All are well - -````mermaid -sequenceDiagram - participant M as Main - participant TM as TaskManager - participant JW as Junior Worker - participant IW as Intermediate Worker - participant SW as Senior Worker - - M->>TM: Initialize(tasks=[T1,T2,T3]) - M->>JW: Create(skill=3) - M->>IW: Create(skill=6) - M->>SW: Create(skill=9) - - JW-->>TM: Connect - IW-->>TM: Connect - SW-->>TM: Connect - - Note over TM: All workers connected
Begin task assignment +## Monitoring and Metrics - par Assign Tasks - TM->>JW: TaskAssignment(T1, difficulty=2) - TM->>IW: TaskAssignment(T2, difficulty=5) - TM->>SW: TaskAssignment(T3, difficulty=8) - end - - Note over JW: Process T1
skill(3) >= difficulty(2) - Note over IW: Process T2
skill(6) >= difficulty(5) - Note over SW: Process T3
skill(9) >= difficulty(8) - - JW->>TM: TaskResult(T1, success=true) - IW->>TM: TaskResult(T2, success=true) - SW->>TM: TaskResult(T3, success=true) - - Note over TM: All tasks completed
Calculate success rate - - TM-->>JW: Stop - TM-->>IW: Stop - TM-->>SW: Stop - TM-->>M: System Shutdown -```` - -### With some Failures - -````mermaid -sequenceDiagram - participant TM as TaskManager - participant JW as Junior Worker
(skill=3) - participant IW as Intermediate Worker
(skill=6) - participant SW as Senior Worker
(skill=9) - - JW-->>TM: Connect - IW-->>TM: Connect - SW-->>TM: Connect - - Note over TM: Workers connected
Begin assignment - - TM->>JW: TaskAssignment(T3, difficulty=8) - TM->>IW: TaskAssignment(T2, difficulty=5) - TM->>SW: TaskAssignment(T1, difficulty=2) - - Note over JW: Process T3
FAIL: skill(3) < difficulty(8) - Note over IW: Process T2
SUCCESS: skill(6) >= difficulty(5) - Note over SW: Process T1
SUCCESS: skill(9) >= difficulty(2) - - JW->>TM: TaskResult(T3, success=false) - IW->>TM: TaskResult(T2, success=true) - SW->>TM: TaskResult(T1, success=true) - - Note over TM: Tasks completed
Success rate: 66.7% +1. System Metrics +```python +class SystemMetrics: + def __init__(self): + self.task_completion_times = [] + self.worker_utilization = {} + self.error_counts = defaultdict(int) + + async def collect_metrics(self): + while True: + self.update_metrics() + await asyncio.sleep(60) +``` - TM-->>JW: Stop - TM-->>IW: Stop - TM-->>SW: Stop -```` \ No newline at end of file +2. Performance Reporting +```python +def generate_performance_report(): + avg_completion_time = sum(completion_times) / len(completion_times) + worker_success_rates = { + w: stats['success_rate'] + for w, stats in worker_stats.items() + } + return PerformanceReport( + avg_completion_time=avg_completion_time, + worker_stats=worker_success_rates, + error_rates=error_counts + ) +``` \ No newline at end of file diff --git a/docs/examples/time-scheduling.md b/docs/examples/time-scheduling.md new file mode 100644 index 0000000..6c0960d --- /dev/null +++ b/docs/examples/time-scheduling.md @@ -0,0 +1,436 @@ +# Distributed Meeting Scheduler + +## System Overview + +This tutorial demonstrates building a distributed meeting scheduling system using Ceylon's agent-based architecture. The system efficiently coordinates meeting times across multiple participants while handling real-world scheduling complexities. + +### What We'll Build + +A distributed system that: +1. Coordinates meeting schedules across multiple participants +2. Finds optimal meeting times based on availability +3. Handles scheduling conflicts and constraints +4. Manages concurrent scheduling requests +5. Provides real-time responses and updates + +### Key Features + +- **Distributed Processing**: Each participant runs as an independent agent +- **Automated Negotiation**: System automatically finds suitable time slots +- **Conflict Resolution**: Handles overlapping meetings and time conflicts +- **Scalable Architecture**: Easily add or remove participants +- **Fault Tolerance**: Handles participant disconnections and failures + +### Architecture Diagram +```mermaid +graph TD + A[Scheduler Agent] --> B[Time Slot Manager] + A --> C[Conflict Resolver] + A --> D[Response Aggregator] + + B --> E[Availability Checker] + C --> E + + P1[Participant 1] --> A + P2[Participant 2] --> A + P3[Participant 3] --> A + + subgraph "Each Participant" + CAL[Calendar Manager] + AH[Availability Handler] + COM[Communication Module] + end +``` + +### System Components + +1. **Scheduler Agent** + - Coordinates the scheduling process + - Manages participant responses + - Implements scheduling algorithms + +2. **Participant Agents** + - Manage individual availability + - Handle meeting requests + - Track scheduled meetings + +3. **Communication Protocol** + - Availability requests/responses + - Meeting confirmations + - Schedule updates + +4. **Business Logic** + - Time slot validation + - Conflict detection + - Priority handling + +## Core Components + +### Data Models +```python +@dataclass +class TimeSlot: + date: str + start_time: int # 24-hour format (0-23) + end_time: int + + def __hash__(self): + return hash((self.date, self.start_time, self.end_time)) + + def validate(self) -> bool: + return (0 <= self.start_time < 24 and + 0 <= self.end_time <= 24 and + self.start_time < self.end_time) + +@dataclass +class Meeting: + name: str + date: str + duration: int + minimum_participants: int + priority: int = 1 + + def validate(self) -> bool: + return (self.duration > 0 and + self.minimum_participants > 0 and + self.priority > 0) + +@dataclass +class SchedulingResult: + meeting: Meeting + time_slot: TimeSlot + participants: List[str] + status: str + error_message: Optional[str] = None +``` + +### Enhanced Participant Implementation +```python +class Participant(Worker): + def __init__(self, name: str, available_times: List[TimeSlot], + admin_peer: str): + self.name = name + self.available_times = self._validate_times(available_times) + self.scheduled_meetings: Dict[str, TimeSlot] = {} + super().__init__(name=name, admin_peer=admin_peer) + + def _validate_times(self, times: List[TimeSlot]) -> List[TimeSlot]: + return [t for t in times if t.validate()] + + @on(AvailabilityRequest) + async def handle_request(self, data: AvailabilityRequest, + time: int, agent: AgentDetail): + try: + is_available = self._check_availability( + data.time_slot, data.time_slot.duration) + + response = AvailabilityResponse( + participant=self.name, + time_slot=data.time_slot, + available=is_available + ) + await self.broadcast_message(response) + + except Exception as e: + logger.error(f"Error checking availability: {e}") + + def _check_availability(self, slot: TimeSlot, + duration: int) -> bool: + # Check conflicts with scheduled meetings + if any(self._has_conflict(slot, scheduled_slot) + for scheduled_slot in self.scheduled_meetings.values()): + return False + + # Check if slot fits in available times + return any(self._fits_in_slot(slot, available_slot, duration) + for available_slot in self.available_times) + + @staticmethod + def _has_conflict(slot1: TimeSlot, slot2: TimeSlot) -> bool: + return (slot1.date == slot2.date and + slot1.start_time < slot2.end_time and + slot2.start_time < slot1.end_time) + + @staticmethod + def _fits_in_slot(slot: TimeSlot, available: TimeSlot, + duration: int) -> bool: + if slot.date != available.date: + return False + latest_start = max(slot.start_time, available.start_time) + earliest_end = min(slot.end_time, available.end_time) + return earliest_end - latest_start >= duration +``` + +### Enhanced Scheduler Implementation +```python +class Scheduler(Admin): + def __init__(self, meeting: Meeting): + super().__init__(name="scheduler") + self.meeting = meeting + self.agreed_slots: Dict[TimeSlot, List[str]] = {} + self.current_slot: Optional[TimeSlot] = None + self.max_attempts = 10 + self.attempt_count = 0 + + @on_connect("*") + async def handle_connection(self, topic: str, agent: AgentDetail): + if not self.current_slot: + self.current_slot = TimeSlot( + self.meeting.date, + 8, # Start at 8 AM + 8 + self.meeting.duration + ) + await self._try_schedule() + + async def _try_schedule(self): + if self.attempt_count >= self.max_attempts: + await self._handle_scheduling_failure() + return + + self.attempt_count += 1 + await self.broadcast_message( + AvailabilityRequest(time_slot=self.current_slot) + ) + + @on(AvailabilityResponse) + async def handle_response(self, data: AvailabilityResponse, + time: int, agent: AgentDetail): + if not data.available: + await self._try_next_slot() + return + + slot_key = f"{data.time_slot.date}_{data.time_slot.start_time}" + if slot_key not in self.agreed_slots: + self.agreed_slots[slot_key] = [] + + if data.participant not in self.agreed_slots[slot_key]: + self.agreed_slots[slot_key].append(data.participant) + + if len(self.agreed_slots[slot_key]) >= self.meeting.minimum_participants: + await self._finalize_meeting(slot_key) + + async def _try_next_slot(self): + next_slot = TimeSlot( + self.meeting.date, + self.current_slot.start_time + 1, + self.current_slot.start_time + 1 + self.meeting.duration + ) + + if next_slot.end_time > 17: # Don't schedule after 5 PM + await self._handle_scheduling_failure() + return + + self.current_slot = next_slot + await self._try_schedule() + + async def _finalize_meeting(self, slot_key): + participants = self.agreed_slots[slot_key] + print(f"Meeting scheduled at {slot_key}") + print(f"Participants: {', '.join(participants)}") + await self.stop() + + async def _handle_scheduling_failure(self): + print("Failed to find suitable time slot") + await self.stop() +``` + +## Scheduling Scenarios + +### 1. Successful First Attempt +```mermaid +sequenceDiagram + participant S as Scheduler + participant A as Alice + participant B as Bob + participant C as Charlie + + Note over S: Meeting(duration=1, min_participants=2) + + A->>S: Connect + B->>S: Connect + C->>S: Connect + + S->>+A: AvailabilityRequest(8:00-9:00) + S->>+B: AvailabilityRequest(8:00-9:00) + S->>+C: AvailabilityRequest(8:00-9:00) + + A-->>-S: Response(available=true) + B-->>-S: Response(available=true) + C-->>-S: Response(available=false) + + Note over S: Minimum participants reached + + S->>A: Meeting Confirmed + S->>B: Meeting Confirmed + S->>C: Meeting Confirmed +``` + +### 2. Multiple Attempts +```mermaid +sequenceDiagram + participant S as Scheduler + participant A as Alice + participant B as Bob + + Note over S: Initial slot: 8:00-9:00 + + S->>A: AvailabilityRequest(8:00) + S->>B: AvailabilityRequest(8:00) + + A-->>S: Response(false) + B-->>S: Response(false) + + Note over S: Try next slot: 9:00-10:00 + + S->>A: AvailabilityRequest(9:00) + S->>B: AvailabilityRequest(9:00) + + A-->>S: Response(true) + B-->>S: Response(true) + + Note over S: Success at second attempt + + S->>A: Meeting Confirmed(9:00) + S->>B: Meeting Confirmed(9:00) +``` + +### 3. Scheduling Failure +```mermaid +sequenceDiagram + participant S as Scheduler + participant A as Alice + participant B as Bob + + Note over S: max_attempts = 3 + + loop 3 times + S->>A: AvailabilityRequest + S->>B: AvailabilityRequest + A-->>S: Response(false) + B-->>S: Response(false) + Note over S: Try next slot + end + + Note over S: Max attempts reached + + S->>A: Scheduling Failed + S->>B: Scheduling Failed +``` + +### 4. Conflict Resolution +```mermaid +sequenceDiagram + participant S as Scheduler + participant P1 as Participant1 + participant P2 as Participant2 + + Note over P1,P2: Both have existing meeting 10:00-11:00 + + S->>P1: AvailabilityRequest(10:00) + S->>P2: AvailabilityRequest(10:00) + + Note over P1: Check conflicts + Note over P2: Check conflicts + + P1-->>S: Response(false) + P2-->>S: Response(false) + + Note over S: Adjust time slot + + S->>P1: AvailabilityRequest(11:00) + S->>P2: AvailabilityRequest(11:00) + + P1-->>S: Response(true) + P2-->>S: Response(true) +``` + +## Error Handling and Recovery + +1. Participant Disconnection: +```python +@on_disconnect +async def handle_disconnection(self, agent: AgentDetail): + # Remove from current slot agreements + for participants in self.agreed_slots.values(): + if agent.name in participants: + participants.remove(agent.name) + + # Retry current slot if minimum participants lost + if self.current_slot: + await self._try_schedule() +``` + +2. Invalid Time Slots: +```python +def validate_time_slot(slot: TimeSlot) -> bool: + if not slot.validate(): + return False + + # Business hours check (8 AM - 5 PM) + if not (8 <= slot.start_time <= 17 and + 8 <= slot.end_time <= 17): + return False + + # Weekend check + date_obj = datetime.strptime(slot.date, "%Y-%m-%d") + if date_obj.weekday() >= 5: + return False + + return True +``` + +3. Scheduling Timeout: +```python +async def schedule_with_timeout(self, timeout: int = 300): + try: + async with asyncio.timeout(timeout): + await self._try_schedule() + except asyncio.TimeoutError: + await self._handle_scheduling_failure() +``` + +## System Extensions + +1. Priority-based Scheduling: +```python +class PriorityScheduler(Scheduler): + def __init__(self, meetings: List[Meeting]): + self.meetings = sorted( + meetings, + key=lambda m: m.priority, + reverse=True + ) +``` + +2. Recurring Meetings: +```python +@dataclass +class RecurringMeeting(Meeting): + frequency: str # "daily", "weekly", "monthly" + end_date: str + + def generate_instances(self) -> List[Meeting]: + instances = [] + current = datetime.strptime(self.date, "%Y-%m-%d") + end = datetime.strptime(self.end_date, "%Y-%m-%d") + + while current <= end: + instances.append(Meeting( + name=self.name, + date=current.strftime("%Y-%m-%d"), + duration=self.duration, + minimum_participants=self.minimum_participants + )) + + if self.frequency == "daily": + current += timedelta(days=1) + elif self.frequency == "weekly": + current += timedelta(weeks=1) + elif self.frequency == "monthly": + current = current.replace( + month=current.month % 12 + 1, + year=current.year + current.month // 12 + ) + + return instances +``` \ No newline at end of file diff --git a/docs/getting_start.md b/docs/getting-started.md similarity index 100% rename from docs/getting_start.md rename to docs/getting-started.md diff --git a/docs/technolgy.md b/docs/technology.md similarity index 100% rename from docs/technolgy.md rename to docs/technology.md diff --git a/mkdocs.yml b/mkdocs.yml index dc5af5d..303fc31 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -62,6 +62,7 @@ use_directory_urls: false plugins: - search - tags + - offline markdown_extensions: - tables @@ -85,4 +86,21 @@ markdown_extensions: custom_fences: - name: mermaid class: mermaid - format: !!python/name:pymdownx.superfences.fence_code_format \ No newline at end of file + format: !!python/name:pymdownx.superfences.fence_code_format + +nav: + - Home: index.md + - Getting started: + - Installation: getting-started.md + - Quickstart: quickstart.md + - Technology: technology.md + - Core Concepts: core-concepts.md + - How to: + - Overview: tutorials.md + - Best Practices: best-practices.md + - Examples: + - Time Scheduling: examples/time-scheduling.md + - Single Item Auction: examples/auction.md + - Task Manager: examples/task-manager.md + - Distributed Agents: examples/connect-through-network.md +