Skip to content

Commit

Permalink
server support set capacity
Browse files Browse the repository at this point in the history
  • Loading branch information
pan-x-c committed Sep 13, 2024
1 parent 0f98548 commit a8f4c83
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 21 deletions.
6 changes: 3 additions & 3 deletions docs/sphinx_doc/en/source/tutorial/208-distribute.md
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ The `call_agent_func` method is called when the client calls methods or properti

#### `ResultPool`

The implementation of `ResultPool` is located at `src/agentscope/server/async_result_pool.py`. It manages the execution results of asynchronous methods, with two implementations: `local` and `redis`. The `local` implementation uses Python's dictionary type (`dict`), while the `redis` implementation is based on Redis. To avoid excessive memory usage by results, both implementations include an automatic expiration mechanism. The `local` can be set for timeout deletion (`max_timeout`) or deletion when exceeding a certain length (`max_len`), while `redis` only supports timeout deletion (`max_timeout`). When launching the `AgentServerLauncher`, you can specify which implementation to use by passing in `pool_type`, with `local` being the default. If specifying `redis`, you must also pass in `redis_url`. Below are examples of usage through code and command line.
The implementation of `ResultPool` is located at `src/agentscope/server/async_result_pool.py`. It manages the execution results of asynchronous methods, with two implementations: `local` and `redis`. The `local` implementation uses Python's dictionary type (`dict`), while the `redis` implementation is based on Redis. To avoid excessive memory usage by results, both implementations include an automatic expiration mechanism. The `local` can be set for expire time deletion (`max_expire`) or deletion when exceeding a certain length (`max_len`), while `redis` only supports expire time deletion (`max_expire`). When launching the `AgentServerLauncher`, you can specify which implementation to use by passing in `pool_type`, with `local` being the default. If specifying `redis`, you must also pass in `redis_url`. Below are examples of usage through code and command line.

```python
# ...
Expand All @@ -428,12 +428,12 @@ launcher = RpcAgentServerLauncher(
custom_agent_classes=[],
pool_type="redis",
redis_url="redis://localhost:6379",
max_timeout=7200, # 2 hours
max_expire=7200, # 2 hours
)
```

```shell
as_server --host localhost --port 12345 --model-config-path model_config_path --agent-dir parent_dir_of_myagents --pool-type redis --redis-url redis://localhost:6379 --max-timeout 7200
as_server --host localhost --port 12345 --model-config-path model_config_path --agent-dir parent_dir_of_myagents --pool-type redis --redis-url redis://localhost:6379 --max-expire 7200
```

[[Back to the top]](#208-distribute-en)
6 changes: 3 additions & 3 deletions docs/sphinx_doc/zh_CN/source/tutorial/208-distribute.md
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ Server 端主要基于 gRPC 实现,主要包含 `AgentServerServicer` 和 `Rpc

#### `ResultPool`

`ResultPool` 的实现位于 `src/agentscope/server/async_result_pool.py`,用于管理异步方法的执行结果,目前有两种实现分别为 `local``redis`。其中 `local` 基于 Python 的字典类型 (`dict`) 实现,而 `redis` 则是基于 Redis 实现。为了避免结果占用过多内存两种实现都包含了过期自动删除机制,其中 `local` 可以设置超时删除 (`max_timeout`) 或超过条数删除 (`max_len`),而 `redis` 则仅支持超时删除 (`max_timeout`)。
`ResultPool` 的实现位于 `src/agentscope/server/async_result_pool.py`,用于管理异步方法的执行结果,目前有两种实现分别为 `local``redis`。其中 `local` 基于 Python 的字典类型 (`dict`) 实现,而 `redis` 则是基于 Redis 实现。为了避免结果占用过多内存两种实现都包含了过期自动删除机制,其中 `local` 可以设置超时删除 (`max_expire`) 或超过条数删除 (`max_len`),而 `redis` 则仅支持超时删除 (`max_expire`)。
在启动 `AgentServerLauncher` 时可以通过传入 `pool_type` 来指定使用哪种实现,默认为`local`
如果指定为 `redis` 则还必须传入 `redis_url`,如下是代码以及命令行的使用案例。

Expand All @@ -438,12 +438,12 @@ launcher = RpcAgentServerLauncher(
custom_agent_classes=[],
pool_type="redis",
redis_url="redis://localhost:6379",
max_timeout=7200, # 2 hours
max_expire=7200, # 2 hours
)
```

```shell
as_server --host localhost --port 12345 --model-config-path model_config_path --agent-dir parent_dir_of_myagents --pool-type redis --redis-url redis://localhost:6379 --max-timeout 7200
as_server --host localhost --port 12345 --model-config-path model_config_path --agent-dir parent_dir_of_myagents --pool-type redis --redis-url redis://localhost:6379 --max-expire 7200
```

[[回到顶部]](#208-distribute-zh)
6 changes: 5 additions & 1 deletion src/agentscope/rpc/rpc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,11 @@ def update_placeholder(
)
except grpc.RpcError:
# wait for a random time between retries
time.sleep((random.random() + 0.5) * retry_interval)
interval = (random.random() + 0.5) * retry_interval
logger.debug(
f"Update placeholder timeout, retrying after {interval} s...",
)
time.sleep(interval)
retry_interval *= 2
continue
break
Expand Down
6 changes: 3 additions & 3 deletions src/agentscope/server/async_result_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ def get(self, key: int, timeout: int = 5) -> bytes:
class LocalPool(AsyncResultPool):
"""Local pool for storing results."""

def __init__(self, max_len: int, max_timeout: int) -> None:
def __init__(self, max_len: int, max_expire: int) -> None:
self.pool = expiringdict.ExpiringDict(
max_len=max_len,
max_age_seconds=max_timeout,
max_age_seconds=max_expire,
)
self.object_id_cnt = 0
self.object_id_lock = threading.Lock()
Expand Down Expand Up @@ -175,4 +175,4 @@ def get_pool(
if pool_type == "redis":
return RedisPool(url=redis_url, max_expire=max_expire)
else:
return LocalPool(max_len=max_len, max_timeout=max_expire)
return LocalPool(max_len=max_len, max_expire=max_expire)
48 changes: 38 additions & 10 deletions src/agentscope/server/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def _setup_agent_server(
stop_event: EventClass = None,
pipe: int = None,
local_mode: bool = True,
capacity: int = 32,
pool_type: str = "local",
redis_url: str = "redis://localhost:6379",
max_pool_size: int = 8192,
Expand Down Expand Up @@ -71,6 +72,8 @@ def _setup_agent_server(
A pipe instance used to pass the actual port of the server.
local_mode (`bool`, defaults to `True`):
Only listen to local requests.
capacity (`int`, default to `32`):
The number of concurrent agents in the server.
pool-type (`str`, defaults to `"local"`): The type of the async
message pool, which can be `local` or `redis`. If `redis` is
specified, you need to start a redis server before launching
Expand Down Expand Up @@ -104,6 +107,7 @@ def _setup_agent_server(
stop_event=stop_event,
pipe=pipe,
local_mode=local_mode,
capacity=capacity,
pool_type=pool_type,
redis_url=redis_url,
max_pool_size=max_pool_size,
Expand All @@ -125,6 +129,7 @@ async def _setup_agent_server_async( # pylint: disable=R0912
stop_event: EventClass = None,
pipe: int = None,
local_mode: bool = True,
capacity: int = 32,
pool_type: str = "local",
redis_url: str = "redis://localhost:6379",
max_pool_size: int = 8192,
Expand Down Expand Up @@ -153,6 +158,8 @@ async def _setup_agent_server_async( # pylint: disable=R0912
local_mode (`bool`, defaults to `True`):
If `True`, only listen to requests from "localhost", otherwise,
listen to requests from all hosts.
capacity (`int`, default to `32`):
The number of concurrent agents in the server.
pool-type (`str`, defaults to `"local"`): The type of the async
message pool, which can be `local` or `redis`. If `redis` is
specified, you need to start a redis server before launching
Expand Down Expand Up @@ -190,6 +197,7 @@ async def _setup_agent_server_async( # pylint: disable=R0912
port=port,
server_id=server_id,
studio_url=studio_url,
capacity=capacity,
pool_type=pool_type,
redis_url=redis_url,
max_pool_size=max_pool_size,
Expand Down Expand Up @@ -226,7 +234,7 @@ async def shutdown_signal_handler() -> None:
port = _check_port(port)
servicer.port = port
server = grpc.aio.server(
futures.ThreadPoolExecutor(max_workers=None),
futures.ThreadPoolExecutor(max_workers=capacity),
# set max message size to 32 MB
options=_DEFAULT_RPC_OPTIONS,
)
Expand Down Expand Up @@ -325,6 +333,7 @@ def __init__(
self,
host: str = "localhost",
port: int = None,
capacity: int = 32,
pool_type: str = "local",
redis_url: str = "redis://localhost:6379",
max_pool_size: int = 8192,
Expand All @@ -343,6 +352,8 @@ def __init__(
Hostname of the agent server.
port (`int`, defaults to `None`):
Socket port of the agent server.
capacity (`int`, default to `32`):
The number of concurrent agents in the server.
pool-type (`str`, defaults to `"local"`): The type of the async
message pool, which can be `local` or `redis`. If `redis` is
specified, you need to start a redis server before launching
Expand Down Expand Up @@ -374,6 +385,7 @@ def __init__(
"""
self.host = host
self.port = _check_port(port)
self.capacity = capacity
self.pool_type = pool_type
self.redis_url = redis_url
self.max_pool_size = max_pool_size
Expand Down Expand Up @@ -408,6 +420,7 @@ def _launch_in_main(self) -> None:
_setup_agent_server_async(
host=self.host,
port=self.port,
capacity=self.capacity,
stop_event=self.stop_event,
server_id=self.server_id,
pool_type=self.pool_type,
Expand Down Expand Up @@ -510,6 +523,7 @@ def as_server() -> None:
* `--host`: the hostname of the server.
* `--port`: the socket port of the server.
* `--capacity`: the number of concurrent agents in the server.
* `--pool-type`: the type of the async message pool, which can be
`local` or `redis`. If `redis` is specified, you need to start a
redis server before launching the server. Defaults to `local`.
Expand All @@ -518,8 +532,8 @@ def as_server() -> None:
* `--max-pool-size`: max number of agent reply messages that the server
can accommodate. Note that the oldest message will be deleted
after exceeding the pool size.
* `--max-timeout-seconds`: max time for reply messages to be cached
in the server. Note that expired messages will be deleted.
* `--max-expire`: max expire time for async function result.
* `--max-timeout-seconds`: max timeout for rpc call.
* `--local-mode`: whether the started agent server only listens to
local requests.
* `--model-config-path`: the path to the model config json file
Expand Down Expand Up @@ -550,6 +564,15 @@ def as_server() -> None:
default=12310,
help="socket port of the server",
)
parser.add_argument(
"--capacity",
type=int,
default=os.cpu_count(),
help=(
"the number of concurrent agents in the server, exceeding this "
"may cause severe performance degradation or even deadlock."
),
)
parser.add_argument(
"--pool-type",
type=str,
Expand All @@ -568,19 +591,22 @@ def as_server() -> None:
type=int,
default=8192,
help=(
"max number of agent reply messages that the server "
"can accommodate. Note that the oldest message will be deleted "
"the max number of async result that the server "
"can accommodate. Note that the oldest result will be deleted "
"after exceeding the pool size."
),
)
parser.add_argument(
"--max-timeout-seconds",
"--max-expire",
type=int,
default=7200,
help=(
"max time for agent reply messages to be cached"
"in the server. Note that expired messages will be deleted."
),
help="max expire time in second for async results.",
)
parser.add_argument(
"--max-timeout-seconds",
type=int,
default=5,
help="max timeout for rpc call in seconds",
)
parser.add_argument(
"--local-mode",
Expand Down Expand Up @@ -643,10 +669,12 @@ def as_server() -> None:
host=args.host,
port=args.port,
server_id=args.server_id,
capacity=args.capacity,
pool_type=args.pool_type,
redis_url=args.redis_url,
max_pool_size=args.max_pool_size,
max_expire_time=args.max_expire_time,
max_timeout_seconds=args.max_timeout_seconds,
local_mode=args.local_mode,
studio_url=args.studio_url,
)
Expand Down
5 changes: 4 additions & 1 deletion src/agentscope/server/servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def __init__(
port: int = None,
server_id: str = None,
studio_url: str = None,
capacity: int = 32,
pool_type: str = "local",
redis_url: str = "redis://localhost:6379",
max_pool_size: int = 8192,
Expand All @@ -93,6 +94,8 @@ def __init__(
Server id of the rpc agent server.
studio_url (`str`, defaults to `None`):
URL of the AgentScope Studio.
capacity (`int`, default to `32`):
The number of concurrent agents in the servicer.
max_pool_size (`int`, defaults to `8192`):
The max number of async results that the server can
accommodate. Note that the oldest result will be deleted
Expand Down Expand Up @@ -126,7 +129,7 @@ def __init__(
max_len=max_pool_size,
max_expire=max_expire_time,
)
self.executor = futures.ThreadPoolExecutor(max_workers=None)
self.executor = futures.ThreadPoolExecutor(max_workers=capacity)
self.task_id_lock = threading.Lock()
self.agent_id_lock = threading.Lock()
self.task_id_counter = 0
Expand Down

0 comments on commit a8f4c83

Please sign in to comment.