diff --git a/python/requirements/ml/rllib-requirements.txt b/python/requirements/ml/rllib-requirements.txt index 63f5b42e0bc27..abbad8ba62315 100644 --- a/python/requirements/ml/rllib-requirements.txt +++ b/python/requirements/ml/rllib-requirements.txt @@ -9,4 +9,4 @@ tf2onnx==1.15.1; sys_platform != 'darwin' or platform_machine != 'arm64' rich==13.3.2 # Msgpack checkpoint stuff. msgpack -msgpack_numpy \ No newline at end of file +msgpack-numpy \ No newline at end of file diff --git a/rllib/env/single_agent_episode.py b/rllib/env/single_agent_episode.py index ea869b108a5e0..dd4f480394705 100644 --- a/rllib/env/single_agent_episode.py +++ b/rllib/env/single_agent_episode.py @@ -11,6 +11,7 @@ from ray.rllib.core.columns import Columns from ray.rllib.env.utils.infinite_lookback_buffer import InfiniteLookbackBuffer from ray.rllib.policy.sample_batch import SampleBatch +from ray.rllib.utils.serialization import gym_space_from_dict, gym_space_to_dict from ray.rllib.utils.typing import AgentID, ModuleID from ray.util.annotations import PublicAPI @@ -323,7 +324,7 @@ def __init__( if isinstance(v, InfiniteLookbackBuffer): self.extra_model_outputs[k] = v else: - # We cannot use the defaultdict's own constructore here as this would + # We cannot use the defaultdict's own constructor here as this would # auto-set the lookback buffer to 0 (there is no data passed to that # constructor). Then, when we manually have to set the data property, # the lookback buffer would still be (incorrectly) 0. @@ -1695,32 +1696,44 @@ def get_state(self) -> Dict[str, Any]: """Returns the pickable state of an episode. The data in the episode is stored into a dictionary. Note that episodes - can also be generated from states (see `self.from_state()`). + can also be generated from states (see `SingleAgentEpisode.from_state()`). Returns: A dict containing all the data from the episode. """ + infos = self.infos.get_state() + infos["data"] = np.array([info if info else None for info in infos["data"]]) return { "id_": self.id_, "agent_id": self.agent_id, "module_id": self.module_id, "multi_agent_episode_id": self.multi_agent_episode_id, - # TODO (simon): Check, if we need to have a `get_state` method for - # `InfiniteLookbackBuffer` and call it here. - "observations": self.observations, - "actions": self.actions, - "rewards": self.rewards, - "infos": self.infos, - "extra_model_outputs": self.extra_model_outputs, + # Note, all data is stored in `InfiniteLookbackBuffer`s. + "observations": self.observations.get_state(), + "actions": self.actions.get_state(), + "rewards": self.rewards.get_state(), + "infos": self.infos.get_state(), + "extra_model_outputs": { + k: v.get_state() if v else v + for k, v in self.extra_model_outputs.items() + } + if len(self.extra_model_outputs) > 0 + else None, "is_terminated": self.is_terminated, "is_truncated": self.is_truncated, "t_started": self.t_started, "t": self.t, - "_observation_space": self._observation_space, - "_action_space": self._action_space, + "_observation_space": gym_space_to_dict(self._observation_space) + if self._observation_space + else None, + "_action_space": gym_space_to_dict(self._action_space) + if self._action_space + else None, "_start_time": self._start_time, "_last_step_time": self._last_step_time, - "_temporary_timestep_data": self._temporary_timestep_data, + "_temporary_timestep_data": dict(self._temporary_timestep_data) + if len(self._temporary_timestep_data) > 0 + else None, } @staticmethod @@ -1739,21 +1752,48 @@ def from_state(state: Dict[str, Any]) -> "SingleAgentEpisode": episode.agent_id = state["agent_id"] episode.module_id = state["module_id"] episode.multi_agent_episode_id = state["multi_agent_episode_id"] - episode.observations = state["observations"] - episode.actions = state["actions"] - episode.rewards = state["rewards"] - episode.infos = state["infos"] - episode.extra_model_outputs = state["extra_model_outputs"] + # Convert data back to `InfiniteLookbackBuffer`s. + episode.observations = InfiniteLookbackBuffer.from_state(state["observations"]) + episode.actions = InfiniteLookbackBuffer.from_state(state["actions"]) + episode.rewards = InfiniteLookbackBuffer.from_state(state["rewards"]) + episode.infos = InfiniteLookbackBuffer.from_state(state["infos"]) + episode.extra_model_outputs = ( + defaultdict( + functools.partial( + InfiniteLookbackBuffer, lookback=episode.observations.lookback + ), + { + k: InfiniteLookbackBuffer.from_state(v) + for k, v in state["extra_model_outputs"].items() + }, + ) + if state["extra_model_outputs"] + else defaultdict( + functools.partial( + InfiniteLookbackBuffer, lookback=episode.observations.lookback + ), + ) + ) episode.is_terminated = state["is_terminated"] episode.is_truncated = state["is_truncated"] episode.t_started = state["t_started"] episode.t = state["t"] - episode._observation_space = state["_observation_space"] - episode._action_space = state["_action_space"] + # We need to convert the spaces to dictionaries for serialization. + episode._observation_space = ( + gym_space_from_dict(state["_observation_space"]) + if state["_observation_space"] + else None + ) + episode._action_space = ( + gym_space_from_dict(state["_action_space"]) + if state["_action_space"] + else None + ) episode._start_time = state["_start_time"] episode._last_step_time = state["_last_step_time"] - episode._temporary_timestep_data = state["_temporary_timestep_data"] - + episode._temporary_timestep_data = defaultdict( + list, state["_temporary_timestep_data"] or {} + ) # Validate the episode. episode.validate() diff --git a/rllib/env/utils/infinite_lookback_buffer.py b/rllib/env/utils/infinite_lookback_buffer.py index 49e2a24962b15..269e3827ca205 100644 --- a/rllib/env/utils/infinite_lookback_buffer.py +++ b/rllib/env/utils/infinite_lookback_buffer.py @@ -1,10 +1,11 @@ -from typing import Any, List, Optional, Union +from typing import Any, Dict, List, Optional, Union import gymnasium as gym import numpy as np import tree # pip install dm_tree from ray.rllib.utils.numpy import LARGE_INTEGER, one_hot, one_hot_multidiscrete +from ray.rllib.utils.serialization import gym_space_from_dict, gym_space_to_dict from ray.rllib.utils.spaces.space_utils import ( batch, get_dummy_batch_for_space, @@ -34,6 +35,77 @@ def __init__( self.space_struct = None self.space = space + def __eq__( + self, + other: "InfiniteLookbackBuffer", + ) -> bool: + """Compares two `InfiniteLookbackBuffers. + + Args: + other: Another object. If another `LookbackBuffer` instance all + their attributes are compared. + + Returns: + `True`, if `other` is an `InfiniteLookbackBuffer` instance and all + attributes are identical. Otherwise, returns `False`. + """ + if isinstance(other, InfiniteLookbackBuffer): + if ( + self.data == other.data + and self.lookback == other.lookback + and self.finalized == other.finalized + and self.space_struct == other.space_struct + and self.space == other.space + ): + return True + return False + + def get_state(self) -> Dict[str, Any]: + """Returns the pickable state of a buffer. + + The data in the buffer is stored into a dictionary. Note that + buffers can also be generated from pickable states (see + `InfiniteLookbackBuffer.from_state`) + + Returns: + A dict containing all the data and metadata from the buffer. + """ + return { + "data": self.data, + "lookback": self.lookback, + "finalized": self.finalized, + "space_struct": gym_space_to_dict(self.space_struct) + if self.space_struct + else self.space_struct, + "space": gym_space_to_dict(self.space) if self.space else self.space, + } + + @staticmethod + def from_state(state: Dict[str, Any]) -> None: + """Creates a new `InfiniteLookbackBuffer` from a state dict. + + Args: + state: The state dict, as returned by `self.get_state`. + + Returns: + A new `InfiniteLookbackBuffer` instance with the data and metadata + from the state dict. + """ + buffer = InfiniteLookbackBuffer() + buffer.data = state["data"] + buffer.lookback = state["lookback"] + buffer.finalized = state["finalized"] + buffer.space_struct = ( + gym_space_from_dict(state["space_struct"]) + if state["space_struct"] + else state["space_struct"] + ) + buffer.space = ( + gym_space_from_dict(state["space"]) if state["space"] else state["space"] + ) + + return buffer + def append(self, item) -> None: """Appends the given item to the end of this buffer.""" if self.finalized: diff --git a/rllib/offline/offline_env_runner.py b/rllib/offline/offline_env_runner.py index dce60c1c993fb..9da38b60bd6ae 100644 --- a/rllib/offline/offline_env_runner.py +++ b/rllib/offline/offline_env_runner.py @@ -13,11 +13,12 @@ from ray.rllib.utils.compression import pack_if_needed from ray.rllib.utils.spaces.space_utils import to_jsonable_if_needed from ray.rllib.utils.typing import EpisodeType +from ray.util.debug import log_once logger = logging.Logger(__file__) # TODO (simon): This class can be agnostic to the episode type as it -# calls only get_state. +# calls only get_state. class OfflineSingleAgentEnvRunner(SingleAgentEnvRunner): @@ -123,7 +124,21 @@ def sample( # Add data to the buffers. if self.output_write_episodes: - self._samples.extend(samples) + + import msgpack + import msgpack_numpy as mnp + + if log_once("msgpack"): + logger.info( + "Packing episodes with `msgpack` and encode array with " + "`msgpack_numpy` for serialization. This is needed for " + "recording episodes." + ) + # Note, we serialize episodes with `msgpack` and `msgpack_numpy` to + # ensure version compatibility. + self._samples.extend( + [msgpack.packb(eps.get_state(), default=mnp.encode) for eps in samples] + ) else: self._map_episodes_to_data(samples) diff --git a/rllib/offline/offline_prelearner.py b/rllib/offline/offline_prelearner.py index 030d79c69eb74..ae4c01dd01c09 100644 --- a/rllib/offline/offline_prelearner.py +++ b/rllib/offline/offline_prelearner.py @@ -1,10 +1,10 @@ import gymnasium as gym import numpy as np import random -import ray -from ray.actor import ActorHandle from typing import Any, Dict, List, Optional, Union, Tuple, TYPE_CHECKING +import ray +from ray.actor import ActorHandle from ray.rllib.core.columns import Columns from ray.rllib.core.learner import Learner from ray.rllib.core.rl_module.multi_rl_module import MultiRLModuleSpec @@ -144,7 +144,17 @@ def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, List[EpisodeType]] # If we directly read in episodes we just convert to list. if self.input_read_episodes: - episodes = batch["item"].tolist() + # Import `msgpack` for decoding. + import msgpack + import msgpack_numpy as mnp + + # Read the episodes and decode them. + episodes = [ + SingleAgentEpisode.from_state( + msgpack.unpackb(state, object_hook=mnp.decode) + ) + for state in batch["item"] + ] # Else, if we have old stack `SampleBatch`es. elif self.input_read_sample_batches: episodes = OfflinePreLearner._map_sample_batch_to_episode( diff --git a/rllib/offline/tests/test_offline_env_runner.py b/rllib/offline/tests/test_offline_env_runner.py index e16734c0af5c0..eaeef106f0178 100644 --- a/rllib/offline/tests/test_offline_env_runner.py +++ b/rllib/offline/tests/test_offline_env_runner.py @@ -1,3 +1,5 @@ +import msgpack +import msgpack_numpy as m import pathlib import shutil import unittest @@ -60,7 +62,7 @@ def test_offline_env_runner_record_episodes(self): ) offline_env_runner = OfflineSingleAgentEnvRunner(config, worker_index=1) - # Sample 1ßß episodes. + # Sample 100 episodes. _ = offline_env_runner.sample( num_episodes=100, random_actions=True, @@ -81,7 +83,14 @@ def test_offline_env_runner_record_episodes(self): # Assert the dataset has only 100 rows (each row containing an episode). self.assertEqual(offline_data.data.count(), 100) # Take a single row and ensure its a `SingleAgentEpisode` instance. - self.assertIsInstance(offline_data.data.take(1)[0]["item"], SingleAgentEpisode) + self.assertIsInstance( + SingleAgentEpisode.from_state( + msgpack.unpackb( + offline_data.data.take(1)[0]["item"], object_hook=m.decode + ) + ), + SingleAgentEpisode, + ) # The batch contains now episodes (in a numpy.NDArray). episodes = offline_data.data.take_batch(100)["item"] # The batch should contain 100 episodes (not 100 env steps).