diff --git a/docs/best_practice.md b/docs/best_practice.md new file mode 100644 index 0000000..a751a2f --- /dev/null +++ b/docs/best_practice.md @@ -0,0 +1,362 @@ +# Best Practices Guide + +## Core Design Principles + +### 1. Single Responsibility + +- Each agent should handle one primary function +- Break complex behaviors into specialized agents +- Keep message handlers focused and specific + +```python +# Good +class DataValidator(Worker): + async def validate(self, data): pass + + +class DataProcessor(Worker): + async def process(self, data): pass + + +# Avoid +class DataHandler(Worker): + async def validate_and_process(self, data): pass +``` + +### 2. Message Immutability + +- Define messages using dataclasses +- Never modify received messages +- Create new instances for changes + +```python +@dataclass(frozen=True) # Enforces immutability +class TaskMessage: + id: str + data: Any + timestamp: float = field(default_factory=time.time) +``` + +### 3. Event-Driven Architecture + +- Use decorators for message handling +- Implement asynchronous communication +- Handle events independently + +```python +class EventDrivenAgent(Worker): + @on(TaskMessage) + async def handle_task(self, msg: TaskMessage): + await self.process_task(msg) + + @on_connect("*") + async def handle_connection(self, topic: str, agent: AgentDetail): + await self.initialize_connection(agent) +``` + +## Architecture Patterns + +### 1. Layered Communication + +```python +class SystemArchitecture: + def __init__(self): + self.layers = { + 'coordination': AdminAgent(), + 'processing': [WorkerAgent() for _ in range(3)], + 'storage': StorageAgent() + } +``` + +### 2. State Management + +```python +class StatefulAgent(Worker): + def __init__(self): + self.state = AgentState.IDLE + self._transitions = { + AgentState.IDLE: [AgentState.PROCESSING], + AgentState.PROCESSING: [AgentState.COMPLETED, AgentState.ERROR] + } + + async def transition(self, new_state: AgentState): + if new_state in self._transitions[self.state]: + self.state = new_state +``` + +### 3. Resource Management + +```python +class ResourceAwareAgent(Worker): + async def __aenter__(self): + await self.initialize_resources() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.cleanup_resources() +``` + +## Error Handling and Resilience + +### 1. Graceful Error Recovery + +```python +class ResilientAgent(Worker): + async def execute_with_retry(self, task, max_retries=3): + for attempt in range(max_retries): + try: + return await self.process(task) + except Exception as e: + if attempt == max_retries - 1: + await self.handle_failure(task, e) + await asyncio.sleep(2 ** attempt) +``` + +### 2. Circuit Breaking + +```python +class CircuitBreaker: + def __init__(self, failure_threshold=5, reset_timeout=60): + self.failures = 0 + self.threshold = failure_threshold + self.reset_timeout = reset_timeout + self.state = 'closed' + + async def call(self, func, *args): + if self.state == 'open': + raise CircuitBreakerOpen() + + try: + result = await func(*args) + self.failures = 0 + return result + except Exception: + self.failures += 1 + if self.failures >= self.threshold: + self.state = 'open' + asyncio.create_task(self.reset_timer()) + raise +``` + +## Performance Optimization + +### 1. Message Batching + +```python +class BatchProcessor(Worker): + def __init__(self, batch_size=100): + self.batch = [] + self.batch_size = batch_size + + async def process(self, item): + self.batch.append(item) + if len(self.batch) >= self.batch_size: + await self.process_batch(self.batch) + self.batch = [] +``` + +### 2. Resource Pooling + +```python +class ResourcePool: + def __init__(self, pool_size): + self.pool = asyncio.Queue(pool_size) + self.semaphore = asyncio.Semaphore(pool_size) + + async def acquire(self): + async with self.semaphore: + return await self.pool.get() + + async def release(self, resource): + await self.pool.put(resource) +``` + +## Security Best Practices + +### 1. Message Authentication + +```python +class SecureAgent(Worker): + def authenticate_message(self, message, signature): + return hmac.verify( + message.content, + signature, + self.secret_key + ) +``` + +### 2. Access Control + +```python +class SecureWorker(Worker): + async def on_message(self, agent: AgentDetail, data: bytes, time: int): + if not self.authorize_peer(agent.id): + logger.warning(f"Unauthorized message from {agent.id}") + return + await self.process_message(data) +``` + +## Monitoring and Observability + +### 1. Structured Logging + +```python +class ObservableAgent(Worker): + async def log_event(self, event_type, **kwargs): + logger.info( + f"{event_type}", + agent_id=self.id, + timestamp=time.time(), + **kwargs + ) +``` + +### 2. Metrics Collection + +```python +class MetricsCollector: + def __init__(self): + self.metrics = { + 'messages_processed': Counter(), + 'processing_time': Histogram(), + 'error_rate': Gauge() + } + + async def record(self, metric, value): + self.metrics[metric].record(value) +``` + +## Implementation Guidelines + +### 1. Message Design + +- Include metadata for tracking +- Add validation methods +- Use clear naming conventions + +```python +@dataclass +class Message: + id: str = field(default_factory=uuid.uuid4) + timestamp: float = field(default_factory=time.time) + payload: Any + metadata: Dict = field(default_factory=dict) + + def validate(self) -> bool: + return bool(self.payload) +``` + +### 2. Communication Patterns + +- Use broadcast for system-wide messages +- Direct messages for point-to-point +- Topic-based for selective communication + +```python +class CommunicationPatterns: + async def broadcast_update(self, update): + await self.broadcast_message(update) + + async def direct_message(self, peer_id, message): + await self.send_message(peer_id, message) + + async def topic_message(self, topic, message): + await self.publish(topic, message) +``` + +### 3. State Transitions + +- Define clear state machines +- Validate transitions +- Log state changes + +```python +class WorkflowAgent(Worker): + async def transition_state(self, new_state): + if new_state not in self.valid_transitions[self.current_state]: + raise InvalidTransition(f"{self.current_state} -> {new_state}") + + self.current_state = new_state + await self.log_event("state_change", new_state=new_state) +``` + +## Common Pitfalls + +1. Race Conditions + +- Use synchronization primitives +- Implement proper locking +- Handle concurrent access + +2. Memory Leaks + +- Clean up resources properly +- Implement context managers +- Monitor memory usage + +3. Message Overflow + +- Implement backpressure +- Use flow control +- Handle queue limits + +4. Error Propagation + +- Define error boundaries +- Implement recovery strategies +- Log error contexts + +## Best Practices Checklist + +### Design + +- [ ] Single responsibility per agent +- [ ] Clear message contracts +- [ ] Proper state management +- [ ] Error handling strategy + +### Implementation + +- [ ] Immutable messages +- [ ] Resource cleanup +- [ ] Proper logging +- [ ] Security measures + +### Operation + +- [ ] Monitoring setup +- [ ] Performance metrics +- [ ] Error tracking +- [ ] Resource monitoring + +## Deployment Considerations + +### Configuration + +```python +class ConfigurableAgent(Worker): + def __init__(self, config_path: str): + self.config = self.load_config(config_path) + self.validate_config() +``` + +### Resource Limits + +```python +class ResourceLimits: + def __init__(self): + self.max_connections = int(os.getenv('MAX_CONNECTIONS', 100)) + self.message_timeout = int(os.getenv('MESSAGE_TIMEOUT', 30)) +``` + +### Health Checks + +```python +class HealthCheck(Worker): + async def check_health(self): + return { + 'status': 'healthy', + 'connections': len(self.connections), + 'message_rate': self.message_counter.rate() + } +``` \ No newline at end of file diff --git a/docs/tutorials.md b/docs/tutorials.md index d641a61..b4d8cce 100644 --- a/docs/tutorials.md +++ b/docs/tutorials.md @@ -1,615 +1,355 @@ -## System Design Principles and Best Practices - -### Core Design Principles - -1. **Single Responsibility** - - Each agent should have one clear purpose - - Break complex behaviors into multiple specialized agents - ```python - # Good - class DataValidatorAgent(Worker): - async def validate(self, data): pass - - class DataProcessorAgent(Worker): - async def process(self, data): pass - - # Avoid - class DataAgent(Worker): - async def validate_and_process(self, data): pass - ``` - -2. **Message Immutability** - - Treat received messages as immutable - - Create new messages for modifications - ```python - @on(DataMessage) - async def handle_data(self, msg: DataMessage): - # Good - new_msg = DataMessage( - id=msg.id, - data=process(msg.data) - ) - - # Avoid - msg.data = process(msg.data) # Don't modify received messages - ``` - -3. **Fault Isolation** - - Contain failures within individual agents - - Implement circuit breakers for dependent services - ```python - class ResilientAgent(Worker): - async def execute_task(self, task): - with CircuitBreaker(failure_threshold=3): - try: - await self.process(task) - except Exception: - await self.handle_failure(task) - ``` - -### Mental Models for Agent Development - -1. **Think in Terms of Workflows** - ```python - class WorkflowAgent(Admin): - def __init__(self): - self.workflow = { - 'start': self.validate_input, - 'validate': self.process_data, - 'process': self.store_results, - 'store': self.notify_completion - } - ``` - -2. **Event-Driven Architecture** - ```python - class EventDrivenAgent(Worker): - @on(DataReceived) - async def handle_data(self, event): pass - - @on(ProcessingComplete) - async def handle_completion(self, event): pass - ``` - -3. **State Management** - ```python - class StateAwareAgent(Worker): - def __init__(self): - self.state_machine = { - 'idle': ['processing'], - 'processing': ['completed', 'failed'], - 'completed': ['idle'], - 'failed': ['idle'] - } - ``` - -### System Architecture Guidelines - -1. **Layered Communication** - ```python - class SystemArchitecture: - def __init__(self): - self.layers = { - 'presentation': WebInterface(), - 'coordination': CoordinatorAgent(), - 'processing': [WorkerAgent() for _ in range(3)], - 'storage': StorageAgent() - } - ``` - -2. **Service Discovery** - ```python - class ServiceRegistry(Admin): - async def register_service(self, service_type, agent_id): - self.services[service_type].append(agent_id) - - async def get_service(self, service_type): - return random.choice(self.services[service_type]) - ``` - -3. **Load Distribution** - ```python - class LoadAwareSystem: - def calculate_distribution(self, agents): - weights = [1/agent.load for agent in agents] - total = sum(weights) - return [w/total for w in weights] - ``` - -### Production Deployment Considerations - -1. **Monitoring Setup** - ```python - class ProductionAgent(Worker): - def __init__(self): - self.metrics = { - 'messages': Counter('messages_total'), - 'latency': Histogram('processing_latency'), - 'errors': Counter('error_total') - } - ``` - -2. **Configuration Management** - ```python - class ConfigurableAgent(Worker): - def __init__(self, config_path: str): - self.config = self.load_config(config_path) - self.validate_config() - self.apply_config() - ``` - -3. **Logging Strategy** - ```python - class LoggingSetup: - def configure_logging(self): - logger.add( - "app.log", - rotation="500 MB", - retention="7 days", - level="INFO" - ) - ``` - -### Performance Optimization Guidelines - -1. **Message Batching** - ```python - class BatchProcessor: - async def process_messages(self, messages): - if len(messages) > self.batch_size: - chunks = self.chunk_messages(messages) - return await asyncio.gather(*map(self.process_batch, chunks)) - ``` - -2. **Resource Pooling** - ```python - class ResourcePool: - def __init__(self, pool_size): - self.pool = asyncio.Queue(pool_size) - self.resources = set() - ``` - -3. **Memory Management** - ```python - class MemoryAware: - def __init__(self, max_cache_size): - self.cache = LRUCache(max_cache_size) - self.monitor_memory_usage() - ``` - -### Security Best Practices - -1. **Message Authentication** - ```python - class SecureMessaging: - def authenticate_message(self, message): - return hmac.verify( - message.content, - message.signature, - self.secret_key - ) - ``` - -2. **Access Control** - ```python - class SecureAgent(Worker): - def authorize_action(self, agent_id, action): - return ( - self.verify_identity(agent_id) and - self.check_permissions(agent_id, action) - ) - ``` - -3. **Data Protection** - ```python - class DataProtection: - def protect_sensitive_data(self, data): - return { - k: mask_sensitive(v) - for k, v in data.items() - } - ``` - -Remember: The Ceylon framework is built around asynchronous communication and distributed processing. Always design your agents with these principles in mind, ensuring they can operate independently while maintaining system coherence through well-defined message patterns and workflows. -# Ceylon Framework Advanced Guide - -## Core Concepts - -### Agent Types -Ceylon uses a distributed architecture with two primary agent types working together to accomplish tasks. - -1. **Admin Agent (BaseAgent with PeerMode.ADMIN)** - - Central coordinator that manages the entire network - - Handles task distribution and result collection - - Acts as the primary decision maker in the system - - Only one admin agent is allowed per system - ```python - from ceylon import Admin - - class NetworkManager(Admin): - def __init__(self, name="admin", port=8888): - super().__init__(name=name, port=port) - ``` - -2. **Worker Agent (BaseAgent with PeerMode.CLIENT)** - - Performs specific tasks assigned by the admin - - Reports results back to the admin - - Can have specialized roles or capabilities - - Multiple workers can operate simultaneously - ```python - from ceylon import Worker - - class TaskWorker(Worker): - def __init__(self, name, admin_peer): - super().__init__(name=name, admin_peer=admin_peer) - ``` - -### Message Handling -Ceylon provides a robust message handling system for agent communication. - -1. **Event Decorators** - - Decorators simplify message and event handling - - Allow declarative definition of message processors - ```python - @on(MessageType) # Processes messages of specific types - @on_run() # Defines agent's main execution loop - @on_connect("*") # Handles new agent connections - ``` - -2. **Message Broadcasting** - - Two primary methods for sending messages - - Supports both broadcast and direct communication - ```python - await self.broadcast_message(data) # Sends to all connected agents - await self.send_message(peer_id, data) # Sends to specific agent - ``` - -3. **Data Serialization** - - Uses Python's pickle for data serialization - - Enables complex object transmission between agents - ```python - # Sending serialized data - await self.broadcast(pickle.dumps(data)) - - # Receiving and deserializing - data = pickle.loads(message) - ``` - -## Advanced Features - -### Agent Configuration -Agents can be extensively configured to suit different roles and requirements. +# Tutorials +## Introduction + +Ceylon is a distributed framework for building multi-agent systems. This guide covers core concepts, best practices, and API usage. + +## Core Components + +### 1. Agent Types + +#### Admin Agent ```python -class CustomAgent(BaseAgent): - def __init__(self): - super().__init__( - name="agent_name", # Unique identifier - mode=PeerMode.CLIENT, # Operating mode - role="custom_role", # Agent's role in system - port=8000, # Network port - admin_peer="peer_id", # Admin connection - admin_ip="127.0.0.1", # Admin location - workspace_id="default", # Workspace grouping - buffer_size=1024 # Message buffer size - ) +from ceylon import Admin + +class CoordinatorAgent(Admin): + def __init__(self, name="coordinator", port=8888): + super().__init__(name=name, port=port) + + async def on_agent_connected(self, topic: str, agent: AgentDetail): + # Handle new agent connections + pass ``` -### Connection Management -Ceylon provides tools for monitoring and managing agent connections. +- Central coordinator for the system +- Manages worker connections +- Handles task distribution +- One admin per system +#### Worker Agent ```python -class NetworkAdmin(Admin): - async def on_agent_connected(self, topic: str, agent: AgentDetail): - # Track connected agents and trigger actions - connected = await self.get_connected_agents() - if len(connected) == self.expected_count: - await self.start_processing() +from ceylon import Worker + +class TaskWorker(Worker): + def __init__(self, name: str): + super().__init__(name=name, role="worker") + + async def on_message(self, agent: AgentDetail, data: bytes, time: int): + # Process received messages + pass ``` -### Error Handling -Robust error handling ensures system reliability. +- Performs specific tasks +- Reports to admin agent +- Multiple workers can run simultaneously + +### 2. Message Handling +#### Event Decorators ```python -async def handle_message(self, agent_id: str, data: bytes, time: int): - try: - message = pickle.loads(data) - await self.process_message(message) - except Exception as e: - logger.error(f"Error processing message: {e}") - await self.handle_error(agent_id) +from ceylon import on, on_run, on_connect + +class CustomAgent(Worker): + @on(MessageType) + async def handle_message(self, msg: MessageType, time: int, agent: AgentDetail): + # Process specific message type + pass + + @on_run() + async def handle_run(self, inputs: bytes): + # Main execution loop + pass + + @on_connect("*") + async def handle_connection(self, topic: str, agent: AgentDetail): + # Handle new connections + pass +``` + +#### Message Types +Define message types using dataclasses: +```python +from dataclasses import dataclass + +@dataclass +class TaskMessage: + id: int + data: str + priority: int = 1 + +@dataclass +class ResultMessage: + task_id: int + result: str +``` + +## Best Practices + +### 1. Message Design +```python +@dataclass +class Message: + # Include metadata + id: str + timestamp: float + + # Add validation + def validate(self) -> bool: + return bool(self.id and self.timestamp) ``` -## Implementation Patterns +- Use dataclasses for message structure +- Include metadata for tracking +- Add validation methods -### State Machine Pattern -Useful for agents that need to track and manage different operational states. +### 2. Error Handling +```python +class ResilientWorker(Worker): + async def process_task(self, task): + try: + result = await self.execute_task(task) + await self.send_result(result) + except Exception as e: + logger.error(f"Task failed: {e}") + await self.handle_failure(task) +``` +- Catch and log exceptions +- Implement retry mechanisms +- Handle cleanup properly + +### 3. State Management ```python from enum import Enum class AgentState(Enum): IDLE = "idle" PROCESSING = "processing" - COMPLETED = "completed" + ERROR = "error" class StatefulAgent(Worker): def __init__(self): super().__init__() self.state = AgentState.IDLE - async def transition_state(self, new_state: AgentState): + async def transition(self, new_state: AgentState): + old_state = self.state self.state = new_state - await self.broadcast_state() + logger.info(f"State transition: {old_state} -> {new_state}") +``` + +### 4. Resource Management +```python +class ResourceAgent(Worker): + def __init__(self): + super().__init__() + self.resources = {} + + async def cleanup(self): + try: + # Release resources + for resource in self.resources.values(): + await resource.close() + finally: + await self.broadcast_shutdown() ``` -### Observer Pattern -Implements event monitoring and notification systems. +## Common Patterns +### 1. Task Distribution ```python -class EventMonitor(Worker): +class TaskDistributor(Admin): def __init__(self): super().__init__() - self.observers = [] + self.worker_loads = {} - async def notify_observers(self, event): - for observer in self.observers: - await self.send_message(observer, event) + async def assign_task(self, task): + available_workers = [w for w in self.worker_loads.items() + if w[1] < self.max_load] + if not available_workers: + raise NoAvailableWorkersError() + + worker = min(available_workers, key=lambda x: x[1])[0] + await self.send_message(worker, task) + self.worker_loads[worker] += 1 ``` -### Pipeline Pattern -Enables sequential processing of tasks through multiple stages. +### 2. Event Processing +```python +class EventProcessor(Worker): + def __init__(self): + super().__init__() + self.handlers = { + 'data': self.handle_data, + 'control': self.handle_control, + 'status': self.handle_status + } + + async def on_message(self, agent: AgentDetail, data: bytes, time: int): + message = pickle.loads(data) + handler = self.handlers.get(message.type) + if handler: + await handler(message) +``` +### 3. Pipeline Processing ```python -class PipelineAgent(Worker): - async def process_stage(self, data): - result = await self.current_stage(data) +class PipelineStage(Worker): + def __init__(self, next_stage_id: str = None): + super().__init__() + self.next_stage = next_stage_id + + async def process(self, data): + result = await self.transform(data) if self.next_stage: - await self.send_to_next_stage(result) + await self.send_message(self.next_stage, result) + return result ``` -## Best Practices +## Practical Examples -1. **Message Design** - - Use dataclasses for structured message formats - - Include metadata for tracking and debugging - ```python - @dataclass - class Message: - id: str # Unique message identifier - type: str # Message classification - payload: Any # Actual message content - timestamp: float = field(default_factory=time.time) - ``` - -2. **Resource Management** - - Implement proper cleanup procedures - - Handle resource release systematically - ```python - async def cleanup(self): - try: - await self.stop_tasks() - await self.close_connections() - finally: - await self.broadcast_shutdown() - ``` - -3. **Monitoring and Logging** - - Use structured logging for better debugging - - Track agent activities and performance - ```python - from loguru import logger - - class MonitoredAgent(Worker): - async def on_message(self, agent_id: str, data: bytes, time: int): - logger.info(f"Message from {agent_id} at {time}") - await self.update_metrics() - ``` - -## Advanced Use Cases - -### Load Balancing -Distributes tasks evenly across available workers. +### 1. Auction System +```python +@dataclass +class Bid: + bidder: str + amount: float +class AuctionManager(Admin): + def __init__(self, item, min_price): + super().__init__() + self.item = item + self.min_price = min_price + self.bids = [] + + @on(Bid) + async def handle_bid(self, bid: Bid, time: int, agent: AgentDetail): + if bid.amount >= self.min_price: + self.bids.append(bid) + await self.broadcast_new_bid(bid) +``` + +### 2. Task Scheduler ```python -class LoadBalancer(Admin): +@dataclass +class ScheduledTask: + id: str + execute_at: float + data: Any + +class Scheduler(Admin): def __init__(self): super().__init__() - self.worker_loads = {} + self.task_queue = [] - async def assign_task(self, task): - # Find least loaded worker - worker = min(self.worker_loads.items(), key=lambda x: x[1])[0] - await self.send_message(worker, task) - self.worker_loads[worker] += 1 + async def schedule_task(self, task: ScheduledTask): + heapq.heappush(self.task_queue, (task.execute_at, task)) + await self.check_queue() ``` -### Fault Tolerance -Implements retry mechanisms and error recovery. +## Performance Optimization +### 1. Message Batching ```python -class FaultTolerantAgent(Worker): - async def execute_with_retry(self, task, max_retries=3): - for attempt in range(max_retries): - try: - return await self.execute_task(task) - except Exception as e: - if attempt == max_retries - 1: - raise - await asyncio.sleep(2 ** attempt) # Exponential backoff +class BatchProcessor(Worker): + def __init__(self, batch_size=100): + super().__init__() + self.batch_size = batch_size + self.batch = [] + + async def add_to_batch(self, item): + self.batch.append(item) + if len(self.batch) >= self.batch_size: + await self.process_batch() ``` -### Dynamic Scaling -Adjusts system resources based on load. - +### 2. Caching ```python -class ScalableSystem(Admin): - async def check_load(self): - if self.system_load > self.threshold: - await self.scale_up() - elif self.system_load < self.threshold_low: - await self.scale_down() +from functools import lru_cache + +class CachedWorker(Worker): + def __init__(self): + super().__init__() + self.cache = {} + + @lru_cache(maxsize=1000) + def compute_result(self, input_data): + return expensive_computation(input_data) ``` -## Performance Optimization +## Logging and Monitoring -1. **Message Batching** - - Groups messages for efficient processing - - Reduces communication overhead - ```python - class BatchProcessor(Worker): - async def process_batch(self, messages: List[Message]): - tasks = [self.process_message(msg) for msg in messages] - return await asyncio.gather(*tasks) - ``` - -2. **Caching** - - Stores frequently used results - - Reduces computation overhead - ```python - from functools import lru_cache - - class CachedAgent(Worker): - @lru_cache(maxsize=1000) - def compute_result(self, input_data): - return expensive_computation(input_data) - ``` +### 1. Structured Logging +```python +from loguru import logger + +class LoggedAgent(Worker): + async def on_message(self, agent: AgentDetail, data: bytes, time: int): + logger.info(f"Message received", + agent_id=agent.id, + message_size=len(data), + timestamp=time) +``` -## Security Considerations +### 2. Metrics Collection +```python +class MetricsAgent(Worker): + def __init__(self): + super().__init__() + self.metrics = { + 'messages_processed': 0, + 'errors': 0, + 'processing_time': [] + } + + async def record_metric(self, name, value): + self.metrics[name] = value + await self.report_metrics() +``` -1. **Message Validation** - - Ensures message integrity and authenticity - - Validates data before processing - ```python - from pydantic import BaseModel, validator - - class SecureMessage(BaseModel): - content: str - signature: str - - @validator('signature') - def verify_signature(cls, v, values): - if not verify_signature(values['content'], v): - raise ValueError('Invalid signature') - return v - ``` - -2. **Access Control** - - Implements agent authorization - - Controls message access - ```python - class SecureAgent(Worker): - async def on_message(self, agent_id: str, data: bytes, time: int): - if not self.is_authorized(agent_id): - logger.warning(f"Unauthorized message from {agent_id}") - return - ``` - -## Testing -Demonstrates proper testing setup for Ceylon agents. +## Security Considerations +### Message Validation ```python -import pytest +class SecureAgent(Worker): + def validate_message(self, message): + return ( + self.verify_signature(message) and + self.check_permissions(message.sender) + ) +``` -@pytest.mark.asyncio -async def test_agent_communication(): - admin = TestAdmin() - worker = TestWorker(admin_peer=admin.details().id) - - await admin.start_agent(b"", [worker]) - assert len(await admin.get_connected_agents()) == 1 +### Access Control +```python +class AuthenticatedAgent(Worker): + def __init__(self): + super().__init__() + self.authorized_peers = set() + + async def on_message(self, agent: AgentDetail, data: bytes, time: int): + if agent.id not in self.authorized_peers: + logger.warning(f"Unauthorized message from {agent.id}") + return ``` -## Integration Examples - -1. **HTTP Interface** - - Exposes agent functionality via REST API - - Enables external system integration - ```python - from fastapi import FastAPI - - app = FastAPI() - agent_system = None - - @app.post("/tasks") - async def create_task(task: Task): - return await agent_system.submit_task(task) - ``` - -2. **Database Integration** - - Persists agent data and results - - Provides data consistency - ```python - from sqlalchemy.ext.asyncio import AsyncSession - - class PersistentAgent(Worker): - def __init__(self, session: AsyncSession): - super().__init__() - self.session = session - - async def save_result(self, result): - self.session.add(result) - await self.session.commit() - ``` - -## Common Patterns for Specific Use Cases - -1. **Auction System** - - Implements bidding and auction mechanics - - Manages competitive resource allocation - ```python - class Auctioneer(Admin): - async def start_auction(self, item): - await self.broadcast_message(AuctionStart(item)) - - @on(Bid) - async def handle_bid(self, bid: Bid): - if self.is_highest_bid(bid): - await self.notify_new_highest_bid(bid) - ``` - -2. **Task Scheduler** - - Manages task timing and assignment - - Handles deadline-based scheduling - ```python - class Scheduler(Admin): - async def schedule_task(self, task, deadline): - worker = await self.find_available_worker() - await self.send_message(worker.id, TaskAssignment(task, deadline)) - ``` - -3. **Meeting Coordinator** - - Coordinates participant schedules - - Finds optimal meeting times - ```python - class MeetingCoordinator(Admin): - async def find_time_slot(self, participants, duration): - responses = await self.collect_availability(participants) - return self.find_common_slot(responses, duration) - ``` - -## Monitoring and Debugging - -1. **Metrics Collection** - - Tracks system performance metrics - - Enables system optimization - ```python - from prometheus_client import Counter, Gauge - - class MetricsAgent(Worker): - def __init__(self): - self.message_counter = Counter('messages_total', 'Total messages processed') - self.active_tasks = Gauge('active_tasks', 'Currently active tasks') - ``` - -2. **Distributed Tracing** - - Tracks message flow through system - - Helps diagnose performance issues - ```python - from opentelemetry import trace - - class TracedAgent(Worker): - async def on_message(self, agent_id: str, data: bytes, time: int): - with trace.get_tracer(__name__).start_as_current_span("process_message"): - await self.process_message(data) - ``` \ No newline at end of file +## Deployment Tips + +1. Use environment variables for configuration +2. Implement proper shutdown handlers +3. Monitor system resources +4. Set up logging aggregation +5. Implement health checks + +## Common Pitfalls to Avoid + +1. Modifying received messages +2. Blocking operations in message handlers +3. Missing error handling +4. Inadequate logging +5. Poor resource cleanup + +## Additional Resources + +- Ceylon Documentation: [https://docs.ceylon.ai](https://docs.ceylon.ai) +- GitHub Repository: [https://github.com/ceylon-ai/ceylon](https://github.com/ceylon-ai/ceylon) +- API Reference: [https://docs.ceylon.ai/api](https://docs.ceylon.ai/api) \ No newline at end of file