From a8f4c8340e4988be5451cea7669bafb216f350fd Mon Sep 17 00:00:00 2001 From: "panxuchen.pxc" Date: Fri, 13 Sep 2024 12:30:53 +0800 Subject: [PATCH] server support set capacity --- .../en/source/tutorial/208-distribute.md | 6 +-- .../zh_CN/source/tutorial/208-distribute.md | 6 +-- src/agentscope/rpc/rpc_client.py | 6 ++- src/agentscope/server/async_result_pool.py | 6 +-- src/agentscope/server/launcher.py | 48 +++++++++++++++---- src/agentscope/server/servicer.py | 5 +- 6 files changed, 56 insertions(+), 21 deletions(-) diff --git a/docs/sphinx_doc/en/source/tutorial/208-distribute.md b/docs/sphinx_doc/en/source/tutorial/208-distribute.md index 8f48dd0d0..33b639dec 100644 --- a/docs/sphinx_doc/en/source/tutorial/208-distribute.md +++ b/docs/sphinx_doc/en/source/tutorial/208-distribute.md @@ -418,7 +418,7 @@ The `call_agent_func` method is called when the client calls methods or properti #### `ResultPool` -The implementation of `ResultPool` is located at `src/agentscope/server/async_result_pool.py`. It manages the execution results of asynchronous methods, with two implementations: `local` and `redis`. The `local` implementation uses Python's dictionary type (`dict`), while the `redis` implementation is based on Redis. To avoid excessive memory usage by results, both implementations include an automatic expiration mechanism. The `local` can be set for timeout deletion (`max_timeout`) or deletion when exceeding a certain length (`max_len`), while `redis` only supports timeout deletion (`max_timeout`). When launching the `AgentServerLauncher`, you can specify which implementation to use by passing in `pool_type`, with `local` being the default. If specifying `redis`, you must also pass in `redis_url`. Below are examples of usage through code and command line. +The implementation of `ResultPool` is located at `src/agentscope/server/async_result_pool.py`. It manages the execution results of asynchronous methods, with two implementations: `local` and `redis`. The `local` implementation uses Python's dictionary type (`dict`), while the `redis` implementation is based on Redis. To avoid excessive memory usage by results, both implementations include an automatic expiration mechanism. The `local` can be set for expire time deletion (`max_expire`) or deletion when exceeding a certain length (`max_len`), while `redis` only supports expire time deletion (`max_expire`). When launching the `AgentServerLauncher`, you can specify which implementation to use by passing in `pool_type`, with `local` being the default. If specifying `redis`, you must also pass in `redis_url`. Below are examples of usage through code and command line. ```python # ... @@ -428,12 +428,12 @@ launcher = RpcAgentServerLauncher( custom_agent_classes=[], pool_type="redis", redis_url="redis://localhost:6379", - max_timeout=7200, # 2 hours + max_expire=7200, # 2 hours ) ``` ```shell -as_server --host localhost --port 12345 --model-config-path model_config_path --agent-dir parent_dir_of_myagents --pool-type redis --redis-url redis://localhost:6379 --max-timeout 7200 +as_server --host localhost --port 12345 --model-config-path model_config_path --agent-dir parent_dir_of_myagents --pool-type redis --redis-url redis://localhost:6379 --max-expire 7200 ``` [[Back to the top]](#208-distribute-en) diff --git a/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md b/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md index 7bfce429a..5bf0e5a30 100644 --- a/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md +++ b/docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md @@ -426,7 +426,7 @@ Server 端主要基于 gRPC 实现,主要包含 `AgentServerServicer` 和 `Rpc #### `ResultPool` -`ResultPool` 的实现位于 `src/agentscope/server/async_result_pool.py`,用于管理异步方法的执行结果,目前有两种实现分别为 `local` 和 `redis`。其中 `local` 基于 Python 的字典类型 (`dict`) 实现,而 `redis` 则是基于 Redis 实现。为了避免结果占用过多内存两种实现都包含了过期自动删除机制,其中 `local` 可以设置超时删除 (`max_timeout`) 或超过条数删除 (`max_len`),而 `redis` 则仅支持超时删除 (`max_timeout`)。 +`ResultPool` 的实现位于 `src/agentscope/server/async_result_pool.py`,用于管理异步方法的执行结果,目前有两种实现分别为 `local` 和 `redis`。其中 `local` 基于 Python 的字典类型 (`dict`) 实现,而 `redis` 则是基于 Redis 实现。为了避免结果占用过多内存两种实现都包含了过期自动删除机制,其中 `local` 可以设置超时删除 (`max_expire`) 或超过条数删除 (`max_len`),而 `redis` 则仅支持超时删除 (`max_expire`)。 在启动 `AgentServerLauncher` 时可以通过传入 `pool_type` 来指定使用哪种实现,默认为`local`。 如果指定为 `redis` 则还必须传入 `redis_url`,如下是代码以及命令行的使用案例。 @@ -438,12 +438,12 @@ launcher = RpcAgentServerLauncher( custom_agent_classes=[], pool_type="redis", redis_url="redis://localhost:6379", - max_timeout=7200, # 2 hours + max_expire=7200, # 2 hours ) ``` ```shell -as_server --host localhost --port 12345 --model-config-path model_config_path --agent-dir parent_dir_of_myagents --pool-type redis --redis-url redis://localhost:6379 --max-timeout 7200 +as_server --host localhost --port 12345 --model-config-path model_config_path --agent-dir parent_dir_of_myagents --pool-type redis --redis-url redis://localhost:6379 --max-expire 7200 ``` [[回到顶部]](#208-distribute-zh) diff --git a/src/agentscope/rpc/rpc_client.py b/src/agentscope/rpc/rpc_client.py index 6e5e74a28..35016acbc 100644 --- a/src/agentscope/rpc/rpc_client.py +++ b/src/agentscope/rpc/rpc_client.py @@ -248,7 +248,11 @@ def update_placeholder( ) except grpc.RpcError: # wait for a random time between retries - time.sleep((random.random() + 0.5) * retry_interval) + interval = (random.random() + 0.5) * retry_interval + logger.debug( + f"Update placeholder timeout, retrying after {interval} s...", + ) + time.sleep(interval) retry_interval *= 2 continue break diff --git a/src/agentscope/server/async_result_pool.py b/src/agentscope/server/async_result_pool.py index 5b080d087..b78c25a09 100644 --- a/src/agentscope/server/async_result_pool.py +++ b/src/agentscope/server/async_result_pool.py @@ -52,10 +52,10 @@ def get(self, key: int, timeout: int = 5) -> bytes: class LocalPool(AsyncResultPool): """Local pool for storing results.""" - def __init__(self, max_len: int, max_timeout: int) -> None: + def __init__(self, max_len: int, max_expire: int) -> None: self.pool = expiringdict.ExpiringDict( max_len=max_len, - max_age_seconds=max_timeout, + max_age_seconds=max_expire, ) self.object_id_cnt = 0 self.object_id_lock = threading.Lock() @@ -175,4 +175,4 @@ def get_pool( if pool_type == "redis": return RedisPool(url=redis_url, max_expire=max_expire) else: - return LocalPool(max_len=max_len, max_timeout=max_expire) + return LocalPool(max_len=max_len, max_expire=max_expire) diff --git a/src/agentscope/server/launcher.py b/src/agentscope/server/launcher.py index c36e81630..0c854f70b 100644 --- a/src/agentscope/server/launcher.py +++ b/src/agentscope/server/launcher.py @@ -41,6 +41,7 @@ def _setup_agent_server( stop_event: EventClass = None, pipe: int = None, local_mode: bool = True, + capacity: int = 32, pool_type: str = "local", redis_url: str = "redis://localhost:6379", max_pool_size: int = 8192, @@ -71,6 +72,8 @@ def _setup_agent_server( A pipe instance used to pass the actual port of the server. local_mode (`bool`, defaults to `True`): Only listen to local requests. + capacity (`int`, default to `32`): + The number of concurrent agents in the server. pool-type (`str`, defaults to `"local"`): The type of the async message pool, which can be `local` or `redis`. If `redis` is specified, you need to start a redis server before launching @@ -104,6 +107,7 @@ def _setup_agent_server( stop_event=stop_event, pipe=pipe, local_mode=local_mode, + capacity=capacity, pool_type=pool_type, redis_url=redis_url, max_pool_size=max_pool_size, @@ -125,6 +129,7 @@ async def _setup_agent_server_async( # pylint: disable=R0912 stop_event: EventClass = None, pipe: int = None, local_mode: bool = True, + capacity: int = 32, pool_type: str = "local", redis_url: str = "redis://localhost:6379", max_pool_size: int = 8192, @@ -153,6 +158,8 @@ async def _setup_agent_server_async( # pylint: disable=R0912 local_mode (`bool`, defaults to `True`): If `True`, only listen to requests from "localhost", otherwise, listen to requests from all hosts. + capacity (`int`, default to `32`): + The number of concurrent agents in the server. pool-type (`str`, defaults to `"local"`): The type of the async message pool, which can be `local` or `redis`. If `redis` is specified, you need to start a redis server before launching @@ -190,6 +197,7 @@ async def _setup_agent_server_async( # pylint: disable=R0912 port=port, server_id=server_id, studio_url=studio_url, + capacity=capacity, pool_type=pool_type, redis_url=redis_url, max_pool_size=max_pool_size, @@ -226,7 +234,7 @@ async def shutdown_signal_handler() -> None: port = _check_port(port) servicer.port = port server = grpc.aio.server( - futures.ThreadPoolExecutor(max_workers=None), + futures.ThreadPoolExecutor(max_workers=capacity), # set max message size to 32 MB options=_DEFAULT_RPC_OPTIONS, ) @@ -325,6 +333,7 @@ def __init__( self, host: str = "localhost", port: int = None, + capacity: int = 32, pool_type: str = "local", redis_url: str = "redis://localhost:6379", max_pool_size: int = 8192, @@ -343,6 +352,8 @@ def __init__( Hostname of the agent server. port (`int`, defaults to `None`): Socket port of the agent server. + capacity (`int`, default to `32`): + The number of concurrent agents in the server. pool-type (`str`, defaults to `"local"`): The type of the async message pool, which can be `local` or `redis`. If `redis` is specified, you need to start a redis server before launching @@ -374,6 +385,7 @@ def __init__( """ self.host = host self.port = _check_port(port) + self.capacity = capacity self.pool_type = pool_type self.redis_url = redis_url self.max_pool_size = max_pool_size @@ -408,6 +420,7 @@ def _launch_in_main(self) -> None: _setup_agent_server_async( host=self.host, port=self.port, + capacity=self.capacity, stop_event=self.stop_event, server_id=self.server_id, pool_type=self.pool_type, @@ -510,6 +523,7 @@ def as_server() -> None: * `--host`: the hostname of the server. * `--port`: the socket port of the server. + * `--capacity`: the number of concurrent agents in the server. * `--pool-type`: the type of the async message pool, which can be `local` or `redis`. If `redis` is specified, you need to start a redis server before launching the server. Defaults to `local`. @@ -518,8 +532,8 @@ def as_server() -> None: * `--max-pool-size`: max number of agent reply messages that the server can accommodate. Note that the oldest message will be deleted after exceeding the pool size. - * `--max-timeout-seconds`: max time for reply messages to be cached - in the server. Note that expired messages will be deleted. + * `--max-expire`: max expire time for async function result. + * `--max-timeout-seconds`: max timeout for rpc call. * `--local-mode`: whether the started agent server only listens to local requests. * `--model-config-path`: the path to the model config json file @@ -550,6 +564,15 @@ def as_server() -> None: default=12310, help="socket port of the server", ) + parser.add_argument( + "--capacity", + type=int, + default=os.cpu_count(), + help=( + "the number of concurrent agents in the server, exceeding this " + "may cause severe performance degradation or even deadlock." + ), + ) parser.add_argument( "--pool-type", type=str, @@ -568,19 +591,22 @@ def as_server() -> None: type=int, default=8192, help=( - "max number of agent reply messages that the server " - "can accommodate. Note that the oldest message will be deleted " + "the max number of async result that the server " + "can accommodate. Note that the oldest result will be deleted " "after exceeding the pool size." ), ) parser.add_argument( - "--max-timeout-seconds", + "--max-expire", type=int, default=7200, - help=( - "max time for agent reply messages to be cached" - "in the server. Note that expired messages will be deleted." - ), + help="max expire time in second for async results.", + ) + parser.add_argument( + "--max-timeout-seconds", + type=int, + default=5, + help="max timeout for rpc call in seconds", ) parser.add_argument( "--local-mode", @@ -643,10 +669,12 @@ def as_server() -> None: host=args.host, port=args.port, server_id=args.server_id, + capacity=args.capacity, pool_type=args.pool_type, redis_url=args.redis_url, max_pool_size=args.max_pool_size, max_expire_time=args.max_expire_time, + max_timeout_seconds=args.max_timeout_seconds, local_mode=args.local_mode, studio_url=args.studio_url, ) diff --git a/src/agentscope/server/servicer.py b/src/agentscope/server/servicer.py index 570b7217f..dbd48b4e9 100644 --- a/src/agentscope/server/servicer.py +++ b/src/agentscope/server/servicer.py @@ -75,6 +75,7 @@ def __init__( port: int = None, server_id: str = None, studio_url: str = None, + capacity: int = 32, pool_type: str = "local", redis_url: str = "redis://localhost:6379", max_pool_size: int = 8192, @@ -93,6 +94,8 @@ def __init__( Server id of the rpc agent server. studio_url (`str`, defaults to `None`): URL of the AgentScope Studio. + capacity (`int`, default to `32`): + The number of concurrent agents in the servicer. max_pool_size (`int`, defaults to `8192`): The max number of async results that the server can accommodate. Note that the oldest result will be deleted @@ -126,7 +129,7 @@ def __init__( max_len=max_pool_size, max_expire=max_expire_time, ) - self.executor = futures.ThreadPoolExecutor(max_workers=None) + self.executor = futures.ThreadPoolExecutor(max_workers=capacity) self.task_id_lock = threading.Lock() self.agent_id_lock = threading.Lock() self.task_id_counter = 0