Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RLlib] New ConnectorV2 API #03: Introduce actual ConnectorV2 API. (#41074) #41212

Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
57e79f9
wip
sven1977 Nov 16, 2023
99d9019
wip
sven1977 Nov 17, 2023
d3dca2f
wip
sven1977 Nov 17, 2023
009a7fd
Merge branch 'master' of https://github.com/ray-project/ray into env_…
sven1977 Nov 30, 2023
b84b544
Merge branch 'master' of https://github.com/ray-project/ray into env_…
sven1977 Dec 14, 2023
b0b3c37
LINT
sven1977 Dec 14, 2023
4df7dfe
wip
sven1977 Dec 14, 2023
1de7ebb
wip
sven1977 Dec 14, 2023
a9acbee
wip
sven1977 Dec 14, 2023
5fe97e1
LINT
sven1977 Dec 14, 2023
213f0d1
LINT
sven1977 Dec 14, 2023
50b7fc6
Merge branch 'master' of https://github.com/ray-project/ray into env_…
sven1977 Dec 14, 2023
90e9c34
Merge branch 'master' of https://github.com/ray-project/ray into env_…
sven1977 Dec 15, 2023
91b4399
Merge branch 'master' into env_runner_support_connectors_03_connector…
sven1977 Dec 16, 2023
3102238
merge
sven1977 Dec 18, 2023
7618d52
Merge remote-tracking branch 'origin/env_runner_support_connectors_03…
sven1977 Dec 18, 2023
4958597
wip
sven1977 Dec 18, 2023
c40f5b0
Merge branch 'master' of https://github.com/ray-project/ray into env_…
sven1977 Dec 19, 2023
bdf803d
wip
sven1977 Dec 19, 2023
2649e70
wip
sven1977 Dec 21, 2023
34f8827
Merge branch 'master' of https://github.com/ray-project/ray into env_…
sven1977 Dec 21, 2023
b58ad31
wip
sven1977 Dec 21, 2023
7bc0ac6
wip
sven1977 Dec 21, 2023
8b8cf06
Merge branch 'master' of https://github.com/ray-project/ray into env_…
sven1977 Dec 21, 2023
f7dde73
wip
sven1977 Dec 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion rllib/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ py_test(


# --------------------------------------------------------------------
# Connector tests
# Connector(V1) tests
# rllib/connector/
#
# Tag: connector
Expand All @@ -774,6 +774,20 @@ py_test(
srcs = ["connectors/tests/test_agent.py"]
)

# --------------------------------------------------------------------
# ConnectorV2 tests
# rllib/connector/
#
# Tag: connector_v2
# --------------------------------------------------------------------

py_test(
name = "connectors/tests/test_connector_v2",
tags = ["team:rllib", "connector_v2"],
size = "small",
srcs = ["connectors/tests/test_connector_v2.py"]
)

# --------------------------------------------------------------------
# Env tests
# rllib/env/
Expand Down
114 changes: 114 additions & 0 deletions rllib/algorithms/algorithm_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,10 @@

if TYPE_CHECKING:
from ray.rllib.algorithms.algorithm import Algorithm
from ray.rllib.connectors.connector_v2 import ConnectorV2
from ray.rllib.core.learner import Learner
from ray.rllib.core.learner.learner_group import LearnerGroup
from ray.rllib.core.rl_module.rl_module import RLModule
from ray.rllib.evaluation.episode import Episode as OldEpisode

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -327,6 +329,8 @@ def __init__(self, algo_class=None):
self.num_envs_per_worker = 1
self.create_env_on_local_worker = False
self.enable_connectors = True
self._env_to_module_connector = None
self._module_to_env_connector = None
# TODO (sven): Rename into `sample_timesteps` (or `sample_duration`
# and `sample_duration_unit` (replacing batch_mode), like we do it
# in the evaluation config).
Expand Down Expand Up @@ -374,6 +378,7 @@ def __init__(self, algo_class=None):
except AttributeError:
pass

self._learner_connector = None
self.optimizer = {}
self.max_requests_in_flight_per_sampler_worker = 2
self._learner_class = None
Expand Down Expand Up @@ -1152,6 +1157,92 @@ class directly. Note that this arg can also be specified via
logger_creator=self.logger_creator,
)

def build_env_to_module_connector(self, env):
custom_connectors = []

# Create an env-to-module connector pipeline (including RLlib's default
# env->module connector piece) and return it.
if self._env_to_module_connector is not None:
val_ = self._env_to_module_connector(env)

from ray.rllib.connectors.connector_v2 import ConnectorV2
from ray.rllib.connectors.connector_pipeline_v2 import ConnectorPipelineV2

if isinstance(val_, ConnectorV2) and not isinstance(
val_, ConnectorPipelineV2
):
custom_connectors = [val_]
else:
return val_

from ray.rllib.connectors.env_to_module.env_to_module_pipeline import (
EnvToModulePipeline,
)

return EnvToModulePipeline(
connectors=custom_connectors,
input_observation_space=env.single_observation_space,
input_action_space=env.single_action_space,
env=env,
)

def build_module_to_env_connector(self, env):
custom_connectors = []

# Create a module-to-env connector pipeline (including RLlib's default
# module->env connector piece) and return it.
if self._module_to_env_connector is not None:
val_ = self._module_to_env_connector(env)

from ray.rllib.connectors.connector_v2 import ConnectorV2
from ray.rllib.connectors.connector_pipeline_v2 import ConnectorPipelineV2

if isinstance(val_, ConnectorV2) and not isinstance(
val_, ConnectorPipelineV2
):
custom_connectors = [val_]
else:
return val_

from ray.rllib.connectors.module_to_env.module_to_env_pipeline import (
ModuleToEnvPipeline,
)

return ModuleToEnvPipeline(
connectors=custom_connectors,
input_observation_space=env.single_observation_space,
input_action_space=env.single_action_space,
env=env,
)

def build_learner_connector(self, input_observation_space, input_action_space):
custom_connectors = []

# Create a learner connector pipeline (including RLlib's default
# learner connector piece) and return it.
if self._learner_connector is not None:
val_ = self._learner_connector(input_observation_space, input_action_space)

from ray.rllib.connectors.connector_v2 import ConnectorV2
from ray.rllib.connectors.connector_pipeline_v2 import ConnectorPipelineV2

if isinstance(val_, ConnectorV2) and not isinstance(
val_, ConnectorPipelineV2
):
custom_connectors = [val_]
else:
return val_

from ray.rllib.connectors.learner.learner_connector_pipeline import (
LearnerConnectorPipeline,
)

return LearnerConnectorPipeline(
connectors=custom_connectors,
input_observation_space=input_observation_space,
input_action_space=input_action_space,
)

def build_learner_group(
self,
*,
Expand Down Expand Up @@ -1605,6 +1696,12 @@ def rollouts(
create_env_on_local_worker: Optional[bool] = NotProvided,
sample_collector: Optional[Type[SampleCollector]] = NotProvided,
enable_connectors: Optional[bool] = NotProvided,
env_to_module_connector: Optional[
Callable[[EnvType], "ConnectorV2"]
] = NotProvided,
module_to_env_connector: Optional[
Callable[[EnvType, "RLModule"], "ConnectorV2"]
] = NotProvided,
use_worker_filter_stats: Optional[bool] = NotProvided,
update_worker_filter_stats: Optional[bool] = NotProvided,
rollout_fragment_length: Optional[Union[int, str]] = NotProvided,
Expand Down Expand Up @@ -1650,6 +1747,11 @@ def rollouts(
enable_connectors: Use connector based environment runner, so that all
preprocessing of obs and postprocessing of actions are done in agent
and action connectors.
env_to_module_connector: A callable taking an Env as input arg and returning
an env-to-module ConnectorV2 (might be a pipeline) object.
module_to_env_connector: A callable taking an Env and an RLModule as input
args and returning a module-to-env ConnectorV2 (might be a pipeline)
object.
use_worker_filter_stats: Whether to use the workers in the WorkerSet to
update the central filters (held by the local worker). If False, stats
from the workers will not be used and discarded.
Expand Down Expand Up @@ -1737,6 +1839,10 @@ def rollouts(
self.create_env_on_local_worker = create_env_on_local_worker
if enable_connectors is not NotProvided:
self.enable_connectors = enable_connectors
if env_to_module_connector is not NotProvided:
self._env_to_module_connector = env_to_module_connector
if module_to_env_connector is not NotProvided:
self._module_to_env_connector = module_to_env_connector
if use_worker_filter_stats is not NotProvided:
self.use_worker_filter_stats = use_worker_filter_stats
if update_worker_filter_stats is not NotProvided:
Expand Down Expand Up @@ -1855,6 +1961,9 @@ def training(
optimizer: Optional[dict] = NotProvided,
max_requests_in_flight_per_sampler_worker: Optional[int] = NotProvided,
learner_class: Optional[Type["Learner"]] = NotProvided,
learner_connector: Optional[
Callable[["RLModule"], "ConnectorV2"]
] = NotProvided,
# Deprecated arg.
_enable_learner_api: Optional[bool] = NotProvided,
) -> "AlgorithmConfig":
Expand Down Expand Up @@ -1916,6 +2025,9 @@ def training(
in your experiment of timesteps.
learner_class: The `Learner` class to use for (distributed) updating of the
RLModule. Only used when `_enable_new_api_stack=True`.
learner_connector: A callable taking an env observation space and an env
action space as inputs and returning a learner ConnectorV2 (might be
a pipeline) object.

Returns:
This updated AlgorithmConfig object.
Expand Down Expand Up @@ -1960,6 +2072,8 @@ def training(
)
if learner_class is not NotProvided:
self._learner_class = learner_class
if learner_connector is not NotProvided:
self._learner_connector = learner_connector

return self

Expand Down
Loading
Loading