diff --git a/.github/workflows/ci-format.yml b/.github/workflows/ci-format.yml index 693dacf251..9581faba1d 100644 --- a/.github/workflows/ci-format.yml +++ b/.github/workflows/ci-format.yml @@ -34,7 +34,7 @@ jobs: -e missing-function-docstring \ -e missing-class-docstring \ -s n \ - --ignore examples,scenarios,docs,manager_pb2_grpc.py,worker_pb2_grpc.py \ + --ignore examples,scenarios,docs \ --msg-template='{path}: line {line}: {msg_id}: {msg}' \ ./smarts ./envision ./baselines diff --git a/CHANGELOG.md b/CHANGELOG.md index d9d8e3ca67..2fea1abd55 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -60,6 +60,8 @@ Copy and pasting the git commit messages is __NOT__ enough. - Removed `examples/rl/racing` as base repository `danijar/dreamerv2` is not updated anymore and this example will be superseded by `examples/rl/drive`. - Removed `FrameStack` environment wrapper. - Removed `SMARTS.traffic_sim` property. +- Removed remote agent modules. +- Removed `protobuf` as a core package dependency. ### Security ## [1.1.0] # 2023-04-28 diff --git a/docs/setup.rst b/docs/setup.rst index 42f09c1935..483ceeac0c 100644 --- a/docs/setup.rst +++ b/docs/setup.rst @@ -60,9 +60,9 @@ Run the following commands to setup the SMARTS simulator. # Install smarts with extras as needed. Extras include the following: # `camera_obs` - needed for rendering camera observations, and for testing. + # `sumo` - needed for using SUMO scenarios. # `test` - needed for testing. - # `train` - needed for RL training and testing. - $ pip install -e '.[camera_obs,test,train,sumo]' + $ pip install -e .[camera_obs,sumo,test] # Run sanity-test and verify they are passing. # If tests fail, check './sanity_test_result.xml' for test report. diff --git a/setup.cfg b/setup.cfg index 6303da0711..2364a6b99c 100644 --- a/setup.cfg +++ b/setup.cfg @@ -38,7 +38,6 @@ install_requires = # The following are planned to be made optional gymnasium==0.27.0 pybullet>=3,<4.0 - protobuf>=3.17.3,<4.0.0 # for /smarts/zoo and remote agents # The following are planned for removal gym>=0.17.3,<=0.19.0 diff --git a/smarts/core/agent_manager.py b/smarts/core/agent_manager.py index 7f705b9661..1c29a51fae 100644 --- a/smarts/core/agent_manager.py +++ b/smarts/core/agent_manager.py @@ -28,11 +28,10 @@ from smarts.core.agent_interface import AgentInterface from smarts.core.bubble_manager import BubbleManager from smarts.core.data_model import SocialAgent -from smarts.core.heterogenous_agent_buffer import HeterogenousAgentBuffer +from smarts.core.local_agent_buffer import LocalAgentBuffer from smarts.core.observations import Observation -from smarts.core.plan import Mission, Plan, PositionalGoal +from smarts.core.plan import Plan from smarts.core.sensor_manager import SensorManager -from smarts.core.sensors import Sensors from smarts.core.utils.id import SocialAgentId from smarts.core.vehicle_state import VehicleState from smarts.sstudio.types.actor.social_agent_actor import SocialAgentActor @@ -47,7 +46,7 @@ class AgentManager: time. """ - def __init__(self, sim, interfaces, zoo_addrs=None): + def __init__(self, sim, interfaces): from smarts.core.vehicle_index import VehicleIndex self._log = logging.getLogger(self.__class__.__name__) @@ -55,7 +54,6 @@ def __init__(self, sim, interfaces, zoo_addrs=None): self._vehicle_index: VehicleIndex = sim.vehicle_index self._sensor_manager: SensorManager = sim.sensor_manager self._agent_buffer = None - self._zoo_addrs = zoo_addrs self._ego_agent_ids = set() self._social_agent_ids = set() @@ -500,9 +498,7 @@ def init_ego_agents(self): def _setup_agent_buffer(self): if not self._agent_buffer: - self._agent_buffer = HeterogenousAgentBuffer( - zoo_manager_addrs=self._zoo_addrs - ) + self._agent_buffer = LocalAgentBuffer() def _setup_social_agents(self): """Initialize all social agents.""" diff --git a/smarts/core/heterogenous_agent_buffer.py b/smarts/core/heterogenous_agent_buffer.py deleted file mode 100644 index 4e9711eec4..0000000000 --- a/smarts/core/heterogenous_agent_buffer.py +++ /dev/null @@ -1,47 +0,0 @@ -# Copyright (C) 2020. Huawei Technologies Co., Ltd. All rights reserved. -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. -from typing import Optional - -from smarts.core.agent_buffer import AgentBuffer -from smarts.core.buffer_agent import BufferAgent - - -class HeterogenousAgentBuffer(AgentBuffer): - """A buffer that manages social agents.""" - - def __init__(self, **kwargs): - if kwargs.get("zoo_manager_addrs"): - from smarts.core.remote_agent_buffer import RemoteAgentBuffer - - self._agent_buffer = RemoteAgentBuffer( - zoo_manager_addrs=kwargs.get("zoo_manager_addrs") - ) - else: - from smarts.core.local_agent_buffer import LocalAgentBuffer - - self._agent_buffer = LocalAgentBuffer() - - def destroy(self): - self._agent_buffer.destroy() - - def acquire_agent( - self, retries: int = 3, timeout: Optional[float] = None - ) -> BufferAgent: - return self._agent_buffer.acquire_agent(retries=retries, timeout=timeout) diff --git a/smarts/core/local_agent_buffer.py b/smarts/core/local_agent_buffer.py index 39479544b3..289c0aaf44 100644 --- a/smarts/core/local_agent_buffer.py +++ b/smarts/core/local_agent_buffer.py @@ -17,11 +17,8 @@ # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. -from concurrent.futures import ProcessPoolExecutor from typing import Optional -import psutil - from smarts.core.agent_buffer import AgentBuffer from smarts.core.buffer_agent import BufferAgent from smarts.core.local_agent import LocalAgent diff --git a/smarts/core/remote_agent.py b/smarts/core/remote_agent.py deleted file mode 100644 index c9697edbe5..0000000000 --- a/smarts/core/remote_agent.py +++ /dev/null @@ -1,133 +0,0 @@ -# Copyright (C) 2020. Huawei Technologies Co., Ltd. All rights reserved. -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. -import logging -from functools import wraps -from typing import Tuple - -import cloudpickle -import grpc - -from smarts.core.buffer_agent import BufferAgent -from smarts.zoo import manager_pb2, manager_pb2_grpc, worker_pb2, worker_pb2_grpc -from smarts.zoo.agent_spec import AgentSpec - - -class RemoteAgentException(Exception): - """An exception describing issues relating to maintaining connection with a remote agent.""" - - pass - - -class RemoteAgent(BufferAgent): - """A remotely controlled agent.""" - - def __init__( - self, - manager_address: Tuple[str, int], - worker_address: Tuple[str, int], - timeout: float = 10, - ): - """Executes an agent in a worker (i.e., a gRPC server). - - Args: - manager_address (Tuple[str,int]): Manager's server address (ip, port). - worker_address (Tuple[str,int]): Worker's server address (ip, port). - timeout (float, optional): Time (seconds) to wait for startup or response from - server. Defaults to 10. - - Raises: - RemoteAgentException: If timeout occurs while connecting to the manager or worker. - """ - self._log = logging.getLogger(self.__class__.__name__) - - # Track the last action future. - self._grpc_future = None - - self._manager_channel = grpc.insecure_channel( - f"{manager_address[0]}:{manager_address[1]}" - ) - self._worker_address = worker_address - self._worker_channel = grpc.insecure_channel( - f"{worker_address[0]}:{worker_address[1]}" - ) - try: - # Wait until the grpc server is ready or timeout seconds. - grpc.channel_ready_future(self._manager_channel).result(timeout=timeout) - grpc.channel_ready_future(self._worker_channel).result(timeout=timeout) - except grpc.FutureTimeoutError as e: - raise RemoteAgentException( - "Timeout while connecting to remote worker process." - ) from e - self._manager_stub = manager_pb2_grpc.ManagerStub(self._manager_channel) - self._worker_stub = worker_pb2_grpc.WorkerStub(self._worker_channel) - - def act(self, obs): - """Call the agent's act function asynchronously and return a Future.""" - self._grpc_future = self._worker_stub.act.future( - worker_pb2.Observation(payload=cloudpickle.dumps(obs)) - ) - - def result_wrapper(f): - @wraps(f) - def wrapper(): - action = cloudpickle.loads(f().action) - return action - - return wrapper - - setattr(self._grpc_future, "result", result_wrapper(self._grpc_future.result)) - - return self._grpc_future - - def start(self, agent_spec: AgentSpec): - """Send the AgentSpec to the agent runner.""" - # Cloudpickle used only for the agent_spec to allow for serialization of lambdas. - self._worker_stub.build( - worker_pb2.Specification(payload=cloudpickle.dumps(agent_spec)) - ) - - def terminate(self): - """Close the agent connection and invalidate this agent.""" - # If the last action future returned is incomplete, cancel it first. - if (self._grpc_future is not None) and (not self._grpc_future.done()): - self._grpc_future.cancel() - - try: - # Stop the remote worker process - self._manager_stub.stop_worker( - manager_pb2.Port(num=self._worker_address[1]) - ) - except grpc.RpcError as e: - if e.code() == grpc.StatusCode.UNAVAILABLE: - # Do nothing as RPC server has been terminated. - pass - elif ( - e.code() == grpc.StatusCode.INVALID_ARGUMENT - and "nonexistent worker" in e.details() - ): - pass - else: - raise e - finally: - try: - # Close manager channel - self._manager_channel.close() - except: - pass diff --git a/smarts/core/remote_agent_buffer.py b/smarts/core/remote_agent_buffer.py deleted file mode 100644 index 96120ada6c..0000000000 --- a/smarts/core/remote_agent_buffer.py +++ /dev/null @@ -1,271 +0,0 @@ -# Copyright (C) 2020. Huawei Technologies Co., Ltd. All rights reserved. -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. - -import logging -import pathlib -import random -import signal -import subprocess -import sys -import time -from concurrent import futures -from typing import Optional, Tuple - -import grpc - -from smarts.core.agent_buffer import AgentBuffer -from smarts.core.buffer_agent import BufferAgent -from smarts.core.remote_agent import RemoteAgent, RemoteAgentException -from smarts.core.utils.networking import find_free_port -from smarts.zoo import manager_pb2, manager_pb2_grpc - - -class RemoteAgentBuffer(AgentBuffer): - """A buffer that manages social agents.""" - - def __init__( - self, - zoo_manager_addrs: Optional[Tuple[str, int]] = None, - buffer_size: int = 3, - max_workers: int = 4, - timeout: float = 10, - ): - """Creates a local manager (if `zoo_manager_addrs=None`) or connects to a remote manager, to create and manage workers - which execute agents. - - Args: - zoo_manager_addrs (Optional[Tuple[str,int]], optional): List of (ip, port) tuples for manager processes. If none - is provided, a manager is spawned in localhost. Defaults to None. - buffer_size (int, optional): Number of RemoteAgents to pre-initialize and keep running in the background, must be - non-zero. Defaults to 3. - max_workers (int, optional): Maximum number of threads used for creation of workers. Defaults to 4. - timeout (float, optional): Time (seconds) to wait for startup or response from server. Defaults to 10. - """ - assert buffer_size > 0 - - self._log = logging.getLogger(self.__class__.__name__) - self._timeout = timeout - - # self._zoo_manager_conns is a list of dictionaries. - # Each dictionary provides connection info for a zoo manager. - # Example: - # [ - # {"address": ("127.0.0.1", 7432)), - # "process": , - # "channel": , - # "stub" : - # }, - # { - # ... - # } - # ... - # ] - self._zoo_manager_conns = [] - self._local_zoo_manager = False - - # Populate zoo manager connection with address and process handles. - if not zoo_manager_addrs: - # Spawn a local zoo manager since no remote zoo managers were provided. - self._local_zoo_manager = True - port = find_free_port() - self._zoo_manager_conns = [ - { - "address": ("localhost", port), - "process": spawn_local_zoo_manager(port), - } - ] - else: - self._zoo_manager_conns = [{"address": addr} for addr in zoo_manager_addrs] - - # Populate zoo manager connection with channel and stub details. - for conn in self._zoo_manager_conns: - conn["channel"], conn["stub"] = get_manager_channel_stub( - conn["address"], self._timeout - ) - - self._buffer_size = buffer_size - self._replenish_threadpool = futures.ThreadPoolExecutor(max_workers=max_workers) - self._agent_buffer = [ - self._remote_agent_future() for _ in range(self._buffer_size) - ] - - # Catch abrupt terminate signals - signal.signal(signal.SIGTERM, self._stop_servers) - - def _stop_servers(self, *args): - self.destroy() - self._log.debug( - f"Shutting down zoo manager and zoo workers due to abrupt process stop." - ) - sys.exit(0) - - def destroy(self): - """Tear-down any remaining remote agents and the local zoo manager (if it exists.)""" - for remote_agent_future in self._agent_buffer: - try: - remote_agent = remote_agent_future.result() - remote_agent.terminate() - except Exception as e: - self._log.error( - f"Exception while tearing down buffered remote agent. {repr(e)}" - ) - raise e - - # If available, teardown local zoo manager. - if self._local_zoo_manager: - self._zoo_manager_conns[0]["channel"].close() - self._zoo_manager_conns[0]["process"].terminate() - self._zoo_manager_conns[0]["process"].wait() - - def _build_remote_agent(self, zoo_manager_conns): - # Get a random zoo manager connection. - zoo_manager_conn = random.choice(zoo_manager_conns) - - # Spawn remote worker and get its port. - retries = 3 - worker_port = None - for retry in range(retries): - try: - response = zoo_manager_conn["stub"].spawn_worker(manager_pb2.Machine()) - worker_port = response.num - break - except grpc.RpcError as e: - self._log.debug( - f"Failed {retry+1}/{retries} times in attempt to spawn a remote worker process. {e}" - ) - - if worker_port == None: - raise RemoteAgentException( - "Remote worker process could not be spawned by the zoo manager." - ) - - # Instantiate and return a local RemoteAgent. - return RemoteAgent( - zoo_manager_conn["address"], (zoo_manager_conn["address"][0], worker_port) - ) - - def _remote_agent_future(self): - return self._replenish_threadpool.submit( - self._build_remote_agent, self._zoo_manager_conns - ) - - def _try_to_acquire_remote_agent(self, timeout: float): - assert len(self._agent_buffer) == self._buffer_size - - # Check if we have any done remote agent futures. - done_future_indices = [ - idx - for idx, agent_future in enumerate(self._agent_buffer) - if agent_future.done() - ] - - if len(done_future_indices) > 0: - # If so, prefer one of these done ones to avoid sim delays. - future = self._agent_buffer.pop(done_future_indices[0]) - else: - # Otherwise, we will block, waiting on a remote agent future. - self._log.debug( - "No ready remote agents, simulation will block until one is available." - ) - future = self._agent_buffer.pop(0) - - # Schedule the next remote agent and add it to the buffer. - self._agent_buffer.append(self._remote_agent_future()) - - remote_agent = future.result(timeout=timeout) - return remote_agent - - def acquire_agent( - self, retries: int = 3, timeout: Optional[float] = None - ) -> BufferAgent: - """Creates RemoteAgent objects. - - Args: - retries (int, optional): Number of attempts in creating or connecting to an available - RemoteAgent. Defaults to 3. - timeout (Optional[float], optional): Time (seconds) to wait in acquiring a RemoteAgent. - Defaults to None, which does not timeout. - - Raises: - RemoteAgentException: If fail to acquire a RemoteAgent. - - Returns: - RemoteAgent: A new RemoteAgent object. - """ - if timeout == None: - timeout = self._timeout - - for retry in range(retries): - try: - return self._try_to_acquire_remote_agent(timeout) - except Exception as e: - self._log.debug( - f"Failed {retry+1}/{retries} times in acquiring remote agent. {repr(e)}" - ) - time.sleep(0.1) - - raise RemoteAgentException("Failed to acquire remote agent.") - - -def spawn_local_zoo_manager(port): - """Generates a local manager subprocess.""" - cmd = [ - sys.executable, # Path to the current Python binary. - str( - (pathlib.Path(__file__).parent.parent / "zoo" / "manager.py") - .absolute() - .resolve() - ), - "--port", - str(port), - ] - - manager = subprocess.Popen(cmd) - if manager.poll() == None: - return manager - - raise RuntimeError("Zoo manager subprocess is not running.") - - -def get_manager_channel_stub(addr: Tuple[str, int], timeout: float = 10): - """Connects to the `gRPC` server at `addr` and returns the channel and stub. - - .. spelling:word-list:: - - grpc - - Args: - addr (Tuple[str,int]): `gRPC` server address. - timeout (float, optional): Time to wait for the `gRPC` server to be ready. Defaults to 10. - - Raises: - RemoteAgentException: If timeout occurs while connecting to the `gRPC` server. - - Returns: - grpc.Channel: Channel to the `gRPC` server. - :spelling:ignore:`manager_pb2_grpc.ManagerStub`: `gRPC` stub. - """ - channel = grpc.insecure_channel(f"{addr[0]}:{addr[1]}") - try: - grpc.channel_ready_future(channel).result(timeout=timeout) - except grpc.FutureTimeoutError: - raise RemoteAgentException("Timeout in connecting to remote zoo manager.") - stub = manager_pb2_grpc.ManagerStub(channel) - return channel, stub diff --git a/smarts/core/smarts.py b/smarts/core/smarts.py index 0680c1525a..eb99f4e6b1 100644 --- a/smarts/core/smarts.py +++ b/smarts/core/smarts.py @@ -110,7 +110,6 @@ class SMARTS(ProviderManager): visdom (Union[bool, Any], optional): Deprecated. Use SMARTS_VISDOM_ENABLED. A visdom client for connecting to a visdom visualization server. fixed_timestep_sec (Optional[float], optional): The fixed timestep that will be default if time is not otherwise specified at step. Defaults to 0.1. reset_agents_only (bool, optional): When specified the simulation will continue use of the current scenario. Defaults to False. - zoo_addrs (Optional[Tuple[str, int]], optional): The `{ip:port}` values of remote agent workers for externally hosted agents. Defaults to None. external_provider (bool, optional): Creates a special provider `SMARTS.external_provider` that allows for inserting state. Defaults to False. """ @@ -124,7 +123,6 @@ def __init__( visdom: Optional[Union[bool, Any]] = False, fixed_timestep_sec: Optional[float] = 0.1, reset_agents_only: bool = False, - zoo_addrs: Optional[Tuple[str, int]] = None, external_provider: bool = False, ): conf = config() @@ -214,7 +212,7 @@ def __init__( # Set up indices self._sensor_manager = SensorManager() self._vehicle_index = VehicleIndex() - self._agent_manager = AgentManager(self, agent_interfaces, zoo_addrs) + self._agent_manager = AgentManager(self, agent_interfaces) # TODO: Should not be stored in SMARTS self._vehicle_collisions: Dict[str, List[Collision]] = dict() diff --git a/smarts/env/gymnasium/hiway_env_v1.py b/smarts/env/gymnasium/hiway_env_v1.py index 7eaae5c434..1b749533de 100644 --- a/smarts/env/gymnasium/hiway_env_v1.py +++ b/smarts/env/gymnasium/hiway_env_v1.py @@ -126,9 +126,6 @@ class HiWayEnvV1(gym.Env): visualization_client_builder: A method that must must construct an object that follows the Envision interface. Allows tapping into a direct data stream from the simulation. - zoo_addrs (str, optional): List of (`ip`, `port`) tuples of - zoo server, used to instantiate remote social agents. Defaults - to None. observation_options (ObservationOptions, str): Defines the options for how the formatting matches the observation space. String version can be used instead. See :class:`~smarts.env.utils.observation_conversion.ObservationOptions`. Defaults to @@ -170,7 +167,6 @@ def __init__( seed: int = 42, sumo_options: Union[Dict[str, Any], SumoOptions] = SumoOptions(), visualization_client_builder: partial = DEFAULT_VISUALIZATION_CLIENT_BUILDER, - zoo_addrs: Optional[str] = None, observation_options: Union[ ObservationOptions, str ] = ObservationOptions.default, @@ -244,7 +240,6 @@ def __init__( envision=visualization_client, visdom=visdom, fixed_timestep_sec=fixed_timestep_sec, - zoo_addrs=zoo_addrs, ) def step( diff --git a/smarts/env/hiway_env.py b/smarts/env/hiway_env.py index 313c810154..d8c74222f4 100644 --- a/smarts/env/hiway_env.py +++ b/smarts/env/hiway_env.py @@ -71,9 +71,6 @@ class HiWayEnv(gym.Env): Defaults to None. envision_record_data_replay_path (Optional[str], optional): Envision's data replay output directory. Defaults to None. - zoo_addrs (Optional[str], optional): List of (ip, port) tuples of - zoo server, used to instantiate remote social agents. Defaults - to None. timestep_sec (Optional[float], optional): [description]. Defaults to None. agent_interfaces (Dict[str, AgentInterface]): Specification of the agents @@ -101,7 +98,6 @@ def __init__( sumo_auto_start: bool = True, envision_endpoint: Optional[str] = None, envision_record_data_replay_path: Optional[str] = None, - zoo_addrs: Optional[str] = None, agent_interfaces: Optional[Dict[str, AgentInterface]] = None, timestep_sec: Optional[ float @@ -188,7 +184,6 @@ def __init__( envision=envision_client, visdom=visdom, fixed_timestep_sec=fixed_timestep_sec, - zoo_addrs=zoo_addrs, ) @property diff --git a/smarts/zoo/manager.py b/smarts/zoo/manager.py deleted file mode 100644 index 404bf44f80..0000000000 --- a/smarts/zoo/manager.py +++ /dev/null @@ -1,75 +0,0 @@ -# MIT License -# -# Copyright (C) 2021. Huawei Technologies Co., Ltd. All rights reserved. -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. - -import argparse -import logging -import os -import signal -from concurrent import futures - -import grpc - -from smarts.zoo import manager_pb2_grpc, manager_servicer - -logging.basicConfig(level=logging.INFO) -log = logging.getLogger(f"manager.py - pid({os.getpid()})") - - -def serve(port): - """Starts a SMARTS agent worker server to offload agent action processing.""" - ip = "[::]" - server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) - manager_servicer_object = manager_servicer.ManagerServicer() - manager_pb2_grpc.add_ManagerServicer_to_server(manager_servicer_object, server) - server.add_insecure_port(f"{ip}:{port}") - server.start() - log.debug(f"Manager - ip({ip}), port({port}), pid({os.getpid()}): Started serving.") - - def stop_server(*args): - manager_servicer_object.destroy() - server.stop(0) - log.debug( - f"Manager - ip({ip}), port({port}), pid({os.getpid()}): Received interrupt signal." - ) - - # Catch keyboard interrupt and terminate signal - signal.signal(signal.SIGINT, stop_server) - signal.signal(signal.SIGTERM, stop_server) - - # Wait to receive server termination signal - server.wait_for_termination() - log.debug(f"Manager - ip({ip}), port({port}), pid({os.getpid()}): Server exited") - - -if __name__ == "__main__": - parser = argparse.ArgumentParser( - "Listen for requests to allocate agents and execute them on-demand." - ) - parser.add_argument( - "--port", - type=int, - default=7432, - help="Port to listen for remote client connections.", - ) - - args = parser.parse_args() - serve(args.port) diff --git a/smarts/zoo/manager_pb2.py b/smarts/zoo/manager_pb2.py deleted file mode 100644 index 927e1c1923..0000000000 --- a/smarts/zoo/manager_pb2.py +++ /dev/null @@ -1,182 +0,0 @@ -# -*- coding: utf-8 -*- -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: manager.proto - -from google.protobuf import descriptor as _descriptor -from google.protobuf import message as _message -from google.protobuf import reflection as _reflection -from google.protobuf import symbol_database as _symbol_database - -# @@protoc_insertion_point(imports) - -_sym_db = _symbol_database.Default() - - -DESCRIPTOR = _descriptor.FileDescriptor( - name="manager.proto", - package="manager", - syntax="proto3", - serialized_options=None, - create_key=_descriptor._internal_create_key, - serialized_pb=b'\n\rmanager.proto\x12\x07manager"\t\n\x07Machine"\x13\n\x04Port\x12\x0b\n\x03num\x18\x01 \x01(\x05"\x08\n\x06Status2m\n\x07Manager\x12\x31\n\x0cspawn_worker\x12\x10.manager.Machine\x1a\r.manager.Port"\x00\x12/\n\x0bstop_worker\x12\r.manager.Port\x1a\x0f.manager.Status"\x00\x62\x06proto3', -) - - -_MACHINE = _descriptor.Descriptor( - name="Machine", - full_name="manager.Machine", - filename=None, - file=DESCRIPTOR, - containing_type=None, - create_key=_descriptor._internal_create_key, - fields=[], - extensions=[], - nested_types=[], - enum_types=[], - serialized_options=None, - is_extendable=False, - syntax="proto3", - extension_ranges=[], - oneofs=[], - serialized_start=26, - serialized_end=35, -) - - -_PORT = _descriptor.Descriptor( - name="Port", - full_name="manager.Port", - filename=None, - file=DESCRIPTOR, - containing_type=None, - create_key=_descriptor._internal_create_key, - fields=[ - _descriptor.FieldDescriptor( - name="num", - full_name="manager.Port.num", - index=0, - number=1, - type=5, - cpp_type=1, - label=1, - has_default_value=False, - default_value=0, - message_type=None, - enum_type=None, - containing_type=None, - is_extension=False, - extension_scope=None, - serialized_options=None, - file=DESCRIPTOR, - create_key=_descriptor._internal_create_key, - ), - ], - extensions=[], - nested_types=[], - enum_types=[], - serialized_options=None, - is_extendable=False, - syntax="proto3", - extension_ranges=[], - oneofs=[], - serialized_start=37, - serialized_end=56, -) - - -_STATUS = _descriptor.Descriptor( - name="Status", - full_name="manager.Status", - filename=None, - file=DESCRIPTOR, - containing_type=None, - create_key=_descriptor._internal_create_key, - fields=[], - extensions=[], - nested_types=[], - enum_types=[], - serialized_options=None, - is_extendable=False, - syntax="proto3", - extension_ranges=[], - oneofs=[], - serialized_start=58, - serialized_end=66, -) - -DESCRIPTOR.message_types_by_name["Machine"] = _MACHINE -DESCRIPTOR.message_types_by_name["Port"] = _PORT -DESCRIPTOR.message_types_by_name["Status"] = _STATUS -_sym_db.RegisterFileDescriptor(DESCRIPTOR) - -Machine = _reflection.GeneratedProtocolMessageType( - "Machine", - (_message.Message,), - { - "DESCRIPTOR": _MACHINE, - "__module__": "manager_pb2" - # @@protoc_insertion_point(class_scope:manager.Machine) - }, -) -_sym_db.RegisterMessage(Machine) - -Port = _reflection.GeneratedProtocolMessageType( - "Port", - (_message.Message,), - { - "DESCRIPTOR": _PORT, - "__module__": "manager_pb2" - # @@protoc_insertion_point(class_scope:manager.Port) - }, -) -_sym_db.RegisterMessage(Port) - -Status = _reflection.GeneratedProtocolMessageType( - "Status", - (_message.Message,), - { - "DESCRIPTOR": _STATUS, - "__module__": "manager_pb2" - # @@protoc_insertion_point(class_scope:manager.Status) - }, -) -_sym_db.RegisterMessage(Status) - - -_MANAGER = _descriptor.ServiceDescriptor( - name="Manager", - full_name="manager.Manager", - file=DESCRIPTOR, - index=0, - serialized_options=None, - create_key=_descriptor._internal_create_key, - serialized_start=68, - serialized_end=177, - methods=[ - _descriptor.MethodDescriptor( - name="spawn_worker", - full_name="manager.Manager.spawn_worker", - index=0, - containing_service=None, - input_type=_MACHINE, - output_type=_PORT, - serialized_options=None, - create_key=_descriptor._internal_create_key, - ), - _descriptor.MethodDescriptor( - name="stop_worker", - full_name="manager.Manager.stop_worker", - index=1, - containing_service=None, - input_type=_PORT, - output_type=_STATUS, - serialized_options=None, - create_key=_descriptor._internal_create_key, - ), - ], -) -_sym_db.RegisterServiceDescriptor(_MANAGER) - -DESCRIPTOR.services_by_name["Manager"] = _MANAGER - -# @@protoc_insertion_point(module_scope) diff --git a/smarts/zoo/manager_pb2_grpc.py b/smarts/zoo/manager_pb2_grpc.py deleted file mode 100644 index db7831f1ae..0000000000 --- a/smarts/zoo/manager_pb2_grpc.py +++ /dev/null @@ -1,122 +0,0 @@ -# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! -"""Client and server classes corresponding to protobuf-defined services.""" -import grpc - -import smarts.zoo.manager_pb2 as manager__pb2 - - -class ManagerStub(object): - """Interface exported by the manager.""" - - def __init__(self, channel): - """Constructor. - - Args: - channel: A grpc.Channel. - """ - self.spawn_worker = channel.unary_unary( - "/manager.Manager/spawn_worker", - request_serializer=manager__pb2.Machine.SerializeToString, - response_deserializer=manager__pb2.Port.FromString, - ) - self.stop_worker = channel.unary_unary( - "/manager.Manager/stop_worker", - request_serializer=manager__pb2.Port.SerializeToString, - response_deserializer=manager__pb2.Status.FromString, - ) - - -class ManagerServicer(object): - """Interface exported by the manager.""" - - def spawn_worker(self, request, context): - """Spawn worker processes. - Returns the address (ip, port) of new worker process. - """ - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details("Method not implemented!") - raise NotImplementedError("Method not implemented!") - - def stop_worker(self, request, context): - """Stop worker process.""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details("Method not implemented!") - raise NotImplementedError("Method not implemented!") - - -def add_ManagerServicer_to_server(servicer, server): - rpc_method_handlers = { - "spawn_worker": grpc.unary_unary_rpc_method_handler( - servicer.spawn_worker, - request_deserializer=manager__pb2.Machine.FromString, - response_serializer=manager__pb2.Port.SerializeToString, - ), - "stop_worker": grpc.unary_unary_rpc_method_handler( - servicer.stop_worker, - request_deserializer=manager__pb2.Port.FromString, - response_serializer=manager__pb2.Status.SerializeToString, - ), - } - generic_handler = grpc.method_handlers_generic_handler( - "manager.Manager", rpc_method_handlers - ) - server.add_generic_rpc_handlers((generic_handler,)) - - -# This class is part of an EXPERIMENTAL API. -class Manager(object): - """Interface exported by the manager.""" - - @staticmethod - def spawn_worker( - request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None, - ): - return grpc.experimental.unary_unary( - request, - target, - "/manager.Manager/spawn_worker", - manager__pb2.Machine.SerializeToString, - manager__pb2.Port.FromString, - options, - channel_credentials, - call_credentials, - compression, - wait_for_ready, - timeout, - metadata, - ) - - @staticmethod - def stop_worker( - request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None, - ): - return grpc.experimental.unary_unary( - request, - target, - "/manager.Manager/stop_worker", - manager__pb2.Port.SerializeToString, - manager__pb2.Status.FromString, - options, - channel_credentials, - call_credentials, - compression, - wait_for_ready, - timeout, - metadata, - ) diff --git a/smarts/zoo/manager_servicer.py b/smarts/zoo/manager_servicer.py deleted file mode 100644 index 7c10f717b9..0000000000 --- a/smarts/zoo/manager_servicer.py +++ /dev/null @@ -1,98 +0,0 @@ -# MIT License -# -# Copyright (C) 2021. Huawei Technologies Co., Ltd. All rights reserved. -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. - -import logging -import os -import pathlib -import subprocess -import sys - -import grpc - -from smarts.core.utils.networking import find_free_port -from smarts.zoo import manager_pb2, manager_pb2_grpc - -logging.basicConfig(level=logging.INFO) -log = logging.getLogger(f"manager_servicer.py - pid({os.getpid()})") - - -class ManagerServicer(manager_pb2_grpc.ManagerServicer): - """Provides methods that implement functionality of ``ManagerServicer``.""" - - def __init__(self): - self._workers = {} - - def __del__(self): - self.destroy() - - def spawn_worker(self, request, context): - port = find_free_port() - - cmd = [ - sys.executable, # Path to the current Python binary. - str((pathlib.Path(__file__).parent / "worker.py").absolute().resolve()), - "--port", - str(port), - ] - - worker = subprocess.Popen(cmd) - if worker.poll() == None: - self._workers[port] = worker - return manager_pb2.Port(num=port) - - context.set_details("Error in spawning worker subprocess.") - context.set_code(grpc.StatusCode.INTERNAL) - return manager_pb2.Port() - - def stop_worker(self, request, context): - log.debug( - f"Manager - pid({os.getpid()}), received stop signal for worker at port {request.num}." - ) - - # Get worker_process corresponding to the received port number. - worker = self._workers.get(request.num, None) - if worker == None: - context.set_details( - f"Trying to stop nonexistent worker with a port {request.num}." - ) - context.set_code(grpc.StatusCode.INVALID_ARGUMENT) - return manager_pb2.Status() - - # Terminate worker process. - worker.terminate() - worker.wait() - - # Delete worker process entry from dictionary. - del self._workers[request.num] - - return manager_pb2.Status() - - def destroy(self): - """Cleans up unmanaged resources.""" - log.debug( - f"Manager - pid({os.getpid()}), shutting down remaining agent worker processes." - ) - workers_to_kill = list(self._workers.values()) - for worker in workers_to_kill: - if worker.poll() == None: - worker.terminate() - worker.wait() diff --git a/smarts/zoo/worker.py b/smarts/zoo/worker.py deleted file mode 100755 index 4dd7bacadd..0000000000 --- a/smarts/zoo/worker.py +++ /dev/null @@ -1,119 +0,0 @@ -# MIT License -# -# Copyright (C) 2021. Huawei Technologies Co., Ltd. All rights reserved. -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. - -""" -Run an agent in it's own (independent) process. - -What Agent code does is out of our direct control, we want to avoid any interactions with global state that might be present in the SMARTS process. - -To protect and isolate Agents from any pollution of global state in the main SMARTS process, we spawn Agents in their fresh and independent python process. - -This script is called from within SMARTS to instantiate a remote agent. -The protocol is as follows: - -1. SMARTS calls: ``worker.py`` --port 5467 # sets a unique port per agent -2. ``worker.py`` will begin listening on port 5467. -3. SMARTS connects to (ip, 5467) as a client. -4. SMARTS calls `build()` rpc with `AgentSpec` as input. -5. ``worker.py`` receives the `AgentSpec` instances and builds the Agent. -6. SMARTS calls `act()` rpc with observation as input and receives the actions as response from ``worker.py``. -""" - -import argparse -import importlib -import logging -import os -import signal -from concurrent import futures - -import grpc - -from smarts.zoo import worker_pb2_grpc, worker_servicer - -# Front-load some expensive imports as to not block the simulation -modules = [ - "smarts.core.utils.pybullet", - "smarts.core.utils.sumo", - "smarts.core.road_map", - "numpy", - "sklearn", - "shapely", - "scipy", - "trimesh", - "panda3d", - "gym", -] - -for mod in modules: - try: - importlib.import_module(mod) - except ImportError: - if mod == "panda3d": - print( - "You need to install the panda3d dependency using pip install -e .[camera_obs] first" - ) - pass - - -# End front-loaded imports - -logging.basicConfig(level=logging.INFO) -log = logging.getLogger(f"worker.py - pid({os.getpid()})") - - -def serve(port): - """Start an agent worker server.""" - ip = "[::]" - server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) - worker_pb2_grpc.add_WorkerServicer_to_server( - worker_servicer.WorkerServicer(), server - ) - server.add_insecure_port(f"{ip}:{port}") - server.start() - log.debug(f"Worker - ip({ip}), port({port}), pid({os.getpid()}): Started serving.") - - def stop_server(*args): - server.stop(0) - log.debug( - f"Worker - ip({ip}), port({port}), pid({os.getpid()}): Received interrupt signal." - ) - - # Catch keyboard interrupt and terminate signal - signal.signal(signal.SIGINT, stop_server) - signal.signal(signal.SIGTERM, stop_server) - - # Wait to receive server termination signal - server.wait_for_termination() - log.debug(f"Worker - ip({ip}), port({port}), pid({os.getpid()}): Server exited") - - -if __name__ == "__main__": - parser = argparse.ArgumentParser("Run an agent in an independent process.") - parser.add_argument( - "--port", - type=int, - required=True, - help="Port to listen for remote client connections.", - ) - - args = parser.parse_args() - serve(args.port) diff --git a/smarts/zoo/worker_pb2.py b/smarts/zoo/worker_pb2.py deleted file mode 100644 index 7a76dad381..0000000000 --- a/smarts/zoo/worker_pb2.py +++ /dev/null @@ -1,255 +0,0 @@ -# -*- coding: utf-8 -*- -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: worker.proto - -from google.protobuf import descriptor as _descriptor -from google.protobuf import message as _message -from google.protobuf import reflection as _reflection -from google.protobuf import symbol_database as _symbol_database - -# @@protoc_insertion_point(imports) - -_sym_db = _symbol_database.Default() - - -DESCRIPTOR = _descriptor.FileDescriptor( - name="worker.proto", - package="worker", - syntax="proto3", - serialized_options=None, - create_key=_descriptor._internal_create_key, - serialized_pb=b'\n\x0cworker.proto\x12\x06worker" \n\rSpecification\x12\x0f\n\x07payload\x18\x01 \x01(\x0c"\x08\n\x06Status"\x1e\n\x0bObservation\x12\x0f\n\x07payload\x18\x01 \x01(\x0c"\x18\n\x06\x41\x63tion\x12\x0e\n\x06\x61\x63tion\x18\x01 \x01(\x0c\x32h\n\x06Worker\x12\x30\n\x05\x62uild\x12\x15.worker.Specification\x1a\x0e.worker.Status"\x00\x12,\n\x03\x61\x63t\x12\x13.worker.Observation\x1a\x0e.worker.Action"\x00\x62\x06proto3', -) - - -_SPECIFICATION = _descriptor.Descriptor( - name="Specification", - full_name="worker.Specification", - filename=None, - file=DESCRIPTOR, - containing_type=None, - create_key=_descriptor._internal_create_key, - fields=[ - _descriptor.FieldDescriptor( - name="payload", - full_name="worker.Specification.payload", - index=0, - number=1, - type=12, - cpp_type=9, - label=1, - has_default_value=False, - default_value=b"", - message_type=None, - enum_type=None, - containing_type=None, - is_extension=False, - extension_scope=None, - serialized_options=None, - file=DESCRIPTOR, - create_key=_descriptor._internal_create_key, - ), - ], - extensions=[], - nested_types=[], - enum_types=[], - serialized_options=None, - is_extendable=False, - syntax="proto3", - extension_ranges=[], - oneofs=[], - serialized_start=24, - serialized_end=56, -) - - -_STATUS = _descriptor.Descriptor( - name="Status", - full_name="worker.Status", - filename=None, - file=DESCRIPTOR, - containing_type=None, - create_key=_descriptor._internal_create_key, - fields=[], - extensions=[], - nested_types=[], - enum_types=[], - serialized_options=None, - is_extendable=False, - syntax="proto3", - extension_ranges=[], - oneofs=[], - serialized_start=58, - serialized_end=66, -) - - -_OBSERVATION = _descriptor.Descriptor( - name="Observation", - full_name="worker.Observation", - filename=None, - file=DESCRIPTOR, - containing_type=None, - create_key=_descriptor._internal_create_key, - fields=[ - _descriptor.FieldDescriptor( - name="payload", - full_name="worker.Observation.payload", - index=0, - number=1, - type=12, - cpp_type=9, - label=1, - has_default_value=False, - default_value=b"", - message_type=None, - enum_type=None, - containing_type=None, - is_extension=False, - extension_scope=None, - serialized_options=None, - file=DESCRIPTOR, - create_key=_descriptor._internal_create_key, - ), - ], - extensions=[], - nested_types=[], - enum_types=[], - serialized_options=None, - is_extendable=False, - syntax="proto3", - extension_ranges=[], - oneofs=[], - serialized_start=68, - serialized_end=98, -) - - -_ACTION = _descriptor.Descriptor( - name="Action", - full_name="worker.Action", - filename=None, - file=DESCRIPTOR, - containing_type=None, - create_key=_descriptor._internal_create_key, - fields=[ - _descriptor.FieldDescriptor( - name="action", - full_name="worker.Action.action", - index=0, - number=1, - type=12, - cpp_type=9, - label=1, - has_default_value=False, - default_value=b"", - message_type=None, - enum_type=None, - containing_type=None, - is_extension=False, - extension_scope=None, - serialized_options=None, - file=DESCRIPTOR, - create_key=_descriptor._internal_create_key, - ), - ], - extensions=[], - nested_types=[], - enum_types=[], - serialized_options=None, - is_extendable=False, - syntax="proto3", - extension_ranges=[], - oneofs=[], - serialized_start=100, - serialized_end=124, -) - -DESCRIPTOR.message_types_by_name["Specification"] = _SPECIFICATION -DESCRIPTOR.message_types_by_name["Status"] = _STATUS -DESCRIPTOR.message_types_by_name["Observation"] = _OBSERVATION -DESCRIPTOR.message_types_by_name["Action"] = _ACTION -_sym_db.RegisterFileDescriptor(DESCRIPTOR) - -Specification = _reflection.GeneratedProtocolMessageType( - "Specification", - (_message.Message,), - { - "DESCRIPTOR": _SPECIFICATION, - "__module__": "worker_pb2" - # @@protoc_insertion_point(class_scope:worker.Specification) - }, -) -_sym_db.RegisterMessage(Specification) - -Status = _reflection.GeneratedProtocolMessageType( - "Status", - (_message.Message,), - { - "DESCRIPTOR": _STATUS, - "__module__": "worker_pb2" - # @@protoc_insertion_point(class_scope:worker.Status) - }, -) -_sym_db.RegisterMessage(Status) - -Observation = _reflection.GeneratedProtocolMessageType( - "Observation", - (_message.Message,), - { - "DESCRIPTOR": _OBSERVATION, - "__module__": "worker_pb2" - # @@protoc_insertion_point(class_scope:worker.Observation) - }, -) -_sym_db.RegisterMessage(Observation) - -Action = _reflection.GeneratedProtocolMessageType( - "Action", - (_message.Message,), - { - "DESCRIPTOR": _ACTION, - "__module__": "worker_pb2" - # @@protoc_insertion_point(class_scope:worker.Action) - }, -) -_sym_db.RegisterMessage(Action) - - -_WORKER = _descriptor.ServiceDescriptor( - name="Worker", - full_name="worker.Worker", - file=DESCRIPTOR, - index=0, - serialized_options=None, - create_key=_descriptor._internal_create_key, - serialized_start=126, - serialized_end=230, - methods=[ - _descriptor.MethodDescriptor( - name="build", - full_name="worker.Worker.build", - index=0, - containing_service=None, - input_type=_SPECIFICATION, - output_type=_STATUS, - serialized_options=None, - create_key=_descriptor._internal_create_key, - ), - _descriptor.MethodDescriptor( - name="act", - full_name="worker.Worker.act", - index=1, - containing_service=None, - input_type=_OBSERVATION, - output_type=_ACTION, - serialized_options=None, - create_key=_descriptor._internal_create_key, - ), - ], -) -_sym_db.RegisterServiceDescriptor(_WORKER) - -DESCRIPTOR.services_by_name["Worker"] = _WORKER - -# @@protoc_insertion_point(module_scope) diff --git a/smarts/zoo/worker_pb2_grpc.py b/smarts/zoo/worker_pb2_grpc.py deleted file mode 100644 index a7ddd64e60..0000000000 --- a/smarts/zoo/worker_pb2_grpc.py +++ /dev/null @@ -1,120 +0,0 @@ -# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! -"""Client and server classes corresponding to protobuf-defined services.""" -import grpc - -import smarts.zoo.worker_pb2 as worker__pb2 - - -class WorkerStub(object): - """Interface exported by the worker server.""" - - def __init__(self, channel): - """Constructor. - - Args: - channel: A grpc.Channel. - """ - self.build = channel.unary_unary( - "/worker.Worker/build", - request_serializer=worker__pb2.Specification.SerializeToString, - response_deserializer=worker__pb2.Status.FromString, - ) - self.act = channel.unary_unary( - "/worker.Worker/act", - request_serializer=worker__pb2.Observation.SerializeToString, - response_deserializer=worker__pb2.Action.FromString, - ) - - -class WorkerServicer(object): - """Interface exported by the worker server.""" - - def build(self, request, context): - """Builds Agent according the AgentSpec.""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details("Method not implemented!") - raise NotImplementedError("Method not implemented!") - - def act(self, request, context): - """Agent processes observations and returns action.""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details("Method not implemented!") - raise NotImplementedError("Method not implemented!") - - -def add_WorkerServicer_to_server(servicer, server): - rpc_method_handlers = { - "build": grpc.unary_unary_rpc_method_handler( - servicer.build, - request_deserializer=worker__pb2.Specification.FromString, - response_serializer=worker__pb2.Status.SerializeToString, - ), - "act": grpc.unary_unary_rpc_method_handler( - servicer.act, - request_deserializer=worker__pb2.Observation.FromString, - response_serializer=worker__pb2.Action.SerializeToString, - ), - } - generic_handler = grpc.method_handlers_generic_handler( - "worker.Worker", rpc_method_handlers - ) - server.add_generic_rpc_handlers((generic_handler,)) - - -# This class is part of an EXPERIMENTAL API. -class Worker(object): - """Interface exported by the worker server.""" - - @staticmethod - def build( - request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None, - ): - return grpc.experimental.unary_unary( - request, - target, - "/worker.Worker/build", - worker__pb2.Specification.SerializeToString, - worker__pb2.Status.FromString, - options, - channel_credentials, - call_credentials, - compression, - wait_for_ready, - timeout, - metadata, - ) - - @staticmethod - def act( - request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None, - ): - return grpc.experimental.unary_unary( - request, - target, - "/worker.Worker/act", - worker__pb2.Observation.SerializeToString, - worker__pb2.Action.FromString, - options, - channel_credentials, - call_credentials, - compression, - wait_for_ready, - timeout, - metadata, - ) diff --git a/smarts/zoo/worker_servicer.py b/smarts/zoo/worker_servicer.py deleted file mode 100644 index 7bbaa4065a..0000000000 --- a/smarts/zoo/worker_servicer.py +++ /dev/null @@ -1,64 +0,0 @@ -# MIT License -# -# Copyright (C) 2021. Huawei Technologies Co., Ltd. All rights reserved. -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. -import logging -import os -import time - -import cloudpickle -import grpc - -from smarts.zoo import worker_pb2, worker_pb2_grpc - -logging.basicConfig(level=logging.INFO) -log = logging.getLogger(f"worker_servicer.py - pid({os.getpid()})") - - -class WorkerServicer(worker_pb2_grpc.WorkerServicer): - """Provides methods that implement functionality of ``WorkerServicer``.""" - - def __init__(self): - self._agent = None - self._agent_spec = None - - def build(self, request, context): - time_start = time.time() - self._agent_spec = cloudpickle.loads(request.payload) - pickle_load_time = time.time() - self._agent = self._agent_spec.build_agent() - agent_build_time = time.time() - log.debug( - "Build agent timings:\n" - f" total ={agent_build_time - time_start:.2}\n" - f" pickle={pickle_load_time - time_start:.2}\n" - f" build ={agent_build_time - pickle_load_time:.2}\n" - ) - return worker_pb2.Status() - - def act(self, request, context): - if self._agent == None or self._agent_spec == None: - context.set_details(f"Remote agent not built yet.") - context.set_code(grpc.StatusCode.FAILED_PRECONDITION) - return worker_pb2.Action() - - obs = cloudpickle.loads(request.payload) - action = self._agent.act(obs) - return worker_pb2.Action(action=cloudpickle.dumps(action))