Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resuming a GroupChat #2627

Merged
merged 17 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
287 changes: 287 additions & 0 deletions autogen/agentchat/groupchat.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import copy
import json
import logging
import random
import re
Expand All @@ -12,6 +14,7 @@
from ..io.base import IOStream
from ..runtime_logging import log_new_agent, logging_enabled
from .agent import Agent
from .chat import ChatResult
from .conversable_agent import ConversableAgent

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -1116,6 +1119,290 @@ async def a_run_chat(
a.previous_cache = None
return True, None

def resume(
self,
messages: Union[List[Dict], str],
remove_termination_string: str = None,
silent: Optional[bool] = False,
) -> Tuple[ConversableAgent, Dict]:
"""Resumes a group chat using the previous messages as a starting point. Requires the agents, group chat, and group chat manager to be established
as per the original group chat.

Args:
- messages Union[List[Dict], str]: The content of the previous chat's messages, either as a Json string or a list of message dictionaries.
- remove_termination_string str: Remove the provided string from the last message to prevent immediate termination
- silent (bool or None): (Experimental) whether to print the messages for this conversation. Default is False.

Returns:
- Tuple[ConversableAgent, Dict]: A tuple containing the last agent who spoke and their message
"""

# Convert messages from string to messages list, if needed
if isinstance(messages, str):
messages = self.messages_from_string(messages)
elif isinstance(messages, list) and all(isinstance(item, dict) for item in messages):
messages = copy.deepcopy(messages)
else:
raise Exception("Messages is not of type str or List[Dict]")

# Clean up the objects, ensuring there are no messages in the agents and group chat

# Clear agent message history
for agent in self._groupchat.agents:
if isinstance(agent, ConversableAgent):
agent.clear_history()

# Clear Manager message history
self.clear_history()

# Clear GroupChat messages
self._groupchat.reset()

# Validation of message and agents

try:
self._valid_resume_messages(messages)
except:
raise

# Load the messages into the group chat
for i, message in enumerate(messages):

if "name" in message:
message_speaker_agent = self._groupchat.agent_by_name(message["name"])
else:
# If there's no name, assign the group chat manager (this is an indication the ChatResult messages was used instead of groupchat.messages as state)
message_speaker_agent = self
message["name"] = self.name

# If it wasn't an agent speaking, it may be the manager
if not message_speaker_agent and message["name"] == self.name:
message_speaker_agent = self

# Add previous messages to each agent (except their own messages and the last message, as we'll kick off the conversation with it)
if i != len(messages) - 1:
for agent in self._groupchat.agents:
if agent.name != message["name"]:
self.send(message, self._groupchat.agent_by_name(agent.name), request_reply=False, silent=True)

# Add previous message to the new groupchat, if it's an admin message the name may not match so add the message directly
if message_speaker_agent:
self._groupchat.append(message, message_speaker_agent)
else:
self._groupchat.messages.append(message)

# Last speaker agent
last_speaker_name = message["name"]

# Last message to check for termination (we could avoid this by ignoring termination check for resume in the future)
last_message = message

# Get last speaker as an agent
previous_last_agent = self._groupchat.agent_by_name(name=last_speaker_name)

# If we didn't match a last speaker agent, we check that it's the group chat's admin name and assign the manager, if so
if not previous_last_agent and (
last_speaker_name == self._groupchat.admin_name or last_speaker_name == self.name
):
previous_last_agent = self

# Termination removal and check
self._process_resume_termination(remove_termination_string, messages)

if not silent:
iostream = IOStream.get_default()
iostream.print(
f"Prepared group chat with {len(messages)} messages, the last speaker is",
colored(last_speaker_name, "yellow"),
flush=True,
)

# Update group chat settings for resuming
self._groupchat.send_introductions = False

return previous_last_agent, last_message

async def a_resume(
self,
messages: Union[List[Dict], str],
remove_termination_string: str = None,
silent: Optional[bool] = False,
) -> Tuple[ConversableAgent, Dict]:
"""Resumes a group chat using the previous messages as a starting point, asynchronously. Requires the agents, group chat, and group chat manager to be established
as per the original group chat.

Args:
- messages Union[List[Dict], str]: The content of the previous chat's messages, either as a Json string or a list of message dictionaries.
- remove_termination_string str: Remove the provided string from the last message to prevent immediate termination
- silent (bool or None): (Experimental) whether to print the messages for this conversation. Default is False.

Returns:
- Tuple[ConversableAgent, Dict]: A tuple containing the last agent who spoke and their message
"""

# Convert messages from string to messages list, if needed
if isinstance(messages, str):
messages = self.messages_from_string(messages)
elif isinstance(messages, list) and all(isinstance(item, dict) for item in messages):
messages = copy.deepcopy(messages)
else:
raise Exception("Messages is not of type str or List[Dict]")

# Clean up the objects, ensuring there are no messages in the agents and group chat

# Clear agent message history
for agent in self._groupchat.agents:
if isinstance(agent, ConversableAgent):
agent.clear_history()

# Clear Manager message history
self.clear_history()

# Clear GroupChat messages
self._groupchat.reset()

# Validation of message and agents

try:
self._valid_resume_messages(messages)
except:
raise

# Load the messages into the group chat
for i, message in enumerate(messages):

if "name" in message:
message_speaker_agent = self._groupchat.agent_by_name(message["name"])
else:
# If there's no name, assign the group chat manager (this is an indication the ChatResult messages was used instead of groupchat.messages as state)
message_speaker_agent = self
message["name"] = self.name

# If it wasn't an agent speaking, it may be the manager
if not message_speaker_agent and message["name"] == self.name:
message_speaker_agent = self

# Add previous messages to each agent (except their own messages and the last message, as we'll kick off the conversation with it)
if i != len(messages) - 1:
for agent in self._groupchat.agents:
if agent.name != message["name"]:
await self.a_send(
message, self._groupchat.agent_by_name(agent.name), request_reply=False, silent=True
)

# Add previous message to the new groupchat, if it's an admin message the name may not match so add the message directly
if message_speaker_agent:
self._groupchat.append(message, message_speaker_agent)
else:
self._groupchat.messages.append(message)

# Last speaker agent
last_speaker_name = message["name"]

# Last message to check for termination (we could avoid this by ignoring termination check for resume in the future)
last_message = message

# Get last speaker as an agent
previous_last_agent = self._groupchat.agent_by_name(name=last_speaker_name)

# If we didn't match a last speaker agent, we check that it's the group chat's admin name and assign the manager, if so
if not previous_last_agent and (
last_speaker_name == self._groupchat.admin_name or last_speaker_name == self.name
):
previous_last_agent = self

# Termination removal and check
self._process_resume_termination(remove_termination_string, messages)

if not silent:
iostream = IOStream.get_default()
iostream.print(
f"Prepared group chat with {len(messages)} messages, the last speaker is",
colored(last_speaker_name, "yellow"),
flush=True,
)

# Update group chat settings for resuming
self._groupchat.send_introductions = False

return previous_last_agent, last_message

def _valid_resume_messages(self, messages: List[Dict]):
"""Validates the messages used for resuming

args:
messages (List[Dict]): list of messages to resume with

returns:
- bool: Whether they are valid for resuming
"""
# Must have messages to start with, otherwise they should run run_chat
if not messages:
raise Exception(
"Cannot resume group chat as no messages were provided. Use GroupChatManager.run_chat or ConversableAgent.initiate_chat to start a new chat."
)

# Check that all agents in the chat messages exist in the group chat
for message in messages:
if message.get("name"):
if (
not self._groupchat.agent_by_name(message["name"])
and not message["name"] == self._groupchat.admin_name # ignore group chat's name
and not message["name"] == self.name # ignore group chat manager's name
):
raise Exception(f"Agent name in message doesn't exist as agent in group chat: {message['name']}")

def _process_resume_termination(self, remove_termination_string: str, messages: List[Dict]):
"""Removes termination string, if required, and checks if termination may occur.

args:
remove_termination_string (str): termination string to remove from the last message

returns:
None
"""

last_message = messages[-1]

# Replace any given termination string in the last message
if remove_termination_string:
if messages[-1].get("content") and remove_termination_string in messages[-1]["content"]:
messages[-1]["content"] = messages[-1]["content"].replace(remove_termination_string, "")

# Check if the last message meets termination (if it has one)
if self._is_termination_msg:
if self._is_termination_msg(last_message):
logger.warning("WARNING: Last message meets termination criteria and this may terminate the chat.")

def messages_from_string(self, message_string: str) -> List[Dict]:
"""Reads the saved state of messages in Json format for resume and returns as a messages list

args:
- message_string: Json string, the saved state

returns:
- List[Dict]: List of messages
"""
try:
state = json.loads(message_string)
except json.JSONDecodeError:
raise Exception("Messages string is not a valid JSON string")

return state

def messages_to_string(self, messages: List[Dict]) -> str:
"""Converts the provided messages into a Json string that can be used for resuming the chat.
The state is made up of a list of messages

args:
- messages (List[Dict]): set of messages to convert to a string

returns:
- str: Json representation of the messages which can be persisted for resuming later
"""

return json.dumps(messages)

def _raise_exception_on_async_reply_functions(self) -> None:
"""Raise an exception if any async reply functions are registered.

Expand Down
Loading
Loading