diff --git a/bindings/ceylon/ceylon/base/playground.py b/bindings/ceylon/ceylon/base/playground.py index 4c3ca83..9312682 100644 --- a/bindings/ceylon/ceylon/base/playground.py +++ b/bindings/ceylon/ceylon/base/playground.py @@ -114,6 +114,8 @@ async def play(self, workers: Optional[List[BaseAgent]] = None): logger.warning("Received Ctrl+C, initiating force close...") await self.force_close() except Exception as e: + import traceback + traceback.print_exc() logger.error(f"Error in playground: {e}") await self.force_close() finally: diff --git a/bindings/ceylon/ceylon/llm/agent.py b/bindings/ceylon/ceylon/llm/agent.py index fa4d5a0..ab75351 100644 --- a/bindings/ceylon/ceylon/llm/agent.py +++ b/bindings/ceylon/ceylon/llm/agent.py @@ -80,7 +80,7 @@ async def execute_task(self, task: TaskMessage) -> None: if response: # Cache successful response - self.response_cache[task.task_id] = response + self.response_cache[task.id] = response # Print the response logger.info("\nGenerated Content:") @@ -100,10 +100,10 @@ async def execute_task(self, task: TaskMessage) -> None: task.metadata['response_timestamp'] = response.timestamp task.metadata.update(response.metadata) - logger.info(f"{self.name}: Completed task {task.task_id}") + logger.info(f"{self.name}: Completed task {task.id}") # Remove from active tasks and broadcast completion - del self.active_tasks[task.task_id] + del self.active_tasks[task.id] await self.broadcast_message(task) # Request new task @@ -112,7 +112,7 @@ async def execute_task(self, task: TaskMessage) -> None: raise Exception("Failed to get valid LLM response") except Exception as e: - logger.error(f"Error executing LLM task {task.task_id}: {e}") + logger.error(f"Error executing LLM task {task.id}: {e}") task.status = TaskStatus.FAILED task.metadata = task.metadata or {} task.metadata['error'] = str(e) @@ -177,7 +177,7 @@ async def _call_llm(self, task: TaskMessage) -> LLMResponse: return LLMResponse( content=response_text, metadata={ - 'task_id': task.task_id, + 'task_id': task.id, 'usage': usage.__dict__, 'model_name': self.llm_model.model_name } diff --git a/bindings/ceylon/ceylon/llm/task_pl.py b/bindings/ceylon/ceylon/llm/task_pl.py new file mode 100644 index 0000000..27aeba1 --- /dev/null +++ b/bindings/ceylon/ceylon/llm/task_pl.py @@ -0,0 +1,180 @@ +from dataclasses import dataclass, field +from typing import Any, Dict, Optional, List +import asyncio +import uuid + +from loguru import logger +from ceylon import on, on_connect, Worker, AgentDetail + +from .base_playground import BasePlayGround +from .manager import TaskManager, TaskMessage, TaskGroup, TaskStatus + + +@dataclass +class TaskRequest: + task_id: str + task_type: str + instructions: str + required_role: str + metadata: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class TaskResponse: + task_id: str + status: TaskStatus + result: Optional[Any] = None + error_message: Optional[str] = None + runtime_stats: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class TaskProgressUpdate: + task_id: str + progress: float # 0.0 to 1.0 + status: TaskStatus + message: Optional[str] = None + + +class TaskWorker(Worker): + def __init__(self, name: str, role: str): + super().__init__(name=name, role=role) + self.active_task: Optional[TaskMessage] = None + + @on(TaskRequest) + async def handle_task_request(self, request: TaskRequest, time: int): + try: + if self.active_task: + # Already processing a task + return + + logger.info(f"Worker {self.name} received task: {request.task_id}") + + # Process task (simulated work) + self.active_task = TaskMessage( + task_id=request.task_id, + name=f"Task-{request.task_id[:8]}", + instructions=request.instructions, + required_role=request.required_role + ) + + # Send progress updates + await self.broadcast_message(TaskProgressUpdate( + task_id=request.task_id, + progress=0.0, + status=TaskStatus.IN_PROGRESS, + message="Starting task" + )) + + # Simulate work + await asyncio.sleep(2) + + # Send completion + response = TaskResponse( + task_id=request.task_id, + status=TaskStatus.COMPLETED, + result={"processed": True}, + runtime_stats={ + "duration": 2.0, + "memory_used": "100MB" + } + ) + await self.broadcast_message(response) + self.active_task = None + + except Exception as e: + logger.error(f"Error processing task {request.task_id}: {e}") + await self.broadcast_message(TaskResponse( + task_id=request.task_id, + status=TaskStatus.FAILED, + error_message=str(e) + )) + self.active_task = None + + +class TaskPlayGround(BasePlayGround): + def __init__(self, name="task_playground", port=8888): + super().__init__(name=name, port=port) + self.task_manager = TaskManager() + self.task_responses: Dict[str, TaskResponse] = {} + self.task_events: Dict[str, asyncio.Event] = {} + self.task_progress: Dict[str, float] = {} + + @on(TaskResponse) + async def handle_task_response(self, response: TaskResponse, time: int): + """Handle task completion responses from workers""" + logger.info(f"Received task response for {response.task_id}: {response.status}") + + self.task_responses[response.task_id] = response + if response.task_id in self.task_events: + self.task_events[response.task_id].set() + + if response.status == TaskStatus.COMPLETED: + task = self.task_manager.tasks.get(response.task_id) + if task: + task.completed = True + all_completed = await self.task_manager.handle_task_completion(task) + if all_completed: + logger.info("All tasks completed") + await self.finish() + + @on(TaskProgressUpdate) + async def handle_progress_update(self, update: TaskProgressUpdate, time: int): + """Handle task progress updates""" + self.task_progress[update.task_id] = update.progress + logger.debug(f"Task {update.task_id} progress: {update.progress:.1%}") + + @on_connect("*") + async def handle_worker_connection(self, topic: str, agent: AgentDetail): + """Register new workers with the task manager""" + self.task_manager.register_worker(agent.name, agent.role) + await super().on_llm_agent_connected(topic, agent) + + async def submit_task(self, task_type: str, instructions: str, role: str, + metadata: Optional[Dict[str, Any]] = None) -> TaskResponse: + """Submit a task and wait for its completion""" + task_id = str(uuid.uuid4()) + request = TaskRequest( + task_id=task_id, + task_type=task_type, + instructions=instructions, + required_role=role, + metadata=metadata or {} + ) + + # Setup completion event + self.task_events[task_id] = asyncio.Event() + + # Send request + await self.broadcast_message(request) + + try: + # Wait for completion + await asyncio.wait_for(self.task_events[task_id].wait(), timeout=30.0) + return self.task_responses[task_id] + except asyncio.TimeoutError: + return TaskResponse( + task_id=task_id, + status=TaskStatus.FAILED, + error_message="Task timed out" + ) + finally: + # Cleanup + self.task_events.pop(task_id, None) + + def get_task_progress(self, task_id: str) -> float: + """Get current progress for a task""" + return self.task_progress.get(task_id, 0.0) + + async def close(self): + """Clean shutdown of playground""" + # Cancel any pending tasks + for task_id, event in self.task_events.items(): + event.set() + + # Clear state + self.task_responses.clear() + self.task_events.clear() + self.task_progress.clear() + + await self.force_close() diff --git a/bindings/ceylon/ceylon/task/data.py b/bindings/ceylon/ceylon/task/data.py index 03025b5..e17833a 100644 --- a/bindings/ceylon/ceylon/task/data.py +++ b/bindings/ceylon/ceylon/task/data.py @@ -2,12 +2,9 @@ # Licensed under the Apache License, Version 2.0 (See LICENSE.md or http://www.apache.org/licenses/LICENSE-2.0). # # -import uuid -from dataclasses import dataclass, field +from dataclasses import dataclass from enum import Enum -from typing import Optional, Set, List, Dict, Callable, Any - -from loguru import logger +from typing import Any, Dict, Optional class TaskStatus(Enum): @@ -17,85 +14,18 @@ class TaskStatus(Enum): FAILED = "failed" -@dataclass -class TaskMessage: - task_id: str - name: str - instructions: str - required_role: str=None - parent_id: Optional[str] = None - group_id: Optional[str] = None # Reference to TaskGroup - subtask_ids: Set[str] = field(default_factory=set) - assigned_to: Optional[str] = None - completed: bool = False - start_time: Optional[float] = None - end_time: Optional[float] = None - max_concurrent: int = 3 - status: TaskStatus = TaskStatus.PENDING - metadata: Optional[Dict[str, Any]] = None - result: Optional[Any] = None - - @dataclass class TaskRequest: - requester: str - role: str - task_type: str - priority: int = 1 - - -@dataclass -class TaskStatusUpdate: task_id: str - status: TaskStatus - message: Optional[str] = None - group_id: Optional[str] = None - - -class GoalStatus(Enum): - NOT_STARTED = "not_started" - IN_PROGRESS = "in_progress" - ACHIEVED = "achieved" - FAILED = "failed" - - -@dataclass -class TaskGroupGoal: - name: str - description: str - check_condition: Callable[[Dict[str, 'TaskGroup'], Dict[str, TaskMessage]], bool] - success_message: str - failure_message: str - status: GoalStatus = GoalStatus.NOT_STARTED + task_type: str + data: Any + metadata: Dict[str, Any] = None @dataclass -class TaskGroup: +class TaskResponse: task_id: str - name: str - description: str - subtasks: List[TaskMessage] - goal: Optional[TaskGroupGoal] = None - id: str = field(default_factory=lambda: str(uuid.uuid4())) - dependencies: Dict[str, List[str]] = field(default_factory=dict) - depends_on: List[str] = field(default_factory=list) - required_by: List[str] = field(default_factory=list) - status: TaskStatus = TaskStatus.PENDING - priority: int = 1 - - def check_goal(self, task_groups: Dict[str, 'TaskGroup'], completed_tasks: Dict[str, TaskMessage]) -> bool: - """Check if the group's goal has been achieved""" - print(f"Checking goal for group {self.name} {self.goal} ") - if not self.goal: - return False - - if self.goal.status == GoalStatus.ACHIEVED: - return True - - if self.goal.check_condition(task_groups, completed_tasks): - self.goal.status = GoalStatus.ACHIEVED - logger.debug(f"\nGoal Achieved for group {self.name}: {self.goal.name}") - logger.debug(self.goal.success_message) - return True - - return False + result: Any + status: str # 'success' or 'error' + error_message: Optional[str] = None + metadata: Dict[str, Any] = None diff --git a/bindings/ceylon/ceylon/task/manager.py b/bindings/ceylon/ceylon/task/manager.py index 0478453..3db78bd 100644 --- a/bindings/ceylon/ceylon/task/manager.py +++ b/bindings/ceylon/ceylon/task/manager.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from typing import Any, Callable, Dict, List, Set, Coroutine from dataclasses import dataclass, field from enum import Enum @@ -23,7 +25,7 @@ class TaskResult: @dataclass class Task: name: str - process: Callable[..., Coroutine] # Updated to expect a coroutine + processor: Callable[..., Coroutine] | str = None # Updated to expect a coroutine input_data: Dict[str, Any] = field(default_factory=dict) id: str = field(default_factory=lambda: str(uuid.uuid4())) dependencies: Set[str] = field(default_factory=set) @@ -42,33 +44,27 @@ def __init__(self): def add_task( self, - name: str, - process: Callable[..., Coroutine], - input_data: Dict[str, Any], - dependencies: Set[str] = None + task: Task, ) -> str: """ Add a new task to the task manager. Args: - name: Name of the task - process: Async callable that will process the task - input_data: Dictionary of input data for the task - dependencies: Set of task IDs that this task depends on + task (Task): The task to be added. Returns: str: ID of the created task """ - task_id = str(uuid.uuid4()) - task = Task( - id=task_id, - name=name, - process=process, - input_data=input_data, - dependencies=dependencies or set() - ) - self.tasks[task_id] = task - return task_id + # task_id = str(uuid.uuid4()) + # task = Task( + # id=task_id, + # name=name, + # processor=process, + # input_data=input_data, + # dependencies=dependencies or set() + # ) + self.tasks[task.id] = task + return task.id def get_task(self, task_id: str) -> Task: """Get task by ID.""" @@ -99,6 +95,8 @@ async def execute_task(self, task: Task) -> TaskResult: # Gather dependency outputs if needed dep_outputs = {} for dep_id in task.dependencies: + if dep_id not in self.tasks: + raise ValueError(f"Dependency {dep_id} not found") dep_task = self.tasks[dep_id] if dep_task.result and dep_task.result.success: dep_outputs[dep_id] = dep_task.result.output @@ -110,12 +108,14 @@ async def execute_task(self, task: Task) -> TaskResult: } # Execute the task process asynchronously - output = await task.process(execution_data) + output = await task.processor(execution_data) result = TaskResult(success=True, output=output) task.status = TaskStatus.COMPLETED except Exception as e: + import traceback + traceback.print_exc() self.logger.error(f"Task failed: {task.name} ({task.id}). Error: {str(e)}") result = TaskResult(success=False, error=str(e)) task.status = TaskStatus.FAILED @@ -170,23 +170,28 @@ async def aggregate_results(input_data): return {'total': sum(d['processed'] for d in dep_outputs.values())} # Add tasks - task1_id = manager.add_task( + task1_id = manager.add_task(Task( name="Process Data 1", - process=process_data, + processor=process_data, input_data={'data': 5} ) + ) task2_id = manager.add_task( - name="Process Data 2", - process=process_data, - input_data={'data': 10} + Task( + name="Process Data 2", + processor=process_data, + input_data={'data': 10} + ) ) task3_id = manager.add_task( - name="Aggregate Results", - process=aggregate_results, - input_data={}, - dependencies={task1_id, task2_id} + Task( + name="Aggregate Results", + processor=aggregate_results, + input_data={}, + dependencies={task1_id, task2_id} + ) ) # Execute all tasks @@ -203,4 +208,4 @@ async def aggregate_results(input_data): if __name__ == "__main__": logging.basicConfig(level=logging.INFO) - asyncio.run(example_usage()) \ No newline at end of file + asyncio.run(example_usage()) diff --git a/bindings/ceylon/ceylon/task/playground.py b/bindings/ceylon/ceylon/task/playground.py index 0add9ac..c232e55 100644 --- a/bindings/ceylon/ceylon/task/playground.py +++ b/bindings/ceylon/ceylon/task/playground.py @@ -1,124 +1,163 @@ # Copyright 2024-Present, Syigen Ltd. and Syigen Private Limited. All rights reserved. # Licensed under the Apache License, Version 2.0 (See LICENSE.md or http://www.apache.org/licenses/LICENSE-2.0). -# # import asyncio +from typing import Dict, List, Any from loguru import logger -from ceylon import on, on_connect -from ceylon.base.playground import BasePlayGround, TaskOutput -from ceylon.task.data import TaskMessage, TaskRequest, TaskStatusUpdate, TaskStatus -from .manager import TaskManager +from ceylon import on +from ceylon.processor.agent import ProcessRequest, ProcessResponse, ProcessState +from ceylon.processor.playground import ProcessPlayGround +from ceylon.task.manager import TaskManager, TaskResult, TaskStatus, Task -class TaskPlayGround(BasePlayGround): - def __init__(self, name="task_manager", port=8888): + +class TaskProcessingPlayground(ProcessPlayGround): + """ + Extended playground that combines ProcessPlayGround capabilities with TaskManager + for structured task processing and dependency management. + """ + + def __init__(self, name="task_processor", port=8888): super().__init__(name=name, port=port) self.task_manager = TaskManager() - - @on(TaskMessage) - async def handle_task_completion(self, task: TaskMessage, time: int): - if task.completed and task.task_id in self.task_manager.tasks: - # Record task completion - logger.info(f"Task {task.task_id} completed by {task.assigned_to}") - - # Create task output record - task_output = TaskOutput( - task_id=task.task_id, - name=task.name, - completed=True, - start_time=task.start_time, - end_time=task.end_time, - metadata=task.metadata if task.metadata else {} + self.task_process_map: Dict[str, str] = {} # Maps task IDs to process request IDs + self.pending_tasks: Dict[str, asyncio.Event] = {} + self.task_responses: Dict[str, TaskResult] = {} + + async def add_and_execute_task(self, + task: Task, + wait_for_completion: bool = True) -> TaskResult: + """ + Add a task and execute it through the processor system. + + Args: + task (Task): The task to be added and executed. + wait_for_completion (bool): Whether to wait for the task to complete. + + Returns: + TaskResult: Result of the task execution + :param task: + """ + if type(task.processor) == str: + required_role = task.processor + else: + raise ValueError("Processor must be a string (role)") + + async def process_executor(input_data: Dict) -> Any: + + if task.dependencies: + dependency_data = {} + for dependency_id in task.dependencies: + dependency_data[dependency_id] = self.task_responses[dependency_id] + + else: + dependency_data = None + + # Create and send process request + process_request = ProcessRequest( + task_type=required_role, + data=input_data.get('data'), + dependency_data=dependency_data ) - self.add_completed_task(task.task_id, task_output) - # Store task result if present - if hasattr(task, 'result'): - self.add_task_result(task.task_id, task.result) - - self._all_tasks_completed_events[task.task_id].set() - # Broadcast status update - await self.broadcast_message(TaskStatusUpdate( - task_id=task.task_id, - status=TaskStatus.COMPLETED, - group_id=task.group_id - )) - - # Check if all groups are completed - all_completed = await self.task_manager.handle_task_completion(task) - if all_completed: - logger.info("\nAll groups completed and goals achieved!") - await self.finish() - return - - # Activate dependent groups - new_assignments = await self.task_manager.activate_ready_groups() - for assignment in new_assignments: - await self.broadcast_message(assignment) - - @on(TaskStatusUpdate) - async def handle_task_status_update(self, update: TaskStatusUpdate, time: int): - if update.status == TaskStatus.FAILED and update.task_id in self.task_manager.tasks: - task = self.task_manager.tasks[update.task_id] - # Record failed task - task_output = TaskOutput( - task_id=task.task_id, - name=task.name, - completed=False, - start_time=task.start_time, - end_time=task.end_time, - error=update.message + + # Store mapping + self.task_process_map[task.id] = process_request.id + + # Send request and wait for response + response = await self.process_request(process_request) + + if response.status == ProcessState.SUCCESS: + return response.result + else: + raise Exception(response.error_message or "Task processing failed") + + task.processor = process_executor + + if wait_for_completion: + # Create completion event + self.pending_tasks[task.id] = asyncio.Event() + + # Execute task + result = await self._execute_task(task) + self.task_responses[task.id] = result + self.task_manager.add_task(task) + + # Clean up + self.pending_tasks.pop(task.id, None) + return result + else: + # Start execution without waiting + asyncio.create_task(self._execute_task(task)) + return None + + async def _execute_task(self, task: Task) -> TaskResult: + """Execute a single task and handle its result.""" + try: + # Execute the task + result = await asyncio.create_task( + self.task_manager.execute_task(task) ) - self.add_completed_task(task.task_id, task_output) - - @on(TaskRequest) - async def handle_task_request(self, request: TaskRequest, time: int): - templates = self.task_manager.task_templates - if request.role not in templates: - logger.warning(f"No task templates for role {request.role}") - return - - template = templates[request.role].get(request.task_type) - if not template: - logger.warning(f"No template for task type {request.task_type}") - return - - new_task = template() - new_task.assigned_to = request.requester - self.task_manager.tasks[new_task.task_id] = new_task - self.task_manager.worker_task_counts[request.requester] += 1 - - await self.broadcast_message(new_task) - logger.info(f"Task Manager: Created and assigned new task {new_task.task_id} to {request.requester}") - - @on_connect("*") - async def handle_worker_connection(self, topic: str, agent: "AgentDetail"): - self.task_manager.register_worker(agent.name, agent.role) - - async def assign_task_groups(self, groups): - """Initialize and start processing multiple task groups""" - assignments = await self.task_manager.assign_task_groups(groups) - for assignment in assignments: - await self.broadcast_message(assignment) - self._all_tasks_completed_events[assignment.task_id] = asyncio.Event() - - async def print_all_statistics(self): - """Print statistics for all task groups""" - await self.task_manager.print_all_statistics() - - # Print task completion statistics - completed_tasks = self.get_completed_tasks() - successful = sum(1 for t in completed_tasks.values() if t.completed) - failed = sum(1 for t in completed_tasks.values() if not t.completed) - - logger.info("\nTask Completion Statistics:") - logger.info(f"Total Tasks: {len(completed_tasks)}") - logger.info(f"Successful: {successful}") - logger.info(f"Failed: {failed}") - - # Print task results if available - task_results = self.get_task_results() - if task_results: - logger.info("\nTask Results Summary:") - for task_id, result in task_results.items(): - logger.info(f"Task {task_id}: {result}") + + # Set completion event if it exists + if task.id in self.pending_tasks: + self.pending_tasks[task.id].set() + + return result + + except Exception as e: + logger.error(f"Error executing task {task.name}: {e}") + return TaskResult(success=False, error=str(e)) + + async def execute_task_group(self, tasks: List[Task]) -> Dict[str, TaskResult]: + """Execute a group of tasks respecting dependencies.""" + results = {} + pending_tasks = set() + + for task in tasks: + event = asyncio.Event() + self.pending_tasks[task.id] = event + pending_tasks.add(task.id) + + # Start task execution + asyncio.create_task(self._execute_task(task)) + + # Wait for all tasks to complete + while pending_tasks: + completed = [] + for task_id in pending_tasks: + if self.pending_tasks[task_id].is_set(): + completed.append(task_id) + task = self.task_manager.get_task(task_id) + results[task_id] = task.result + self.pending_tasks.pop(task_id) + + for task_id in completed: + pending_tasks.remove(task_id) + + await asyncio.sleep(0.1) + + return results + + @on(ProcessResponse) + async def handle_process_response(self, response: ProcessResponse, time: int): + """Handle process responses and update task status.""" + await super().handle_process_response(response, time) + + # Find corresponding task + task_id = next( + (tid for tid, pid in self.task_process_map.items() + if pid == response.request_id), + None + ) + + if task_id: + task = self.task_manager.get_task(task_id) + if task: + if response.status == ProcessState.SUCCESS: + task.status = TaskStatus.COMPLETED + elif response.status == ProcessState.ERROR: + task.status = TaskStatus.FAILED + + # Clean up mapping + self.task_process_map.pop(task_id, None) diff --git a/bindings/ceylon/examples/auction/auction_playground.py b/bindings/ceylon/examples/auction/auction_playground.py index bf8c014..19c3433 100644 --- a/bindings/ceylon/examples/auction/auction_playground.py +++ b/bindings/ceylon/examples/auction/auction_playground.py @@ -59,7 +59,7 @@ async def execute_task(self, task: TaskMessage) -> None: amount=bid_amount, item_id=task.metadata['item_id'] ) - self.bids[task.task_id] = bid + self.bids[task.id] = bid # Update task with bid information task.completed = True diff --git a/bindings/ceylon/examples/llm/app.py b/bindings/ceylon/examples/llm/app.py index f0bb91f..b02adcb 100644 --- a/bindings/ceylon/examples/llm/app.py +++ b/bindings/ceylon/examples/llm/app.py @@ -227,13 +227,13 @@ async def main(): # Check completion status writing_done = ( - writing_group.task_id in active_playground.task_manager.completed_groups + writing_group.id in active_playground.task_manager.completed_groups or (writing_group.goal and writing_group.goal.status == GoalStatus.ACHIEVED) ) analysis_done = ( - analysis_group.task_id in active_playground.task_manager.completed_groups + analysis_group.id in active_playground.task_manager.completed_groups or (analysis_group.goal and analysis_group.goal.status == GoalStatus.ACHIEVED) ) diff --git a/bindings/ceylon/examples/llm/llm_hello.py b/bindings/ceylon/examples/llm/llm_hello.py index c2e143f..b069107 100644 --- a/bindings/ceylon/examples/llm/llm_hello.py +++ b/bindings/ceylon/examples/llm/llm_hello.py @@ -1,118 +1,32 @@ import asyncio -from dataclasses import dataclass -from typing import Optional, Any -from ceylon import Worker, on -from ceylon.base.playground import BasePlayGround +from ceylon.processor.agent import ProcessWorker, ProcessRequest, ProcessResponse +from ceylon.processor.playground import ProcessPlayGround -@dataclass -class TaskRequest: - task_id: str - task_type: str - data: Any - - -@dataclass -class TaskResponse: - task_id: str - result: Any - status: str # 'success' or 'error' - error_message: Optional[str] = None - - -class MessagePlayground(BasePlayGround): - def __init__(self, name="message_playground", port=8888): - super().__init__(name=name, port=port) - self.responses = {} - self.response_events = {} - - @on(TaskResponse) - async def handle_response(self, response: TaskResponse, time: int): - # Store the response and trigger the event - self.responses[response.task_id] = response - if response.task_id in self.response_events: - self.response_events[response.task_id].set() - - # Forward response externally (you can modify this part) - print(f"Received response: {response}") - - async def send_task_request(self, task_type: str, data: Any) -> TaskResponse or None: - # Create a unique task ID - task_id = f"task_{len(self.responses)}" - - # Create an event for this task - self.response_events[task_id] = asyncio.Event() - - # Create and send the request - request = TaskRequest(task_id=task_id, task_type=task_type, data=data) - await self.broadcast_message(request) - - # Wait for response - try: - await asyncio.wait_for(self.response_events[task_id].wait(), timeout=30.0) - return self.responses[task_id] - except asyncio.TimeoutError: - return TaskResponse( - task_id=task_id, - result=None, - status='error', - error_message='Request timed out' - ) - finally: - # Cleanup - self.response_events.pop(task_id, None) - +class MessageProcessor(ProcessWorker): + async def _processor(self, request: ProcessRequest, time: int): + if request.task_type == "uppercase": + if isinstance(request.data, str): + result = request.data.upper() + return result + else: + raise Exception("Input must be a string") + else: + raise Exception(f"Unknown task type: {request.task_type}") -class MessageWorker(Worker): def __init__(self, name: str): super().__init__(name=name, role="processor") - @on(TaskRequest) - async def handle_request(self, request: TaskRequest, time: int): - try: - # Process the request (example implementation) - if request.task_type == "uppercase": - if isinstance(request.data, str): - result = request.data.upper() - status = "success" - error_message = None - else: - result = None - status = "error" - error_message = "Input must be a string" - else: - result = None - status = "error" - error_message = f"Unknown task type: {request.task_type}" - - # Send response - response = TaskResponse( - task_id=request.task_id, - result=result, - status=status, - error_message=error_message - ) - await self.broadcast_message(response) - - except Exception as e: - # Handle errors - error_response = TaskResponse( - task_id=request.task_id, - result=None, - status="error", - error_message=str(e) - ) - await self.broadcast_message(error_response) - async def main(): # Create playground and worker - playground = MessagePlayground() - worker = MessageWorker("worker1") + playground = ProcessPlayGround() + worker = MessageProcessor("worker1") # Start the system async with playground.play(workers=[worker]) as active_playground: + active_playground: ProcessPlayGround = active_playground # Send some test requests test_cases = [ ("uppercase", "hello world"), @@ -122,10 +36,14 @@ async def main(): for task_type, data in test_cases: print(f"\nSending request - Type: {task_type}, Data: {data}") - response = await active_playground.send_task_request(task_type, data) - print(f"Response received: {response}") + response: ProcessResponse = await active_playground.process_request(ProcessRequest( + task_type=task_type, + data=data + )) + print(f"Response received: {response.result}") await asyncio.sleep(1) # Small delay between requests await active_playground.finish() + if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file + asyncio.run(main()) diff --git a/bindings/ceylon/examples/llm/llm_task_hello.py b/bindings/ceylon/examples/llm/llm_task_hello.py new file mode 100644 index 0000000..517039f --- /dev/null +++ b/bindings/ceylon/examples/llm/llm_task_hello.py @@ -0,0 +1,108 @@ +import asyncio +from typing import Dict, Any + +from loguru import logger + +from ceylon.task import TaskPlayGround +from ceylon.task.agent import TaskAgent +from ceylon.task.data import TaskRequest, TaskStatus + + +class SimpleLLMAgent(TaskAgent): + """Simple LLM agent that simulates text generation and processing""" + + def _setup_supported_tasks(self) -> None: + self.supported_tasks = { + "generate": self._generate_text, + "summarize": self._summarize_text, + "answer": self._answer_question + } + + async def process_task(self, request: TaskRequest) -> Any: + handler = self.supported_tasks[request.task_type] + return await handler(request.data) + + async def _generate_text(self, prompt: str) -> Dict[str, Any]: + """Simulate text generation""" + await asyncio.sleep(2) # Simulate API call + return { + "text": f"Generated response for: {prompt}", + "tokens": len(prompt) * 2, + "model": "simple-llm" + } + + async def _summarize_text(self, text: str) -> Dict[str, Any]: + """Simulate text summarization""" + await asyncio.sleep(1.5) # Simulate processing + return { + "summary": f"Summary of {len(text)} chars: {text[:50]}...", + "original_length": len(text), + "summary_ratio": 0.3 + } + + async def _answer_question(self, question: str) -> Dict[str, Any]: + """Simulate question answering""" + await asyncio.sleep(1) # Simulate thinking + return { + "answer": f"The answer to '{question}' is: This is a simulated response", + "confidence": 0.85, + "sources": ["simulated knowledge base"] + } + + +async def run_llm_example(): + # Create playground and LLM agent + playground = TaskPlayGround() + llm_agent = SimpleLLMAgent("llm_assistant", "text_processor") + + try: + async with playground.play(workers=[llm_agent]) as active_playground: + # Test cases + test_cases = [ + ("generate", "Write a short story about Python programming"), + ("summarize", "This is a long text that needs to be summarized. " * 5), + ("answer", "What is the meaning of life?"), + ("unknown", "This should fail"), # Unknown task type + ] + + for task_type, data in test_cases: + logger.info(f"\nSubmitting {task_type} task:") + logger.info(f"Input: {data[:100]}...") + + # Submit task with metadata + response = await active_playground.submit_task( + task_type=task_type, + instructions=data, + role="text_processor", + metadata={ + "priority": "high", + "source": "example_script" + } + ) + + # Handle response + if response: + if response.status == TaskStatus.COMPLETED: + logger.info(f"Task completed successfully!") + logger.info(f"Result: {response.result}") + if response.runtime_stats: + logger.info(f"Stats: {response.runtime_stats}") + else: + logger.warning(f"Task failed: {response.error_message}") + + # Small delay between tasks + await asyncio.sleep(1) + + await active_playground.finish() + + except KeyboardInterrupt: + logger.warning("Execution interrupted by user") + except Exception as e: + logger.error(f"Error during execution: {e}") + finally: + logger.info("Example completed") + + +if __name__ == "__main__": + # Run the example + asyncio.run(run_llm_example()) \ No newline at end of file diff --git a/bindings/ceylon/examples/llm/simple_app.py b/bindings/ceylon/examples/llm/simple_app.py index 0c2a4ef..476196c 100644 --- a/bindings/ceylon/examples/llm/simple_app.py +++ b/bindings/ceylon/examples/llm/simple_app.py @@ -72,13 +72,13 @@ async def main(): await active_playground.assign_task_groups([task_group]) # Wait for completion # Get and display results - completed_task = (await active_playground.wait_and_get_completed_tasks())[task.task_id] + completed_task = (await active_playground.wait_and_get_completed_tasks())[task.id] if completed_task.completed: print("\nTask Completed Successfully!") print(f"Duration: {completed_task.end_time - completed_task.start_time:.2f}s") print("\nGenerated Content:") print("=" * 80) - task_result = active_playground.get_task_results().get(task.task_id) + task_result = active_playground.get_task_results().get(task.id) if task_result: print(task_result) print("=" * 80) diff --git a/bindings/ceylon/examples/task/playground-app.md b/bindings/ceylon/examples/task/playground-app.md index 7279d00..4fc3c60 100644 --- a/bindings/ceylon/examples/task/playground-app.md +++ b/bindings/ceylon/examples/task/playground-app.md @@ -156,19 +156,19 @@ processing_group = TaskManager.create_task_group( ### 3. Monitor Progress ```python -async def monitor_progress(self, active_playground: TaskPlayGround, - processing_group: TaskGroup) -> None: +async def monitor_progress(self, active_playground: TaskPlayGround, + processing_group: TaskGroup) -> None: while True: await asyncio.sleep(1) - current_group = active_playground.task_manager.task_groups[processing_group.task_id] + current_group = active_playground.task_manager.task_groups[processing_group.id] # Print status updates logger.info(f"Group Status: {current_group.status}") if current_group.goal: logger.info(f"Goal Status: {current_group.goal.status}") - if (current_group.goal and - current_group.goal.status == GoalStatus.ACHIEVED): + if (current_group.goal and + current_group.goal.status == GoalStatus.ACHIEVED): logger.info("Goal achieved! System can stop while tasks continue.") break ``` diff --git a/bindings/ceylon/examples/task/playground_app.py b/bindings/ceylon/examples/task/playground_app.py index 36d6ff2..3b1f893 100644 --- a/bindings/ceylon/examples/task/playground_app.py +++ b/bindings/ceylon/examples/task/playground_app.py @@ -71,7 +71,7 @@ async def monitor_progress(self, active_playground: TaskPlayGround, """Monitor task group progress until goal is achieved""" while True: await asyncio.sleep(1) - current_group = active_playground.task_manager.task_groups[processing_group.task_id] + current_group = active_playground.task_manager.task_groups[processing_group.id] # Print status updates logger.info(f"Group Status: {current_group.status}") diff --git a/bindings/ceylon/examples/task/task-app-tutorial.md b/bindings/ceylon/examples/task/task-app-tutorial.md index 5d6f807..23f9550 100644 --- a/bindings/ceylon/examples/task/task-app-tutorial.md +++ b/bindings/ceylon/examples/task/task-app-tutorial.md @@ -202,7 +202,7 @@ async with playground.play(workers=workers) as active_playground: # Monitor progress while True: await asyncio.sleep(1) - current_group = active_playground.task_manager.task_groups[processing_group.task_id] + current_group = active_playground.task_manager.task_groups[processing_group.id] print(f"\nGroup Status: {current_group.status}") if current_group.goal: diff --git a/bindings/ceylon/examples/task_manager/README.md b/bindings/ceylon/examples/task_manager/README.md index 5c55ba6..0984975 100644 --- a/bindings/ceylon/examples/task_manager/README.md +++ b/bindings/ceylon/examples/task_manager/README.md @@ -142,6 +142,7 @@ Distribution logic: 4. Broadcasts assignments ### Result Processing + ```python @on(TaskResult) async def handle_result(self, data: TaskResult, time: int, agent: AgentDetail): @@ -149,7 +150,7 @@ async def handle_result(self, data: TaskResult, time: int, agent: AgentDetail): if len(self.task_results) == len(self.tasks): print("All tasks completed") for result in self.task_results: - print(f"Task {result.task_id} assigned to {result.worker} - " + print(f"Task {result.id} assigned to {result.worker} - " f"{'Success' if result.success else 'Failure'}") await self.end_task_management() ``` diff --git a/bindings/ceylon/examples/task_manager/task_manager.py b/bindings/ceylon/examples/task_manager/task_manager.py index ccc6c36..2c6228c 100644 --- a/bindings/ceylon/examples/task_manager/task_manager.py +++ b/bindings/ceylon/examples/task_manager/task_manager.py @@ -94,7 +94,7 @@ async def handle_result(self, data: TaskResult, time: int, agent: AgentDetail): if len(self.task_results) == len(self.tasks): print("All tasks completed") for result in self.task_results: - print(f"Task {result.task_id} assigned to {result.worker} - {'Success' if result.success else 'Failure'}") + print(f"Task {result.id} assigned to {result.worker} - {'Success' if result.success else 'Failure'}") await self.end_task_management() async def assign_tasks(self): diff --git a/docs/examples/playground/task-manager.md b/docs/examples/playground/task-manager.md index 7279d00..744b0c7 100644 --- a/docs/examples/playground/task-manager.md +++ b/docs/examples/playground/task-manager.md @@ -156,21 +156,21 @@ processing_group = TaskManager.create_task_group( ### 3. Monitor Progress ```python -async def monitor_progress(self, active_playground: TaskPlayGround, - processing_group: TaskGroup) -> None: - while True: - await asyncio.sleep(1) - current_group = active_playground.task_manager.task_groups[processing_group.task_id] - - # Print status updates - logger.info(f"Group Status: {current_group.status}") - if current_group.goal: - logger.info(f"Goal Status: {current_group.goal.status}") - - if (current_group.goal and - current_group.goal.status == GoalStatus.ACHIEVED): - logger.info("Goal achieved! System can stop while tasks continue.") - break +async def monitor_progress(self, active_playground: TaskPlayGround, + processing_group: TaskGroup) -> None: + while True: + await asyncio.sleep(1) + current_group = active_playground.task_manager.task_groups[processing_group.id] + + # Print status updates + logger.info(f"Group Status: {current_group.status}") + if current_group.goal: + logger.info(f"Goal Status: {current_group.goal.status}") + + if (current_group.goal and + current_group.goal.status == GoalStatus.ACHIEVED): + logger.info("Goal achieved! System can stop while tasks continue.") + break ``` ### 4. Run the System diff --git a/docs/examples/task-manager.md b/docs/examples/task-manager.md index 5307835..a9e8158 100644 --- a/docs/examples/task-manager.md +++ b/docs/examples/task-manager.md @@ -120,98 +120,98 @@ class WorkerAgent(BaseAgent): ```python class TaskManager(BaseAgent): - def __init__(self, tasks: List[Task], expected_workers: int, - name="task_manager", port=8000): - super().__init__( - name=name, - port=port, - mode=PeerMode.ADMIN, - role="task_manager" - ) - self.tasks = tasks - self.expected_workers = expected_workers - self.task_results = [] - self.tasks_assigned = False - self.worker_stats = {} - self.task_timeouts = {} - - async def assign_tasks(self): - if self.tasks_assigned: - return - - self.tasks_assigned = True - connected_workers = await self.get_connected_agents() - - # Intelligent task distribution - worker_tasks = self._match_tasks_to_workers( - self.tasks, connected_workers) - - for worker, task in worker_tasks: - assignment = TaskAssignment( - task=task, - assigned_at=int(time.time()), - timeout=task.difficulty * 2 - ) - await self.broadcast(pickle.dumps(assignment)) - - # Set timeout handler - self.task_timeouts[task.id] = asyncio.create_task( - self._handle_timeout(task.id, assignment.timeout) - ) - - async def _handle_timeout(self, task_id: int, timeout: int): - await asyncio.sleep(timeout) - if task_id not in self.completed_tasks: - logger.warning(f"Task {task_id} timed out") - # Implement timeout handling - - def _match_tasks_to_workers(self, tasks, workers): - # Intelligent matching algorithm - matches = [] - sorted_tasks = sorted(tasks, + def __init__(self, tasks: List[Task], expected_workers: int, + name="task_manager", port=8000): + super().__init__( + name=name, + port=port, + mode=PeerMode.ADMIN, + role="task_manager" + ) + self.tasks = tasks + self.expected_workers = expected_workers + self.task_results = [] + self.tasks_assigned = False + self.worker_stats = {} + self.task_timeouts = {} + + async def assign_tasks(self): + if self.tasks_assigned: + return + + self.tasks_assigned = True + connected_workers = await self.get_connected_agents() + + # Intelligent task distribution + worker_tasks = self._match_tasks_to_workers( + self.tasks, connected_workers) + + for worker, task in worker_tasks: + assignment = TaskAssignment( + task=task, + assigned_at=int(time.time()), + timeout=task.difficulty * 2 + ) + await self.broadcast(pickle.dumps(assignment)) + + # Set timeout handler + self.task_timeouts[task.id] = asyncio.create_task( + self._handle_timeout(task.id, assignment.timeout) + ) + + async def _handle_timeout(self, task_id: int, timeout: int): + await asyncio.sleep(timeout) + if task_id not in self.completed_tasks: + logger.warning(f"Task {task_id} timed out") + # Implement timeout handling + + def _match_tasks_to_workers(self, tasks, workers): + # Intelligent matching algorithm + matches = [] + sorted_tasks = sorted(tasks, key=lambda t: t.difficulty, reverse=True) - sorted_workers = sorted(workers, + sorted_workers = sorted(workers, key=lambda w: self._get_worker_score(w)) - - for task, worker in zip(sorted_tasks, sorted_workers): - matches.append((worker, task)) - return matches - - def _get_worker_score(self, worker): - stats = self.worker_stats.get(worker.name, {}) - success_rate = stats.get('success_rate', 0.5) - completed_tasks = stats.get('completed_tasks', 0) - return success_rate * 0.7 + completed_tasks * 0.3 - - @on(TaskResult) - async def handle_result(self, data: TaskResult, - time: int, agent: AgentDetail): - # Cancel timeout handler - if data.task_id in self.task_timeouts: - self.task_timeouts[data.task_id].cancel() - - self.task_results.append(data) - self._update_worker_stats(data) - - if len(self.task_results) == len(self.tasks): - await self._generate_final_report() - await self.end_task_management() - - def _update_worker_stats(self, result: TaskResult): - if result.worker not in self.worker_stats: - self.worker_stats[result.worker] = { - 'completed_tasks': 0, - 'successful_tasks': 0, - 'total_time': 0 - } - - stats = self.worker_stats[result.worker] - stats['completed_tasks'] += 1 - if result.success: - stats['successful_tasks'] += 1 - stats['total_time'] += result.execution_time - stats['success_rate'] = (stats['successful_tasks'] / + + for task, worker in zip(sorted_tasks, sorted_workers): + matches.append((worker, task)) + return matches + + def _get_worker_score(self, worker): + stats = self.worker_stats.get(worker.name, {}) + success_rate = stats.get('success_rate', 0.5) + completed_tasks = stats.get('completed_tasks', 0) + return success_rate * 0.7 + completed_tasks * 0.3 + + @on(TaskResult) + async def handle_result(self, data: TaskResult, + time: int, agent: AgentDetail): + # Cancel timeout handler + if data.id in self.task_timeouts: + self.task_timeouts[data.id].cancel() + + self.task_results.append(data) + self._update_worker_stats(data) + + if len(self.task_results) == len(self.tasks): + await self._generate_final_report() + await self.end_task_management() + + def _update_worker_stats(self, result: TaskResult): + if result.worker not in self.worker_stats: + self.worker_stats[result.worker] = { + 'completed_tasks': 0, + 'successful_tasks': 0, + 'total_time': 0 + } + + stats = self.worker_stats[result.worker] + stats['completed_tasks'] += 1 + if result.success: + stats['successful_tasks'] += 1 + stats['total_time'] += result.execution_time + stats['success_rate'] = (stats['successful_tasks'] / stats['completed_tasks']) ```