Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RLlib] Add APPO/IMPALA multi-agent StatelessCartPole learning tests to CI (+ fix some bugs related to this). #47245

134 changes: 100 additions & 34 deletions rllib/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -184,23 +184,6 @@ py_test(
srcs = ["tuned_examples/appo/cartpole_appo.py"],
args = ["--as-test", "--enable-new-api-stack", "--num-gpus=2"]
)
# StatelessCartPole
py_test(
name = "learning_tests_stateless_cartpole_appo",
main = "tuned_examples/appo/stateless_cartpole_appo.py",
tags = ["team:rllib", "exclusive", "learning_tests", "torch_only", "learning_tests_discrete", "learning_tests_pytorch_use_all_core"],
size = "large",
srcs = ["tuned_examples/appo/stateless_cartpole_appo.py"],
args = ["--as-test", "--enable-new-api-stack", "--num-gpus=1"]
)
py_test(
name = "learning_tests_stateless_cartpole_appo_multi_gpu",
main = "tuned_examples/appo/stateless_cartpole_appo.py",
tags = ["team:rllib", "exclusive", "learning_tests", "torch_only", "learning_tests_discrete", "learning_tests_pytorch_use_all_core", "multi_gpu"],
size = "large",
srcs = ["tuned_examples/appo/stateless_cartpole_appo.py"],
args = ["--as-test", "--enable-new-api-stack", "--num-gpus=2"]
)
# MultiAgentCartPole
py_test(
name = "learning_tests_multi_agent_cartpole_appo",
Expand Down Expand Up @@ -234,6 +217,72 @@ py_test(
srcs = ["tuned_examples/appo/multi_agent_cartpole_appo.py"],
args = ["--as-test", "--enable-new-api-stack", "--num-agents=2", "--num-gpus=2", "--num-cpus=7"]
)
# StatelessCartPole
py_test(
name = "learning_tests_stateless_cartpole_appo",
main = "tuned_examples/appo/stateless_cartpole_appo.py",
tags = ["team:rllib", "exclusive", "learning_tests", "torch_only", "learning_tests_discrete", "learning_tests_pytorch_use_all_core"],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is here missing a "gpu" while num_gpus=1 or do we want to test here simply a remote learner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, we test here the simple case of: 1 (remote) Learner on 1 CPU.

size = "large",
srcs = ["tuned_examples/appo/stateless_cartpole_appo.py"],
args = ["--as-test", "--enable-new-api-stack", "--num-gpus=1"]
)
py_test(
name = "learning_tests_stateless_cartpole_appo_gpu",
main = "tuned_examples/appo/stateless_cartpole_appo.py",
tags = ["team:rllib", "exclusive", "learning_tests", "torch_only", "learning_tests_discrete", "learning_tests_pytorch_use_all_core", "gpu"],
size = "large",
srcs = ["tuned_examples/appo/stateless_cartpole_appo.py"],
args = ["--as-test", "--enable-new-api-stack", "--num-agents=2", "--num-gpus=1"]
)
py_test(
name = "learning_tests_stateless_cartpole_appo_multi_cpu",
main = "tuned_examples/appo/stateless_cartpole_appo.py",
tags = ["team:rllib", "exclusive", "learning_tests", "torch_only", "learning_tests_discrete", "learning_tests_pytorch_use_all_core"],
size = "large",
srcs = ["tuned_examples/appo/stateless_cartpole_appo.py"],
args = ["--as-test", "--enable-new-api-stack", "--num-gpus=2"]
)
py_test(
name = "learning_tests_stateless_cartpole_appo_multi_gpu",
main = "tuned_examples/appo/stateless_cartpole_appo.py",
tags = ["team:rllib", "exclusive", "learning_tests", "torch_only", "learning_tests_discrete", "learning_tests_pytorch_use_all_core", "multi_gpu"],
size = "large",
srcs = ["tuned_examples/appo/stateless_cartpole_appo.py"],
args = ["--as-test", "--enable-new-api-stack", "--num-gpus=2"]
)
# MultiAgentStatelessCartPole
py_test(
name = "learning_tests_multi_agent_stateless_cartpole_appo",
main = "tuned_examples/appo/multi_agent_stateless_cartpole_appo.py",
tags = ["team:rllib", "exclusive", "learning_tests", "torch_only", "learning_tests_discrete", "learning_tests_pytorch_use_all_core"],
size = "large",
srcs = ["tuned_examples/appo/multi_agent_stateless_cartpole_appo.py"],
args = ["--as-test", "--enable-new-api-stack", "--num-gpus=1"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

)
py_test(
name = "learning_tests_multi_agent_stateless_cartpole_appo_gpu",
main = "tuned_examples/appo/multi_agent_stateless_cartpole_appo.py",
tags = ["team:rllib", "exclusive", "learning_tests", "torch_only", "learning_tests_discrete", "learning_tests_pytorch_use_all_core", "gpu"],
size = "large",
srcs = ["tuned_examples/appo/multi_agent_stateless_cartpole_appo.py"],
args = ["--as-test", "--enable-new-api-stack", "--num-agents=2", "--num-gpus=1"]
)
py_test(
name = "learning_tests_multi_agent_stateless_cartpole_appo_multi_cpu",
main = "tuned_examples/appo/multi_agent_stateless_cartpole_appo.py",
tags = ["team:rllib", "exclusive", "learning_tests", "torch_only", "learning_tests_discrete", "learning_tests_pytorch_use_all_core"],
size = "large",
srcs = ["tuned_examples/appo/multi_agent_stateless_cartpole_appo.py"],
args = ["--as-test", "--enable-new-api-stack", "--num-gpus=2"]
)
py_test(
name = "learning_tests_multi_agent_stateless_cartpole_appo_multi_gpu",
main = "tuned_examples/appo/multi_agent_stateless_cartpole_appo.py",
tags = ["team:rllib", "exclusive", "learning_tests", "torch_only", "learning_tests_discrete", "learning_tests_pytorch_use_all_core", "multi_gpu"],
size = "large",
srcs = ["tuned_examples/appo/multi_agent_stateless_cartpole_appo.py"],
args = ["--as-test", "--enable-new-api-stack", "--num-gpus=2"]
)

#@OldAPIStack
py_test(
Expand Down Expand Up @@ -462,23 +511,6 @@ py_test(
srcs = ["tuned_examples/impala/cartpole_impala.py"],
args = ["--as-test", "--enable-new-api-stack", "--num-gpus=2"]
)
# StatelessCartPole
py_test(
name = "learning_tests_stateless_cartpole_impala",
main = "tuned_examples/impala/stateless_cartpole_impala.py",
tags = ["team:rllib", "exclusive", "learning_tests", "torch_only", "learning_tests_discrete", "learning_tests_pytorch_use_all_core"],
size = "large",
srcs = ["tuned_examples/impala/stateless_cartpole_impala.py"],
args = ["--as-test", "--enable-new-api-stack", "--num-gpus=1"]
)
py_test(
name = "learning_tests_stateless_cartpole_impala_multi_gpu",
main = "tuned_examples/impala/stateless_cartpole_impala.py",
tags = ["team:rllib", "exclusive", "learning_tests", "torch_only", "learning_tests_discrete", "learning_tests_pytorch_use_all_core", "multi_gpu"],
size = "large",
srcs = ["tuned_examples/impala/stateless_cartpole_impala.py"],
args = ["--as-test", "--enable-new-api-stack", "--num-gpus=2"]
)
# MultiAgentCartPole
py_test(
name = "learning_tests_multi_agent_cartpole_impala",
Expand Down Expand Up @@ -512,6 +544,40 @@ py_test(
srcs = ["tuned_examples/impala/multi_agent_cartpole_impala.py"],
args = ["--as-test", "--enable-new-api-stack", "--num-agents=2", "--num-gpus=2", "--num-cpus=7"]
)
# StatelessCartPole
py_test(
name = "learning_tests_stateless_cartpole_impala",
main = "tuned_examples/impala/stateless_cartpole_impala.py",
tags = ["team:rllib", "exclusive", "learning_tests", "torch_only", "learning_tests_discrete", "learning_tests_pytorch_use_all_core"],
size = "large",
srcs = ["tuned_examples/impala/stateless_cartpole_impala.py"],
args = ["--as-test", "--enable-new-api-stack", "--num-gpus=1"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here. I guess this brings us a num_learners=1, doesn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually tries to put the 1 (remote) Learner on 1 GPU.

Sorry, you are right in that these command line options are very confusing:

On a CPU machine:
--num-gpus=1 -> 1 (remote) Learner (on CPU!)
--num-gpus=2 -> 2 (remote) Learners (on CPUs!)

On a GPU machine:
--num-gpus=1 -> 1 (remote) Learner (on GPU)
--num-gpus=2 -> 2 (remote) Learners (on GPUs)

We should probably rename these args.

)
py_test(
name = "learning_tests_stateless_cartpole_impala_multi_gpu",
main = "tuned_examples/impala/stateless_cartpole_impala.py",
tags = ["team:rllib", "exclusive", "learning_tests", "torch_only", "learning_tests_discrete", "learning_tests_pytorch_use_all_core", "multi_gpu"],
size = "large",
srcs = ["tuned_examples/impala/stateless_cartpole_impala.py"],
args = ["--as-test", "--enable-new-api-stack", "--num-gpus=2"]
)
# MultiAgentStatelessCartPole
py_test(
name = "learning_tests_multi_agent_stateless_cartpole_impala",
main = "tuned_examples/impala/multi_agent_stateless_cartpole_impala.py",
tags = ["team:rllib", "exclusive", "learning_tests", "torch_only", "learning_tests_discrete", "learning_tests_pytorch_use_all_core"],
size = "large",
srcs = ["tuned_examples/impala/multi_agent_stateless_cartpole_impala.py"],
args = ["--as-test", "--enable-new-api-stack", "--num-gpus=1"]
)
py_test(
name = "learning_tests_multi_agent_stateless_cartpole_impala_multi_gpu",
main = "tuned_examples/impala/multi_agent_stateless_cartpole_impala.py",
tags = ["team:rllib", "exclusive", "learning_tests", "torch_only", "learning_tests_discrete", "learning_tests_pytorch_use_all_core", "multi_gpu"],
size = "large",
srcs = ["tuned_examples/impala/multi_agent_stateless_cartpole_impala.py"],
args = ["--as-test", "--enable-new-api-stack", "--num-gpus=2"]
)

#@OldAPIstack
py_test(
Expand Down
19 changes: 14 additions & 5 deletions rllib/connectors/common/add_states_from_episodes_to_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def __call__(
# Also, let module-to-env pipeline know that we had added a single timestep
# time rank to the data (to remove it again).
if not self._as_learner_connector:
for column, column_data in data.copy().items():
for column in data.keys():
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simplify

self.foreach_batch_item_change_in_place(
batch=data,
column=column,
Expand All @@ -250,11 +250,20 @@ def __call__(
# Before adding STATE_IN to the `data`, zero-pad existing data and batch
# into max_seq_len chunks.
for column, column_data in data.copy().items():
# Do not zero-pad INFOS column.
if column == Columns.INFOS:
continue
for key, item_list in column_data.items():
if column != Columns.INFOS:
column_data[key] = split_and_zero_pad_list(
item_list, T=self.max_seq_len
)
# Multi-agent case AND RLModule is not stateful -> Do not zero-pad
# for this model.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bug fix: For multi-agent with some RLModules NOT stateful, we should NOT zero-pad anything.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this actually work already when using it on full length episodes coming from OfflineData?

assert isinstance(key, tuple)
if len(key) == 3:
eps_id, aid, mid = key
if not rl_module[mid].is_stateful():
continue
column_data[key] = split_and_zero_pad_list(
item_list, T=self.max_seq_len
)

for sa_episode in self.single_agent_episode_iterator(
episodes,
Expand Down
13 changes: 10 additions & 3 deletions rllib/connectors/env_to_module/mean_std_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,16 @@ def __call__(
# anymore to the original observations).
for sa_episode in self.single_agent_episode_iterator(episodes):
sa_obs = sa_episode.get_observations(indices=-1)
normalized_sa_obs = self._filters[sa_episode.agent_id](
sa_obs, update=self._update_stats
)
try:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make the error better, that shows up when multi_agent=True c'tor arg is forgotten.

normalized_sa_obs = self._filters[sa_episode.agent_id](
sa_obs, update=self._update_stats
)
except KeyError:
raise KeyError(
"KeyError trying to access a filter by agent ID "
f"`{sa_episode.agent_id}`! You probably did NOT pass the "
f"`multi_agent=True` flag into the `MeanStdFilter()` constructor. "
)
sa_episode.set_observations(at_indices=-1, new_data=normalized_sa_obs)
# We set the Episode's observation space to ours so that we can safely
# set the last obs to the new value (without causing a space mismatch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from ray.rllib.connectors.connector_v2 import ConnectorV2
from ray.rllib.core.columns import Columns
from ray.rllib.core.rl_module.rl_module import RLModule
from ray.rllib.env.multi_agent_episode import MultiAgentEpisode
from ray.rllib.utils.annotations import override
from ray.rllib.utils.postprocessing.episodes import add_one_ts_to_episodes_and_truncate
from ray.rllib.utils.typing import EpisodeType
Expand Down Expand Up @@ -101,10 +102,23 @@ def __call__(
# batch: - - - - - - - T B0- - - - - R Bx- - - - R Bx
# mask : t t t t t t t t f t t t t t t f t t t t t f

# TODO (sven): Same situation as in TODO below, but for multi-agent episode.
# Maybe add a dedicated connector piece for this task?
# We extend the MultiAgentEpisode's ID by a running number here to make sure
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah tricky. This kind of trick needs to also go into the connector docs. This can solve problems, but we need to know how.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, it's getting to a point, where the default pipelines do become quite complex. We should spend some time soon to maybe simplify these again or to make the ConnectorV2 helper methods even better, e.g. self.foreach_batch_item_change_in_place.

# we treat each MAEpisode chunk as separate (for potentially upcoming v-trace
# and LSTM zero-padding) and don't mix data from different chunks.
if isinstance(episodes[0], MultiAgentEpisode):
for i, ma_episode in enumerate(episodes):
ma_episode.id_ += "_" + str(i)
# Also change the underlying single-agent episode's
# `multi_agent_episode_id` properties.
for sa_episode in ma_episode.agent_episodes.values():
sa_episode.multi_agent_episode_id = ma_episode.id_

for i, sa_episode in enumerate(
self.single_agent_episode_iterator(episodes, agents_that_stepped_only=False)
):
# TODO (sven): This is a little bit of a hack: By expanding the Episode's
# TODO (sven): This is a little bit of a hack: By extending the Episode's
# ID, we make sure that each episode chunk in `episodes` is treated as a
# separate episode in the `self.add_n_batch_items` below. Some algos (e.g.
# APPO) may have >1 episode chunks from the same episode (same ID) in the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,21 @@ def __call__(
if shared_data is None or not shared_data.get("_added_single_ts_time_rank"):
return data

data = tree.map_structure_with_path(
lambda p, s: s if Columns.STATE_OUT in p else np.squeeze(s, axis=0),
data,
)
def _remove_single_ts(item, eps_id, aid, mid):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bug fix: For mixed-MultiRLModules where some RLModules are NOT stateful, the old code would crash.

# Only remove time-rank for modules that are statefule (only for those has
# a timerank been added).
if mid is None or rl_module[mid].is_stateful():
return tree.map_structure(lambda s: np.squeeze(s, axis=0), item)
return item

for column, column_data in data.copy().items():
# Skip state_out (doesn't have a time rank).
if column == Columns.STATE_OUT:
continue
self.foreach_batch_item_change_in_place(
data,
column=column,
func=_remove_single_ts,
)

return data
70 changes: 16 additions & 54 deletions rllib/core/learner/learner.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
NUM_ENV_STEPS_TRAINED,
NUM_MODULE_STEPS_TRAINED,
LEARNER_CONNECTOR_TIMER,
MODULE_TRAIN_BATCH_SIZE_MEAN,
)
from ray.rllib.utils.metrics.metrics_logger import MetricsLogger
from ray.rllib.utils.minibatch_utils import (
Expand Down Expand Up @@ -1294,24 +1295,8 @@ def _update_from_batch_or_episodes(
if not self.should_module_be_updated(module_id, batch):
del batch.policy_batches[module_id]

# Log all timesteps (env, agent, modules) based on given episodes.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FInally, this goes away haha.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need to remvoe this also from learn_from_iterator

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch. Will check ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if self._learner_connector is not None and episodes is not None:
self._log_steps_trained_metrics(episodes, batch, shared_data)
# TODO (sven): Possibly remove this if-else block entirely. We might be in a
# world soon where we always learn from episodes, never from an incoming batch.
else:
self.metrics.log_dict(
{
(ALL_MODULES, NUM_ENV_STEPS_TRAINED): batch.env_steps(),
(ALL_MODULES, NUM_MODULE_STEPS_TRAINED): batch.agent_steps(),
**{
(mid, NUM_MODULE_STEPS_TRAINED): len(b)
for mid, b in batch.policy_batches.items()
},
},
reduce="sum",
clear_on_reduce=True,
)
# Log all timesteps (env, agent, modules) based on given episodes/batch.
self._log_steps_trained_metrics(batch)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simplify


if minibatch_size:
if self._learner_connector is not None:
Expand Down Expand Up @@ -1581,49 +1566,26 @@ def _set_optimizer_lr(optimizer: Optimizer, lr: float) -> None:
def _get_clip_function() -> Callable:
"""Returns the gradient clipping function to use, given the framework."""

def _log_steps_trained_metrics(self, episodes, batch, shared_data):
def _log_steps_trained_metrics(self, batch: MultiAgentBatch):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simplify

# Logs this iteration's steps trained, based on given `episodes`.
env_steps = sum(len(e) for e in episodes)
env_steps = batch.count
log_dict = defaultdict(dict)
orig_lengths = shared_data.get("_sa_episodes_lengths", {})
for sa_episode in self._learner_connector.single_agent_episode_iterator(
episodes, agents_that_stepped_only=False
):
mid = (
sa_episode.module_id
if sa_episode.module_id is not None
else DEFAULT_MODULE_ID
)
# Do not log steps trained for those ModuleIDs that should not be updated.
if mid != ALL_MODULES and mid not in batch.policy_batches:
continue

_len = (
orig_lengths[sa_episode.id_]
if sa_episode.id_ in orig_lengths
else len(sa_episode)

for mid, module_batch in batch.policy_batches.items():
module_batch_size = len(module_batch)
self.metrics.log_value(
key=(mid, MODULE_TRAIN_BATCH_SIZE_MEAN),
value=module_batch_size,
)
# TODO (sven): Decide, whether agent_ids should be part of LEARNER_RESULTS.
# Currently and historically, only ModuleID keys and ALL_MODULES were used
# and expected. Does it make sense to include e.g. agent steps trained?
# I'm not sure atm.
# aid = (
# sa_episode.agent_id if sa_episode.agent_id is not None
# else DEFAULT_AGENT_ID
# )
if NUM_MODULE_STEPS_TRAINED not in log_dict[mid]:
log_dict[mid][NUM_MODULE_STEPS_TRAINED] = _len
log_dict[mid][NUM_MODULE_STEPS_TRAINED] = module_batch_size
else:
log_dict[mid][NUM_MODULE_STEPS_TRAINED] += _len
# TODO (sven): See above.
# if NUM_AGENT_STEPS_TRAINED not in log_dict[aid]:
# log_dict[aid][NUM_AGENT_STEPS_TRAINED] = _len
# else:
# log_dict[aid][NUM_AGENT_STEPS_TRAINED] += _len
log_dict[mid][NUM_MODULE_STEPS_TRAINED] += module_batch_size

if NUM_MODULE_STEPS_TRAINED not in log_dict[ALL_MODULES]:
log_dict[ALL_MODULES][NUM_MODULE_STEPS_TRAINED] = _len
log_dict[ALL_MODULES][NUM_MODULE_STEPS_TRAINED] = module_batch_size
else:
log_dict[ALL_MODULES][NUM_MODULE_STEPS_TRAINED] += _len
log_dict[ALL_MODULES][NUM_MODULE_STEPS_TRAINED] += module_batch_size

# Log env steps (all modules).
self.metrics.log_value(
Expand Down
4 changes: 2 additions & 2 deletions rllib/tuned_examples/appo/multi_agent_cartpole_appo.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from ray.rllib.utils.test_utils import add_rllib_example_script_args
from ray.tune.registry import register_env

parser = add_rllib_example_script_args()
parser = add_rllib_example_script_args(default_timesteps=2000000)
parser.set_defaults(
enable_new_api_stack=True,
num_agents=2,
Expand Down Expand Up @@ -46,7 +46,7 @@

stop = {
f"{ENV_RUNNER_RESULTS}/{EPISODE_RETURN_MEAN}": 400.0 * args.num_agents,
f"{NUM_ENV_STEPS_SAMPLED_LIFETIME}": 2000000,
f"{NUM_ENV_STEPS_SAMPLED_LIFETIME}": args.stop_timesteps,
}


Expand Down
Loading