Skip to content

Commit

Permalink
Renaming things (#3)
Browse files Browse the repository at this point in the history
* Rename event to message
* rename SendMessage etc to SendMessageEnvelope etc
  • Loading branch information
ekzhu authored May 17, 2024
1 parent ebee8b8 commit 5f7ef08
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 45 deletions.
20 changes: 10 additions & 10 deletions examples/futures.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
from dataclasses import dataclass

from agnext.agent_components.type_routed_agent import TypeRoutedAgent, event_handler
from agnext.agent_components.type_routed_agent import TypeRoutedAgent, message_handler
from agnext.application_components.single_threaded_agent_runtime import SingleThreadedAgentRuntime
from agnext.core.agent import Agent
from agnext.core.agent_runtime import AgentRuntime
Expand All @@ -10,37 +10,37 @@

@dataclass
class MessageType(Message):
message: str
body: str
sender: str


class Inner(TypeRoutedAgent[MessageType]):
def __init__(self, name: str, router: AgentRuntime[MessageType]) -> None:
super().__init__(name, router)

@event_handler(MessageType)
async def on_new_event(self, event: MessageType) -> MessageType:
return MessageType(message=f"Inner: {event.message}", sender=self.name)
@message_handler(MessageType)
async def on_new_message(self, message: MessageType) -> MessageType:
return MessageType(body=f"Inner: {message.body}", sender=self.name)


class Outer(TypeRoutedAgent[MessageType]):
def __init__(self, name: str, router: AgentRuntime[MessageType], inner: Agent[MessageType]) -> None:
super().__init__(name, router)
self._inner = inner

@event_handler(MessageType)
async def on_new_event(self, event: MessageType) -> MessageType:
inner_response = self._send_message(event, self._inner)
@message_handler(MessageType)
async def on_new_message(self, message: MessageType) -> MessageType:
inner_response = self._send_message(message, self._inner)
inner_message = await inner_response
return MessageType(message=f"Outer: {inner_message.message}", sender=self.name)
return MessageType(body=f"Outer: {inner_message.body}", sender=self.name)


async def main() -> None:
router = SingleThreadedAgentRuntime[MessageType]()

inner = Inner("inner", router)
outer = Outer("outer", router, inner)
response = router.send_message(MessageType(message="Hello", sender="external"), outer)
response = router.send_message(MessageType(body="Hello", sender="external"), outer)

while not response.done():
await router.process_next()
Expand Down
12 changes: 6 additions & 6 deletions src/agnext/agent_components/type_routed_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@


# NOTE: this works on concrete types and not inheritance
def event_handler(target_type: Type[T]) -> Callable[[Callable[..., Awaitable[T]]], Callable[..., Awaitable[T]]]:
def message_handler(target_type: Type[T]) -> Callable[[Callable[..., Awaitable[T]]], Callable[..., Awaitable[T]]]:
def decorator(func: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
func._target_type = target_type # type: ignore
return func
Expand All @@ -37,12 +37,12 @@ def __init__(self, name: str, router: AgentRuntime[T]) -> None:
def subscriptions(self) -> Sequence[Type[T]]:
return list(self._handlers.keys())

async def on_event(self, event: T) -> T:
handler = self._handlers.get(type(event))
async def on_message(self, message: T) -> T:
handler = self._handlers.get(type(message))
if handler is not None:
return await handler(event)
return await handler(message)
else:
return await self.on_unhandled_event(event)
return await self.on_unhandled_message(message)

async def on_unhandled_event(self, event: T) -> T:
async def on_unhandled_message(self, message: T) -> T:
raise CantHandleException()
63 changes: 36 additions & 27 deletions src/agnext/application_components/single_threaded_agent_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,79 +11,88 @@


@dataclass
class BroadcastMessage(Generic[T]):
class BroadcastMessageEnvolope(Generic[T]):
"""A message envelope for broadcasting messages to all agents that can handle
the message of the type T."""

message: T
future: Future[List[T]]


@dataclass
class SendMessage(Generic[T]):
class SendMessageEnvelope(Generic[T]):
"""A message envelope for sending a message to a specific agent that can handle
the message of the type T."""

message: T
destination: Agent[T]
future: Future[T]


@dataclass
class ResponseMessage(Generic[T]): ...
class ResponseMessageEnvelope(Generic[T]):
"""A message envelope for sending a response to a message."""

...


class SingleThreadedAgentRuntime(AgentRuntime[T]):
def __init__(self) -> None:
self._event_queue: List[BroadcastMessage[T] | SendMessage[T]] = []
self._message_queue: List[BroadcastMessageEnvolope[T] | SendMessageEnvelope[T]] = []
self._per_type_subscribers: Dict[Type[T], List[Agent[T]]] = {}
self._agents: Set[Agent[T]] = set()

def add_agent(self, agent: Agent[T]) -> None:
for event_type in agent.subscriptions:
if event_type not in self._per_type_subscribers:
self._per_type_subscribers[event_type] = []
self._per_type_subscribers[event_type].append(agent)
for message_type in agent.subscriptions:
if message_type not in self._per_type_subscribers:
self._per_type_subscribers[message_type] = []
self._per_type_subscribers[message_type].append(agent)
self._agents.add(agent)

# Returns the response of the message
def send_message(self, message: T, destination: Agent[T]) -> Future[T]:
loop = asyncio.get_event_loop()
future: Future[T] = loop.create_future()

self._event_queue.append(SendMessage(message, destination, future))
self._message_queue.append(SendMessageEnvelope(message, destination, future))

return future

# Returns the response of all handling agents
def broadcast_message(self, message: T) -> Future[List[T]]:
future: Future[List[T]] = asyncio.get_event_loop().create_future()
self._event_queue.append(BroadcastMessage(message, future))
self._message_queue.append(BroadcastMessageEnvolope(message, future))
return future

async def _process_send(self, message: SendMessage[T]) -> None:
recipient = message.destination
async def _process_send(self, message_envelope: SendMessageEnvelope[T]) -> None:
recipient = message_envelope.destination
if recipient not in self._agents:
message.future.set_exception(Exception("Recipient not found"))
message_envelope.future.set_exception(Exception("Recipient not found"))
return

response = await recipient.on_event(message.message)
message.future.set_result(response)
response = await recipient.on_message(message_envelope.message)
message_envelope.future.set_result(response)

async def _process_broadcast(self, message: BroadcastMessage[T]) -> None:
async def _process_broadcast(self, message_envelope: BroadcastMessageEnvolope[T]) -> None:
responses: List[T] = []
for agent in self._per_type_subscribers.get(type(message.message), []):
response = await agent.on_event(message.message)
for agent in self._per_type_subscribers.get(type(message_envelope.message), []):
response = await agent.on_message(message_envelope.message)
responses.append(response)
message.future.set_result(responses)
message_envelope.future.set_result(responses)

async def process_next(self) -> None:
if len(self._event_queue) == 0:
if len(self._message_queue) == 0:
# Yield control to the event loop to allow other tasks to run
await asyncio.sleep(0)
return

event = self._event_queue.pop(0)
message_envelope = self._message_queue.pop(0)

match event:
case SendMessage(message, destination, future):
asyncio.create_task(self._process_send(SendMessage(message, destination, future)))
case BroadcastMessage(message, future):
asyncio.create_task(self._process_broadcast(BroadcastMessage(message, future)))
match message_envelope:
case SendMessageEnvelope(message, destination, future):
asyncio.create_task(self._process_send(SendMessageEnvelope(message, destination, future)))
case BroadcastMessageEnvolope(message, future):
asyncio.create_task(self._process_broadcast(BroadcastMessageEnvolope(message, future)))

# Yield control to the event loop to allow other tasks to run
# Yield control to the message loop to allow other tasks to run
await asyncio.sleep(0)
2 changes: 1 addition & 1 deletion src/agnext/core/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ def name(self) -> str: ...
@property
def subscriptions(self) -> Sequence[Type[T]]: ...

async def on_event(self, event: T) -> T: ...
async def on_message(self, message: T) -> T: ...
2 changes: 1 addition & 1 deletion src/agnext/core/base_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def subscriptions(self) -> Sequence[Type[T]]:
return []

@abstractmethod
async def on_event(self, event: T) -> T: ...
async def on_message(self, message: T) -> T: ...

def _send_message(self, message: T, destination: Agent[T]) -> Future[T]:
return self._router.send_message(message, destination)
Expand Down

0 comments on commit 5f7ef08

Please sign in to comment.