Skip to content

Commit

Permalink
docs: Enhance documentation for SingleThreadedAgentRuntime with usage…
Browse files Browse the repository at this point in the history
… examples and clarifications; undeprecate process_next (#5230)

Resolves #5046
  • Loading branch information
ekzhu authored Jan 28, 2025
1 parent b29d0bd commit 10996bc
Showing 1 changed file with 98 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
else:
from ._queue import Queue, QueueShutDown # type: ignore

from typing_extensions import deprecated

from ._agent import Agent
from ._agent_id import AgentId
Expand Down Expand Up @@ -147,6 +146,103 @@ def _warn_if_none(value: Any, handler_name: str) -> None:


class SingleThreadedAgentRuntime(AgentRuntime):
"""A single-threaded agent runtime that processes all messages using a single asyncio queue.
Messages are delivered in the order they are received, and the runtime processes
each message in a separate asyncio task concurrently.
.. note::
This runtime is suitable for development and standalone applications.
It is not suitable for high-throughput or high-concurrency scenarios.
Args:
intervention_handlers (List[InterventionHandler], optional): A list of intervention
handlers that can intercept messages before they are sent or published. Defaults to None.
tracer_provider (TracerProvider, optional): The tracer provider to use for tracing. Defaults to None.
Examples:
A simple example of creating a runtime, registering an agent, sending a message and stopping the runtime:
.. code-block:: python
import asyncio
from dataclasses import dataclass
from autogen_core import AgentId, MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handler
@dataclass
class MyMessage:
content: str
class MyAgent(RoutedAgent):
@message_handler
async def handle_my_message(self, message: MyMessage, ctx: MessageContext) -> None:
print(f"Received message: {message.content}")
async def main() -> None:
# Create a runtime and register the agent
runtime = SingleThreadedAgentRuntime()
await MyAgent.register(runtime, "my_agent", lambda: MyAgent("My agent"))
# Start the runtime, send a message and stop the runtime
runtime.start()
await runtime.send_message(MyMessage("Hello, world!"), recipient=AgentId("my_agent", "default"))
await runtime.stop()
asyncio.run(main())
An example of creating a runtime, registering an agent, publishing a message and stopping the runtime:
.. code-block:: python
import asyncio
from dataclasses import dataclass
from autogen_core import (
DefaultTopicId,
MessageContext,
RoutedAgent,
SingleThreadedAgentRuntime,
default_subscription,
message_handler,
)
@dataclass
class MyMessage:
content: str
# The agent is subscribed to the default topic.
@default_subscription
class MyAgent(RoutedAgent):
@message_handler
async def handle_my_message(self, message: MyMessage, ctx: MessageContext) -> None:
print(f"Received message: {message.content}")
async def main() -> None:
# Create a runtime and register the agent
runtime = SingleThreadedAgentRuntime()
await MyAgent.register(runtime, "my_agent", lambda: MyAgent("My agent"))
# Start the runtime.
runtime.start()
# Publish a message to the default topic that the agent is subscribed to.
await runtime.publish_message(MyMessage("Hello, world!"), DefaultTopicId())
# Wait for the message to be processed and then stop the runtime.
await runtime.stop_when_idle()
asyncio.run(main())
"""

def __init__(
self,
*,
Expand Down Expand Up @@ -455,8 +551,8 @@ async def _process_response(self, message_envelope: ResponseMessageEnvelope) ->
message_envelope.future.set_result(message_envelope.message)
self._message_queue.task_done()

@deprecated("Manually stepping the runtime processing is deprecated. Use start() instead.")
async def process_next(self) -> None:
"""Process the next message in the queue."""
await self._process_next()

async def _process_next(self) -> None:
Expand Down

0 comments on commit 10996bc

Please sign in to comment.