Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: Enhance documentation for SingleThreadedAgentRuntime with usage examples and clarifications; undeprecate process_next #5230

Merged
merged 4 commits into from
Jan 28, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
else:
from ._queue import Queue, QueueShutDown # type: ignore

from typing_extensions import deprecated

from ._agent import Agent
from ._agent_id import AgentId
Expand Down Expand Up @@ -147,6 +146,103 @@ def _warn_if_none(value: Any, handler_name: str) -> None:


class SingleThreadedAgentRuntime(AgentRuntime):
"""A single-threaded agent runtime that processes all messages using a single asyncio queue.
Messages are delivered in the order they are received, and the runtime processes
each message in a separate asyncio task concurrently.

.. note::

This runtime is suitable for development and standalone applications.
It is not suitable for high-throughput or high-concurrency scenarios.

Args:
intervention_handlers (List[InterventionHandler], optional): A list of intervention
handlers that can intercept messages before they are sent or published. Defaults to None.
tracer_provider (TracerProvider, optional): The tracer provider to use for tracing. Defaults to None.

Examples:

A simple example of creating a runtime, registering an agent, sending a message and stopping the runtime:

.. code-block:: python

import asyncio
from dataclasses import dataclass

from autogen_core import AgentId, MessageContext, RoutedAgent, SingleThreadedAgentRuntime, message_handler


@dataclass
class MyMessage:
content: str


class MyAgent(RoutedAgent):
@message_handler
async def handle_my_message(self, message: MyMessage, ctx: MessageContext) -> None:
print(f"Received message: {message.content}")


async def main() -> None:
# Create a runtime and register the agent
runtime = SingleThreadedAgentRuntime()
await MyAgent.register(runtime, "my_agent", lambda: MyAgent("My agent"))

# Start the runtime, send a message and stop the runtime
runtime.start()
await runtime.send_message(MyMessage("Hello, world!"), recipient=AgentId("my_agent", "default"))
await runtime.stop()


asyncio.run(main())

An example of creating a runtime, registering an agent, publishing a message and stopping the runtime:

.. code-block:: python

import asyncio
from dataclasses import dataclass

from autogen_core import (
DefaultTopicId,
MessageContext,
RoutedAgent,
SingleThreadedAgentRuntime,
default_subscription,
message_handler,
)


@dataclass
class MyMessage:
content: str


# The agent is subscribed to the default topic.
@default_subscription
class MyAgent(RoutedAgent):
@message_handler
async def handle_my_message(self, message: MyMessage, ctx: MessageContext) -> None:
print(f"Received message: {message.content}")


async def main() -> None:
# Create a runtime and register the agent
runtime = SingleThreadedAgentRuntime()
await MyAgent.register(runtime, "my_agent", lambda: MyAgent("My agent"))

# Start the runtime.
runtime.start()
# Publish a message to the default topic that the agent is subscribed to.
await runtime.publish_message(MyMessage("Hello, world!"), DefaultTopicId())
# Wait for the message to be processed and then stop the runtime.
await runtime.stop_when_idle()


asyncio.run(main())

"""

def __init__(
self,
*,
Expand Down Expand Up @@ -455,8 +551,8 @@ async def _process_response(self, message_envelope: ResponseMessageEnvelope) ->
message_envelope.future.set_result(message_envelope.message)
self._message_queue.task_done()

@deprecated("Manually stepping the runtime processing is deprecated. Use start() instead.")
async def process_next(self) -> None:
"""Process the next message in the queue."""
await self._process_next()

async def _process_next(self) -> None:
Expand Down
Loading