Skip to content

Commit

Permalink
Update task manager example with docs
Browse files Browse the repository at this point in the history
  • Loading branch information
dewmal committed Feb 6, 2025
1 parent 83b58ce commit 44df3d9
Show file tree
Hide file tree
Showing 3 changed files with 414 additions and 269 deletions.
166 changes: 166 additions & 0 deletions bindings/ceylon/examples/task_manager/advance_playground.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# Copyright 2024-Present, Syigen Ltd. and Syigen Private Limited. All rights reserved.
# Licensed under the Apache License, Version 2.0 (See LICENSE.md or http://www.apache.org/licenses/LICENSE-2.0).
#
import asyncio
from typing import Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
import uuid
from loguru import logger

# Task States
class TaskState(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"

# Data Models
@dataclass
class TaskResult:
success: bool
output: Any = None
error: Optional[str] = None

@dataclass
class Task:
name: str
processor: str # Role identifier for the worker
input_data: Dict[str, Any]
id: str = str(uuid.uuid4())
dependencies: set[str] = None
state: TaskState = TaskState.PENDING
result: Optional[TaskResult] = None

def __post_init__(self):
if self.dependencies is None:
self.dependencies = set()

# Task Processing Worker
from ceylon.processor.agent import ProcessWorker
from ceylon.processor.data import ProcessRequest, ProcessResponse, ProcessState

class TaskWorker(ProcessWorker):
def __init__(self, name: str, role: str):
super().__init__(name=name, role=role)

async def _processor(self, request: ProcessRequest, time: int) -> tuple[Any, dict]:
try:
# Process based on role
if self.role == "math":
result = await self.process_math(request.data)
elif self.role == "text":
result = await self.process_text(request.data)
else:
result = await self.process_default(request.data)

return result, {"worker": self.name, "time": time}
except Exception as e:
raise Exception(f"Processing error: {str(e)}")

async def process_math(self, data: Dict[str, Any]) -> Any:
operation = data.get("operation")
numbers = data.get("numbers", [])

if operation == "sum":
return sum(numbers)
elif operation == "multiply":
result = 1
for num in numbers:
result *= num
return result
else:
raise ValueError(f"Unknown math operation: {operation}")

async def process_text(self, data: str) -> str:
return data.upper()

async def process_default(self, data: Any) -> Any:
return str(data)

# Task Processing Playground
from ceylon.task.playground import TaskProcessingPlayground

class CustomTaskPlayground(TaskProcessingPlayground):
def __init__(self, name: str = "custom_playground", port: int = 8888):
super().__init__(name=name, port=port)

async def execute_task_sequence(self, tasks: list[Task]) -> Dict[str, TaskResult]:
results = {}

# Add all tasks first
for task in tasks:
self.task_manager.add_task(task)

# Execute tasks in order of dependencies
while len(results) < len(tasks):
ready_tasks = []

for task in tasks:
if task.id not in results and all(dep in results for dep in task.dependencies):
ready_tasks.append(task)

if not ready_tasks:
raise Exception("Circular dependency detected")

# Execute ready tasks in parallel
execution_tasks = [
self.add_and_execute_task(task, wait_for_completion=True)
for task in ready_tasks
]

task_results = await asyncio.gather(*execution_tasks)

# Store results
for task, result in zip(ready_tasks, task_results):
results[task.id] = result

return results

# Example usage
async def main():
# Create playground and workers
playground = CustomTaskPlayground()
math_worker = TaskWorker("math_worker", "math")
text_worker = TaskWorker("text_worker", "text")

async with playground.play(workers=[math_worker, text_worker]) as pg:
# Create tasks
task1 = Task(
name="Calculate Sum",
processor="math",
input_data={
"operation": "sum",
"numbers": [1, 2, 3, 4, 5]
}
)

task2 = Task(
name="Process Text",
processor="text",
input_data="hello world"
)

task3 = Task(
name="Final Calculation",
processor="math",
input_data={
"operation": "multiply",
"numbers": [10]
},
dependencies={task1.id}
)

# Execute task sequence
results = await pg.execute_task_sequence([task1, task2, task3])

# Print results
for task_id, result in results.items():
task = pg.task_manager.get_task(task_id)
print(f"\nTask: {task.name}")
print(f"Result: {result.output}")

await pg.finish()

if __name__ == "__main__":
asyncio.run(main())
40 changes: 35 additions & 5 deletions bindings/ceylon/examples/task_manager/using-playground.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,36 @@ This tutorial demonstrates how to build a distributed task processing system usi
- Contains input data and processing instructions
- Can have dependencies on other tasks

## Message Passing

````mermaid
sequenceDiagram
participant TM as TaskManager
participant W1 as Junior(skill=3)
participant W2 as Intermediate(skill=6)
participant W3 as Senior(skill=9)
Note over TM,W3: Connection Phase
W1->>TM: Connect
W2->>TM: Connect
W3->>TM: Connect
Note over TM,W3: Task Assignment Phase
TM->>W1: TaskAssignment(Task 1, difficulty=2)
TM->>W2: TaskAssignment(Task 2, difficulty=5)
TM->>W3: TaskAssignment(Task 3, difficulty=8)
Note over TM,W3: Task Processing Phase
par Process Tasks
W1-->>TM: TaskResult(id=1, success=true)
W2-->>TM: TaskResult(id=2, success=true)
W3-->>TM: TaskResult(id=3, success=true)
end
Note over TM: Calculate Success Rate
Note over TM: End Task Management
````
The diagram illustrates a distributed task management system where a central TaskManager coordinates with multiple worker agents. Each worker has a different skill level (3, 6, and 9) and can handle tasks of varying difficulty (2, 5, and 8). The workflow begins with workers connecting to the TaskManager, followed by task assignments based on availability. Workers process their assigned tasks in parallel, with success determined by whether their skill level exceeds the task's difficulty. Once all tasks are complete, the TaskManager calculates the overall success rate before shutting down.
## Implementation Guide

### 1. Define Your Task Structure
Expand Down Expand Up @@ -154,29 +184,29 @@ async def create_pipeline(playground):
processor="prep_worker",
input_data={'raw_data': data}
)

# Processing task depending on prep
process_task = Task(
name="Data Processing",
processor="process_worker",
dependencies={prep_task.id}
)

# Final aggregation
aggregate_task = Task(
name="Result Aggregation",
processor="aggregator",
dependencies={process_task.id}
)

# Execute pipeline
tasks = [prep_task, process_task, aggregate_task]
results = []

for task in tasks:
result = await playground.add_and_execute_task(task)
results.append(result)

return results
```

Expand Down
Loading

0 comments on commit 44df3d9

Please sign in to comment.