From cd07f73d5064de178a4b1f1477579b6d11bdbd60 Mon Sep 17 00:00:00 2001 From: Dewmal Date: Sun, 26 Jan 2025 00:41:08 +0530 Subject: [PATCH] finish tutorial --- docs/tutorials.md | 661 ++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 581 insertions(+), 80 deletions(-) diff --git a/docs/tutorials.md b/docs/tutorials.md index 984761e..d641a61 100644 --- a/docs/tutorials.md +++ b/docs/tutorials.md @@ -1,114 +1,615 @@ -# Ceylon Implementation Guide +## System Design Principles and Best Practices + +### Core Design Principles + +1. **Single Responsibility** + - Each agent should have one clear purpose + - Break complex behaviors into multiple specialized agents + ```python + # Good + class DataValidatorAgent(Worker): + async def validate(self, data): pass + + class DataProcessorAgent(Worker): + async def process(self, data): pass + + # Avoid + class DataAgent(Worker): + async def validate_and_process(self, data): pass + ``` + +2. **Message Immutability** + - Treat received messages as immutable + - Create new messages for modifications + ```python + @on(DataMessage) + async def handle_data(self, msg: DataMessage): + # Good + new_msg = DataMessage( + id=msg.id, + data=process(msg.data) + ) + + # Avoid + msg.data = process(msg.data) # Don't modify received messages + ``` + +3. **Fault Isolation** + - Contain failures within individual agents + - Implement circuit breakers for dependent services + ```python + class ResilientAgent(Worker): + async def execute_task(self, task): + with CircuitBreaker(failure_threshold=3): + try: + await self.process(task) + except Exception: + await self.handle_failure(task) + ``` + +### Mental Models for Agent Development + +1. **Think in Terms of Workflows** + ```python + class WorkflowAgent(Admin): + def __init__(self): + self.workflow = { + 'start': self.validate_input, + 'validate': self.process_data, + 'process': self.store_results, + 'store': self.notify_completion + } + ``` + +2. **Event-Driven Architecture** + ```python + class EventDrivenAgent(Worker): + @on(DataReceived) + async def handle_data(self, event): pass + + @on(ProcessingComplete) + async def handle_completion(self, event): pass + ``` + +3. **State Management** + ```python + class StateAwareAgent(Worker): + def __init__(self): + self.state_machine = { + 'idle': ['processing'], + 'processing': ['completed', 'failed'], + 'completed': ['idle'], + 'failed': ['idle'] + } + ``` + +### System Architecture Guidelines + +1. **Layered Communication** + ```python + class SystemArchitecture: + def __init__(self): + self.layers = { + 'presentation': WebInterface(), + 'coordination': CoordinatorAgent(), + 'processing': [WorkerAgent() for _ in range(3)], + 'storage': StorageAgent() + } + ``` + +2. **Service Discovery** + ```python + class ServiceRegistry(Admin): + async def register_service(self, service_type, agent_id): + self.services[service_type].append(agent_id) + + async def get_service(self, service_type): + return random.choice(self.services[service_type]) + ``` + +3. **Load Distribution** + ```python + class LoadAwareSystem: + def calculate_distribution(self, agents): + weights = [1/agent.load for agent in agents] + total = sum(weights) + return [w/total for w in weights] + ``` + +### Production Deployment Considerations + +1. **Monitoring Setup** + ```python + class ProductionAgent(Worker): + def __init__(self): + self.metrics = { + 'messages': Counter('messages_total'), + 'latency': Histogram('processing_latency'), + 'errors': Counter('error_total') + } + ``` + +2. **Configuration Management** + ```python + class ConfigurableAgent(Worker): + def __init__(self, config_path: str): + self.config = self.load_config(config_path) + self.validate_config() + self.apply_config() + ``` + +3. **Logging Strategy** + ```python + class LoggingSetup: + def configure_logging(self): + logger.add( + "app.log", + rotation="500 MB", + retention="7 days", + level="INFO" + ) + ``` + +### Performance Optimization Guidelines + +1. **Message Batching** + ```python + class BatchProcessor: + async def process_messages(self, messages): + if len(messages) > self.batch_size: + chunks = self.chunk_messages(messages) + return await asyncio.gather(*map(self.process_batch, chunks)) + ``` + +2. **Resource Pooling** + ```python + class ResourcePool: + def __init__(self, pool_size): + self.pool = asyncio.Queue(pool_size) + self.resources = set() + ``` + +3. **Memory Management** + ```python + class MemoryAware: + def __init__(self, max_cache_size): + self.cache = LRUCache(max_cache_size) + self.monitor_memory_usage() + ``` -## Quick Start Examples - -### Meeting Scheduler - -```python -from ceylon import Admin, Worker -from ceylon.base.support import on, on_run, on_connect +### Security Best Practices +1. **Message Authentication** + ```python + class SecureMessaging: + def authenticate_message(self, message): + return hmac.verify( + message.content, + message.signature, + self.secret_key + ) + ``` + +2. **Access Control** + ```python + class SecureAgent(Worker): + def authorize_action(self, agent_id, action): + return ( + self.verify_identity(agent_id) and + self.check_permissions(agent_id, action) + ) + ``` + +3. **Data Protection** + ```python + class DataProtection: + def protect_sensitive_data(self, data): + return { + k: mask_sensitive(v) + for k, v in data.items() + } + ``` + +Remember: The Ceylon framework is built around asynchronous communication and distributed processing. Always design your agents with these principles in mind, ensuring they can operate independently while maintaining system coherence through well-defined message patterns and workflows. +# Ceylon Framework Advanced Guide + +## Core Concepts + +### Agent Types +Ceylon uses a distributed architecture with two primary agent types working together to accomplish tasks. + +1. **Admin Agent (BaseAgent with PeerMode.ADMIN)** + - Central coordinator that manages the entire network + - Handles task distribution and result collection + - Acts as the primary decision maker in the system + - Only one admin agent is allowed per system + ```python + from ceylon import Admin + + class NetworkManager(Admin): + def __init__(self, name="admin", port=8888): + super().__init__(name=name, port=port) + ``` + +2. **Worker Agent (BaseAgent with PeerMode.CLIENT)** + - Performs specific tasks assigned by the admin + - Reports results back to the admin + - Can have specialized roles or capabilities + - Multiple workers can operate simultaneously + ```python + from ceylon import Worker + + class TaskWorker(Worker): + def __init__(self, name, admin_peer): + super().__init__(name=name, admin_peer=admin_peer) + ``` -class Scheduler(Admin): - def __init__(self, meeting): - super().__init__(name="scheduler", port=8000) - self.meeting = meeting +### Message Handling +Ceylon provides a robust message handling system for agent communication. + +1. **Event Decorators** + - Decorators simplify message and event handling + - Allow declarative definition of message processors + ```python + @on(MessageType) # Processes messages of specific types + @on_run() # Defines agent's main execution loop + @on_connect("*") # Handles new agent connections + ``` + +2. **Message Broadcasting** + - Two primary methods for sending messages + - Supports both broadcast and direct communication + ```python + await self.broadcast_message(data) # Sends to all connected agents + await self.send_message(peer_id, data) # Sends to specific agent + ``` + +3. **Data Serialization** + - Uses Python's pickle for data serialization + - Enables complex object transmission between agents + ```python + # Sending serialized data + await self.broadcast(pickle.dumps(data)) + + # Receiving and deserializing + data = pickle.loads(message) + ``` + +## Advanced Features + +### Agent Configuration +Agents can be extensively configured to suit different roles and requirements. - @on_connect("*") - async def handle_connection(self, topic: str, agent: AgentDetail): - await self.broadcast_message(AvailabilityRequest()) +```python +class CustomAgent(BaseAgent): + def __init__(self): + super().__init__( + name="agent_name", # Unique identifier + mode=PeerMode.CLIENT, # Operating mode + role="custom_role", # Agent's role in system + port=8000, # Network port + admin_peer="peer_id", # Admin connection + admin_ip="127.0.0.1", # Admin location + workspace_id="default", # Workspace grouping + buffer_size=1024 # Message buffer size + ) +``` +### Connection Management +Ceylon provides tools for monitoring and managing agent connections. -async def main(): - scheduler = Scheduler(meeting) - await scheduler.start_agent(b"", workers) +```python +class NetworkAdmin(Admin): + async def on_agent_connected(self, topic: str, agent: AgentDetail): + # Track connected agents and trigger actions + connected = await self.get_connected_agents() + if len(connected) == self.expected_count: + await self.start_processing() ``` -[Full Code](examples/meeting-sechdular.md) | [Interactive Demo](https://colab.research.google.com/drive/1C-E9BN992k5sZYeJWnVrsWA5_ryaaT8m) - -### Auction System +### Error Handling +Robust error handling ensures system reliability. ```python -class Auctioneer(Admin): - async def handle_bid(self, bid: Bid): - if bid.amount > self.highest_bid: - self.highest_bid = bid.amount - await self.broadcast_message(CurrentPrice(bid.amount)) +async def handle_message(self, agent_id: str, data: bytes, time: int): + try: + message = pickle.loads(data) + await self.process_message(message) + except Exception as e: + logger.error(f"Error processing message: {e}") + await self.handle_error(agent_id) ``` -[Full Code](examples/auction) | [Interactive Demo](https://colab.research.google.com/drive/12o76s4CyGvOpUaACDYIaYmJgJE1hC81Y) - ## Implementation Patterns -### Admin-Worker Pattern +### State Machine Pattern +Useful for agents that need to track and manage different operational states. ```python -# Admin node setup -admin = Admin(name="admin", port=8888) -admin_details = admin.details() - -# Worker node setup -worker = Worker(name="worker", - admin_peer=admin_details.id, - role="worker") - -await admin.start_agent(b"", [worker]) +from enum import Enum + +class AgentState(Enum): + IDLE = "idle" + PROCESSING = "processing" + COMPLETED = "completed" + +class StatefulAgent(Worker): + def __init__(self): + super().__init__() + self.state = AgentState.IDLE + + async def transition_state(self, new_state: AgentState): + self.state = new_state + await self.broadcast_state() ``` -### Message Handling +### Observer Pattern +Implements event monitoring and notification systems. ```python -@on(MessageType) -async def handle_message(self, message: bytes): - # Process message - response = process(message) - await self.broadcast_message(response) +class EventMonitor(Worker): + def __init__(self): + super().__init__() + self.observers = [] + + async def notify_observers(self, event): + for observer in self.observers: + await self.send_message(observer, event) ``` -### Event Processing +### Pipeline Pattern +Enables sequential processing of tasks through multiple stages. ```python -@on_connect("*") -async def handle_connect(self, topic: str, agent: AgentDetail): - # Handle new connection - await self.send_message(agent.id, welcome_message) +class PipelineAgent(Worker): + async def process_stage(self, data): + result = await self.current_stage(data) + if self.next_stage: + await self.send_to_next_stage(result) ``` -## Advanced Topics +## Best Practices + +1. **Message Design** + - Use dataclasses for structured message formats + - Include metadata for tracking and debugging + ```python + @dataclass + class Message: + id: str # Unique message identifier + type: str # Message classification + payload: Any # Actual message content + timestamp: float = field(default_factory=time.time) + ``` + +2. **Resource Management** + - Implement proper cleanup procedures + - Handle resource release systematically + ```python + async def cleanup(self): + try: + await self.stop_tasks() + await self.close_connections() + finally: + await self.broadcast_shutdown() + ``` + +3. **Monitoring and Logging** + - Use structured logging for better debugging + - Track agent activities and performance + ```python + from loguru import logger + + class MonitoredAgent(Worker): + async def on_message(self, agent_id: str, data: bytes, time: int): + logger.info(f"Message from {agent_id} at {time}") + await self.update_metrics() + ``` + +## Advanced Use Cases + +### Load Balancing +Distributes tasks evenly across available workers. -### Custom Agent Behaviors - -- Role-based permissions -- State management -- Error handling -- Resource cleanup - -### Scaling Considerations - -- Buffer sizing -- Connection pooling -- Load balancing -- Error recovery - -### Security Best Practices +```python +class LoadBalancer(Admin): + def __init__(self): + super().__init__() + self.worker_loads = {} + + async def assign_task(self, task): + # Find least loaded worker + worker = min(self.worker_loads.items(), key=lambda x: x[1])[0] + await self.send_message(worker, task) + self.worker_loads[worker] += 1 +``` -- TLS encryption -- Authentication -- Access control -- Audit logging +### Fault Tolerance +Implements retry mechanisms and error recovery. -## Additional Resources +```python +class FaultTolerantAgent(Worker): + async def execute_with_retry(self, task, max_retries=3): + for attempt in range(max_retries): + try: + return await self.execute_task(task) + except Exception as e: + if attempt == max_retries - 1: + raise + await asyncio.sleep(2 ** attempt) # Exponential backoff +``` -### Tutorials +### Dynamic Scaling +Adjusts system resources based on load. -- [Article Creation Workflow](https://medium.com/ceylonai/collaborative-ai-workflow-using-ceylon-framework-for-streamlined-article-creation-81bbd7ee7c01) -- [Meeting Scheduler Guide](https://medium.com/ceylonai/a-meeting-scheduler-with-ceylon-multi-agent-system-a7aa5a906f36) +```python +class ScalableSystem(Admin): + async def check_load(self): + if self.system_load > self.threshold: + await self.scale_up() + elif self.system_load < self.threshold_low: + await self.scale_down() +``` -### Example Projects +## Performance Optimization + +1. **Message Batching** + - Groups messages for efficient processing + - Reduces communication overhead + ```python + class BatchProcessor(Worker): + async def process_batch(self, messages: List[Message]): + tasks = [self.process_message(msg) for msg in messages] + return await asyncio.gather(*tasks) + ``` + +2. **Caching** + - Stores frequently used results + - Reduces computation overhead + ```python + from functools import lru_cache + + class CachedAgent(Worker): + @lru_cache(maxsize=1000) + def compute_result(self, input_data): + return expensive_computation(input_data) + ``` + +## Security Considerations + +1. **Message Validation** + - Ensures message integrity and authenticity + - Validates data before processing + ```python + from pydantic import BaseModel, validator + + class SecureMessage(BaseModel): + content: str + signature: str + + @validator('signature') + def verify_signature(cls, v, values): + if not verify_signature(values['content'], v): + raise ValueError('Invalid signature') + return v + ``` + +2. **Access Control** + - Implements agent authorization + - Controls message access + ```python + class SecureAgent(Worker): + async def on_message(self, agent_id: str, data: bytes, time: int): + if not self.is_authorized(agent_id): + logger.warning(f"Unauthorized message from {agent_id}") + return + ``` + +## Testing +Demonstrates proper testing setup for Ceylon agents. -- [Task Manager](examples/task_manager) -- [Time Scheduler](examples/time_scheduling) -- [Auction System](examples/auction) +```python +import pytest + +@pytest.mark.asyncio +async def test_agent_communication(): + admin = TestAdmin() + worker = TestWorker(admin_peer=admin.details().id) + + await admin.start_agent(b"", [worker]) + assert len(await admin.get_connected_agents()) == 1 +``` ---- -Copyright 2024-Present, Syigen Ltd. and Syigen Private Limited. \ No newline at end of file +## Integration Examples + +1. **HTTP Interface** + - Exposes agent functionality via REST API + - Enables external system integration + ```python + from fastapi import FastAPI + + app = FastAPI() + agent_system = None + + @app.post("/tasks") + async def create_task(task: Task): + return await agent_system.submit_task(task) + ``` + +2. **Database Integration** + - Persists agent data and results + - Provides data consistency + ```python + from sqlalchemy.ext.asyncio import AsyncSession + + class PersistentAgent(Worker): + def __init__(self, session: AsyncSession): + super().__init__() + self.session = session + + async def save_result(self, result): + self.session.add(result) + await self.session.commit() + ``` + +## Common Patterns for Specific Use Cases + +1. **Auction System** + - Implements bidding and auction mechanics + - Manages competitive resource allocation + ```python + class Auctioneer(Admin): + async def start_auction(self, item): + await self.broadcast_message(AuctionStart(item)) + + @on(Bid) + async def handle_bid(self, bid: Bid): + if self.is_highest_bid(bid): + await self.notify_new_highest_bid(bid) + ``` + +2. **Task Scheduler** + - Manages task timing and assignment + - Handles deadline-based scheduling + ```python + class Scheduler(Admin): + async def schedule_task(self, task, deadline): + worker = await self.find_available_worker() + await self.send_message(worker.id, TaskAssignment(task, deadline)) + ``` + +3. **Meeting Coordinator** + - Coordinates participant schedules + - Finds optimal meeting times + ```python + class MeetingCoordinator(Admin): + async def find_time_slot(self, participants, duration): + responses = await self.collect_availability(participants) + return self.find_common_slot(responses, duration) + ``` + +## Monitoring and Debugging + +1. **Metrics Collection** + - Tracks system performance metrics + - Enables system optimization + ```python + from prometheus_client import Counter, Gauge + + class MetricsAgent(Worker): + def __init__(self): + self.message_counter = Counter('messages_total', 'Total messages processed') + self.active_tasks = Gauge('active_tasks', 'Currently active tasks') + ``` + +2. **Distributed Tracing** + - Tracks message flow through system + - Helps diagnose performance issues + ```python + from opentelemetry import trace + + class TracedAgent(Worker): + async def on_message(self, agent_id: str, data: bytes, time: int): + with trace.get_tracer(__name__).start_as_current_span("process_message"): + await self.process_message(data) + ``` \ No newline at end of file