diff --git a/bindings/ceylon/ceylon/llm/agent.py b/bindings/ceylon/ceylon/llm/agent.py index ab75351..fec2983 100644 --- a/bindings/ceylon/ceylon/llm/agent.py +++ b/bindings/ceylon/ceylon/llm/agent.py @@ -5,15 +5,14 @@ import asyncio from dataclasses import dataclass, field from datetime import datetime -from typing import Optional, Dict, Any +from typing import Dict, Any -from loguru import logger from pydantic import BaseModel from ceylon.llm.models import Model, ModelSettings, ModelMessage from ceylon.llm.models.support.messages import MessageRole, TextPart -from ceylon.task.agent import TaskExecutionAgent -from ceylon.task.data import TaskMessage, TaskStatus +from ceylon.processor.agent import ProcessWorker +from ceylon.processor.data import ProcessRequest @dataclass @@ -22,6 +21,7 @@ class LLMResponse: metadata: Dict[str, Any] = field(default_factory=dict) timestamp: float = field(default_factory=lambda: datetime.now().timestamp()) + class LLMConfig(BaseModel): system_prompt: str temperature: float = 0.7 @@ -34,25 +34,26 @@ class LLMConfig(BaseModel): class Config: arbitrary_types_allowed = True -class LLMAgent(TaskExecutionAgent): + +class LLMAgent(ProcessWorker): """ An agent that processes tasks using configurable LLM capabilities. Supports multiple LLM backends through the Model interface. """ + def __init__( self, name: str, llm_model: Model, config: LLMConfig, - worker_role: str = "llm_processor", + role: str = "llm_processor", max_concurrent_tasks: int = 3 ): super().__init__( name=name, - worker_role=worker_role, - max_concurrent_tasks=max_concurrent_tasks + role=role ) - self.llm_model = llm_model + self.llm_model: Model = llm_model self.config = config self.response_cache: Dict[str, LLMResponse] = {} self.processing_lock = asyncio.Lock() @@ -65,169 +66,25 @@ def __init__( ) ) - async def execute_task(self, task: TaskMessage) -> None: - """ - Execute an LLM task with retry logic and error handling - """ - try: - logger.info(f"\n{'='*80}") - logger.info(f"Task: {task.name}") - logger.info(f"Description: {task.instructions}") - logger.info(f"{'='*80}\n") - - async with self.processing_lock: - response = await self._execute_with_retry(task) - - if response: - # Cache successful response - self.response_cache[task.id] = response - - # Print the response - logger.info("\nGenerated Content:") - logger.info(f"{'-'*80}") - logger.info(response.content) - logger.info(f"{'-'*80}\n") - - # Update task with completion info - task.completed = True - task.end_time = datetime.now().timestamp() - - # Include response in task metadata - if not task.metadata: - task.metadata = {} - task.metadata['llm_response'] = response.content - task.result = response.content - task.metadata['response_timestamp'] = response.timestamp - task.metadata.update(response.metadata) - - logger.info(f"{self.name}: Completed task {task.id}") - - # Remove from active tasks and broadcast completion - del self.active_tasks[task.id] - await self.broadcast_message(task) - - # Request new task - await self.request_task("standard") - else: - raise Exception("Failed to get valid LLM response") - - except Exception as e: - logger.error(f"Error executing LLM task {task.id}: {e}") - task.status = TaskStatus.FAILED - task.metadata = task.metadata or {} - task.metadata['error'] = str(e) - await self.broadcast_message(task) - - async def _execute_with_retry(self, task: TaskMessage) -> Optional[LLMResponse]: - """ - Execute LLM call with configured retry logic - """ - last_error = None - - for attempt in range(self.config.retry_attempts): - try: - response = await self._call_llm(task) - - if response and response.content: - if await self.validate_response(response, task): - return response - else: - raise ValueError("Response validation failed") - - except Exception as e: - last_error = e - logger.warning(f"Attempt {attempt + 1} failed: {e}") - if attempt < self.config.retry_attempts - 1: - await asyncio.sleep(self.config.retry_delay * (attempt + 1)) - - if last_error: - raise last_error - return None - - async def _call_llm(self, task: TaskMessage) -> LLMResponse: - """ - Make the actual LLM API call using the configured model - """ - try: - async with asyncio.timeout(self.config.timeout): - # Construct messages for the model - messages = [ - ModelMessage( - role=MessageRole.SYSTEM, - parts=[TextPart(text=self.config.system_prompt)] - ), - ModelMessage( - role=MessageRole.USER, - parts=[TextPart(text=self._format_task_prompt(task))] - ) + async def _processor(self, request: ProcessRequest, time: int): + message_list = [ + ModelMessage( + role=MessageRole.SYSTEM, + parts=[ + TextPart(text=self.config.system_prompt) ] - - # Make the model request - response, usage = await self.llm_model.request( - messages=messages, - context=self.model_context - ) - - # Extract text from response parts - response_text = "" - for part in response.parts: - if hasattr(part, 'text'): - response_text += part.text - - return LLMResponse( - content=response_text, - metadata={ - 'task_id': task.id, - 'usage': usage.__dict__, - 'model_name': self.llm_model.model_name - } - ) - - except asyncio.TimeoutError: - raise TimeoutError(f"LLM call timed out after {self.config.timeout}s") - except Exception as e: - raise Exception(f"LLM call failed: {str(e)}") - - def _format_task_prompt(self, task: TaskMessage) -> str: - """ - Format the task into a prompt for the LLM - """ - prompt_parts = [ - f"Task: {task.name}", - f"Description: {task.instructions}" + ), + ModelMessage( + role=MessageRole.USER, + parts=[ + TextPart(text=request.data) + ] + ) ] - # Add any task-specific metadata to prompt - if task.metadata: - for key, value in task.metadata.items(): - if key in ['type', 'topic', 'style', 'target_length']: - prompt_parts.append(f"{key.title()}: {value}") - - return "\n".join(prompt_parts) - - async def validate_response(self, response: LLMResponse, task: TaskMessage) -> bool: - """ - Validate LLM response format and content - Override this method to implement custom validation logic - """ - if not response or not response.content: - return False - - # Basic length validation - if task.metadata and 'target_length' in task.metadata: - target_length = task.metadata['target_length'] - actual_length = len(response.content.split()) - if actual_length < target_length * 0.5 or actual_length > target_length * 1.5: - logger.warning(f"Response length {actual_length} words outside target range of {target_length}") - return False - - # Add custom validation logic here - return True + return await self.llm_model.request(message_list, self.model_context) - async def close(self) -> None: - """ - Clean up resources when agent is stopped - """ + async def stop(self) -> None: if self.llm_model: await self.llm_model.close() - await super().close() \ No newline at end of file + await super().stop() diff --git a/bindings/ceylon/ceylon/llm/task_pl.py b/bindings/ceylon/ceylon/llm/task_pl.py deleted file mode 100644 index 27aeba1..0000000 --- a/bindings/ceylon/ceylon/llm/task_pl.py +++ /dev/null @@ -1,180 +0,0 @@ -from dataclasses import dataclass, field -from typing import Any, Dict, Optional, List -import asyncio -import uuid - -from loguru import logger -from ceylon import on, on_connect, Worker, AgentDetail - -from .base_playground import BasePlayGround -from .manager import TaskManager, TaskMessage, TaskGroup, TaskStatus - - -@dataclass -class TaskRequest: - task_id: str - task_type: str - instructions: str - required_role: str - metadata: Dict[str, Any] = field(default_factory=dict) - - -@dataclass -class TaskResponse: - task_id: str - status: TaskStatus - result: Optional[Any] = None - error_message: Optional[str] = None - runtime_stats: Dict[str, Any] = field(default_factory=dict) - - -@dataclass -class TaskProgressUpdate: - task_id: str - progress: float # 0.0 to 1.0 - status: TaskStatus - message: Optional[str] = None - - -class TaskWorker(Worker): - def __init__(self, name: str, role: str): - super().__init__(name=name, role=role) - self.active_task: Optional[TaskMessage] = None - - @on(TaskRequest) - async def handle_task_request(self, request: TaskRequest, time: int): - try: - if self.active_task: - # Already processing a task - return - - logger.info(f"Worker {self.name} received task: {request.task_id}") - - # Process task (simulated work) - self.active_task = TaskMessage( - task_id=request.task_id, - name=f"Task-{request.task_id[:8]}", - instructions=request.instructions, - required_role=request.required_role - ) - - # Send progress updates - await self.broadcast_message(TaskProgressUpdate( - task_id=request.task_id, - progress=0.0, - status=TaskStatus.IN_PROGRESS, - message="Starting task" - )) - - # Simulate work - await asyncio.sleep(2) - - # Send completion - response = TaskResponse( - task_id=request.task_id, - status=TaskStatus.COMPLETED, - result={"processed": True}, - runtime_stats={ - "duration": 2.0, - "memory_used": "100MB" - } - ) - await self.broadcast_message(response) - self.active_task = None - - except Exception as e: - logger.error(f"Error processing task {request.task_id}: {e}") - await self.broadcast_message(TaskResponse( - task_id=request.task_id, - status=TaskStatus.FAILED, - error_message=str(e) - )) - self.active_task = None - - -class TaskPlayGround(BasePlayGround): - def __init__(self, name="task_playground", port=8888): - super().__init__(name=name, port=port) - self.task_manager = TaskManager() - self.task_responses: Dict[str, TaskResponse] = {} - self.task_events: Dict[str, asyncio.Event] = {} - self.task_progress: Dict[str, float] = {} - - @on(TaskResponse) - async def handle_task_response(self, response: TaskResponse, time: int): - """Handle task completion responses from workers""" - logger.info(f"Received task response for {response.task_id}: {response.status}") - - self.task_responses[response.task_id] = response - if response.task_id in self.task_events: - self.task_events[response.task_id].set() - - if response.status == TaskStatus.COMPLETED: - task = self.task_manager.tasks.get(response.task_id) - if task: - task.completed = True - all_completed = await self.task_manager.handle_task_completion(task) - if all_completed: - logger.info("All tasks completed") - await self.finish() - - @on(TaskProgressUpdate) - async def handle_progress_update(self, update: TaskProgressUpdate, time: int): - """Handle task progress updates""" - self.task_progress[update.task_id] = update.progress - logger.debug(f"Task {update.task_id} progress: {update.progress:.1%}") - - @on_connect("*") - async def handle_worker_connection(self, topic: str, agent: AgentDetail): - """Register new workers with the task manager""" - self.task_manager.register_worker(agent.name, agent.role) - await super().on_llm_agent_connected(topic, agent) - - async def submit_task(self, task_type: str, instructions: str, role: str, - metadata: Optional[Dict[str, Any]] = None) -> TaskResponse: - """Submit a task and wait for its completion""" - task_id = str(uuid.uuid4()) - request = TaskRequest( - task_id=task_id, - task_type=task_type, - instructions=instructions, - required_role=role, - metadata=metadata or {} - ) - - # Setup completion event - self.task_events[task_id] = asyncio.Event() - - # Send request - await self.broadcast_message(request) - - try: - # Wait for completion - await asyncio.wait_for(self.task_events[task_id].wait(), timeout=30.0) - return self.task_responses[task_id] - except asyncio.TimeoutError: - return TaskResponse( - task_id=task_id, - status=TaskStatus.FAILED, - error_message="Task timed out" - ) - finally: - # Cleanup - self.task_events.pop(task_id, None) - - def get_task_progress(self, task_id: str) -> float: - """Get current progress for a task""" - return self.task_progress.get(task_id, 0.0) - - async def close(self): - """Clean shutdown of playground""" - # Cancel any pending tasks - for task_id, event in self.task_events.items(): - event.set() - - # Clear state - self.task_responses.clear() - self.task_events.clear() - self.task_progress.clear() - - await self.force_close() diff --git a/bindings/ceylon/examples/llm/app.py b/bindings/ceylon/examples/llm/app.py index b02adcb..655c0da 100644 --- a/bindings/ceylon/examples/llm/app.py +++ b/bindings/ceylon/examples/llm/app.py @@ -1,137 +1,14 @@ import asyncio -import uuid -from datetime import datetime -from typing import List, Dict -import loguru - -from ceylon.llm.agent import LLMAgent, LLMConfig +from ceylon.llm.agent import LLMConfig, LLMAgent from ceylon.llm.models.ollama import OllamaModel -from ceylon.task import TaskPlayGround -from ceylon.task.data import TaskMessage, TaskGroupGoal, GoalStatus -from ceylon.task.manager import TaskManager - - -def print_header(text: str): - print(f"\n{'='*80}") - print(f"{text.center(80)}") - print(f"{'='*80}") - -def print_task_results(completed_tasks: Dict, task_results: Dict): - print_header("Task Results") - - # Group tasks by type - writing_tasks = {} - analysis_tasks = {} - - for task_id, output in completed_tasks.items(): - if output.metadata.get('type') == 'article_writing': - writing_tasks[task_id] = output - elif output.metadata.get('type') == 'text_analysis': - analysis_tasks[task_id] = output - - # Print writing task results - print_header("Generated Articles") - for task_id, output in writing_tasks.items(): - print(f"\nArticle: {output.metadata.get('topic', 'Untitled')}") - print(f"Status: {'✓ Complete' if output.completed else '✗ Failed'}") - if output.completed: - print(f"Duration: {output.end_time - output.start_time:.2f}s") - result = task_results.get(task_id) - if result: - print("\nContent:") - print("-" * 40) - print(result) - print("-" * 40) - else: - print(f"Error: {output.error}") - - # Print analysis task results - print_header("Text Analysis Results") - for task_id, output in analysis_tasks.items(): - print(f"\nAnalysis Task: {output.name}") - print(f"Status: {'✓ Complete' if output.completed else '✗ Failed'}") - if output.completed: - print(f"Duration: {output.end_time - output.start_time:.2f}s") - result = task_results.get(task_id) - if result: - print("\nAnalysis:") - print("-" * 40) - print(result) - print("-" * 40) - else: - print(f"Error: {output.error}") - -def create_writing_tasks(topics: List[str]) -> List[TaskMessage]: - """Create content generation tasks for given topics""" - tasks = [] - for topic in topics: - task_id = str(uuid.uuid4()) - tasks.append(TaskMessage( - task_id=task_id, - name=f"Write Article: {topic}", - instructions=( - f"Write a comprehensive article about {topic}. " - f"Include key concepts, recent developments, and future implications." - ), - duration=3, - required_role="content_writer", - metadata={ - "type": "article_writing", - "topic": topic, - "target_length": 500, - "style": "informative" - } - )) - return tasks - -def create_analysis_tasks(texts: List[str]) -> List[TaskMessage]: - """Create text analysis tasks""" - tasks = [] - for i, text in enumerate(texts): - task_id = str(uuid.uuid4()) - tasks.append(TaskMessage( - task_id=task_id, - name=f"Analyze Text {i + 1}", - instructions=( - "Perform a detailed analysis of the provided text. " - "Include key themes, sentiment, and main points." - ), - duration=2, - required_role="content_writer", - metadata={ - "type": "text_analysis", - "content": text, - "analysis_aspects": ["themes", "sentiment", "key_points"] - } - )) - return tasks - -def create_success_rate_checker(required_rate: float): - """Create a goal checker that verifies task success rate""" - def check_completion(task_groups: dict, completed_tasks: dict) -> bool: - if not completed_tasks: - return False - - successful_tasks = sum( - 1 for task in completed_tasks.values() - if task.completed and task.status != GoalStatus.FAILED - ) - total_tasks = len(completed_tasks) - success_rate = successful_tasks / total_tasks +from ceylon.processor.agent import ProcessWorker, ProcessRequest, ProcessResponse +from ceylon.processor.playground import ProcessPlayGround - print(f"\nProgress Update:") - print(f"Completed Tasks: {successful_tasks}/{total_tasks}") - print(f"Success Rate: {success_rate:.1%} (Target: {required_rate:.1%})") - return success_rate >= required_rate - - return check_completion async def main(): - print_header("Ceylon LLM Agent Demo") - print("\nInitializing system...") - - # Initialize Ollama model + # Create playground and worker + playground = ProcessPlayGround() llm_model = OllamaModel( model_name="llama3.2", base_url="http://localhost:11434" @@ -140,136 +17,32 @@ async def main(): # Configure LLM agent llm_config = LLMConfig( system_prompt=( - "You are an expert content writer and analyst capable of producing " - "well-researched articles and performing detailed content analysis. " - "Always provide clear, accurate, and structured responses." + "You are an expert content writer specializing in technology topics. " + "Provide clear, informative, and engaging responses." ), temperature=0.7, - max_tokens=2000, - retry_attempts=2, - retry_delay=1.0, - timeout=30.0 + max_tokens=1000, + retry_attempts=1 ) - # Create LLM agents - llm_agents = [ - LLMAgent( - name=f"writer_{i}", - llm_model=llm_model, - config=llm_config, - worker_role="content_writer", - max_concurrent_tasks=2 - ) - for i in range(2) - ] - - # Initialize playground - playground = TaskPlayGround(name="content_generation") - - # Define topics and create task groups - topics = [ - "Quantum Computing Applications", - "Sustainable Energy Solutions", - "Future of Remote Work", - "Space Exploration Progress" - ] - - analysis_texts = [ - "Recent advancements in AI have transformed healthcare diagnosis...", - "Renewable energy adoption has increased significantly globally...", - "The shift to remote work has fundamentally changed workplace dynamics..." - ] - - writing_group = TaskManager.create_task_group( - name="Article Writing", - description="Generate informative articles on various topics", - subtasks=create_writing_tasks(topics), - goal=TaskGroupGoal( - name="Writing Quality Goal", - description="Achieve 80% success rate in content generation", - check_condition=create_success_rate_checker(0.8), - success_message="Successfully completed article writing tasks!", - failure_message="Failed to achieve target success rate in writing." - ), - priority=1 + # Create LLM agent + llm_agent = LLMAgent( + name="writer_1", + llm_model=llm_model, + config=llm_config, + role="writer" ) - analysis_group = TaskManager.create_task_group( - name="Content Analysis", - description="Analyze various texts for insights", - subtasks=create_analysis_tasks(analysis_texts), - goal=TaskGroupGoal( - name="Analysis Quality Goal", - description="Achieve 90% success rate in text analysis", - check_condition=create_success_rate_checker(0.9), - success_message="Successfully completed text analysis tasks!", - failure_message="Failed to achieve target success rate in analysis." - ), - priority=2 - ) - - try: - print_header("Starting Task Processing") - start_time = datetime.now() - print(f"Start Time: {start_time.strftime('%Y-%m-%d %H:%M:%S')}") - - # Run the playground - async with playground.play(workers=llm_agents) as active_playground: - # Assign task groups - await active_playground.assign_task_groups([ - writing_group, - analysis_group - ]) - - # Monitor progress - while True: - await asyncio.sleep(2) - - # Check completion status - writing_done = ( - writing_group.id in active_playground.task_manager.completed_groups - or (writing_group.goal and - writing_group.goal.status == GoalStatus.ACHIEVED) - ) - - analysis_done = ( - analysis_group.id in active_playground.task_manager.completed_groups - or (analysis_group.goal and - analysis_group.goal.status == GoalStatus.ACHIEVED) - ) - - # Print status - print(f"\nStatus Update ({datetime.now().strftime('%H:%M:%S')})") - print(f"Article Writing: {writing_group.status.value}") - print(f"Text Analysis: {analysis_group.status.value}") - - if writing_done and analysis_done: - print_header("All Tasks Completed!") - break - - # Get completed tasks and results - completed_tasks = active_playground.get_completed_tasks() - task_results = active_playground.get_task_results() - - # Print comprehensive results - print_task_results(completed_tasks, task_results) - - # Print execution time - end_time = datetime.now() - duration = (end_time - start_time).total_seconds() - print(f"\nTotal Execution Time: {duration:.2f} seconds") - - # Print final statistics - print_header("Final Statistics") - await active_playground.print_all_statistics() + # Start the system + async with playground.play(workers=[llm_agent]) as active_playground: + active_playground: ProcessPlayGround = active_playground + # Send some test requests + response: ProcessResponse = await active_playground.process_request(ProcessRequest( + task_type="writer", + data="Write a blog post about AI in 2023." + )) + print(f"Response received: {response.result}") - finally: - # Cleanup - await llm_model.close() - print("\nSystem shutdown completed.") if __name__ == "__main__": - print("\nStarting Ceylon LLM Agent Demo...") - print("Make sure Ollama is running at http://localhost:11434\n") - loguru.logger.remove() - asyncio.run(main()) \ No newline at end of file + asyncio.run(main())