Skip to content

Commit

Permalink
[CAP] Improved AutoGen Agents support & Pip Install (#2711)
Browse files Browse the repository at this point in the history
* 1) Removed most framework sleeps 2) refactored connection code

* pre-commit fixes

* pre-commit

* ignore protobuf files in pre-commit checks

* Fix duplicate actor registration

* refactor change

* Nicer printing of Actors

* 1) Report recv_multipart errors 4) Always send 4 parts

* AutoGen generate_reply expects to wait indefinitely for an answer.  CAP can wait a certain amount and give up.   In order to reconcile the two, AutoGenConnector is set to wait indefinitely.

* pre-commit formatting fixes

* pre-commit format changes

* don't check autogenerated proto py files

* Iterating on CAP interface for AutoGen

* User proxy must initiate chat

* autogencap pypi package

* added dependencies

* serialize/deserialize dictionary elements to json when dealing with ReceiveReq

* 1) Removed most framework sleeps 2) refactored connection code

* Nicer printing of Actors

* AutoGen generate_reply expects to wait indefinitely for an answer.  CAP can wait a certain amount and give up.   In order to reconcile the two, AutoGenConnector is set to wait indefinitely.

* pre-commit formatting fixes

* pre-commit format changes

* Iterating on CAP interface for AutoGen

* User proxy must initiate chat

* autogencap pypi package

* added dependencies

* serialize/deserialize dictionary elements to json when dealing with ReceiveReq

* pre-commit check fixes

* fix pre-commit issues

* Better encapsulation of logging

* pre-commit fix

* pip package update
  • Loading branch information
rajan-chari authored May 19, 2024
1 parent 11d9336 commit 31d2d37
Show file tree
Hide file tree
Showing 12 changed files with 235 additions and 52 deletions.
21 changes: 12 additions & 9 deletions samples/apps/cap/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# Composable Actor Platform (CAP) for AutoGen

## I just want to run the demo!
## I just want to run the remote AutoGen agents!
*Python Instructions (Windows, Linux, MacOS):*

0) cd py
1) pip install -r autogencap/requirements.txt
2) python ./demo/App.py
3) Choose (5) and follow instructions to run standalone Agents
4) Choose other options for other demos

*Demo Notes:*
1) Options involving AutoGen require OAI_CONFIG_LIST.
Expand All @@ -15,14 +17,15 @@

*Demo Reference:*
```
Select the Composable Actor Platform (CAP) demo app to run:
(enter anything else to quit)
1. Hello World Actor
2. Complex Actor Graph
3. AutoGen Pair
4. AutoGen GroupChat
5. AutoGen Agents in different processes
Enter your choice (1-5):
Select the Composable Actor Platform (CAP) demo app to run:
(enter anything else to quit)
1. Hello World
2. Complex Agent (e.g. Name or Quit)
3. AutoGen Pair
4. AutoGen GroupChat
5. AutoGen Agents in different processes
6. List Actors in CAP (Registry)
Enter your choice (1-6):
```

## What is Composable Actor Platform (CAP)?
Expand Down
39 changes: 39 additions & 0 deletions samples/apps/cap/py/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Composable Actor Platform (CAP) for AutoGen

## I just want to run the remote AutoGen agents!
*Python Instructions (Windows, Linux, MacOS):*

pip install autogencap

1) AutoGen require OAI_CONFIG_LIST.
AutoGen python requirements: 3.8 <= python <= 3.11

```
## What is Composable Actor Platform (CAP)?
AutoGen is about Agents and Agent Orchestration. CAP extends AutoGen to allows Agents to communicate via a message bus. CAP, therefore, deals with the space between these components. CAP is a message based, actor platform that allows actors to be composed into arbitrary graphs.
Actors can register themselves with CAP, find other agents, construct arbitrary graphs, send and receive messages independently and many, many, many other things.
```python
# CAP Platform
network = LocalActorNetwork()
# Register an agent
network.register(GreeterAgent())
# Tell agents to connect to other agents
network.connect()
# Get a channel to the agent
greeter_link = network.lookup_agent("Greeter")
# Send a message to the agent
greeter_link.send_txt_msg("Hello World!")
# Cleanup
greeter_link.close()
network.disconnect()
```
### Check out other demos in the `py/demo` directory. We show the following: ###
1) Hello World shown above
2) Many CAP Actors interacting with each other
3) A pair of interacting AutoGen Agents wrapped in CAP Actors
4) CAP wrapped AutoGen Agents in a group chat
5) Two AutoGen Agents running in different processes and communicating through CAP
6) List all registered agents in CAP
7) AutoGen integration to list all registered agents
24 changes: 14 additions & 10 deletions samples/apps/cap/py/autogencap/ActorConnector.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ def _connect_pub_socket(self):
evt: Dict[str, Any] = {}
mon_evt = recv_monitor_message(monitor)
evt.update(mon_evt)
if evt["event"] == zmq.EVENT_MONITOR_STOPPED or evt["event"] == zmq.EVENT_HANDSHAKE_SUCCEEDED:
Debug("ActorSender", "Handshake received (Or Monitor stopped)")
if evt["event"] == zmq.EVENT_HANDSHAKE_SUCCEEDED:
Debug("ActorSender", "Handshake received")
break
elif evt["event"] == zmq.EVENT_MONITOR_STOPPED:
Debug("ActorSender", "Monitor stopped")
break
self._pub_socket.disable_monitor()
monitor.close()
Expand Down Expand Up @@ -117,32 +120,33 @@ def send_txt_msg(self, msg):
def send_bin_msg(self, msg_type: str, msg):
self._sender.send_bin_msg(msg_type, msg)

def binary_request(self, msg_type: str, msg, retry=5):
def binary_request(self, msg_type: str, msg, num_attempts=5):
original_timeout: int = 0
if retry == -1:
if num_attempts == -1:
original_timeout = self._resp_socket.getsockopt(zmq.RCVTIMEO)
self._resp_socket.setsockopt(zmq.RCVTIMEO, 1000)

try:
self._sender.send_bin_request_msg(msg_type, msg, self._resp_topic)
while retry == -1 or retry > 0:
while num_attempts == -1 or num_attempts > 0:
try:
topic, resp_msg_type, _, resp = self._resp_socket.recv_multipart()
return topic, resp_msg_type, resp
except zmq.Again:
Debug(
"ActorConnector", f"{self._topic}: No response received. retry_count={retry}, max_retry={retry}"
"ActorConnector",
f"{self._topic}: No response received. retry_count={num_attempts}, max_retry={num_attempts}",
)
time.sleep(0.01)
if retry != -1:
retry -= 1
if num_attempts != -1:
num_attempts -= 1
finally:
if retry == -1:
if num_attempts == -1:
self._resp_socket.setsockopt(zmq.RCVTIMEO, original_timeout)

Error("ActorConnector", f"{self._topic}: No response received. Giving up.")
return None, None, None

def close(self):
self._sender.close()
self._pub_socket.close()
self._resp_socket.close()
64 changes: 40 additions & 24 deletions samples/apps/cap/py/autogencap/DebugLog.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,42 +15,58 @@
LEVEL_NAMES = ["DBG", "INF", "WRN", "ERR"]
LEVEL_COLOR = ["dark_grey", "green", "yellow", "red"]

console_lock = threading.Lock()


def Log(level, context, msg):
# Check if the current level meets the threshold
if level >= Config.LOG_LEVEL: # Use the LOG_LEVEL from the Config module
# Check if the context is in the list of ignored contexts
if context in Config.IGNORED_LOG_CONTEXTS:
return
with console_lock:
timestamp = colored(datetime.datetime.now().strftime("%m/%d/%y %H:%M:%S"), "dark_grey")
# Translate level number to name and color
level_name = colored(LEVEL_NAMES[level], LEVEL_COLOR[level])
# Left justify the context and color it blue
context = colored(context.ljust(14), "blue")
# Left justify the threadid and color it blue
thread_id = colored(str(threading.get_ident()).ljust(5), "blue")
# color the msg based on the level
msg = colored(msg, LEVEL_COLOR[level])
print(f"{thread_id} {timestamp} {level_name}: [{context}] {msg}")

class BaseLogger:
def __init__(self):
self._lock = threading.Lock()

def Log(self, level, context, msg):
# Check if the current level meets the threshold
if level >= Config.LOG_LEVEL: # Use the LOG_LEVEL from the Config module
# Check if the context is in the list of ignored contexts
if context in Config.IGNORED_LOG_CONTEXTS:
return
with self._lock:
self.WriteLog(level, context, msg)

def WriteLog(self, level, context, msg):
raise NotImplementedError("Subclasses must implement this method")


class ConsoleLogger(BaseLogger):
def __init__(self):
super().__init__()

def WriteLog(self, level, context, msg):
timestamp = colored(datetime.datetime.now().strftime("%m/%d/%y %H:%M:%S"), "pink")
# Translate level number to name and color
level_name = colored(LEVEL_NAMES[level], LEVEL_COLOR[level])
# Left justify the context and color it blue
context = colored(context.ljust(14), "blue")
# Left justify the threadid and color it blue
thread_id = colored(str(threading.get_ident()).ljust(5), "blue")
# color the msg based on the level
msg = colored(msg, LEVEL_COLOR[level])
print(f"{thread_id} {timestamp} {level_name}: [{context}] {msg}")


LOGGER = ConsoleLogger()


def Debug(context, message):
Log(DEBUG, context, message)
LOGGER.Log(DEBUG, context, message)


def Info(context, message):
Log(INFO, context, message)
LOGGER.Log(INFO, context, message)


def Warn(context, message):
Log(WARN, context, message)
LOGGER.Log(WARN, context, message)


def Error(context, message):
Log(ERROR, context, message)
LOGGER.Log(ERROR, context, message)


def shorten(msg, num_parts=5, max_len=100):
Expand Down
6 changes: 4 additions & 2 deletions samples/apps/cap/py/autogencap/ag_adapter/AutoGenConnector.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
from typing import Dict, Optional, Union

from autogen import Agent
Expand Down Expand Up @@ -37,7 +38,7 @@ def send_gen_reply_req(self):
# Setting retry to -1 to keep trying until a response is received
# This normal AutoGen behavior but does not handle the case when an AutoGen agent
# is not running. In that case, the connector will keep trying indefinitely.
_, _, resp = self._can_channel.binary_request(type(msg).__name__, serialized_msg, retry=-1)
_, _, resp = self._can_channel.binary_request(type(msg).__name__, serialized_msg, num_attempts=-1)
gen_reply_resp = GenReplyResp()
gen_reply_resp.ParseFromString(resp)
return gen_reply_resp.data
Expand All @@ -55,7 +56,8 @@ def send_receive_req(
msg = ReceiveReq()
if isinstance(message, dict):
for key, value in message.items():
msg.data_map.data[key] = value
json_serialized_value = json.dumps(value)
msg.data_map.data[key] = json_serialized_value
elif isinstance(message, str):
msg.data = message
msg.sender = sender.name
Expand Down
7 changes: 6 additions & 1 deletion samples/apps/cap/py/autogencap/ag_adapter/CAP2AG.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
from enum import Enum
from typing import Optional

Expand Down Expand Up @@ -72,7 +73,11 @@ def _call_agent_receive(self, receive_params: ReceiveReq):
save_name = self._ag2can_other_agent.name
self._ag2can_other_agent.set_name(receive_params.sender)
if receive_params.HasField("data_map"):
data = dict(receive_params.data_map.data)
json_data = dict(receive_params.data_map.data)
data = {}
for key, json_value in json_data.items():
value = json.loads(json_value)
data[key] = value
else:
data = receive_params.data
self._the_ag_agent.receive(data, self._ag2can_other_agent, request_reply, silent)
Expand Down
22 changes: 22 additions & 0 deletions samples/apps/cap/py/autogencap/ag_adapter/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import time

from autogen import ConversableAgent

from ..DebugLog import Info, Warn
from .CAP2AG import CAP2AG


class Agent:
def __init__(self, agent: ConversableAgent, counter_party_name="user_proxy", init_chat=False):
self._agent = agent
self._the_other_name = counter_party_name
self._agent_adptr = CAP2AG(
ag_agent=self._agent, the_other_name=self._the_other_name, init_chat=init_chat, self_recursive=True
)

def register(self, network):
Info("Agent", f"Running Standalone {self._agent.name}")
network.register(self._agent_adptr)

def running(self):
return self._agent_adptr.run
2 changes: 1 addition & 1 deletion samples/apps/cap/py/demo/App.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def main():
print("3. AutoGen Pair")
print("4. AutoGen GroupChat")
print("5. AutoGen Agents in different processes")
print("6. List Actors in CAP")
print("6. List Actors in CAP (Registry)")
choice = input("Enter your choice (1-6): ")

if choice == "1":
Expand Down
9 changes: 4 additions & 5 deletions samples/apps/cap/py/demo/RemoteAGDemo.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@
def remote_ag_demo():
print("Remote Agent Demo")
instructions = """
In this demo, Broker, Assistant, and UserProxy are running in separate processes.
demo/standalone/UserProxy.py will initiate a conversation by sending UserProxy a message.
In this demo, Assistant, and UserProxy are running in separate processes.
demo/standalone/user_proxy.py will initiate a conversation by sending UserProxy Agent a message.
Please do the following:
1) Start Broker (python demo/standalone/Broker.py)
2) Start Assistant (python demo/standalone/Assistant.py)
3) Start UserProxy (python demo/standalone/UserProxy.py)
1) Start Assistant (python demo/standalone/assistant.py)
2) Start UserProxy (python demo/standalone/user_proxy.py)
"""
print(instructions)
input("Press Enter to return to demo menu...")
Expand Down
57 changes: 57 additions & 0 deletions samples/apps/cap/py/demo/standalone/user_proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import time

import _paths
from autogencap.ag_adapter.agent import Agent
from autogencap.Config import IGNORED_LOG_CONTEXTS
from autogencap.LocalActorNetwork import LocalActorNetwork

from autogen import UserProxyAgent

# Filter out some Log message contexts
IGNORED_LOG_CONTEXTS.extend(["BROKER"])


def main():
# Standard AutoGen
user_proxy = UserProxyAgent(
"user_proxy",
code_execution_config={"work_dir": "coding"},
is_termination_msg=lambda x: "TERMINATE" in x.get("content"),
)

# Wrap AutoGen Agent in CAP
cap_user_proxy = Agent(user_proxy, counter_party_name="assistant", init_chat=True)
# Create the message bus
network = LocalActorNetwork()
# Add the user_proxy to the message bus
cap_user_proxy.register(network)
# Start message processing
network.connect()

# Wait for the user_proxy to finish
interact_with_user(network, cap_user_proxy)
# Cleanup
network.disconnect()


# Starts the Broker and the Assistant. The UserProxy is started separately.
def interact_with_user(network, cap_assistant):
user_proxy_conn = network.lookup_actor("user_proxy")
example = "Plot a chart of MSFT daily closing prices for last 1 Month."
print(f"Example: {example}")
try:
user_input = input("Please enter your command: ")
if user_input == "":
user_input = example
print(f"Sending: {user_input}")
user_proxy_conn.send_txt_msg(user_input)

# Hang around for a while
while cap_assistant.running():
time.sleep(0.5)
except KeyboardInterrupt:
print("Interrupted by user, shutting down.")


if __name__ == "__main__":
main()
Loading

0 comments on commit 31d2d37

Please sign in to comment.