Skip to content

Commit

Permalink
modify task operator to run parallel tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
dewmal committed Aug 25, 2024
1 parent cd5644a commit 273dbe8
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 87 deletions.
37 changes: 21 additions & 16 deletions bindings/ceylon/ceylon/task/task_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,29 +51,34 @@ async def run_tasks(self):
return
for task in self.tasks:
self.results[task.id] = []
sub_task = task.get_next_subtask()
if sub_task is None:
sub_tasks = task.get_next_subtasks()
if len(sub_tasks) == 0:
continue
subtask_name, subtask_ = sub_task
if subtask_.executor is None:
assigned_agent = await self.get_task_executor(subtask_)
subtask_ = task.update_subtask_executor(subtask_name, assigned_agent)
logger.debug(f"Assigned agent {subtask_.executor} to subtask {subtask_name}")
await self.broadcast_data(
TaskAssignment(task=subtask_, assigned_agent=subtask_.executor))
for sub_task in sub_tasks:
if sub_task is None:
continue
subtask_name, subtask_ = sub_task
logger.info(f"Assigning agent to subtask {subtask_name}")
if subtask_.executor is None:
assigned_agent = await self.get_task_executor(subtask_)
subtask_ = task.update_subtask_executor(subtask_name, assigned_agent)
await self.broadcast_data(
TaskAssignment(task=subtask_, assigned_agent=subtask_.executor))
logger.info(f"Assigned agent {subtask_.executor} to subtask {subtask_name}")

@on_message(type=SubTaskResult)
async def on_task_result(self, result: SubTaskResult):
logger.info(f"Received task result: {result}")
if result.status == TaskResultStatus.COMPLETED:
for idx, task in enumerate(self.tasks):
sub_task = task.get_next_subtask()
print(result.task_id, sub_task[1].id, result.task_id == sub_task[1].id)
if sub_task is None or result.task_id != sub_task[1].id:
continue
if result.task_id == sub_task[1].id:
task.update_subtask_status(sub_task[1].name, result.result)
break
sub_tasks = task.get_next_subtasks()
for sub_task in sub_tasks:
print(result.task_id, sub_task[1].id, result.task_id == sub_task[1].id)
if sub_task is None or result.task_id != sub_task[1].id:
continue
if result.task_id == sub_task[1].id:
task.update_subtask_status(sub_task[1].name, result.result)
break

# Task is completed
for task in self.tasks:
Expand Down
109 changes: 69 additions & 40 deletions bindings/ceylon/ceylon/task/task_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from loguru import logger
from pydantic import BaseModel
from pydantic import Field
import asyncio


class SubTask(BaseModel):
Expand Down Expand Up @@ -54,6 +53,15 @@ class TaskDeliverable(BaseModel):
def __str__(self):
return f"TaskDeliverable: {self.deliverable} - Key Features: {self.key_features} - Considerations: {self.considerations} - Objective: {self.objective}"

@staticmethod
def default(description: str) -> 'TaskDeliverable':
return TaskDeliverable(
objective="Complete the assigned task",
deliverable=description,
key_features=["Basic functionality"],
considerations=["Meet minimum requirements"]
)


class Task(BaseModel):
id: str = Field(default_factory=lambda: str(uuid4()))
Expand Down Expand Up @@ -94,12 +102,14 @@ def _create_dependency_graph(self) -> nx.DiGraph:
def validate_sub_tasks(self) -> bool:
subtask_names = set(self.subtasks.keys())

# Check if all dependencies are present
for subtask in self.subtasks.values():
if not subtask.depends_on.issubset(subtask_names):
missing_deps = subtask.depends_on - subtask_names
logger.info(f"Subtask '{subtask.name}' has missing dependencies: {missing_deps}")
return False

# Check for circular dependencies
try:
self._validate_dependencies()
except ValueError as e:
Expand All @@ -112,13 +122,20 @@ def get_execution_order(self) -> List[str]:
graph = self._create_dependency_graph()
return list(nx.topological_sort(graph))

def get_ready_subtasks(self) -> List[Tuple[str, SubTask]]:
ready_subtasks = []
# def get_next_subtask(self) -> Optional[Tuple[str, SubTask]]:
# for subtask_name in self.execution_order:
# subtask = self.subtasks[subtask_name]
# if all(self.subtasks[dep].completed for dep in subtask.depends_on):
# return (subtask_name, subtask)
# return None

def get_next_subtasks(self) -> List[Tuple[str, SubTask]]:
subtasks = []
for subtask_name in self.execution_order:
subtask = self.subtasks[subtask_name]
if not subtask.completed and all(self.subtasks[dep].completed for dep in subtask.depends_on):
ready_subtasks.append((subtask_name, subtask))
return ready_subtasks
if all(self.subtasks[dep].completed for dep in subtask.depends_on):
subtasks.append((subtask_name, subtask))
return subtasks

def update_subtask_status(self, subtask_name: str, result: str):
if subtask_name not in self.subtasks:
Expand All @@ -128,6 +145,9 @@ def update_subtask_status(self, subtask_name: str, result: str):
if result is not None:
subtask.complete(result)

if subtask_name in self.execution_order:
self.execution_order.remove(subtask_name)

def update_subtask_executor(self, subtask_name: str, executor: str) -> SubTask:
if subtask_name not in self.subtasks:
raise ValueError(f"Subtask {subtask_name} not found")
Expand Down Expand Up @@ -194,35 +214,46 @@ class TaskResult(BaseModel):
final_answer: str


async def execute_subtask(subtask_name: str, subtask: SubTask) -> str:
print(f"Executing: {subtask}")
# Simulate subtask execution with a delay
await asyncio.sleep(1)
return f"Success: {subtask_name}"


async def execute_task(task: Task) -> None:
while not task.is_completed():
ready_subtasks = task.get_ready_subtasks()
print(f"Ready subtasks: {ready_subtasks}")

if not ready_subtasks:
await asyncio.sleep(0.1)
continue

# Execute ready subtasks in parallel
subtask_coroutines = [execute_subtask(name, subtask) for name, subtask in ready_subtasks]
results = await asyncio.gather(*subtask_coroutines)

# Update task status
for (subtask_name, _), result in zip(ready_subtasks, results):
task.update_subtask_status(subtask_name, result)
print(f"Completed: {subtask_name}")

print("All subtasks completed successfully!")
if __name__ == "__main__":
def execute_task(task: Task) -> None:
execution_step = 0
while True:
# Get the next subtask
next_subtasks: List[tuple[str, SubTask]] = task.get_next_subtasks()
print(f"Step {execution_step}: {next_subtasks}")
if next_subtasks is None or len(next_subtasks) == 0:
break
for next_subtask in next_subtasks:
# If there are no more subtasks, break the loop
if next_subtask is None:
break

subtask_name, subtask = next_subtask
print(f"Executing: {subtask}")

# Here you would actually execute the subtask
# For this example, we'll simulate execution with a simple print statement
print(f"Simulating execution of {subtask_name}")

# Simulate a result (in a real scenario, this would be the outcome of the subtask execution)
result = "Success"

# Update the subtask status
task.update_subtask_status(subtask_name, result)

# Check if the entire task is completed
if task.is_completed():
print("All subtasks completed successfully!")
break

# Final check to see if all subtasks were completed
if task.is_completed():
print("Task execution completed successfully!")
else:
print("Task execution incomplete. Some subtasks may have failed.")
execution_step += 1


if __name__ == "__main__":
# Create a task with initial subtasks
web_app = Task.create_task("Build Web App", "Create a simple web application",
subtasks=[
Expand All @@ -233,11 +264,8 @@ async def execute_task(task: Task) -> None:
SubTask(name="testing", description="Perform unit and integration tests",
depends_on={"backend", "frontend"},
required_specialty="Knowledge about testing tools"),
SubTask(name="qa_test_cases", description="Perform unit and integration tests",
depends_on={"backend", "frontend"},
required_specialty="Knowledge about testing tools"),
SubTask(name="frontend", description="Develop the frontend UI",
depends_on={"setup", "backend"},
depends_on={"setup", "database"},
required_specialty="Knowledge about frontend tools"),
SubTask(name="backend", description="Develop the backend API",
depends_on={"setup", "database"},
Expand All @@ -249,23 +277,24 @@ async def execute_task(task: Task) -> None:
depends_on={"deployment"},
required_specialty="Knowledge about delivery tools"),
SubTask(name="qa", description="Perform quality assurance",
depends_on={"testing", "qa_test_cases"},
depends_on={"testing"},
required_specialty="Knowledge about testing tools")
])

# Execute the task
print("Execution order:", [web_app.subtasks[task_id].name for task_id in web_app.get_execution_order()])

if web_app.validate_sub_tasks():
print("Subtasks are valid")

print("\nExecuting task:")
asyncio.run(execute_task(task=web_app))
execute_task(task=web_app)

print("\nFinal task status:")
print(web_app)
else:
print("Subtasks are invalid")

# Serialization example
# Serialization example
print("\nSerialized Task:")
print(web_app.model_dump_json(indent=2))
57 changes: 26 additions & 31 deletions bindings/ceylon/tests/tasks/manage_tasks-agents.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from langchain_community.chat_models import ChatOllama

from ceylon.llm import LLMTaskAgent, LLMTaskManager
from ceylon.llm import LLMTaskCoordinator, LLMTaskOperator
from ceylon.task import Task, SubTask
from ceylon.task.task_operation import TaskDeliverable

# Example usage
if __name__ == "__main__":
Expand Down Expand Up @@ -32,68 +33,62 @@
required_specialty="Knowledge about testing tools")

])

web_app.task_deliverable = TaskDeliverable.default(web_app.description)
tasks = [
web_app
]

llm = ChatOllama(model="llama3.1:latest", temperature=0)
# Create specialized agents
agents = [
LLMTaskAgent(
LLMTaskOperator(
name="backend",
role="Backend Developer",
context="Knowledge about backend tools",
skills=["Python", "Java", "Node.js"], # Example skills
tools=["Django", "Spring Boot", "Express.js"], # Example tools
llm=llm
),

LLMTaskAgent(
LLMTaskOperator(
name="frontend",
role="Frontend Developer",
context="Knowledge about frontend tools",
skills=["HTML", "CSS", "JavaScript", "React"], # Example skills
tools=["React", "Angular", "Vue.js"], # Example tools
llm=llm
),

LLMTaskAgent(
LLMTaskOperator(
name="database",
role="Database Administrator",
context="Knowledge about database management tools",
skills=["SQL", "NoSQL", "Database Design"], # Example skills
tools=["MySQL", "MongoDB", "PostgreSQL"], # Example tools
llm=llm
),

LLMTaskAgent(
#
LLMTaskOperator(
name="deployment",
role="Deployment Manager",
context="Knowledge about deployment tools and CI tools",
skills=["CI/CD", "Docker", "Kubernetes"], # Example skills
tools=["Jenkins", "Docker", "Kubernetes"], # Example tools
llm=llm
),

LLMTaskAgent(
name="qa",
role="Quality Assurance Engineer",
context="Knowledge about testing tools",
skills=["Automated Testing", "Manual Testing", "Test Case Design"], # Example skills
tools=["Selenium", "JUnit", "TestNG"], # Example tools
llm=llm
),

LLMTaskAgent(
name="delivery",
role="Delivery Manager",
context="Knowledge about delivery tools",
skills=["Release Management", "Continuous Delivery"], # Example skills
tools=["Jira", "Confluence", "GitLab CI"], # Example tools
llm=llm
)
#
# LLMTaskOperator(
# name="qa",
# role="Quality Assurance Engineer",
# context="Knowledge about testing tools",
# skills=["Automated Testing", "Manual Testing", "Test Case Design"], # Example skills
# llm=llm
# ),
#
# LLMTaskOperator(
# name="delivery",
# role="Delivery Manager",
# context="Knowledge about delivery tools",
# skills=["Release Management", "Continuous Delivery"], # Example skills
# llm=llm
# )

]
task_manager = LLMTaskManager(tasks, agents, tool_llm=llm)
task_manager.run_admin(inputs=b"", workers=agents)
task_manager = LLMTaskCoordinator(tasks, agents, llm=llm)
task_manager.do()

0 comments on commit 273dbe8

Please sign in to comment.