Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize launcher of RPCAgent #47

Merged
merged 4 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 40 additions & 8 deletions src/agentscope/agents/rpc_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,12 @@ def setup_rcp_agent_server(
f"rpc server [{agent_class.__name__}] at port [{port}] started "
"successfully",
)
pipe.send(port)
start_event.set()
stop_event.wait()
if start_event is not None:
pipe.send(port)
start_event.set()
stop_event.wait()
else:
server.wait_for_termination()
logger.info(
f"Stopping rpc server [{agent_class.__name__}] at port [{port}]",
)
Expand Down Expand Up @@ -335,13 +338,35 @@ def __init__(
self.port = check_port(port)
self.max_pool_size = max_pool_size
self.max_timeout_seconds = max_timeout_seconds
self.local_model = local_mode
self.local_mode = local_mode
self.server = None
self.stop_event = None
self.parent_con = None

def launch(self) -> None:
"""launch a local rpc agent server."""
def _launch_in_main(self) -> None:
"""Launch gRPC server in main-process"""
server_thread = threading.Thread(
target=setup_rcp_agent_server,
kwargs={
"agent_class": self.agent_class,
"agent_args": self.agent_args,
"agent_kwargs": self.agent_kwargs,
"host": self.host,
"port": self.port,
"max_pool_size": self.max_pool_size,
"max_timeout_seconds": self.max_timeout_seconds,
"local_mode": self.local_mode,
},
)
server_thread.start()
logger.info(
f"Launch [{self.agent_class.__name__}] server at "
f"[{self.host}:{self.port}] success",
)
server_thread.join()

def _launch_in_sub(self) -> None:
"""Launch gRPC server in sub-process."""
self.stop_event = Event()
self.parent_con, child_con = Pipe()
start_event = Event()
Expand All @@ -359,7 +384,7 @@ def launch(self) -> None:
"pipe": child_con,
"max_pool_size": self.max_pool_size,
"max_timeout_seconds": self.max_timeout_seconds,
"local_mode": self.local_model,
"local_mode": self.local_mode,
},
)
server_process.start()
Expand All @@ -371,6 +396,13 @@ def launch(self) -> None:
f"[{self.host}:{self.port}] success",
)

def launch(self, in_subprocess: bool = True) -> None:
"""launch a local rpc agent server."""
if in_subprocess:
self._launch_in_sub()
else:
self._launch_in_main()

def wait_until_terminate(self) -> None:
"""Wait for server process"""
if self.server is not None:
Expand All @@ -382,8 +414,8 @@ def shutdown(self) -> None:
if self.stop_event is not None:
self.stop_event.set()
self.stop_event = None
self.server.terminate()
self.server.join(timeout=5)
self.server.terminate()
if self.server.is_alive():
self.server.kill()
logger.info(
Expand Down
6 changes: 2 additions & 4 deletions src/agentscope/models/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,8 @@ def __init__(self, config_name: str, **kwargs: Any) -> None:
from the config file.
"""
self.config_name = config_name
logger.info(
f"Initialize model [{config_name}] with config:\n"
f"{json.dumps(kwargs, indent=2)}",
)
logger.info(f"Initialize model [{config_name}]")
logger.debug(f"[{config_name}]:\n {json.dumps(kwargs, indent=2)}")

def __call__(self, *args: Any, **kwargs: Any) -> ModelResponse:
"""Processing input with the model."""
Expand Down