From 93adc0cbc1bfd401470d9f4757d0ef663c82dad0 Mon Sep 17 00:00:00 2001 From: Xuchen Pan <32844285+pan-x-c@users.noreply.github.com> Date: Fri, 17 May 2024 16:28:53 +0800 Subject: [PATCH] Add `as_server` command and add `server_id` (#211) --- docs/sphinx_doc/en/source/index.rst | 1 + .../en/source/tutorial/208-distribute.md | 26 +- docs/sphinx_doc/zh_CN/source/index.rst | 1 + .../zh_CN/source/tutorial/208-distribute.md | 27 +- .../distributed_basic/distributed_dialog.py | 2 +- .../distributed_debate/distributed_debate.py | 2 +- examples/distributed_simulation/main.py | 2 +- setup.py | 1 + src/agentscope/agents/__init__.py | 3 +- src/agentscope/agents/agent.py | 20 +- src/agentscope/agents/rpc_agent.py | 641 +----------------- src/agentscope/message.py | 2 +- src/agentscope/rpc/__init__.py | 1 - src/agentscope/server/__init__.py | 10 + src/agentscope/server/launcher.py | 449 ++++++++++++ src/agentscope/server/servicer.py | 313 +++++++++ src/agentscope/utils/tools.py | 39 +- tests/rpc_agent_test.py | 2 +- 18 files changed, 879 insertions(+), 663 deletions(-) create mode 100644 src/agentscope/server/__init__.py create mode 100644 src/agentscope/server/launcher.py create mode 100644 src/agentscope/server/servicer.py diff --git a/docs/sphinx_doc/en/source/index.rst b/docs/sphinx_doc/en/source/index.rst index fb81e2e64..1aad67356 100644 --- a/docs/sphinx_doc/en/source/index.rst +++ b/docs/sphinx_doc/en/source/index.rst @@ -38,6 +38,7 @@ AgentScope Documentation agentscope.pipelines agentscope.service agentscope.rpc + agentscope.server agentscope.web agentscope.prompt agentscope.utils diff --git a/docs/sphinx_doc/en/source/tutorial/208-distribute.md b/docs/sphinx_doc/en/source/tutorial/208-distribute.md index 714f2e05f..0381a13f1 100644 --- a/docs/sphinx_doc/en/source/tutorial/208-distribute.md +++ b/docs/sphinx_doc/en/source/tutorial/208-distribute.md @@ -59,14 +59,16 @@ b = AgentB( #### Independent Process Mode In the Independent Process Mode, we need to start the agent server process on the target machine first. +When starting the agent server process, you need to specify a model config file, which contains the models which can be used in the agent server, the IP address and port of the agent server process For example, start two agent server processes on the two different machines with IP `ip_a` and `ip_b`(called `Machine1` and `Machine2` accrodingly). -You can run the following code on `Machine1`: +You can run the following code on `Machine1`.Before running, make sure that the machine has access to all models that used in your application, specifically, you need to put your model config file in `model_config_path_a` and set environment variables such as your model API key correctly in `Machine1`. The example model config file instances are located under `examples/model_configs_template`. ```python # import some packages +# register models which can be used in the server agentscope.init( - ... + model_configs=model_config_path_a, ) # Create an agent service process server = RpcAgentServerLauncher( @@ -79,13 +81,20 @@ server.launch() server.wait_until_terminate() ``` -And run the following code on `Machine2`: +> For similarity, you can run the following command in your terminal rather than the above code: +> +> ```shell +> as_server --host ip_a --port 12001 --model-config-path model_config_path_a +> ``` + +Then put your model config file accordingly in `model_config_path_b`, set environment variables, and run the following code on `Machine2`. ```python # import some packages +# register models which can be used in the server agentscope.init( - ... + model_configs=model_config_path_b, ) # Create an agent service process server = RpcAgentServerLauncher( @@ -98,6 +107,12 @@ server.launch() server.wait_until_terminate() ``` +> Similarly, you can run the following command in your terminal to setup the agent server: +> +> ```shell +> as_server --host ip_b --port 12002 --model-config-path model_config_path_b +> ``` + Then, you can connect to the agent servers from the main process with the following code. ```python @@ -254,6 +269,9 @@ About more detailed technical implementation solutions, please refer to our [pap In agentscope, the agent server provides a running platform for various types of agents. Multiple agents can run in the same agent server and hold independent memory and other local states but they will share the same computation resources. + +After installing the distributed version of AgentScope, you can use the `as_server` command to start the agent server, and the detailed startup arguments can be found in the documentation of the {func}`as_server` function. + As long as the code is not modified, an agent server can provide services for multiple main processes. This means that when running mutliple applications, you only need to start the agent server for the first time, and it can be reused subsequently. diff --git a/docs/sphinx_doc/zh_CN/source/index.rst b/docs/sphinx_doc/zh_CN/source/index.rst index 7f6c48275..662fb267c 100644 --- a/docs/sphinx_doc/zh_CN/source/index.rst +++ b/docs/sphinx_doc/zh_CN/source/index.rst @@ -38,6 +38,7 @@ AgentScope 文档 agentscope.pipelines agentscope.service agentscope.rpc + agentscope.server agentscope.web agentscope.prompt agentscope.utils diff --git a/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md b/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md index ef50f123f..a185bd5da 100644 --- a/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md +++ b/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md @@ -57,15 +57,16 @@ b = AgentB( #### 独立进程模式 -在独立进程模式中,需要首先在目标机器上启动智能体服务器进程。 +在独立进程模式中,需要首先在目标机器上启动智能体服务器进程,启动时需要提供该服务器能够使用的模型的配置信息,以及服务器的 IP 和端口号。 例如想要将两个智能体服务进程部署在 IP 分别为 `ip_a` 和 `ip_b` 的机器上(假设这两台机器分别为`Machine1` 和 `Machine2`)。 -你可以先在 `Machine1` 上运行如下代码: +你可以在 `Machine1` 上运行如下代码。在运行之前请确保该机器能够正确访问到应用中所使用的所有模型。具体来讲,需要将用到的所有模型的配置信息放置在 `model_config_path_a` 文件中,并检查API key 等环境变量是否正确设置,模型配置文件样例可参考 `examples/model_configs_template`。 ```python # import some packages +# register models which can be used in the server agentscope.init( - ... + model_configs=model_config_path_a, ) # Create an agent service process server = RpcAgentServerLauncher( @@ -78,13 +79,20 @@ server.launch() server.wait_until_terminate() ``` -之后在 `Machine2` 上运行如下代码: +> 为了进一步简化使用,可以在命令行中输入如下指令来代替上述代码: +> +> ```shell +> as_server --host ip_a --port 12001 --model-config-path model_config_path_a +> ``` + +在 `Machine2` 上运行如下代码,这里同样要确保已经将模型配置文件放置在 `model_config_path_b` 位置并设置环境变量,从而确保运行在该机器上的 Agent 能够正常访问到模型。 ```python # import some packages +# register models which can be used in the server agentscope.init( - ... + model_configs=model_config_path_b, ) # Create an agent service process server = RpcAgentServerLauncher( @@ -97,6 +105,12 @@ server.launch() server.wait_until_terminate() ``` +> 这里也同样可以用如下指令来代替上面的代码。 +> +> ```shell +> as_server --host ip_b --port 12002 --model-config-path model_config_path_b +> ``` + 接下来,就可以使用如下代码从主进程中连接这两个智能体服务器进程。 ```python @@ -251,6 +265,9 @@ Placeholder 内部包含了该消息产生方的联络方法,可以通过网 #### Agent Server Agent Server 也就是智能体服务器。在 AgentScope 中,Agent Server 提供了一个让不同 Agent 实例运行的平台。多个不同类型的 Agent 可以运行在同一个 Agent Server 中并保持独立的记忆以及其他本地状态信息,但是他们将共享同一份计算资源。 + +在安装 AgentScope 的分布式版本后就可以通过 `as_server` 命令来启动 Agent Server,具体的启动参数在 {func}`as_server` 函数文档中可以找到。 + 只要没有对代码进行修改,一个已经启动的 Agent Server 可以为多个主流程提供服务。 这意味着在运行多个应用时,只需要在第一次运行前启动 Agent Server,后续这些 Agent Server 进程就可以持续复用。 diff --git a/examples/distributed_basic/distributed_dialog.py b/examples/distributed_basic/distributed_dialog.py index e558c54fa..ab0de4235 100644 --- a/examples/distributed_basic/distributed_dialog.py +++ b/examples/distributed_basic/distributed_dialog.py @@ -7,7 +7,7 @@ import agentscope from agentscope.agents.user_agent import UserAgent from agentscope.agents.dialog_agent import DialogAgent -from agentscope.agents.rpc_agent import RpcAgentServerLauncher +from agentscope.server import RpcAgentServerLauncher def parse_args() -> argparse.Namespace: diff --git a/examples/distributed_debate/distributed_debate.py b/examples/distributed_debate/distributed_debate.py index f5813e6f2..a4e0a4287 100644 --- a/examples/distributed_debate/distributed_debate.py +++ b/examples/distributed_debate/distributed_debate.py @@ -8,7 +8,7 @@ import agentscope from agentscope.agents import DialogAgent from agentscope.msghub import msghub -from agentscope.agents.rpc_agent import RpcAgentServerLauncher +from agentscope.server import RpcAgentServerLauncher from agentscope.message import Msg from agentscope.utils.logging_utils import logger diff --git a/examples/distributed_simulation/main.py b/examples/distributed_simulation/main.py index 7fd0cf19b..bb26fe533 100644 --- a/examples/distributed_simulation/main.py +++ b/examples/distributed_simulation/main.py @@ -11,7 +11,7 @@ import agentscope from agentscope.agents import AgentBase -from agentscope.agents.rpc_agent import RpcAgentServerLauncher +from agentscope.server import RpcAgentServerLauncher from agentscope.message import Msg diff --git a/setup.py b/setup.py index 2f2a75c34..ee22169f4 100644 --- a/setup.py +++ b/setup.py @@ -123,6 +123,7 @@ "console_scripts": [ "as_studio=agentscope.web.studio.studio:run_app", "as_workflow=agentscope.web.workstation.workflow:main", + "as_server=agentscope.server.launcher:as_server", ], }, ) diff --git a/src/agentscope/agents/__init__.py b/src/agentscope/agents/__init__.py index 0d5f6f84d..7bc5f83e5 100644 --- a/src/agentscope/agents/__init__.py +++ b/src/agentscope/agents/__init__.py @@ -6,7 +6,7 @@ from .dict_dialog_agent import DictDialogAgent from .user_agent import UserAgent from .text_to_image_agent import TextToImageAgent -from .rpc_agent import RpcAgent, RpcAgentServerLauncher +from .rpc_agent import RpcAgent from .react_agent import ReActAgent @@ -20,5 +20,4 @@ "ReActAgent", "DistConf", "RpcAgent", - "RpcAgentServerLauncher", ] diff --git a/src/agentscope/agents/agent.py b/src/agentscope/agents/agent.py index bdf657df2..4341952d9 100644 --- a/src/agentscope/agents/agent.py +++ b/src/agentscope/agents/agent.py @@ -24,7 +24,7 @@ class _AgentMeta(ABCMeta): """ def __init__(cls, name: Any, bases: Any, attrs: Any) -> None: - if not hasattr(cls, "registry"): + if not hasattr(cls, "_registry"): cls._registry = {} else: if name in cls._registry: @@ -245,7 +245,7 @@ def register_agent_class(cls, agent_class: Type[AgentBase]) -> None: """ agent_class_name = agent_class.__name__ if agent_class_name in cls._registry: - logger.warning( + logger.info( f"Agent class with name [{agent_class_name}] already exists.", ) else: @@ -384,14 +384,22 @@ def to_dist( port (`int`, defaults to `None`): Port of the rpc agent server. max_pool_size (`int`, defaults to `8192`): - Max number of task results that the server can accommodate. + Only takes effect when `host` and `port` are not filled in. + The max number of agent reply messages that the started agent + server can accommodate. Note that the oldest message will be + deleted after exceeding the pool size. max_timeout_seconds (`int`, defaults to `1800`): - Timeout for task results. + Only takes effect when `host` and `port` are not filled in. + Maximum time for reply messages to be cached in the launched + agent server. Note that expired messages will be deleted. local_mode (`bool`, defaults to `True`): - Whether the started rpc server only listens to local + Only takes effect when `host` and `port` are not filled in. + Whether the started agent server only listens to local requests. lazy_launch (`bool`, defaults to `True`): - Only launch the server when the agent is called. + Only takes effect when `host` and `port` are not filled in. + If `True`, launch the agent server when the agent is called, + otherwise, launch the agent server immediately. launch_server(`bool`, defaults to `None`): This field has been deprecated and will be removed in future releases. diff --git a/src/agentscope/agents/rpc_agent.py b/src/agentscope/agents/rpc_agent.py index 47d32ce3a..306dfa900 100644 --- a/src/agentscope/agents/rpc_agent.py +++ b/src/agentscope/agents/rpc_agent.py @@ -1,63 +1,14 @@ # -*- coding: utf-8 -*- """ Base class for Rpc Agent """ - -from multiprocessing import Process, Event, Pipe -from multiprocessing.synchronize import Event as EventClass -import socket -import threading -import json -import base64 -import traceback -import asyncio from typing import Type, Optional, Union, Sequence -from concurrent import futures -from loguru import logger - -try: - import dill - import grpc - from grpc import ServicerContext - from expiringdict import ExpiringDict -except ImportError as import_error: - from agentscope.utils.tools import ImportErrorReporter - - dill = ImportErrorReporter(import_error, "distribute") - grpc = ImportErrorReporter(import_error, "distribute") - ServicerContext = ImportErrorReporter(import_error, "distribute") - ExpiringDict = ImportErrorReporter(import_error, "distribute") -from agentscope._init import init_process, _INIT_SETTINGS from agentscope.agents.agent import AgentBase from agentscope.message import ( - Msg, PlaceholderMessage, - deserialize, serialize, ) -from agentscope.rpc import ( - RpcAgentClient, - RpcMsg, - RpcAgentServicer, - add_RpcAgentServicer_to_server, -) - - -def rpc_servicer_method( # type: ignore[no-untyped-def] - func, -): - """A decorator used to identify that the specific method is an rpc agent - servicer method, which can only be run in the rpc server process. - """ - - def inner(rpc_agent, msg): # type: ignore[no-untyped-def] - if not rpc_agent.is_servicer: - error_msg = f"Detect main process try to use rpc servicer method \ - [{func.__name__}]" - logger.error(error_msg) - raise RuntimeError(error_msg) - return func(rpc_agent, msg) - - return inner +from agentscope.rpc import RpcAgentClient +from agentscope.server.launcher import RpcAgentServerLauncher class RpcAgent(AgentBase): @@ -219,591 +170,3 @@ def stop(self) -> None: def __del__(self) -> None: self.stop() - - -def setup_rpc_agent_server( - host: str, - port: int, - init_settings: dict = None, - start_event: EventClass = None, - stop_event: EventClass = None, - pipe: int = None, - local_mode: bool = True, - max_pool_size: int = 8192, - max_timeout_seconds: int = 1800, - custom_agents: list = None, -) -> None: - """Setup gRPC server rpc agent. - - Args: - host (`str`, defaults to `"localhost"`): - Hostname of the rpc agent server. - port (`int`): - The socket port monitored by grpc server. - init_settings (`dict`, defaults to `None`): - Init settings for agentscope.init. - start_event (`EventClass`, defaults to `None`): - An Event instance used to determine whether the child process - has been started. - stop_event (`EventClass`, defaults to `None`): - The stop Event instance used to determine whether the child - process has been stopped. - pipe (`int`, defaults to `None`): - A pipe instance used to pass the actual port of the server. - local_mode (`bool`, defaults to `None`): - Only listen to local requests. - max_pool_size (`int`, defaults to `8192`): - Max number of task results that the server can accommodate. - max_timeout_seconds (`int`, defaults to `1800`): - Timeout for task results. - custom_agents (`list`, defaults to `None`): - A list of custom agent classes that are not in `agentscope.agents`. - """ - asyncio.run( - setup_rpc_agent_server_async( - host=host, - port=port, - init_settings=init_settings, - start_event=start_event, - stop_event=stop_event, - pipe=pipe, - local_mode=local_mode, - max_pool_size=max_pool_size, - max_timeout_seconds=max_timeout_seconds, - custom_agents=custom_agents, - ), - ) - - -async def setup_rpc_agent_server_async( - host: str, - port: int, - init_settings: dict = None, - start_event: EventClass = None, - stop_event: EventClass = None, - pipe: int = None, - local_mode: bool = True, - max_pool_size: int = 8192, - max_timeout_seconds: int = 1800, - custom_agents: list = None, -) -> None: - """Setup gRPC server rpc agent in an async way. - - Args: - host (`str`, defaults to `"localhost"`): - Hostname of the rpc agent server. - port (`int`): - The socket port monitored by grpc server. - init_settings (`dict`, defaults to `None`): - Init settings for agentscope.init. - start_event (`EventClass`, defaults to `None`): - An Event instance used to determine whether the child process - has been started. - stop_event (`EventClass`, defaults to `None`): - The stop Event instance used to determine whether the child - process has been stopped. - pipe (`int`, defaults to `None`): - A pipe instance used to pass the actual port of the server. - local_mode (`bool`, defaults to `None`): - Only listen to local requests. - max_pool_size (`int`, defaults to `8192`): - Max number of task results that the server can accommodate. - max_timeout_seconds (`int`, defaults to `1800`): - Timeout for task results. - custom_agents (`list`, defaults to `None`): - A list of custom agent classes that are not in `agentscope.agents`. - """ - - if init_settings is not None: - init_process(**init_settings) - servicer = AgentPlatform( - host=host, - port=port, - max_pool_size=max_pool_size, - max_timeout_seconds=max_timeout_seconds, - ) - # update agent registry - if custom_agents is not None: - for agent_class in custom_agents: - AgentBase.register_agent_class(agent_class=agent_class) - while True: - try: - port = check_port(port) - servicer.port = port - logger.info( - f"Starting rpc server at port [{port}]...", - ) - server = grpc.aio.server( - futures.ThreadPoolExecutor(max_workers=None), - ) - add_RpcAgentServicer_to_server(servicer, server) - if local_mode: - server.add_insecure_port(f"localhost:{port}") - else: - server.add_insecure_port(f"0.0.0.0:{port}") - await server.start() - break - except OSError: - logger.warning( - f"Failed to start rpc server at port [{port}]" - f"try another port", - ) - logger.info( - f"rpc server at port [{port}] started successfully", - ) - if start_event is not None: - pipe.send(port) - start_event.set() - while not stop_event.is_set(): - await asyncio.sleep(1) - logger.info( - f"Stopping rpc server at port [{port}]", - ) - await server.stop(10.0) - else: - await server.wait_for_termination() - logger.info( - f"rpc server at port [{port}] stopped successfully", - ) - - -def find_available_port() -> int: - """Get an unoccupied socket port number.""" - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(("", 0)) - return s.getsockname()[1] - - -def check_port(port: Optional[int] = None) -> int: - """Check if the port is available. - - Args: - port (`int`): - the port number being checked. - - Returns: - `int`: the port number that passed the check. If the port is found - to be occupied, an available port number will be automatically - returned. - """ - if port is None: - new_port = find_available_port() - logger.warning( - "gRpc server port is not provided, automatically select " - f"[{new_port}] as the port number.", - ) - return new_port - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - if s.connect_ex(("localhost", port)) == 0: - new_port = find_available_port() - logger.warning( - f"Port [{port}] is occupied, use [{new_port}] instead", - ) - return new_port - return port - - -class RpcAgentServerLauncher: - """The launcher of AgentPlatform (formerly RpcAgentServer).""" - - def __init__( - self, - host: str = "localhost", - port: int = None, - max_pool_size: int = 8192, - max_timeout_seconds: int = 1800, - local_mode: bool = False, - custom_agents: list = None, - agent_class: Type[AgentBase] = None, - agent_args: tuple = (), - agent_kwargs: dict = None, - ) -> None: - """Init a rpc agent server launcher. - - Args: - host (`str`, defaults to `"localhost"`): - Hostname of the rpc agent server. - port (`int`, defaults to `None`): - Port of the rpc agent server. - max_pool_size (`int`, defaults to `8192`): - Max number of task results that the server can accommodate. - max_timeout_seconds (`int`, defaults to `1800`): - Timeout for task results. - local_mode (`bool`, defaults to `False`): - Whether the started rpc server only listens to local - requests. - custom_agents (`list`, defaults to `None`): - A list of custom agent classes that are not in - `agentscope.agents`. - agent_class (`Type[AgentBase]`, deprecated): - The AgentBase subclass encapsulated by this wrapper. - agent_args (`tuple`, deprecated): The args tuple used to - initialize the agent_class. - agent_kwargs (`dict`, deprecated): The args dict used to - initialize the agent_class. - """ - self.host = host - self.port = check_port(port) - self.max_pool_size = max_pool_size - self.max_timeout_seconds = max_timeout_seconds - self.local_mode = local_mode - self.server = None - self.stop_event = None - self.parent_con = None - self.custom_agents = custom_agents - if ( - agent_class is not None - or len(agent_args) > 0 - or agent_kwargs is not None - ): - logger.warning( - "`agent_class`, `agent_args` and `agent_kwargs` is deprecated" - " in `RpcAgentServerLauncher`", - ) - - def _launch_in_main(self) -> None: - """Launch gRPC server in main-process""" - logger.info( - f"Launching agent server at [{self.host}:{self.port}]...", - ) - asyncio.run( - setup_rpc_agent_server_async( - host=self.host, - port=self.port, - max_pool_size=self.max_pool_size, - max_timeout_seconds=self.max_timeout_seconds, - local_mode=self.local_mode, - custom_agents=self.custom_agents, - ), - ) - - def _launch_in_sub(self) -> None: - """Launch gRPC server in sub-process.""" - self.stop_event = Event() - self.parent_con, child_con = Pipe() - start_event = Event() - server_process = Process( - target=setup_rpc_agent_server, - kwargs={ - "host": self.host, - "port": self.port, - "init_settings": _INIT_SETTINGS, - "start_event": start_event, - "stop_event": self.stop_event, - "pipe": child_con, - "max_pool_size": self.max_pool_size, - "max_timeout_seconds": self.max_timeout_seconds, - "local_mode": self.local_mode, - "custom_agents": self.custom_agents, - }, - ) - server_process.start() - self.port = self.parent_con.recv() - start_event.wait() - self.server = server_process - logger.info( - f"Launch agent server at [{self.host}:{self.port}] success", - ) - - def launch(self, in_subprocess: bool = True) -> None: - """launch a rpc agent server. - - Args: - in_subprocess (bool, optional): launch the server in subprocess. - Defaults to True. For agents that need to obtain command line - input, such as UserAgent, please set this value to False. - """ - if in_subprocess: - self._launch_in_sub() - else: - self._launch_in_main() - - def wait_until_terminate(self) -> None: - """Wait for server process""" - if self.server is not None: - self.server.join() - - def shutdown(self) -> None: - """Shutdown the rpc agent server.""" - if self.server is not None: - if self.stop_event is not None: - self.stop_event.set() - self.stop_event = None - self.server.join() - if self.server.is_alive(): - self.server.kill() - logger.info( - f"Agent server at port [{self.port}] is killed.", - ) - self.server = None - - -class AgentPlatform(RpcAgentServicer): - """A platform for agent to run on (formerly RpcServerSideWrapper)""" - - def __init__( - self, - host: str = "localhost", - port: int = None, - max_pool_size: int = 8192, - max_timeout_seconds: int = 1800, - ): - """Init the AgentPlatform. - - Args: - host (`str`, defaults to "localhost"): - Hostname of the rpc agent server. - port (`int`, defaults to `None`): - Port of the rpc agent server. - max_pool_size (`int`, defaults to `8192`): - The max number of task results that the server can - accommodate. Note that the oldest result will be deleted - after exceeding the pool size. - max_timeout_seconds (`int`, defaults to `1800`): - Timeout for task results. Note that expired results will be - deleted. - """ - self.host = host - self.port = port - self.result_pool = ExpiringDict( - max_len=max_pool_size, - max_age_seconds=max_timeout_seconds, - ) - self.executor = futures.ThreadPoolExecutor(max_workers=None) - self.task_id_lock = threading.Lock() - self.agent_id_lock = threading.Lock() - self.task_id_counter = 0 - self.agent_pool: dict[str, AgentBase] = {} - - def get_task_id(self) -> int: - """Get the auto-increment task id.""" - with self.task_id_lock: - self.task_id_counter += 1 - return self.task_id_counter - - def agent_exists(self, agent_id: str) -> bool: - """Check whether the agent exists. - - Args: - agent_id (`str`): the agent id. - - Returns: - bool: whether the agent exists. - """ - return agent_id in self.agent_pool - - def check_and_generate_agent( - self, - agent_id: str, - agent_configs: dict, - ) -> None: - """ - Check whether the agent exists, and create new agent instance - for new agent. - - Args: - agent_id (`str`): the agent id. - agent_configs (`dict`): configuration used to initialize the agent, - with three fields (generated in `_AgentMeta`): - - .. code-block:: python - - { - "class_name": {name of the agent} - "args": {args in tuple type to init the agent} - "kwargs": {args in dict type to init the agent} - } - - """ - with self.agent_id_lock: - if agent_id not in self.agent_pool: - agent_class_name = agent_configs["class_name"] - agent_instance = AgentBase.get_agent_class(agent_class_name)( - *agent_configs["args"], - **agent_configs["kwargs"], - ) - agent_instance._agent_id = agent_id # pylint: disable=W0212 - self.agent_pool[agent_id] = agent_instance - logger.info(f"create agent instance [{agent_id}]") - - def check_and_delete_agent(self, agent_id: str) -> None: - """ - Check whether the agent exists, and delete the agent instance - for the agent_id. - - Args: - agent_id (`str`): the agent id. - """ - with self.agent_id_lock: - if agent_id in self.agent_pool: - self.agent_pool.pop(agent_id) - logger.info(f"delete agent instance [{agent_id}]") - - def call_func( # pylint: disable=W0236 - self, - request: RpcMsg, - context: ServicerContext, - ) -> RpcMsg: - """Call the specific servicer function.""" - if hasattr(self, request.target_func): - if request.target_func not in ["_create_agent", "_get"]: - if not self.agent_exists(request.agent_id): - return context.abort( - grpc.StatusCode.INVALID_ARGUMENT, - f"Agent [{request.agent_id}] not exists.", - ) - return getattr(self, request.target_func)(request) - else: - # TODO: support other user defined method - logger.error(f"Unsupported method {request.target_func}") - return context.abort( - grpc.StatusCode.INVALID_ARGUMENT, - f"Unsupported method {request.target_func}", - ) - - def _reply(self, request: RpcMsg) -> RpcMsg: - """Call function of RpcAgentService - - Args: - request (`RpcMsg`): - Message containing input parameters or input parameter - placeholders. - - Returns: - `RpcMsg`: A serialized Msg instance with attributes name, host, - port and task_id - """ - if request.value: - msg = deserialize(request.value) - else: - msg = None - task_id = self.get_task_id() - self.result_pool[task_id] = threading.Condition() - self.executor.submit( - self.process_messages, - task_id, - request.agent_id, - msg, # type: ignore[arg-type] - ) - return RpcMsg( - value=Msg( - name=self.agent_pool[request.agent_id].name, - content=None, - task_id=task_id, - ).serialize(), - ) - - def _get(self, request: RpcMsg) -> RpcMsg: - """Get function of RpcAgentService - - Args: - request (`RpcMsg`): - Identifier of message, with json format:: - - { - 'task_id': int - } - - Returns: - `RpcMsg`: Concrete values of the specific message (or part of it). - """ - msg = json.loads(request.value) - while True: - result = self.result_pool.get(msg["task_id"]) - if isinstance(result, threading.Condition): - with result: - result.wait(timeout=1) - else: - break - return RpcMsg(value=result.serialize()) - - def _observe(self, request: RpcMsg) -> RpcMsg: - """Observe function of RpcAgentService - - Args: - request (`RpcMsg`): - The serialized input to be observed. - - Returns: - `RpcMsg`: Empty RpcMsg. - """ - msgs = deserialize(request.value) - for msg in msgs: - if isinstance(msg, PlaceholderMessage): - msg.update_value() - self.agent_pool[request.agent_id].observe(msgs) - return RpcMsg() - - def _create_agent(self, request: RpcMsg) -> RpcMsg: - """Create a new agent instance for the agent_id. - - Args: - request (RpcMsg): request message with a `agent_id` field. - """ - self.check_and_generate_agent( - request.agent_id, - agent_configs=( - dill.loads(base64.b64decode(request.value)) - if request.value - else None - ), - ) - return RpcMsg() - - def _clone_agent(self, request: RpcMsg) -> RpcMsg: - """Clone a new agent instance from the origin instance. - - Args: - request (RpcMsg): The `agent_id` field is the agent_id of the - agent to be cloned. - - Returns: - `RpcMsg`: The `value` field contains the agent_id of generated - agent. - """ - agent_id = request.agent_id - with self.agent_id_lock: - if agent_id not in self.agent_pool: - raise ValueError(f"Agent [{agent_id}] not exists") - ori_agent = self.agent_pool[agent_id] - new_agent = ori_agent.__class__( - *ori_agent._init_settings["args"], # pylint: disable=W0212 - **ori_agent._init_settings["kwargs"], # pylint: disable=W0212 - ) - with self.agent_id_lock: - self.agent_pool[new_agent.agent_id] = new_agent - return RpcMsg(value=new_agent.agent_id) - - def _delete_agent(self, request: RpcMsg) -> RpcMsg: - """Delete the agent instance of the specific sesssion_id. - - Args: - request (RpcMsg): request message with a `agent_id` field. - """ - self.check_and_delete_agent(request.agent_id) - return RpcMsg() - - def process_messages( - self, - task_id: int, - agent_id: str, - task_msg: dict = None, - ) -> None: - """Task processing.""" - if isinstance(task_msg, PlaceholderMessage): - task_msg.update_value() - cond = self.result_pool[task_id] - try: - result = self.agent_pool[agent_id].reply(task_msg) - self.result_pool[task_id] = result - except Exception: - error_msg = traceback.format_exc() - logger.error(f"Error in agent [{agent_id}]:\n{error_msg}") - self.result_pool[task_id] = Msg( - name="ERROR", - role="assistant", - __status="ERROR", - content=f"Error in agent [{agent_id}]:\n{error_msg}", - ) - with cond: - cond.notify_all() diff --git a/src/agentscope/message.py b/src/agentscope/message.py index cd8a0a9a5..372d6e624 100644 --- a/src/agentscope/message.py +++ b/src/agentscope/message.py @@ -426,7 +426,7 @@ def serialize(self) -> str: } -def deserialize(s: str) -> Union[MessageBase, Sequence]: +def deserialize(s: Union[str, bytes]) -> Union[MessageBase, Sequence]: """Deserialize json string into MessageBase""" js_msg = json.loads(s) msg_type = js_msg.pop("__type") diff --git a/src/agentscope/rpc/__init__.py b/src/agentscope/rpc/__init__.py index 2c7703a90..42d3b5fe5 100644 --- a/src/agentscope/rpc/__init__.py +++ b/src/agentscope/rpc/__init__.py @@ -1,6 +1,5 @@ # -*- coding: utf-8 -*- """Import all rpc related modules in the package.""" -from typing import Any from .rpc_agent_client import RpcAgentClient, ResponseStub, call_in_thread try: diff --git a/src/agentscope/server/__init__.py b/src/agentscope/server/__init__.py new file mode 100644 index 000000000..8b69a542a --- /dev/null +++ b/src/agentscope/server/__init__.py @@ -0,0 +1,10 @@ +# -*- coding: utf-8 -*- +"""Import all server related modules in the package.""" +from .launcher import RpcAgentServerLauncher, as_server +from .servicer import AgentServerServicer + +__all__ = [ + "RpcAgentServerLauncher", + "AgentServerServicer", + "as_server", +] diff --git a/src/agentscope/server/launcher.py b/src/agentscope/server/launcher.py new file mode 100644 index 000000000..ed5ed7f67 --- /dev/null +++ b/src/agentscope/server/launcher.py @@ -0,0 +1,449 @@ +# -*- coding: utf-8 -*- +""" Server of distributed agent""" +import os +from multiprocessing import Process, Event, Pipe +from multiprocessing.synchronize import Event as EventClass +import asyncio +import signal +import argparse +from typing import Type +from concurrent import futures +from loguru import logger + +try: + import grpc + from agentscope.rpc.rpc_agent_pb2_grpc import ( + add_RpcAgentServicer_to_server, + ) +except ImportError as import_error: + from agentscope.utils.tools import ImportErrorReporter + + grpc = ImportErrorReporter(import_error, "distribute") + add_RpcAgentServicer_to_server = ImportErrorReporter( + import_error, + "distribute", + ) + +import agentscope +from agentscope.server.servicer import AgentServerServicer +from agentscope.agents.agent import AgentBase +from agentscope.utils.tools import ( + _get_timestamp, + check_port, +) + + +def _setup_agent_server( + host: str, + port: int, + server_id: str, + init_settings: dict = None, + start_event: EventClass = None, + stop_event: EventClass = None, + pipe: int = None, + local_mode: bool = True, + max_pool_size: int = 8192, + max_timeout_seconds: int = 1800, + custom_agents: list = None, +) -> None: + """Setup agent server. + + Args: + host (`str`, defaults to `"localhost"`): + Hostname of the agent server. + port (`int`): + The socket port monitored by the agent server. + server_id (`str`): + The id of the server. + init_settings (`dict`, defaults to `None`): + Init settings for agentscope.init. + start_event (`EventClass`, defaults to `None`): + An Event instance used to determine whether the child process + has been started. + stop_event (`EventClass`, defaults to `None`): + The stop Event instance used to determine whether the child + process has been stopped. + pipe (`int`, defaults to `None`): + A pipe instance used to pass the actual port of the server. + local_mode (`bool`, defaults to `None`): + Only listen to local requests. + max_pool_size (`int`, defaults to `8192`): + Max number of agent replies that the server can accommodate. + max_timeout_seconds (`int`, defaults to `1800`): + Timeout for agent replies. + custom_agents (`list`, defaults to `None`): + A list of custom agent classes that are not in `agentscope.agents`. + """ + asyncio.run( + _setup_agent_server_async( + host=host, + port=port, + server_id=server_id, + init_settings=init_settings, + start_event=start_event, + stop_event=stop_event, + pipe=pipe, + local_mode=local_mode, + max_pool_size=max_pool_size, + max_timeout_seconds=max_timeout_seconds, + custom_agents=custom_agents, + ), + ) + + +async def _setup_agent_server_async( + host: str, + port: int, + server_id: str, + init_settings: dict = None, + start_event: EventClass = None, + stop_event: EventClass = None, + pipe: int = None, + local_mode: bool = True, + max_pool_size: int = 8192, + max_timeout_seconds: int = 1800, + custom_agents: list = None, +) -> None: + """Setup agent server in an async way. + + Args: + host (`str`, defaults to `"localhost"`): + Hostname of the agent server. + port (`int`): + The socket port monitored by the agent server. + server_id (`str`): + The id of the server. + init_settings (`dict`, defaults to `None`): + Init settings for agentscope.init. + start_event (`EventClass`, defaults to `None`): + An Event instance used to determine whether the child process + has been started. + stop_event (`EventClass`, defaults to `None`): + The stop Event instance used to determine whether the child + process has been stopped. + pipe (`int`, defaults to `None`): + A pipe instance used to pass the actual port of the server. + local_mode (`bool`, defaults to `None`): + If `True`, only listen to requests from "localhost", otherwise, + listen to requests from all hosts. + max_pool_size (`int`, defaults to `8192`): + The max number of agent reply messages that the server can + accommodate. Note that the oldest message will be deleted + after exceeding the pool size. + max_timeout_seconds (`int`, defaults to `1800`): + Maximum time for reply messages to be cached in the server. + Note that expired messages will be deleted. + custom_agents (`list`, defaults to `None`): + A list of custom agent classes that are not in `agentscope.agents`. + """ + from agentscope._init import init_process + + if init_settings is not None: + init_process(**init_settings) + servicer = AgentServerServicer( + host=host, + port=port, + max_pool_size=max_pool_size, + max_timeout_seconds=max_timeout_seconds, + ) + # update agent registry + if custom_agents is not None: + for agent_class in custom_agents: + AgentBase.register_agent_class(agent_class=agent_class) + + async def shutdown_signal_handler() -> None: + logger.info( + f"Received shutdown signal. Gracefully stopping the server at " + f"[{host}:{port}].", + ) + await server.stop(grace=5) + + loop = asyncio.get_running_loop() + if os.name != "nt": + # windows does not support add_signal_handler + for sig in (signal.SIGINT, signal.SIGTERM): + loop.add_signal_handler( + sig, + lambda: asyncio.create_task(shutdown_signal_handler()), + ) + while True: + try: + port = check_port(port) + servicer.port = port + server = grpc.aio.server( + futures.ThreadPoolExecutor(max_workers=None), + ) + add_RpcAgentServicer_to_server(servicer, server) + if local_mode: + server.add_insecure_port(f"localhost:{port}") + else: + server.add_insecure_port(f"0.0.0.0:{port}") + await server.start() + break + except OSError: + logger.warning( + f"Failed to start agent server at port [{port}]" + f"try another port", + ) + logger.info( + f"agent server [{server_id}] at {host}:{port} started successfully", + ) + if start_event is not None: + pipe.send(port) + start_event.set() + while not stop_event.is_set(): + await asyncio.sleep(1) + logger.info( + f"Stopping agent server at [{host}:{port}]", + ) + await server.stop(grace=10.0) + else: + await server.wait_for_termination() + logger.info( + f"agent server [{server_id}] at {host}:{port} stopped successfully", + ) + + +class RpcAgentServerLauncher: + """The launcher of AgentServer.""" + + def __init__( + self, + host: str = "localhost", + port: int = None, + max_pool_size: int = 8192, + max_timeout_seconds: int = 1800, + local_mode: bool = False, + custom_agents: list = None, + server_id: str = None, + agent_class: Type[AgentBase] = None, + agent_args: tuple = (), + agent_kwargs: dict = None, + ) -> None: + """Init a launcher of agent server. + + Args: + host (`str`, defaults to `"localhost"`): + Hostname of the agent server. + port (`int`, defaults to `None`): + Socket port of the agent server. + max_pool_size (`int`, defaults to `8192`): + The max number of agent reply messages that the server can + accommodate. Note that the oldest message will be deleted + after exceeding the pool size. + max_timeout_seconds (`int`, defaults to `1800`): + Maximum time for reply messages to be cached in the server. + Note that expired messages will be deleted. + local_mode (`bool`, defaults to `False`): + If `True`, only listen to requests from "localhost", otherwise, + listen to requests from all hosts. + custom_agents (`list`, defaults to `None`): + A list of custom agent classes that are not in + `agentscope.agents`. + server_id (`str`, defaults to `None`): + The id of the agent server. If not specified, a random id + will be generated. + agent_class (`Type[AgentBase]`, deprecated): + The AgentBase subclass encapsulated by this wrapper. + agent_args (`tuple`, deprecated): The args tuple used to + initialize the agent_class. + agent_kwargs (`dict`, deprecated): The args dict used to + initialize the agent_class. + """ + self.host = host + self.port = check_port(port) + self.max_pool_size = max_pool_size + self.max_timeout_seconds = max_timeout_seconds + self.local_mode = local_mode + self.server = None + self.stop_event = None + self.parent_con = None + self.custom_agents = custom_agents + self.server_id = ( + self.generate_server_id() if server_id is None else server_id + ) + if ( + agent_class is not None + or len(agent_args) > 0 + or agent_kwargs is not None + ): + logger.warning( + "`agent_class`, `agent_args` and `agent_kwargs` is deprecated" + " in `RpcAgentServerLauncher`", + ) + + def generate_server_id(self) -> str: + """Generate server id""" + return f"{self.host}:{self.port}-{_get_timestamp('%y%m%d-%H:%M:%S')}" + + def _launch_in_main(self) -> None: + """Launch agent server in main-process""" + logger.info( + f"Launching agent server at [{self.host}:{self.port}]...", + ) + asyncio.run( + _setup_agent_server_async( + host=self.host, + port=self.port, + server_id=self.server_id, + max_pool_size=self.max_pool_size, + max_timeout_seconds=self.max_timeout_seconds, + local_mode=self.local_mode, + custom_agents=self.custom_agents, + ), + ) + + def _launch_in_sub(self) -> None: + """Launch an agent server in sub-process.""" + from agentscope._init import _INIT_SETTINGS + + self.stop_event = Event() + self.parent_con, child_con = Pipe() + start_event = Event() + server_process = Process( + target=_setup_agent_server, + kwargs={ + "host": self.host, + "port": self.port, + "server_id": self.server_id, + "init_settings": _INIT_SETTINGS, + "start_event": start_event, + "stop_event": self.stop_event, + "pipe": child_con, + "max_pool_size": self.max_pool_size, + "max_timeout_seconds": self.max_timeout_seconds, + "local_mode": self.local_mode, + "custom_agents": self.custom_agents, + }, + ) + server_process.start() + self.port = self.parent_con.recv() + start_event.wait() + self.server = server_process + logger.info( + f"Launch agent server at [{self.host}:{self.port}] success", + ) + + def launch(self, in_subprocess: bool = True) -> None: + """launch an agent server. + + Args: + in_subprocess (bool, optional): launch the server in subprocess. + Defaults to True. For agents that need to obtain command line + input, such as UserAgent, please set this value to False. + """ + if in_subprocess: + self._launch_in_sub() + else: + self._launch_in_main() + + def wait_until_terminate(self) -> None: + """Wait for server process""" + if self.server is not None: + self.server.join() + + def shutdown(self) -> None: + """Shutdown the agent server.""" + if self.server is not None: + if self.stop_event is not None: + self.stop_event.set() + self.stop_event = None + self.server.join() + if self.server.is_alive(): + self.server.kill() + logger.info( + f"Agent server at port [{self.port}] is killed.", + ) + self.server = None + + +def as_server() -> None: + """Launch an agent server with terminal command. + + Note: + + The arguments of `as_server` are listed as follows: + + * `--host`: the hostname of the server. + * `--port`: the socket port of the server. + * `--max-pool-size`: max number of agent reply messages that the server + can accommodate. Note that the oldest message will be deleted + after exceeding the pool size. + * `--max-timeout-seconds`: max time for reply messages to be cached + in the server. Note that expired messages will be deleted. + * `--local-mode`: whether the started agent server only listens to + local requests. + * `--model-config-path`: the path to the model config json file + + In most cases, you only need to specify the `--host`, `--port` and + `--model-config-path`. + + .. code-block:: shell + + as_server --host localhost --port 12345 --model-config-path config.json + + """ # noqa + parser = argparse.ArgumentParser() + parser.add_argument( + "--host", + type=str, + default="localhost", + help="hostname of the server", + ) + parser.add_argument( + "--port", + type=int, + default=12310, + help="socket port of the server", + ) + parser.add_argument( + "--max-pool-size", + type=int, + default=8192, + help=( + "max number of agent reply messages that the server " + "can accommodate. Note that the oldest message will be deleted " + "after exceeding the pool size." + ), + ) + parser.add_argument( + "--max-timeout-seconds", + type=int, + default=1800, + help=( + "max time for agent reply messages to be cached" + "in the server. Note that expired messages will be deleted." + ), + ) + parser.add_argument( + "--local-mode", + type=bool, + default=False, + help=( + "If `True`, only listen to requests from 'localhost', otherwise, " + "listen to requests from all hosts." + ), + ) + parser.add_argument( + "--model-config-path", + type=str, + help="path to the model config json file", + ) + args = parser.parse_args() + agentscope.init( + project="agent_server", + name=f"server_{args.host}:{args.port}", + runtime_id=_get_timestamp( + "server_{}_{}_%y%m%d-%H%M%S", + ).format(args.host, args.port), + model_configs=args.model_config_path, + ) + launcher = RpcAgentServerLauncher( + host=args.host, + port=args.port, + max_pool_size=args.max_pool_size, + max_timeout_seconds=args.max_timeout_seconds, + local_mode=args.local_mode, + ) + launcher.launch(in_subprocess=False) + launcher.wait_until_terminate() diff --git a/src/agentscope/server/servicer.py b/src/agentscope/server/servicer.py new file mode 100644 index 000000000..53c63425f --- /dev/null +++ b/src/agentscope/server/servicer.py @@ -0,0 +1,313 @@ +# -*- coding: utf-8 -*- +""" Server of distributed agent""" +import threading +import base64 +import json +import traceback +from concurrent import futures +from loguru import logger + +try: + import dill + import grpc + from grpc import ServicerContext + from expiringdict import ExpiringDict + from ..rpc.rpc_agent_pb2 import RpcMsg # pylint: disable=E0611 + from ..rpc.rpc_agent_pb2_grpc import RpcAgentServicer +except ImportError as import_error: + from agentscope.utils.tools import ImportErrorReporter + + dill = ImportErrorReporter(import_error, "distribute") + grpc = ImportErrorReporter(import_error, "distribute") + ServicerContext = ImportErrorReporter(import_error, "distribute") + ExpiringDict = ImportErrorReporter(import_error, "distribute") + RpcMsg = ImportErrorReporter( # type: ignore[misc] + import_error, + "distribute", + ) + RpcAgentServicer = ImportErrorReporter(import_error, "distribute") + +from ..agents.agent import AgentBase +from ..message import ( + Msg, + PlaceholderMessage, + deserialize, +) + + +class AgentServerServicer(RpcAgentServicer): + """A Servicer for RPC Agent Server (formerly RpcServerSideWrapper)""" + + def __init__( + self, + host: str = "localhost", + port: int = None, + max_pool_size: int = 8192, + max_timeout_seconds: int = 1800, + ): + """Init the AgentServerServicer. + + Args: + host (`str`, defaults to "localhost"): + Hostname of the rpc agent server. + port (`int`, defaults to `None`): + Port of the rpc agent server. + max_pool_size (`int`, defaults to `8192`): + The max number of agent reply messages that the server can + accommodate. Note that the oldest message will be deleted + after exceeding the pool size. + max_timeout_seconds (`int`, defaults to `1800`): + Maximum time for reply messages to be cached in the server. + Note that expired messages will be deleted. + """ + self.host = host + self.port = port + self.result_pool = ExpiringDict( + max_len=max_pool_size, + max_age_seconds=max_timeout_seconds, + ) + self.executor = futures.ThreadPoolExecutor(max_workers=None) + self.task_id_lock = threading.Lock() + self.agent_id_lock = threading.Lock() + self.task_id_counter = 0 + self.agent_pool: dict[str, AgentBase] = {} + + def get_task_id(self) -> int: + """Get the auto-increment task id. + Each reply call will get a unique task id.""" + with self.task_id_lock: + self.task_id_counter += 1 + return self.task_id_counter + + def agent_exists(self, agent_id: str) -> bool: + """Check whether the agent exists. + + Args: + agent_id (`str`): the agent id. + + Returns: + bool: whether the agent exists. + """ + return agent_id in self.agent_pool + + def check_and_generate_agent( + self, + agent_id: str, + agent_configs: dict, + ) -> None: + """ + Check whether the agent exists, and create new agent instance + for new agent. + + Args: + agent_id (`str`): the agent id. + agent_configs (`dict`): configuration used to initialize the agent, + with three fields (generated in `_AgentMeta`): + + .. code-block:: python + + { + "class_name": {name of the agent} + "args": {args in tuple type to init the agent} + "kwargs": {args in dict type to init the agent} + } + + """ + with self.agent_id_lock: + if agent_id not in self.agent_pool: + agent_class_name = agent_configs["class_name"] + agent_instance = AgentBase.get_agent_class(agent_class_name)( + *agent_configs["args"], + **agent_configs["kwargs"], + ) + agent_instance._agent_id = agent_id # pylint: disable=W0212 + self.agent_pool[agent_id] = agent_instance + logger.info(f"create agent instance [{agent_id}]") + + def check_and_delete_agent(self, agent_id: str) -> None: + """ + Check whether the agent exists, and delete the agent instance + for the agent_id. + + Args: + agent_id (`str`): the agent id. + """ + with self.agent_id_lock: + if agent_id in self.agent_pool: + self.agent_pool.pop(agent_id) + logger.info(f"delete agent instance [{agent_id}]") + + def call_func( # pylint: disable=W0236 + self, + request: RpcMsg, + context: ServicerContext, + ) -> RpcMsg: + """Call the specific servicer function.""" + if hasattr(self, request.target_func): + if request.target_func not in ["_create_agent", "_get"]: + if not self.agent_exists(request.agent_id): + return context.abort( + grpc.StatusCode.INVALID_ARGUMENT, + f"Agent [{request.agent_id}] not exists.", + ) + return getattr(self, request.target_func)(request) + else: + # TODO: support other user defined method + logger.error(f"Unsupported method {request.target_func}") + return context.abort( + grpc.StatusCode.INVALID_ARGUMENT, + f"Unsupported method {request.target_func}", + ) + + def _reply(self, request: RpcMsg) -> RpcMsg: + """Call function of RpcAgentService + + Args: + request (`RpcMsg`): + Message containing input parameters or input parameter + placeholders. + + Returns: + `RpcMsg`: A serialized Msg instance with attributes name, host, + port and task_id + """ + if request.value: + msg = deserialize(request.value) + else: + msg = None + task_id = self.get_task_id() + self.result_pool[task_id] = threading.Condition() + self.executor.submit( + self.process_messages, + task_id, + request.agent_id, + msg, # type: ignore[arg-type] + ) + return RpcMsg( + value=Msg( # type: ignore[arg-type] + name=self.agent_pool[request.agent_id].name, + content=None, + task_id=task_id, + ).serialize(), + ) + + def _get(self, request: RpcMsg) -> RpcMsg: + """Get a reply message with specific task_id. + + Args: + request (`RpcMsg`): + The task id that generated this message, with json format:: + + { + 'task_id': int + } + + Returns: + `RpcMsg`: Concrete values of the specific message (or part of it). + """ + msg = json.loads(request.value) + while True: + result = self.result_pool.get(msg["task_id"]) + if isinstance(result, threading.Condition): + with result: + result.wait(timeout=1) + else: + break + return RpcMsg(value=result.serialize()) + + def _observe(self, request: RpcMsg) -> RpcMsg: + """Observe function of the original agent. + + Args: + request (`RpcMsg`): + The serialized input to be observed. + + Returns: + `RpcMsg`: Empty RpcMsg. + """ + msgs = deserialize(request.value) + for msg in msgs: + if isinstance(msg, PlaceholderMessage): + msg.update_value() + self.agent_pool[request.agent_id].observe(msgs) + return RpcMsg() + + def _create_agent(self, request: RpcMsg) -> RpcMsg: + """Create a new agent instance with the given agent_id. + + Args: + request (RpcMsg): request message with a `agent_id` field. + """ + self.check_and_generate_agent( + request.agent_id, + agent_configs=( + dill.loads(base64.b64decode(request.value)) + if request.value + else None + ), + ) + return RpcMsg() + + def _clone_agent(self, request: RpcMsg) -> RpcMsg: + """Clone a new agent instance from the origin instance. + + Args: + request (RpcMsg): The `agent_id` field is the agent_id of the + agent to be cloned. + + Returns: + `RpcMsg`: The `value` field contains the agent_id of generated + agent. + """ + agent_id = request.agent_id + with self.agent_id_lock: + if agent_id not in self.agent_pool: + raise ValueError(f"Agent [{agent_id}] not exists") + ori_agent = self.agent_pool[agent_id] + new_agent = ori_agent.__class__( + *ori_agent._init_settings["args"], # pylint: disable=W0212 + **ori_agent._init_settings["kwargs"], # pylint: disable=W0212 + ) + with self.agent_id_lock: + self.agent_pool[new_agent.agent_id] = new_agent + return RpcMsg(value=new_agent.agent_id) # type: ignore[arg-type] + + def _delete_agent(self, request: RpcMsg) -> RpcMsg: + """Delete the agent instance of the specific agent_id. + + Args: + request (RpcMsg): request message with a `agent_id` field. + """ + self.check_and_delete_agent(request.agent_id) + return RpcMsg() + + def process_messages( + self, + task_id: int, + agent_id: str, + task_msg: dict = None, + ) -> None: + """Processing an input message and generate its reply message. + + Args: + task_id (`int`): task id of the input message, . + agent_id (`str`): the id of the agent that accepted the message. + task_msg (`dict`): the input message. + """ + if isinstance(task_msg, PlaceholderMessage): + task_msg.update_value() + cond = self.result_pool[task_id] + try: + result = self.agent_pool[agent_id].reply(task_msg) + self.result_pool[task_id] = result + except Exception: + error_msg = traceback.format_exc() + logger.error(f"Error in agent [{agent_id}]:\n{error_msg}") + self.result_pool[task_id] = Msg( + name="ERROR", + role="assistant", + __status="ERROR", + content=f"Error in agent [{agent_id}]:\n{error_msg}", + ) + with cond: + cond.notify_all() diff --git a/src/agentscope/utils/tools.py b/src/agentscope/utils/tools.py index 8ebd23777..8888d99e6 100644 --- a/src/agentscope/utils/tools.py +++ b/src/agentscope/utils/tools.py @@ -6,7 +6,8 @@ import os.path import secrets import string -from typing import Any, Literal, List +import socket +from typing import Any, Literal, List, Optional from urllib.parse import urlparse @@ -61,6 +62,42 @@ def to_dialog_str(item: dict) -> str: return f"{speaker}: {content}" +def find_available_port() -> int: + """Get an unoccupied socket port number.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("", 0)) + return s.getsockname()[1] + + +def check_port(port: Optional[int] = None) -> int: + """Check if the port is available. + + Args: + port (`int`): + the port number being checked. + + Returns: + `int`: the port number that passed the check. If the port is found + to be occupied, an available port number will be automatically + returned. + """ + if port is None: + new_port = find_available_port() + logger.warning( + "agent server port is not provided, automatically select " + f"[{new_port}] as the port number.", + ) + return new_port + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + if s.connect_ex(("localhost", port)) == 0: + new_port = find_available_port() + logger.warning( + f"Port [{port}] is occupied, use [{new_port}] instead", + ) + return new_port + return port + + def _guess_type_by_extension( url: str, ) -> Literal["image", "audio", "video", "file"]: diff --git a/tests/rpc_agent_test.py b/tests/rpc_agent_test.py index d010587cd..d70613268 100644 --- a/tests/rpc_agent_test.py +++ b/tests/rpc_agent_test.py @@ -9,7 +9,7 @@ import agentscope from agentscope.agents import AgentBase, DistConf -from agentscope.agents.rpc_agent import RpcAgentServerLauncher +from agentscope.server import RpcAgentServerLauncher from agentscope.message import Msg from agentscope.message import PlaceholderMessage from agentscope.message import deserialize