Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CAP] User supplied threads for agents #2812

Merged
merged 7 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 34 additions & 15 deletions samples/apps/cap/py/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## I just want to run the remote AutoGen agents!
*Python Instructions (Windows, Linux, MacOS):*

pip install autogencap
pip install autogencap-rajan.jedi

1) AutoGen require OAI_CONFIG_LIST.
AutoGen python requirements: 3.8 <= python <= 3.11
Expand All @@ -14,26 +14,45 @@ pip install autogencap
AutoGen is about Agents and Agent Orchestration. CAP extends AutoGen to allows Agents to communicate via a message bus. CAP, therefore, deals with the space between these components. CAP is a message based, actor platform that allows actors to be composed into arbitrary graphs.

Actors can register themselves with CAP, find other agents, construct arbitrary graphs, send and receive messages independently and many, many, many other things.

```python
# CAP Platform
network = LocalActorNetwork()
# Register an agent
network.register(GreeterAgent())
# Tell agents to connect to other agents
network.connect()
# Get a channel to the agent
greeter_link = network.lookup_agent("Greeter")
# Send a message to the agent
greeter_link.send_txt_msg("Hello World!")
# Cleanup
greeter_link.close()
network.disconnect()
# CAP Library
from autogencap.ComponentEnsemble import ComponentEnsemble
from autogencap.Actor import Actor

# A simple Agent
class GreeterAgent(Actor):
def __init__(self):
super().__init__(
agent_name="Greeter",
description="This is the greeter agent, who knows how to greet people.")

# Prints out the message it receives
def on_txt_msg(self, msg):
print(f"Greeter received: {msg}")
return True

ensemble = ComponentEnsemble()
# Create an agent
agent = GreeterAgent()
# Register an agent
ensemble.register(agent) # start message processing
# call on_connect() on all Agents
ensemble.connect()
# Get a channel to the agent
greeter_link = ensemble.find_by_name("Greeter")
#Send a message to the agent
greeter_link.send_txt_msg("Hello World!")
# Cleanup
greeter_link.close()
ensemble.disconnect()
```

### Check out other demos in the `py/demo` directory. We show the following: ###
1) Hello World shown above
2) Many CAP Actors interacting with each other
3) A pair of interacting AutoGen Agents wrapped in CAP Actors
4) CAP wrapped AutoGen Agents in a group chat
5) Two AutoGen Agents running in different processes and communicating through CAP
6) List all registered agents in CAP
7) AutoGen integration to list all registered agents
7) Run Agent in user supplied message loop
89 changes: 53 additions & 36 deletions samples/apps/cap/py/autogencap/Actor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import threading
import time
import traceback

import zmq
Expand All @@ -9,11 +8,12 @@


class Actor:
def __init__(self, agent_name: str, description: str):
def __init__(self, agent_name: str, description: str, start_thread: bool = True):
self.actor_name: str = agent_name
self.agent_description: str = description
self.run = False
self._start_event = threading.Event()
self._start_thread = start_thread

def on_connect(self, network):
Debug(self.actor_name, f"is connecting to {network}")
Expand All @@ -27,37 +27,50 @@ def on_bin_msg(self, msg: bytes, msg_type: str, receiver: str, sender: str) -> b
Info(self.actor_name, f"Msg: receiver=[{receiver}], msg_type=[{msg_type}]")
return True

def _recv_thread(self):
def _msg_loop_init(self):
Debug(self.actor_name, "recv thread started")
self._socket: zmq.Socket = self._context.socket(zmq.SUB)
self._socket.setsockopt(zmq.RCVTIMEO, 500)
self._socket.connect(xpub_url)
str_topic = f"{self.actor_name}"
Debug(self.actor_name, f"subscribe to: {str_topic}")
self._socket.setsockopt_string(zmq.SUBSCRIBE, f"{str_topic}")
self._start_event.set()

def get_message(self):
try:
Debug(self.actor_name, "recv thread started")
self._socket: zmq.Socket = self._context.socket(zmq.SUB)
self._socket.setsockopt(zmq.RCVTIMEO, 500)
self._socket.connect(xpub_url)
str_topic = f"{self.actor_name}"
Debug(self.actor_name, f"subscribe to: {str_topic}")
self._socket.setsockopt_string(zmq.SUBSCRIBE, f"{str_topic}")
self._start_event.set()
topic, msg_type, sender_topic, msg = self._socket.recv_multipart()
topic = topic.decode("utf-8") # Convert bytes to string
msg_type = msg_type.decode("utf-8") # Convert bytes to string
sender_topic = sender_topic.decode("utf-8") # Convert bytes to string
except zmq.Again:
return None # No message received, continue to next iteration
except Exception as e:
Error(self.actor_name, f"recv thread encountered an error: {e}")
traceback.print_exc()
return None
return topic, msg_type, sender_topic, msg

def dispatch_message(self, message):
if message is None:
return
topic, msg_type, sender_topic, msg = message
if msg_type == "text":
msg = msg.decode("utf-8") # Convert bytes to string
if not self.on_txt_msg(msg, msg_type, topic, sender_topic):
msg = "quit"
if msg.lower() == "quit":
self.run = False
else:
if not self.on_bin_msg(msg, msg_type, topic, sender_topic):
self.run = False

def _msg_loop(self):
try:
self._msg_loop_init()
while self.run:
try:
topic, msg_type, sender_topic, msg = self._socket.recv_multipart()
topic = topic.decode("utf-8") # Convert bytes to string
msg_type = msg_type.decode("utf-8") # Convert bytes to string
sender_topic = sender_topic.decode("utf-8") # Convert bytes to string
except zmq.Again:
continue # No message received, continue to next iteration
except Exception as e:
Error(self.actor_name, f"recv thread encountered an error: {e}")
traceback.print_exc()
continue
if msg_type == "text":
msg = msg.decode("utf-8") # Convert bytes to string
if not self.on_txt_msg(msg, msg_type, topic, sender_topic):
msg = "quit"
if msg.lower() == "quit":
break
else:
if not self.on_bin_msg(msg, msg_type, topic, sender_topic):
break
message = self.get_message()
self.dispatch_message(message)
except Exception as e:
Debug(self.actor_name, f"recv thread encountered an error: {e}")
traceback.print_exc()
Expand All @@ -68,12 +81,15 @@ def _recv_thread(self):
self._start_event.set()
Debug(self.actor_name, "recv thread ended")

def start(self, context: zmq.Context):
def on_start(self, context: zmq.Context):
self._context = context
self.run: bool = True
self._thread = threading.Thread(target=self._recv_thread)
self._thread.start()
self._start_event.wait()
if self._start_thread:
self._thread = threading.Thread(target=self._msg_loop)
self._thread.start()
self._start_event.wait()
else:
self._msg_loop_init()

def disconnect_network(self, network):
Debug(self.actor_name, f"is disconnecting from {network}")
Expand All @@ -82,6 +98,7 @@ def disconnect_network(self, network):

def stop(self):
self.run = False
self._thread.join()
if self._start_thread:
self._thread.join()
self._socket.setsockopt(zmq.LINGER, 0)
self._socket.close()
4 changes: 1 addition & 3 deletions samples/apps/cap/py/autogencap/ComponentEnsemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
from .DirectorySvc import DirectorySvc
from .proto.CAP_pb2 import ActorInfo, ActorInfoCollection

# TODO: remove time import


class ComponentEnsemble:
def __init__(self, name: str = "Local Actor Network", start_broker: bool = True):
Expand Down Expand Up @@ -43,7 +41,7 @@ def register(self, actor: Actor):
# that we can look up the actor by name
self._directory_svc.register_actor_by_name(actor.actor_name)
self.local_actors[actor.actor_name] = actor
actor.start(self._context)
actor.on_start(self._context)
Debug("Local_Actor_Network", f"{actor.actor_name} registered in the network.")

def connect(self):
Expand Down
2 changes: 1 addition & 1 deletion samples/apps/cap/py/autogencap/DirectorySvc.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def start(self):
self._directory_connector = ActorConnector(self._context, Directory_Svc_Topic)
if self._no_other_directory():
self._directory_actor = DirectoryActor(Directory_Svc_Topic, "Directory Service")
self._directory_actor.start(self._context)
self._directory_actor.on_start(self._context)
Info("DirectorySvc", "Directory service started.")
else:
Info("DirectorySvc", "Another directory service is running. This instance will not start.")
Expand Down
4 changes: 2 additions & 2 deletions samples/apps/cap/py/autogencap/ag_adapter/AGActor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@


class AGActor(Actor):
def start(self, context: zmq.Context):
super().start(context)
def on_start(self, context: zmq.Context):
super().on_start(context)
str_topic = Termination_Topic
Debug(self.actor_name, f"subscribe to: {str_topic}")
self._socket.setsockopt_string(zmq.SUBSCRIBE, f"{str_topic}")
4 changes: 4 additions & 0 deletions samples/apps/cap/py/demo/App.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from list_agents import list_agents
from RemoteAGDemo import remote_ag_demo
from SimpleActorDemo import simple_actor_demo
from single_threaded import single_threaded_demo

####################################################################################################

Expand Down Expand Up @@ -46,6 +47,7 @@ def main():
print("4. AutoGen GroupChat")
print("5. AutoGen Agents in different processes")
print("6. List Actors in CAP (Registry)")
print("7. Agent loop in main thread (no background thread for Agent)")
choice = input("Enter your choice (1-6): ")

if choice == "1":
Expand All @@ -64,6 +66,8 @@ def main():
remote_ag_demo()
elif choice == "6":
list_agents()
elif choice == "7":
single_threaded_demo()
else:
print("Quitting...")
break
Expand Down
3 changes: 2 additions & 1 deletion samples/apps/cap/py/demo/AppAgents.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ class GreeterAgent(Actor):

def __init__(
self,
start_thread=True,
agent_name="Greeter",
description="This is the greeter agent, who knows how to greet people.",
):
super().__init__(agent_name, description)
super().__init__(agent_name, description, start_thread=start_thread)


class FidelityAgent(Actor):
Expand Down
13 changes: 1 addition & 12 deletions samples/apps/cap/py/demo/SimpleActorDemo.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import time

from AppAgents import GreeterAgent
from autogencap.ComponentEnsemble import ComponentEnsemble
from autogencap.DebugLog import Error
from autogencap.proto.CAP_pb2 import Ping


def simple_actor_demo():
Expand All @@ -17,12 +13,5 @@ def simple_actor_demo():
ensemble.register(agent)
ensemble.connect()
greeter_link = ensemble.find_by_name("Greeter")
ensemble.disconnect()

ping = Ping()
# Serialize and send the message
msg_type_str = Ping.__name__
msg_bytes = ping.SerializeToString()
greeter_link.send_txt_msg("Hello World!")
greeter_link.send_bin_msg(msg_type_str, msg_bytes)
_, resp_type, resp_msg_bytes = greeter_link.send_recv_msg(msg_type_str, msg_bytes)
ensemble.disconnect()
39 changes: 39 additions & 0 deletions samples/apps/cap/py/demo/single_threaded.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import _paths
from AppAgents import GreeterAgent
from autogencap.ComponentEnsemble import ComponentEnsemble
from autogencap.DebugLog import Error
from autogencap.proto.CAP_pb2 import Ping


def single_threaded_demo():
"""
Demonstrates the usage of the CAP platform by registering an actor, connecting to the actor,
sending a message, and performing cleanup operations.
"""
# CAP Platform
ensemble = ComponentEnsemble()
agent = GreeterAgent(start_thread=False)
ensemble.register(agent)
ensemble.connect()
greeter_link = ensemble.find_by_name("Greeter")
greeter_link.send_txt_msg("Hello World!")

no_msg = 0
while no_msg < 5:
message = agent.get_message()
agent.dispatch_message(message)
if message is None:
no_msg += 1

message = agent.get_message()
agent.dispatch_message(message)

ensemble.disconnect()


def main():
single_threaded_demo()


if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion samples/apps/cap/py/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "autogencap_rajan.jedi"
version = "0.0.9"
version = "0.0.10"
authors = [
{ name="Rajan Chari", email="rajan.jedi@gmail.com" },
]
Expand Down
Loading