Skip to content

Commit

Permalink
Merge branch 'main' into autogenstudio-orm
Browse files Browse the repository at this point in the history
  • Loading branch information
victordibia committed May 11, 2024
2 parents 85bc73a + 60c6658 commit 1c15cd6
Show file tree
Hide file tree
Showing 43 changed files with 1,877 additions and 45 deletions.
287 changes: 287 additions & 0 deletions autogen/agentchat/groupchat.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import copy
import json
import logging
import random
import re
Expand All @@ -12,6 +14,7 @@
from ..io.base import IOStream
from ..runtime_logging import log_new_agent, logging_enabled
from .agent import Agent
from .chat import ChatResult
from .conversable_agent import ConversableAgent

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -1116,6 +1119,290 @@ async def a_run_chat(
a.previous_cache = None
return True, None

def resume(
self,
messages: Union[List[Dict], str],
remove_termination_string: str = None,
silent: Optional[bool] = False,
) -> Tuple[ConversableAgent, Dict]:
"""Resumes a group chat using the previous messages as a starting point. Requires the agents, group chat, and group chat manager to be established
as per the original group chat.
Args:
- messages Union[List[Dict], str]: The content of the previous chat's messages, either as a Json string or a list of message dictionaries.
- remove_termination_string str: Remove the provided string from the last message to prevent immediate termination
- silent (bool or None): (Experimental) whether to print the messages for this conversation. Default is False.
Returns:
- Tuple[ConversableAgent, Dict]: A tuple containing the last agent who spoke and their message
"""

# Convert messages from string to messages list, if needed
if isinstance(messages, str):
messages = self.messages_from_string(messages)
elif isinstance(messages, list) and all(isinstance(item, dict) for item in messages):
messages = copy.deepcopy(messages)
else:
raise Exception("Messages is not of type str or List[Dict]")

# Clean up the objects, ensuring there are no messages in the agents and group chat

# Clear agent message history
for agent in self._groupchat.agents:
if isinstance(agent, ConversableAgent):
agent.clear_history()

# Clear Manager message history
self.clear_history()

# Clear GroupChat messages
self._groupchat.reset()

# Validation of message and agents

try:
self._valid_resume_messages(messages)
except:
raise

# Load the messages into the group chat
for i, message in enumerate(messages):

if "name" in message:
message_speaker_agent = self._groupchat.agent_by_name(message["name"])
else:
# If there's no name, assign the group chat manager (this is an indication the ChatResult messages was used instead of groupchat.messages as state)
message_speaker_agent = self
message["name"] = self.name

# If it wasn't an agent speaking, it may be the manager
if not message_speaker_agent and message["name"] == self.name:
message_speaker_agent = self

# Add previous messages to each agent (except their own messages and the last message, as we'll kick off the conversation with it)
if i != len(messages) - 1:
for agent in self._groupchat.agents:
if agent.name != message["name"]:
self.send(message, self._groupchat.agent_by_name(agent.name), request_reply=False, silent=True)

# Add previous message to the new groupchat, if it's an admin message the name may not match so add the message directly
if message_speaker_agent:
self._groupchat.append(message, message_speaker_agent)
else:
self._groupchat.messages.append(message)

# Last speaker agent
last_speaker_name = message["name"]

# Last message to check for termination (we could avoid this by ignoring termination check for resume in the future)
last_message = message

# Get last speaker as an agent
previous_last_agent = self._groupchat.agent_by_name(name=last_speaker_name)

# If we didn't match a last speaker agent, we check that it's the group chat's admin name and assign the manager, if so
if not previous_last_agent and (
last_speaker_name == self._groupchat.admin_name or last_speaker_name == self.name
):
previous_last_agent = self

# Termination removal and check
self._process_resume_termination(remove_termination_string, messages)

if not silent:
iostream = IOStream.get_default()
iostream.print(
f"Prepared group chat with {len(messages)} messages, the last speaker is",
colored(last_speaker_name, "yellow"),
flush=True,
)

# Update group chat settings for resuming
self._groupchat.send_introductions = False

return previous_last_agent, last_message

async def a_resume(
self,
messages: Union[List[Dict], str],
remove_termination_string: str = None,
silent: Optional[bool] = False,
) -> Tuple[ConversableAgent, Dict]:
"""Resumes a group chat using the previous messages as a starting point, asynchronously. Requires the agents, group chat, and group chat manager to be established
as per the original group chat.
Args:
- messages Union[List[Dict], str]: The content of the previous chat's messages, either as a Json string or a list of message dictionaries.
- remove_termination_string str: Remove the provided string from the last message to prevent immediate termination
- silent (bool or None): (Experimental) whether to print the messages for this conversation. Default is False.
Returns:
- Tuple[ConversableAgent, Dict]: A tuple containing the last agent who spoke and their message
"""

# Convert messages from string to messages list, if needed
if isinstance(messages, str):
messages = self.messages_from_string(messages)
elif isinstance(messages, list) and all(isinstance(item, dict) for item in messages):
messages = copy.deepcopy(messages)
else:
raise Exception("Messages is not of type str or List[Dict]")

# Clean up the objects, ensuring there are no messages in the agents and group chat

# Clear agent message history
for agent in self._groupchat.agents:
if isinstance(agent, ConversableAgent):
agent.clear_history()

# Clear Manager message history
self.clear_history()

# Clear GroupChat messages
self._groupchat.reset()

# Validation of message and agents

try:
self._valid_resume_messages(messages)
except:
raise

# Load the messages into the group chat
for i, message in enumerate(messages):

if "name" in message:
message_speaker_agent = self._groupchat.agent_by_name(message["name"])
else:
# If there's no name, assign the group chat manager (this is an indication the ChatResult messages was used instead of groupchat.messages as state)
message_speaker_agent = self
message["name"] = self.name

# If it wasn't an agent speaking, it may be the manager
if not message_speaker_agent and message["name"] == self.name:
message_speaker_agent = self

# Add previous messages to each agent (except their own messages and the last message, as we'll kick off the conversation with it)
if i != len(messages) - 1:
for agent in self._groupchat.agents:
if agent.name != message["name"]:
await self.a_send(
message, self._groupchat.agent_by_name(agent.name), request_reply=False, silent=True
)

# Add previous message to the new groupchat, if it's an admin message the name may not match so add the message directly
if message_speaker_agent:
self._groupchat.append(message, message_speaker_agent)
else:
self._groupchat.messages.append(message)

# Last speaker agent
last_speaker_name = message["name"]

# Last message to check for termination (we could avoid this by ignoring termination check for resume in the future)
last_message = message

# Get last speaker as an agent
previous_last_agent = self._groupchat.agent_by_name(name=last_speaker_name)

# If we didn't match a last speaker agent, we check that it's the group chat's admin name and assign the manager, if so
if not previous_last_agent and (
last_speaker_name == self._groupchat.admin_name or last_speaker_name == self.name
):
previous_last_agent = self

# Termination removal and check
self._process_resume_termination(remove_termination_string, messages)

if not silent:
iostream = IOStream.get_default()
iostream.print(
f"Prepared group chat with {len(messages)} messages, the last speaker is",
colored(last_speaker_name, "yellow"),
flush=True,
)

# Update group chat settings for resuming
self._groupchat.send_introductions = False

return previous_last_agent, last_message

def _valid_resume_messages(self, messages: List[Dict]):
"""Validates the messages used for resuming
args:
messages (List[Dict]): list of messages to resume with
returns:
- bool: Whether they are valid for resuming
"""
# Must have messages to start with, otherwise they should run run_chat
if not messages:
raise Exception(
"Cannot resume group chat as no messages were provided. Use GroupChatManager.run_chat or ConversableAgent.initiate_chat to start a new chat."
)

# Check that all agents in the chat messages exist in the group chat
for message in messages:
if message.get("name"):
if (
not self._groupchat.agent_by_name(message["name"])
and not message["name"] == self._groupchat.admin_name # ignore group chat's name
and not message["name"] == self.name # ignore group chat manager's name
):
raise Exception(f"Agent name in message doesn't exist as agent in group chat: {message['name']}")

def _process_resume_termination(self, remove_termination_string: str, messages: List[Dict]):
"""Removes termination string, if required, and checks if termination may occur.
args:
remove_termination_string (str): termination string to remove from the last message
returns:
None
"""

last_message = messages[-1]

# Replace any given termination string in the last message
if remove_termination_string:
if messages[-1].get("content") and remove_termination_string in messages[-1]["content"]:
messages[-1]["content"] = messages[-1]["content"].replace(remove_termination_string, "")

# Check if the last message meets termination (if it has one)
if self._is_termination_msg:
if self._is_termination_msg(last_message):
logger.warning("WARNING: Last message meets termination criteria and this may terminate the chat.")

def messages_from_string(self, message_string: str) -> List[Dict]:
"""Reads the saved state of messages in Json format for resume and returns as a messages list
args:
- message_string: Json string, the saved state
returns:
- List[Dict]: List of messages
"""
try:
state = json.loads(message_string)
except json.JSONDecodeError:
raise Exception("Messages string is not a valid JSON string")

return state

def messages_to_string(self, messages: List[Dict]) -> str:
"""Converts the provided messages into a Json string that can be used for resuming the chat.
The state is made up of a list of messages
args:
- messages (List[Dict]): set of messages to convert to a string
returns:
- str: Json representation of the messages which can be persisted for resuming later
"""

return json.dumps(messages)

def _raise_exception_on_async_reply_functions(self) -> None:
"""Raise an exception if any async reply functions are registered.
Expand Down
18 changes: 18 additions & 0 deletions autogen/code_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import subprocess
import sys
import time
import venv
from concurrent.futures import ThreadPoolExecutor, TimeoutError
from hashlib import md5
from types import SimpleNamespace
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

import docker
Expand Down Expand Up @@ -719,3 +721,19 @@ def implement(
# cost += metrics["gen_cost"]
# if metrics["succeed_assertions"] or i == len(configs) - 1:
# return responses[metrics["index_selected"]], cost, i


def create_virtual_env(dir_path: str, **env_args) -> SimpleNamespace:
"""Creates a python virtual environment and returns the context.
Args:
dir_path (str): Directory path where the env will be created.
**env_args: Any extra args to pass to the `EnvBuilder`
Returns:
SimpleNamespace: the virtual env context object."""
if not env_args:
env_args = {"with_pip": True}
env_builder = venv.EnvBuilder(**env_args)
env_builder.create(dir_path)
return env_builder.ensure_directories(dir_path)
Loading

0 comments on commit 1c15cd6

Please sign in to comment.