diff --git a/rllib/BUILD b/rllib/BUILD index 1b281b62ba65..0481d9c46a5a 100644 --- a/rllib/BUILD +++ b/rllib/BUILD @@ -1436,7 +1436,7 @@ py_test( srcs = ["core/rl_module/tests/test_rl_module_specs.py"] ) -# Learner +# LearnerGroup py_test( name = "TestLearnerGroupSyncUpdate", main = "core/learner/tests/test_learner_group.py", @@ -1473,16 +1473,17 @@ py_test( args = ["TestLearnerGroupSaveLoadState"] ) +# Learner py_test( name = "test_learner", - tags = ["team:rllib", "core", "ray_data"], + tags = ["team:rllib", "core", "ray_data", "exclusive"], size = "medium", srcs = ["core/learner/tests/test_learner.py"] ) py_test( name = "test_torch_learner_compile", - tags = ["team:rllib", "core", "ray_data"], + tags = ["team:rllib", "core", "ray_data", "exclusive"], size = "medium", srcs = ["core/learner/torch/tests/test_torch_learner_compile.py"] ) diff --git a/rllib/algorithms/algorithm.py b/rllib/algorithms/algorithm.py index d6b7a99e6712..bf01aff4be89 100644 --- a/rllib/algorithms/algorithm.py +++ b/rllib/algorithms/algorithm.py @@ -122,6 +122,7 @@ NUM_AGENT_STEPS_TRAINED_LIFETIME, NUM_ENV_STEPS_SAMPLED, NUM_ENV_STEPS_SAMPLED_LIFETIME, + NUM_ENV_STEPS_SAMPLED_PER_SECOND, NUM_ENV_STEPS_SAMPLED_THIS_ITER, NUM_ENV_STEPS_SAMPLED_FOR_EVALUATION_THIS_ITER, NUM_ENV_STEPS_TRAINED, @@ -539,7 +540,7 @@ def __init__( # env id. timestr = datetime.today().strftime("%Y-%m-%d_%H-%M-%S") env_descr_for_dir = re.sub("[/\\\\]", "-", str(env_descr)) - logdir_prefix = f"{str(self)}_{env_descr_for_dir}_{timestr}" + logdir_prefix = f"{type(self).__name__}_{env_descr_for_dir}_{timestr}" if not os.path.exists(DEFAULT_STORAGE_PATH): # Possible race condition if dir is created several times on # rollout workers @@ -721,7 +722,7 @@ def setup(self, config: AlgorithmConfig) -> None: # Evaluation EnvRunnerGroup setup. # User would like to setup a separate evaluation worker set. # Note: We skip EnvRunnerGroup creation if we need to do offline evaluation. - if self._should_create_evaluation_rollout_workers(self.evaluation_config): + if self._should_create_evaluation_env_runners(self.evaluation_config): _, env_creator = self._get_env_id_and_creator( self.evaluation_config.env, self.evaluation_config ) @@ -825,7 +826,7 @@ def setup(self, config: AlgorithmConfig) -> None: self.env_runner_group.sync_env_runner_states( config=self.config, env_steps_sampled=self.metrics.peek( - NUM_ENV_STEPS_SAMPLED_LIFETIME, default=0 + (ENV_RUNNER_RESULTS, NUM_ENV_STEPS_SAMPLED_LIFETIME), default=0 ), rl_module_state=rl_module_state, ) @@ -930,7 +931,13 @@ def step(self) -> ResultDict: # - We have to evaluate in this training iteration, but no parallelism -> # evaluate after the training iteration is entirely done. else: - train_results, train_iter_ctx = self._run_one_training_iteration() + if self.config.enable_env_runner_and_connector_v2: + train_results, train_iter_ctx = self._run_one_training_iteration() + else: + ( + train_results, + train_iter_ctx, + ) = self._run_one_training_iteration_old_api_stack() # Sequential: Train (already done above), then evaluate. if evaluate_this_iter and not self.config.evaluation_parallel_to_training: @@ -950,9 +957,6 @@ def step(self) -> ResultDict: with self.metrics.log_time((TIMERS, SYNCH_ENV_CONNECTOR_STATES_TIMER)): self.env_runner_group.sync_env_runner_states( config=self.config, - env_steps_sampled=self.metrics.peek( - NUM_ENV_STEPS_SAMPLED_LIFETIME, default=0 - ), ) # Compile final ResultDict from `train_results` and `eval_results`. Note # that, as opposed to the old API stack, EnvRunner stats should already be @@ -960,7 +964,6 @@ def step(self) -> ResultDict: results = self._compile_iteration_results_new_api_stack( train_results=train_results, eval_results=eval_results, - step_ctx=train_iter_ctx, ) else: self._sync_filters_if_needed( @@ -1034,13 +1037,12 @@ def evaluate( if self.config.enable_env_runner_and_connector_v2: # Synchronize EnvToModule and ModuleToEnv connector states and broadcast # new states back to all eval EnvRunners. - with self._timers[SYNCH_EVAL_ENV_CONNECTOR_STATES_TIMER]: + with self.metrics.log_time( + (TIMERS, SYNCH_EVAL_ENV_CONNECTOR_STATES_TIMER) + ): self.eval_env_runner_group.sync_env_runner_states( config=self.evaluation_config, from_worker=self.env_runner_group.local_env_runner, - env_steps_sampled=self.metrics.peek( - NUM_ENV_STEPS_SAMPLED_LIFETIME, default=0 - ), ) else: self._sync_filters_if_needed( @@ -1106,19 +1108,6 @@ def evaluate( eval_results = {} if self.config.enable_env_runner_and_connector_v2: - # Lifetime eval counters. - self.metrics.log_dict( - { - NUM_ENV_STEPS_SAMPLED_LIFETIME: env_steps, - NUM_AGENT_STEPS_SAMPLED_LIFETIME: agent_steps, - NUM_EPISODES_LIFETIME: self.metrics.peek( - (EVALUATION_RESULTS, ENV_RUNNER_RESULTS, NUM_EPISODES), - default=0, - ), - }, - key=EVALUATION_RESULTS, - reduce="sum", - ) eval_results = self.metrics.reduce( key=EVALUATION_RESULTS, return_stats_obj=False ) @@ -1280,7 +1269,12 @@ def _env_runner_remote(worker, num, round, iter): return env_steps, agent_steps, metrics, iter env_steps = agent_steps = 0 - train_mean_time = self._timers[TRAINING_ITERATION_TIMER].mean + if self.config.enable_env_runner_and_connector_v2: + train_mean_time = self.metrics.peek( + (TIMERS, TRAINING_ITERATION_TIMER), default=0.0 + ) + else: + train_mean_time = self._timers[TRAINING_ITERATION_TIMER].mean t0 = time.time() algo_iteration = self.iteration @@ -1310,7 +1304,14 @@ def _env_runner_remote(worker, num, round, iter): (train_mean_time - (time.time() - t0)) # Multiply by our own (eval) throughput to get the timesteps # to do (per worker). - * self._timers[EVALUATION_ITERATION_TIMER].mean_throughput + * self.metrics.peek( + ( + EVALUATION_RESULTS, + ENV_RUNNER_RESULTS, + NUM_ENV_STEPS_SAMPLED_PER_SECOND, + ), + default=0.0, + ) / num_healthy_workers ), ), @@ -1638,8 +1639,8 @@ def restore_workers(self, workers: EnvRunnerGroup) -> None: pol_states.pop("connector_configs", None) state_ref = ray.put(state) - # By default, entire local worker state is synced after restoration - # to bring these workers up to date. + # By default, entire local EnvRunner state is synced after restoration + # to bring the previously failed EnvRunner up to date. workers.foreach_worker( func=lambda w: w.set_state(ray.get(state_ref)), remote_worker_ids=restored, @@ -1658,7 +1659,7 @@ def restore_workers(self, workers: EnvRunnerGroup) -> None: ) @OverrideToImplementCustomLogic - def training_step(self) -> ResultDict: + def training_step(self) -> None: """Default single iteration logic of an algorithm. - Collect on-policy samples (SampleBatches) in parallel using the @@ -1671,7 +1672,13 @@ def training_step(self) -> ResultDict: - Return all collected metrics for the iteration. Returns: - The results dict from executing the training iteration. + For the new API stack, returns None. Results are compiled and extracted + automatically through a single `self.metrics.reduce()` call at the very end + of an iteration (which might contain more than one call to + `training_step()`). This way, we make sure that we account for all + results generated by each individual `training_step()` call. + For the old API stack, returns the results dict from executing the training + step. """ if not self.config.enable_env_runner_and_connector_v2: raise NotImplementedError( @@ -1725,12 +1732,6 @@ def training_step(self) -> ResultDict: inference_only=True, ) - # Return reduced metrics (Dict). - # Note that these training results will further be processed (e.g. - # merged with evaluation results) before eventually being returned from the - # encapsulating `Algorithm.step()` call as a plain nested ResultDict. - return self.metrics.reduce() - @PublicAPI def get_module(self, module_id: ModuleID = DEFAULT_MODULE_ID) -> RLModule: """Returns the (single-agent) RLModule with `model_id` (None if ID not found). @@ -2898,7 +2899,7 @@ def default_resource_request( ] # resources for remote evaluation env samplers or datasets (if any) - if cls._should_create_evaluation_rollout_workers(eval_cf): + if cls._should_create_evaluation_env_runners(eval_cf): # Evaluation workers. # Note: The local eval worker is located on the driver CPU. evaluation_bundles = [ @@ -3165,412 +3166,154 @@ def validate_env(env: EnvType, env_context: EnvContext) -> None: """ pass - @override(Trainable) - def _export_model( - self, export_formats: List[str], export_dir: str - ) -> Dict[str, str]: - ExportFormat.validate(export_formats) - exported = {} - if ExportFormat.CHECKPOINT in export_formats: - path = os.path.join(export_dir, ExportFormat.CHECKPOINT) - self.export_policy_checkpoint(path) - exported[ExportFormat.CHECKPOINT] = path - if ExportFormat.MODEL in export_formats: - path = os.path.join(export_dir, ExportFormat.MODEL) - self.export_policy_model(path) - exported[ExportFormat.MODEL] = path - if ExportFormat.ONNX in export_formats: - path = os.path.join(export_dir, ExportFormat.ONNX) - self.export_policy_model(path, onnx=int(os.getenv("ONNX_OPSET", "11"))) - exported[ExportFormat.ONNX] = path - return exported + def _run_one_training_iteration(self) -> Tuple[ResultDict, "TrainIterCtx"]: + """Runs one training iteration (`self.iteration` will be +1 after this). - @OldAPIStack - def __getstate__(self) -> Dict: - """Returns current state of Algorithm, sufficient to restore it from scratch. + Calls `self.training_step()` repeatedly until the configured minimum time (sec), + minimum sample- or minimum training steps have been reached. Returns: - The current state dict of this Algorithm, which can be used to sufficiently - restore the algorithm from scratch without any other information. + The ResultDict from the last call to `training_step()`. Note that even + though we only return the last ResultDict, the user still has full control + over the history and reduce behavior of individual metrics at the time these + metrics are logged with `self.metrics.log_...()`. """ - if self.config.enable_env_runner_and_connector_v2: - raise RuntimeError( - "Algorithm.__getstate__() not supported anymore on the new API stack! " - "Use Algorithm.get_state() instead." - ) - - # Add config to state so complete Algorithm can be reproduced w/o it. - state = { - "algorithm_class": type(self), - "config": self.config.get_state(), - } - - if hasattr(self, "env_runner_group"): - state["worker"] = self.env_runner_group.local_env_runner.get_state() - - # Also store eval `policy_mapping_fn` (in case it's different from main - # one). Note, the new `EnvRunner API` has no policy mapping function. - if ( - hasattr(self, "eval_env_runner_group") - and self.eval_env_runner_group is not None - ): - state["eval_policy_mapping_fn"] = self.eval_env_runner.policy_mapping_fn - - # Save counters. - state["counters"] = self._counters + with self.metrics.log_time((TIMERS, TRAINING_ITERATION_TIMER)): + # In case we are training (in a thread) parallel to evaluation, + # we may have to re-enable eager mode here (gets disabled in the + # thread). + if self.config.get("framework") == "tf2" and not tf.executing_eagerly(): + tf1.enable_eager_execution() - # TODO: Experimental functionality: Store contents of replay buffer - # to checkpoint, only if user has configured this. - if self.local_replay_buffer is not None and self.config.get( - "store_buffer_in_checkpoints" - ): - state["local_replay_buffer"] = self.local_replay_buffer.get_state() + has_run_once = False + # Create a step context ... + with TrainIterCtx(algo=self) as train_iter_ctx: + # .. so we can query it whether we should stop the iteration loop (e.g. + # when we have reached `min_time_s_per_iteration`). + while not train_iter_ctx.should_stop(has_run_once): + # Before training step, try to bring failed workers back. + with self.metrics.log_time((TIMERS, RESTORE_WORKERS_TIMER)): + self.restore_workers(self.env_runner_group) - # Save current `training_iteration`. - state[TRAINING_ITERATION] = self.training_iteration + # Try to train one step. + with self.metrics.log_time((TIMERS, TRAINING_STEP_TIMER)): + training_step_return_value = self.training_step() + has_run_once = True + + # On the new API stack, results should NOT be returned anymore as + # a dict, but purely logged through the `MetricsLogger` API. This + # way, we make sure to never miss a single stats/counter/timer + # when calling `self.training_step()` more than once within the same + # iteration. + if training_step_return_value is not None: + raise ValueError( + "`Algorithm.training_step()` should NOT return a result " + "dict anymore on the new API stack! Instead, log all " + "results, timers, counters through the `self.metrics` " + "(MetricsLogger) instance of the Algorithm and return " + "None. The logged results are compiled automatically into " + "one single result dict per training iteration." + ) - return state + # Only here, reduce the results into a single result dict. + return self.metrics.reduce(), train_iter_ctx - @OldAPIStack - def __setstate__(self, state) -> None: - """Sets the algorithm to the provided state. + def _run_one_evaluation( + self, + parallel_train_future: Optional[concurrent.futures.ThreadPoolExecutor] = None, + ) -> ResultDict: + """Runs evaluation step via `self.evaluate()` and handling worker failures. Args: - state: The state dict to restore this Algorithm instance to. `state` may - have been returned by a call to an Algorithm's `__getstate__()` method. + parallel_train_future: In case, we are training and avaluating in parallel, + this arg carries the currently running ThreadPoolExecutor object that + runs the training iteration. Use `parallel_train_future.done()` to + check, whether the parallel training job has completed and + `parallel_train_future.result()` to get its return values. + + Returns: + The results dict from the evaluation call. """ - if self.config.enable_env_runner_and_connector_v2: - raise RuntimeError( - "Algorithm.__setstate__() not supported anymore on the new API stack! " - "Use Algorithm.set_state() instead." - ) + if self.eval_env_runner_group is not None: + if self.config.enable_env_runner_and_connector_v2: + with self.metrics.log_time((TIMERS, RESTORE_EVAL_WORKERS_TIMER)): + self.restore_workers(self.eval_env_runner_group) + else: + with self._timers[RESTORE_EVAL_WORKERS_TIMER]: + self.restore_workers(self.eval_env_runner_group) - # Old API stack: The local worker stores its state (together with all the - # Module information) in state['worker']. - if hasattr(self, "env_runner_group") and "worker" in state and state["worker"]: - self.env_runner.set_state(state["worker"]) - remote_state_ref = ray.put(state["worker"]) - self.env_runner_group.foreach_worker( - lambda w: w.set_state(ray.get(remote_state_ref)), - local_env_runner=False, + # Run `self.evaluate()` only once per training iteration. + if self.config.enable_env_runner_and_connector_v2: + with self.metrics.log_time((TIMERS, EVALUATION_ITERATION_TIMER)): + eval_results = self.evaluate( + parallel_train_future=parallel_train_future + ) + # TODO (sven): Properly support throughput/sec measurements within + # `self.metrics.log_time()` call. + self.metrics.log_value( + key=( + EVALUATION_RESULTS, + ENV_RUNNER_RESULTS, + NUM_ENV_STEPS_SAMPLED_PER_SECOND, + ), + value=( + eval_results.get(ENV_RUNNER_RESULTS, {}).get( + NUM_ENV_STEPS_SAMPLED, 0 + ) + / self.metrics.peek((TIMERS, EVALUATION_ITERATION_TIMER)) + ), ) - if self.eval_env_runner_group: - # Avoid `state` being pickled into the remote function below. - _eval_policy_mapping_fn = state.get("eval_policy_mapping_fn") - - def _setup_eval_worker(w): - w.set_state(ray.get(remote_state_ref)) - # Override `policy_mapping_fn` as it might be different for eval - # workers. - w.set_policy_mapping_fn(_eval_policy_mapping_fn) - # If evaluation workers are used, also restore the policies - # there in case they are used for evaluation purpose. - self.eval_env_runner_group.foreach_worker(_setup_eval_worker) - - # Restore replay buffer data. - if self.local_replay_buffer is not None: - # TODO: Experimental functionality: Restore contents of replay - # buffer from checkpoint, only if user has configured this. - if self.config.store_buffer_in_checkpoints: - if "local_replay_buffer" in state: - self.local_replay_buffer.set_state(state["local_replay_buffer"]) - else: - logger.warning( - "`store_buffer_in_checkpoints` is True, but no replay " - "data found in state!" - ) - elif "local_replay_buffer" in state and log_once( - "no_store_buffer_in_checkpoints_but_data_found" - ): - logger.warning( - "`store_buffer_in_checkpoints` is False, but some replay " - "data found in state!" + else: + with self._timers[EVALUATION_ITERATION_TIMER]: + eval_results = self.evaluate( + parallel_train_future=parallel_train_future ) + self._timers[EVALUATION_ITERATION_TIMER].push_units_processed( + self._counters[NUM_ENV_STEPS_SAMPLED_FOR_EVALUATION_THIS_ITER] + ) - if "counters" in state: - self._counters = state["counters"] - - if TRAINING_ITERATION in state: - self._iteration = state[TRAINING_ITERATION] + # After evaluation, do a round of health check on remote eval workers to see if + # any of the failed workers are back. + if self.eval_env_runner_group is not None: + # Add number of healthy evaluation workers after this iteration. + eval_results[ + "num_healthy_workers" + ] = self.eval_env_runner_group.num_healthy_remote_workers() + eval_results[ + "num_in_flight_async_reqs" + ] = self.eval_env_runner_group.num_in_flight_async_reqs() + eval_results[ + "num_remote_worker_restarts" + ] = self.eval_env_runner_group.num_remote_worker_restarts() - @OldAPIStack - @staticmethod - def _checkpoint_info_to_algorithm_state( - checkpoint_info: dict, - *, - policy_ids: Optional[Collection[PolicyID]] = None, - policy_mapping_fn: Optional[Callable[[AgentID, EpisodeID], PolicyID]] = None, - policies_to_train: Optional[ - Union[ - Collection[PolicyID], - Callable[[PolicyID, Optional[SampleBatchType]], bool], - ] - ] = None, - ) -> Dict: - """Converts a checkpoint info or object to a proper Algorithm state dict. + return {EVALUATION_RESULTS: eval_results} - The returned state dict can be used inside self.__setstate__(). + def _run_one_training_iteration_and_evaluation_in_parallel( + self, + ) -> Tuple[ResultDict, ResultDict, "TrainIterCtx"]: + """Runs one training iteration and one evaluation step in parallel. - Args: - checkpoint_info: A checkpoint info dict as returned by - `ray.rllib.utils.checkpoints.get_checkpoint_info( - [checkpoint dir or AIR Checkpoint])`. - policy_ids: Optional list/set of PolicyIDs. If not None, only those policies - listed here will be included in the returned state. Note that - state items such as filters, the `is_policy_to_train` function, as - well as the multi-agent `policy_ids` dict will be adjusted as well, - based on this arg. - policy_mapping_fn: An optional (updated) policy mapping function - to include in the returned state. - policies_to_train: An optional list of policy IDs to be trained - or a callable taking PolicyID and SampleBatchType and - returning a bool (trainable or not?) to include in the returned state. + First starts the training iteration (via `self._run_one_training_iteration()`) + within a ThreadPoolExecutor, then runs the evaluation step in parallel. + In auto-duration mode (config.evaluation_duration=auto), makes sure the + evaluation step takes roughly the same time as the training iteration. Returns: - The state dict usable within the `self.__setstate__()` method. + A tuple containing the training results, the evaluation results, and + the `TrainIterCtx` object returned by the training call. """ - if checkpoint_info["type"] != "Algorithm": - raise ValueError( - "`checkpoint` arg passed to " - "`Algorithm._checkpoint_info_to_algorithm_state()` must be an " - f"Algorithm checkpoint (but is {checkpoint_info['type']})!" - ) - - msgpack = None - if checkpoint_info.get("format") == "msgpack": - msgpack = try_import_msgpack(error=True) + with concurrent.futures.ThreadPoolExecutor() as executor: - with open(checkpoint_info["state_file"], "rb") as f: - if msgpack is not None: - data = f.read() - state = msgpack.unpackb(data, raw=False) + if self.config.enable_env_runner_and_connector_v2: + parallel_train_future = executor.submit( + lambda: self._run_one_training_iteration() + ) else: - state = pickle.load(f) - - # Old API stack: Policies are in separate sub-dirs. - if ( - checkpoint_info["checkpoint_version"] > version.Version("0.1") - and state.get("worker") is not None - and state.get("worker") - ): - worker_state = state["worker"] + parallel_train_future = executor.submit( + lambda: self._run_one_training_iteration_old_api_stack() + ) - # Retrieve the set of all required policy IDs. - policy_ids = set( - policy_ids if policy_ids is not None else worker_state["policy_ids"] - ) - - # Remove those policies entirely from filters that are not in - # `policy_ids`. - worker_state["filters"] = { - pid: filter - for pid, filter in worker_state["filters"].items() - if pid in policy_ids - } - - # Get Algorithm class. - if isinstance(state["algorithm_class"], str): - # Try deserializing from a full classpath. - # Or as a last resort: Tune registered algorithm name. - state["algorithm_class"] = deserialize_type( - state["algorithm_class"] - ) or get_trainable_cls(state["algorithm_class"]) - # Compile actual config object. - default_config = state["algorithm_class"].get_default_config() - if isinstance(default_config, AlgorithmConfig): - new_config = default_config.update_from_dict(state["config"]) - else: - new_config = Algorithm.merge_algorithm_configs( - default_config, state["config"] - ) - - # Remove policies from multiagent dict that are not in `policy_ids`. - new_policies = new_config.policies - if isinstance(new_policies, (set, list, tuple)): - new_policies = {pid for pid in new_policies if pid in policy_ids} - else: - new_policies = { - pid: spec for pid, spec in new_policies.items() if pid in policy_ids - } - new_config.multi_agent( - policies=new_policies, - policies_to_train=policies_to_train, - **( - {"policy_mapping_fn": policy_mapping_fn} - if policy_mapping_fn is not None - else {} - ), - ) - state["config"] = new_config - - # Prepare local `worker` state to add policies' states into it, - # read from separate policy checkpoint files. - worker_state["policy_states"] = {} - for pid in policy_ids: - policy_state_file = os.path.join( - checkpoint_info["checkpoint_dir"], - "policies", - pid, - "policy_state." - + ("msgpck" if checkpoint_info["format"] == "msgpack" else "pkl"), - ) - if not os.path.isfile(policy_state_file): - raise ValueError( - "Given checkpoint does not seem to be valid! No policy " - f"state file found for PID={pid}. " - f"The file not found is: {policy_state_file}." - ) - - with open(policy_state_file, "rb") as f: - if msgpack is not None: - worker_state["policy_states"][pid] = msgpack.load(f) - else: - worker_state["policy_states"][pid] = pickle.load(f) - - # These two functions are never serialized in a msgpack checkpoint (which - # does not store code, unlike a cloudpickle checkpoint). Hence the user has - # to provide them with the `Algorithm.from_checkpoint()` call. - if policy_mapping_fn is not None: - worker_state["policy_mapping_fn"] = policy_mapping_fn - if ( - policies_to_train is not None - # `policies_to_train` might be left None in case all policies should be - # trained. - or worker_state["is_policy_to_train"] == NOT_SERIALIZABLE - ): - worker_state["is_policy_to_train"] = policies_to_train - - if state["config"].enable_rl_module_and_learner: - state["learner_state_dir"] = os.path.join( - checkpoint_info["checkpoint_dir"], "learner" - ) - - return state - - @DeveloperAPI - def _create_local_replay_buffer_if_necessary( - self, config: PartialAlgorithmConfigDict - ) -> Optional[MultiAgentReplayBuffer]: - """Create a MultiAgentReplayBuffer instance if necessary. - - Args: - config: Algorithm-specific configuration data. - - Returns: - MultiAgentReplayBuffer instance based on algorithm config. - None, if local replay buffer is not needed. - """ - if not config.get("replay_buffer_config") or config["replay_buffer_config"].get( - "no_local_replay_buffer" - ): - return - - return from_config(ReplayBuffer, config["replay_buffer_config"]) - - def _run_one_training_iteration(self) -> Tuple[ResultDict, "TrainIterCtx"]: - """Runs one training iteration (`self.iteration` will be +1 after this). - - Calls `self.training_step()` repeatedly until the configured minimum time (sec), - minimum sample- or minimum training steps have been reached. - - Returns: - The ResultDict from the last call to `training_step()`. Note that even - though we only return the last ResultDict, the user stil has full control - over the history and reduce behavior of individual metrics at the time these - metrics are logged with `self.metrics.log_...()`. - """ - with self._timers[TRAINING_ITERATION_TIMER]: - # In case we are training (in a thread) parallel to evaluation, - # we may have to re-enable eager mode here (gets disabled in the - # thread). - if self.config.get("framework") == "tf2" and not tf.executing_eagerly(): - tf1.enable_eager_execution() - - results = {} - training_step_results = None - # Create a step context ... - with TrainIterCtx(algo=self) as train_iter_ctx: - # .. so we can query it whether we should stop the iteration loop (e.g. - # when we have reached `min_time_s_per_iteration`). - while not train_iter_ctx.should_stop(training_step_results): - # Before training step, try to bring failed workers back. - with self._timers[RESTORE_WORKERS_TIMER]: - self.restore_workers(self.env_runner_group) - - # Try to train one step. - with self._timers[TRAINING_STEP_TIMER]: - # TODO (sven): Should we reduce the different - # `training_step_results` over time with MetricsLogger. - training_step_results = self.training_step() - - if training_step_results: - results = training_step_results - - return results, train_iter_ctx - - def _run_one_evaluation( - self, - parallel_train_future: Optional[concurrent.futures.ThreadPoolExecutor] = None, - ) -> ResultDict: - """Runs evaluation step via `self.evaluate()` and handling worker failures. - - Args: - parallel_train_future: In case, we are training and avaluating in parallel, - this arg carries the currently running ThreadPoolExecutor object that - runs the training iteration. Use `parallel_train_future.done()` to - check, whether the parallel training job has completed and - `parallel_train_future.result()` to get its return values. - - Returns: - The results dict from the evaluation call. - """ - if self.eval_env_runner_group is not None: - with self._timers[RESTORE_EVAL_WORKERS_TIMER]: - self.restore_workers(self.eval_env_runner_group) - - # Run `self.evaluate()` only once per training iteration. - # TODO (sven): Move this timer into new metrics-logger API. - with self._timers[EVALUATION_ITERATION_TIMER]: - eval_results = self.evaluate(parallel_train_future=parallel_train_future) - self._timers[EVALUATION_ITERATION_TIMER].push_units_processed( - self._counters[NUM_ENV_STEPS_SAMPLED_FOR_EVALUATION_THIS_ITER] - ) - - # After evaluation, do a round of health check on remote eval workers to see if - # any of the failed workers are back. - if self.eval_env_runner_group is not None: - # Add number of healthy evaluation workers after this iteration. - eval_results[ - "num_healthy_workers" - ] = self.eval_env_runner_group.num_healthy_remote_workers() - eval_results[ - "num_in_flight_async_reqs" - ] = self.eval_env_runner_group.num_in_flight_async_reqs() - eval_results[ - "num_remote_worker_restarts" - ] = self.eval_env_runner_group.num_remote_worker_restarts() - - return {EVALUATION_RESULTS: eval_results} - - def _run_one_training_iteration_and_evaluation_in_parallel( - self, - ) -> Tuple[ResultDict, ResultDict, "TrainIterCtx"]: - """Runs one training iteration and one evaluation step in parallel. - - First starts the training iteration (via `self._run_one_training_iteration()`) - within a ThreadPoolExecutor, then runs the evaluation step in parallel. - In auto-duration mode (config.evaluation_duration=auto), makes sure the - evaluation step takes roughly the same time as the training iteration. - - Returns: - A tuple containing the training results, the evaluation results, and - the `TrainIterCtx` object returned by the training call. - """ - with concurrent.futures.ThreadPoolExecutor() as executor: - parallel_train_future = executor.submit( - lambda: self._run_one_training_iteration() - ) evaluation_results = {} # If the debug setting _run_training_always_in_thread is used, do NOT # evaluate, no matter what the settings are, @@ -3645,153 +3388,515 @@ def _run_one_training_iteration_and_evaluation_in_parallel_wo_thread( # Run training and collect the training results. train_results, train_iter_ctx = self._run_one_training_iteration() - # Collect the evaluation results. - eval_results = self.eval_env_runner_group.fetch_ready_async_reqs( - return_obj_refs=False, timeout_seconds=time_out - ) - for wid, (batch, metrics, iter) in eval_results: - # Skip results from an older iteration. - if iter != self.iteration: - continue - agent_steps += batch.agent_steps() - env_steps += batch.env_steps() - all_metrics.append(metrics) + # Collect the evaluation results. + eval_results = self.eval_env_runner_group.fetch_ready_async_reqs( + return_obj_refs=False, timeout_seconds=time_out + ) + for wid, (batch, metrics, iter) in eval_results: + # Skip results from an older iteration. + if iter != self.iteration: + continue + agent_steps += batch.agent_steps() + env_steps += batch.env_steps() + all_metrics.append(metrics) + + if not self.config.enable_env_runner_and_connector_v2: + eval_results = summarize_episodes( + all_metrics, + all_metrics, + keep_custom_metrics=( + self.evaluation_config.keep_per_episode_custom_metrics + ), + ) + eval_results[NUM_AGENT_STEPS_SAMPLED_THIS_ITER] = agent_steps + eval_results[NUM_ENV_STEPS_SAMPLED_THIS_ITER] = env_steps + # TODO: Remove this key at some point. Here for backward compatibility. + eval_results["timesteps_this_iter"] = eval_results.get( + NUM_ENV_STEPS_SAMPLED_THIS_ITER, 0 + ) + else: + self.metrics.merge_and_log_n_dicts( + all_metrics, + key=(EVALUATION_RESULTS, ENV_RUNNER_RESULTS), + ) + eval_results = self.metrics.reduce((EVALUATION_RESULTS, ENV_RUNNER_RESULTS)) + + # Warn if results are empty, it could be that this is because the eval timesteps + # are not enough to run through one full episode. + if eval_results[ENV_RUNNER_RESULTS][NUM_EPISODES] == 0: + logger.warning( + "This evaluation iteration resulted in an empty set of episode summary " + "results! It's possible that your configured duration timesteps are not" + " enough to finish even a single episode. You have configured " + f"{self.config.evaluation_duration}" + f"{self.config.evaluation_duration_unit}. For 'timesteps', try " + "increasing this value via the `config.evaluation(evaluation_duration=" + "...)` OR change the unit to 'episodes' via `config.evaluation(" + "evaluation_duration_unit='episodes')` OR try increasing the timeout " + "threshold via `config.evaluation(evaluation_sample_timeout_s=...)` OR " + "you can also set `config.evaluation_force_reset_envs_before_iteration`" + " to False. However, keep in mind that in the latter case, the " + "evaluation results may contain some episode stats generated with " + "earlier weights versions." + ) + + # After evaluation, do a round of health check on remote eval workers to see if + # any of the failed workers are back. + # Add number of healthy evaluation workers after this iteration. + eval_results[ + "num_healthy_workers" + ] = self.eval_env_runner_group.num_healthy_remote_workers() + eval_results[ + "num_in_flight_async_reqs" + ] = self.eval_env_runner_group.num_in_flight_async_reqs() + eval_results[ + "num_remote_worker_restarts" + ] = self.eval_env_runner_group.num_remote_worker_restarts() + + return train_results, {"evaluation": eval_results}, train_iter_ctx + + def _run_offline_evaluation(self): + """Runs offline evaluation via `OfflineEvaluator.estimate_on_dataset()` API. + + This method will be used when `evaluation_dataset` is provided. + Note: This will only work if the policy is a single agent policy. + + Returns: + The results dict from the offline evaluation call. + """ + assert len(self.env_runner_group.local_env_runner.policy_map) == 1 + + parallelism = self.evaluation_config.evaluation_num_env_runners or 1 + offline_eval_results = {"off_policy_estimator": {}} + for evaluator_name, offline_evaluator in self.reward_estimators.items(): + offline_eval_results["off_policy_estimator"][ + evaluator_name + ] = offline_evaluator.estimate_on_dataset( + self.evaluation_dataset, + n_parallelism=parallelism, + ) + return offline_eval_results + + @classmethod + def _should_create_evaluation_env_runners(cls, eval_config: "AlgorithmConfig"): + """Determines whether we need to create evaluation workers. + + Returns False if we need to run offline evaluation + (with ope.estimate_on_dastaset API) or when local worker is to be used for + evaluation. Note: We only use estimate_on_dataset API with bandits for now. + That is when ope_split_batch_by_episode is False. + TODO: In future we will do the same for episodic RL OPE. + """ + run_offline_evaluation = ( + eval_config.off_policy_estimation_methods + and not eval_config.ope_split_batch_by_episode + ) + return not run_offline_evaluation and ( + eval_config.evaluation_num_env_runners > 0 + or eval_config.evaluation_interval + ) + + def _compile_iteration_results_new_api_stack(self, *, train_results, eval_results): + # Error if users still use `self._timers`. + if self._timers: + raise ValueError( + "`Algorithm._timers` is no longer supported on the new API stack! " + "Instead, use `Algorithm.metrics.log_time(" + "[some key (str) or nested key sequence (tuple)])`, e.g. inside your " + "custom `training_step()` method, do: " + "`with self.metrics.log_time(('timers', 'my_block_to_be_timed')): ...`" + ) + + # Return dict (shallow copy of `train_results`). + results: ResultDict = train_results.copy() + # Backward compatibility `NUM_ENV_STEPS_SAMPLED_LIFETIME` is now: + # `ENV_RUNNER_RESULTS/NUM_ENV_STEPS_SAMPLED_LIFETIME`. + results[NUM_ENV_STEPS_SAMPLED_LIFETIME] = results.get( + ENV_RUNNER_RESULTS, {} + ).get(NUM_ENV_STEPS_SAMPLED_LIFETIME, 0) + + # Evaluation results. + if eval_results: + assert ( + isinstance(eval_results, dict) + and len(eval_results) == 1 + and EVALUATION_RESULTS in eval_results + ) + results.update(eval_results) + + # Fault tolerance stats. + results[FAULT_TOLERANCE_STATS] = { + "num_healthy_workers": self.env_runner_group.num_healthy_remote_workers(), + "num_in_flight_async_reqs": ( + self.env_runner_group.num_in_flight_async_reqs() + ), + "num_remote_worker_restarts": ( + self.env_runner_group.num_remote_worker_restarts() + ), + } + # Resolve all `Stats` leafs by peeking (get their reduced values). + return tree.map_structure( + lambda s: s.peek() if isinstance(s, Stats) else s, + results, + ) + + def __repr__(self): + if self.config.enable_rl_module_and_learner: + return ( + f"{type(self).__name__}(" + f"env={self.config.env}; env-runners={self.config.num_env_runners}; " + f"learners={self.config.num_learners}; " + f"multi-agent={self.config.is_multi_agent()}" + f")" + ) + else: + return type(self).__name__ + + @property + def env_runner(self): + return self.env_runner_group.local_env_runner + + @property + def eval_env_runner(self): + return self.eval_env_runner_group.local_env_runner + + def _record_usage(self, config): + """Record the framework and algorithm used. + + Args: + config: Algorithm config dict. + """ + record_extra_usage_tag(TagKey.RLLIB_FRAMEWORK, config["framework"]) + record_extra_usage_tag(TagKey.RLLIB_NUM_WORKERS, str(config["num_env_runners"])) + alg = self.__class__.__name__ + # We do not want to collect user defined algorithm names. + if alg not in ALL_ALGORITHMS: + alg = "USER_DEFINED" + record_extra_usage_tag(TagKey.RLLIB_ALGORITHM, alg) + + @OldAPIStack + def _export_model( + self, export_formats: List[str], export_dir: str + ) -> Dict[str, str]: + ExportFormat.validate(export_formats) + exported = {} + if ExportFormat.CHECKPOINT in export_formats: + path = os.path.join(export_dir, ExportFormat.CHECKPOINT) + self.export_policy_checkpoint(path) + exported[ExportFormat.CHECKPOINT] = path + if ExportFormat.MODEL in export_formats: + path = os.path.join(export_dir, ExportFormat.MODEL) + self.export_policy_model(path) + exported[ExportFormat.MODEL] = path + if ExportFormat.ONNX in export_formats: + path = os.path.join(export_dir, ExportFormat.ONNX) + self.export_policy_model(path, onnx=int(os.getenv("ONNX_OPSET", "11"))) + exported[ExportFormat.ONNX] = path + return exported + + @OldAPIStack + def __getstate__(self) -> Dict: + """Returns current state of Algorithm, sufficient to restore it from scratch. + + Returns: + The current state dict of this Algorithm, which can be used to sufficiently + restore the algorithm from scratch without any other information. + """ + if self.config.enable_env_runner_and_connector_v2: + raise RuntimeError( + "Algorithm.__getstate__() not supported anymore on the new API stack! " + "Use Algorithm.get_state() instead." + ) + + # Add config to state so complete Algorithm can be reproduced w/o it. + state = { + "algorithm_class": type(self), + "config": self.config.get_state(), + } + + if hasattr(self, "env_runner_group"): + state["worker"] = self.env_runner_group.local_env_runner.get_state() + + # Also store eval `policy_mapping_fn` (in case it's different from main + # one). Note, the new `EnvRunner API` has no policy mapping function. + if ( + hasattr(self, "eval_env_runner_group") + and self.eval_env_runner_group is not None + ): + state["eval_policy_mapping_fn"] = self.eval_env_runner.policy_mapping_fn + + # Save counters. + state["counters"] = self._counters + + # TODO: Experimental functionality: Store contents of replay buffer + # to checkpoint, only if user has configured this. + if self.local_replay_buffer is not None and self.config.get( + "store_buffer_in_checkpoints" + ): + state["local_replay_buffer"] = self.local_replay_buffer.get_state() + + # Save current `training_iteration`. + state[TRAINING_ITERATION] = self.training_iteration + + return state + + @OldAPIStack + def __setstate__(self, state) -> None: + """Sets the algorithm to the provided state. + + Args: + state: The state dict to restore this Algorithm instance to. `state` may + have been returned by a call to an Algorithm's `__getstate__()` method. + """ + if self.config.enable_env_runner_and_connector_v2: + raise RuntimeError( + "Algorithm.__setstate__() not supported anymore on the new API stack! " + "Use Algorithm.set_state() instead." + ) + + # Old API stack: The local worker stores its state (together with all the + # Module information) in state['worker']. + if hasattr(self, "env_runner_group") and "worker" in state and state["worker"]: + self.env_runner.set_state(state["worker"]) + remote_state_ref = ray.put(state["worker"]) + self.env_runner_group.foreach_worker( + lambda w: w.set_state(ray.get(remote_state_ref)), + local_env_runner=False, + ) + if self.eval_env_runner_group: + # Avoid `state` being pickled into the remote function below. + _eval_policy_mapping_fn = state.get("eval_policy_mapping_fn") + + def _setup_eval_worker(w): + w.set_state(ray.get(remote_state_ref)) + # Override `policy_mapping_fn` as it might be different for eval + # workers. + w.set_policy_mapping_fn(_eval_policy_mapping_fn) + + # If evaluation workers are used, also restore the policies + # there in case they are used for evaluation purpose. + self.eval_env_runner_group.foreach_worker(_setup_eval_worker) + + # Restore replay buffer data. + if self.local_replay_buffer is not None: + # TODO: Experimental functionality: Restore contents of replay + # buffer from checkpoint, only if user has configured this. + if self.config.store_buffer_in_checkpoints: + if "local_replay_buffer" in state: + self.local_replay_buffer.set_state(state["local_replay_buffer"]) + else: + logger.warning( + "`store_buffer_in_checkpoints` is True, but no replay " + "data found in state!" + ) + elif "local_replay_buffer" in state and log_once( + "no_store_buffer_in_checkpoints_but_data_found" + ): + logger.warning( + "`store_buffer_in_checkpoints` is False, but some replay " + "data found in state!" + ) + + if "counters" in state: + self._counters = state["counters"] + + if TRAINING_ITERATION in state: + self._iteration = state[TRAINING_ITERATION] + + @OldAPIStack + @staticmethod + def _checkpoint_info_to_algorithm_state( + checkpoint_info: dict, + *, + policy_ids: Optional[Collection[PolicyID]] = None, + policy_mapping_fn: Optional[Callable[[AgentID, EpisodeID], PolicyID]] = None, + policies_to_train: Optional[ + Union[ + Collection[PolicyID], + Callable[[PolicyID, Optional[SampleBatchType]], bool], + ] + ] = None, + ) -> Dict: + """Converts a checkpoint info or object to a proper Algorithm state dict. + + The returned state dict can be used inside self.__setstate__(). + + Args: + checkpoint_info: A checkpoint info dict as returned by + `ray.rllib.utils.checkpoints.get_checkpoint_info( + [checkpoint dir or AIR Checkpoint])`. + policy_ids: Optional list/set of PolicyIDs. If not None, only those policies + listed here will be included in the returned state. Note that + state items such as filters, the `is_policy_to_train` function, as + well as the multi-agent `policy_ids` dict will be adjusted as well, + based on this arg. + policy_mapping_fn: An optional (updated) policy mapping function + to include in the returned state. + policies_to_train: An optional list of policy IDs to be trained + or a callable taking PolicyID and SampleBatchType and + returning a bool (trainable or not?) to include in the returned state. + + Returns: + The state dict usable within the `self.__setstate__()` method. + """ + if checkpoint_info["type"] != "Algorithm": + raise ValueError( + "`checkpoint` arg passed to " + "`Algorithm._checkpoint_info_to_algorithm_state()` must be an " + f"Algorithm checkpoint (but is {checkpoint_info['type']})!" + ) + + msgpack = None + if checkpoint_info.get("format") == "msgpack": + msgpack = try_import_msgpack(error=True) + + with open(checkpoint_info["state_file"], "rb") as f: + if msgpack is not None: + data = f.read() + state = msgpack.unpackb(data, raw=False) + else: + state = pickle.load(f) + + # Old API stack: Policies are in separate sub-dirs. + if ( + checkpoint_info["checkpoint_version"] > version.Version("0.1") + and state.get("worker") is not None + and state.get("worker") + ): + worker_state = state["worker"] + + # Retrieve the set of all required policy IDs. + policy_ids = set( + policy_ids if policy_ids is not None else worker_state["policy_ids"] + ) + + # Remove those policies entirely from filters that are not in + # `policy_ids`. + worker_state["filters"] = { + pid: filter + for pid, filter in worker_state["filters"].items() + if pid in policy_ids + } + + # Get Algorithm class. + if isinstance(state["algorithm_class"], str): + # Try deserializing from a full classpath. + # Or as a last resort: Tune registered algorithm name. + state["algorithm_class"] = deserialize_type( + state["algorithm_class"] + ) or get_trainable_cls(state["algorithm_class"]) + # Compile actual config object. + default_config = state["algorithm_class"].get_default_config() + if isinstance(default_config, AlgorithmConfig): + new_config = default_config.update_from_dict(state["config"]) + else: + new_config = Algorithm.merge_algorithm_configs( + default_config, state["config"] + ) - if not self.config.enable_env_runner_and_connector_v2: - eval_results = summarize_episodes( - all_metrics, - all_metrics, - keep_custom_metrics=( - self.evaluation_config.keep_per_episode_custom_metrics + # Remove policies from multiagent dict that are not in `policy_ids`. + new_policies = new_config.policies + if isinstance(new_policies, (set, list, tuple)): + new_policies = {pid for pid in new_policies if pid in policy_ids} + else: + new_policies = { + pid: spec for pid, spec in new_policies.items() if pid in policy_ids + } + new_config.multi_agent( + policies=new_policies, + policies_to_train=policies_to_train, + **( + {"policy_mapping_fn": policy_mapping_fn} + if policy_mapping_fn is not None + else {} ), ) - eval_results[NUM_AGENT_STEPS_SAMPLED_THIS_ITER] = agent_steps - eval_results[NUM_ENV_STEPS_SAMPLED_THIS_ITER] = env_steps - # TODO: Remove this key at some point. Here for backward compatibility. - eval_results["timesteps_this_iter"] = eval_results.get( - NUM_ENV_STEPS_SAMPLED_THIS_ITER, 0 - ) - else: - self.metrics.merge_and_log_n_dicts( - all_metrics, - key=(EVALUATION_RESULTS, ENV_RUNNER_RESULTS), - ) - eval_results = self.metrics.reduce((EVALUATION_RESULTS, ENV_RUNNER_RESULTS)) + state["config"] = new_config - # Warn if results are empty, it could be that this is because the eval timesteps - # are not enough to run through one full episode. - if eval_results[ENV_RUNNER_RESULTS][NUM_EPISODES] == 0: - logger.warning( - "This evaluation iteration resulted in an empty set of episode summary " - "results! It's possible that your configured duration timesteps are not" - " enough to finish even a single episode. You have configured " - f"{self.config.evaluation_duration} " - f"{self.config.evaluation_duration_unit}. For 'timesteps', try " - "increasing this value via the `config.evaluation(evaluation_duration=" - "...)` OR change the unit to 'episodes' via `config.evaluation(" - "evaluation_duration_unit='episodes')` OR try increasing the timeout " - "threshold via `config.evaluation(evaluation_sample_timeout_s=...)` OR " - "you can also set `config.evaluation_force_reset_envs_before_iteration`" - " to False. However, keep in mind that in the latter case, the " - "evaluation results may contain some episode stats generated with " - "earlier weights versions." - ) + # Prepare local `worker` state to add policies' states into it, + # read from separate policy checkpoint files. + worker_state["policy_states"] = {} + for pid in policy_ids: + policy_state_file = os.path.join( + checkpoint_info["checkpoint_dir"], + "policies", + pid, + "policy_state." + + ("msgpck" if checkpoint_info["format"] == "msgpack" else "pkl"), + ) + if not os.path.isfile(policy_state_file): + raise ValueError( + "Given checkpoint does not seem to be valid! No policy " + f"state file found for PID={pid}. " + f"The file not found is: {policy_state_file}." + ) - # After evaluation, do a round of health check on remote eval workers to see if - # any of the failed workers are back. - # Add number of healthy evaluation workers after this iteration. - eval_results[ - "num_healthy_workers" - ] = self.eval_env_runner_group.num_healthy_remote_workers() - eval_results[ - "num_in_flight_async_reqs" - ] = self.eval_env_runner_group.num_in_flight_async_reqs() - eval_results[ - "num_remote_worker_restarts" - ] = self.eval_env_runner_group.num_remote_worker_restarts() + with open(policy_state_file, "rb") as f: + if msgpack is not None: + worker_state["policy_states"][pid] = msgpack.load(f) + else: + worker_state["policy_states"][pid] = pickle.load(f) - return train_results, {"evaluation": eval_results}, train_iter_ctx + # These two functions are never serialized in a msgpack checkpoint (which + # does not store code, unlike a cloudpickle checkpoint). Hence the user has + # to provide them with the `Algorithm.from_checkpoint()` call. + if policy_mapping_fn is not None: + worker_state["policy_mapping_fn"] = policy_mapping_fn + if ( + policies_to_train is not None + # `policies_to_train` might be left None in case all policies should be + # trained. + or worker_state["is_policy_to_train"] == NOT_SERIALIZABLE + ): + worker_state["is_policy_to_train"] = policies_to_train - def _run_offline_evaluation(self): - """Runs offline evaluation via `OfflineEvaluator.estimate_on_dataset()` API. + if state["config"].enable_rl_module_and_learner: + state["learner_state_dir"] = os.path.join( + checkpoint_info["checkpoint_dir"], "learner" + ) - This method will be used when `evaluation_dataset` is provided. - Note: This will only work if the policy is a single agent policy. + return state + + @OldAPIStack + def _create_local_replay_buffer_if_necessary( + self, config: PartialAlgorithmConfigDict + ) -> Optional[MultiAgentReplayBuffer]: + """Create a MultiAgentReplayBuffer instance if necessary. + + Args: + config: Algorithm-specific configuration data. Returns: - The results dict from the offline evaluation call. + MultiAgentReplayBuffer instance based on algorithm config. + None, if local replay buffer is not needed. """ - assert len(self.env_runner_group.local_env_runner.policy_map) == 1 + if not config.get("replay_buffer_config") or config["replay_buffer_config"].get( + "no_local_replay_buffer" + ): + return - parallelism = self.evaluation_config.evaluation_num_env_runners or 1 - offline_eval_results = {"off_policy_estimator": {}} - for evaluator_name, offline_evaluator in self.reward_estimators.items(): - offline_eval_results["off_policy_estimator"][ - evaluator_name - ] = offline_evaluator.estimate_on_dataset( - self.evaluation_dataset, - n_parallelism=parallelism, - ) - return offline_eval_results + return from_config(ReplayBuffer, config["replay_buffer_config"]) - @classmethod - def _should_create_evaluation_rollout_workers(cls, eval_config: "AlgorithmConfig"): - """Determines whether we need to create evaluation workers. + @OldAPIStack + def _run_one_training_iteration_old_api_stack(self): + with self._timers[TRAINING_ITERATION_TIMER]: + if self.config.get("framework") == "tf2" and not tf.executing_eagerly(): + tf1.enable_eager_execution() - Returns False if we need to run offline evaluation - (with ope.estimate_on_dastaset API) or when local worker is to be used for - evaluation. Note: We only use estimate_on_dataset API with bandits for now. - That is when ope_split_batch_by_episode is False. - TODO: In future we will do the same for episodic RL OPE. - """ - run_offline_evaluation = ( - eval_config.off_policy_estimation_methods - and not eval_config.ope_split_batch_by_episode - ) - return not run_offline_evaluation and ( - eval_config.evaluation_num_env_runners > 0 - or eval_config.evaluation_interval - ) + results = {} + training_step_results = None + with TrainIterCtx(algo=self) as train_iter_ctx: + while not train_iter_ctx.should_stop(training_step_results): + with self._timers[RESTORE_WORKERS_TIMER]: + self.restore_workers(self.env_runner_group) - def _compile_iteration_results_new_api_stack( - self, *, train_results, eval_results, step_ctx - ): - # Return dict (shallow copy of `train_results`). - results: ResultDict = train_results.copy() + with self._timers[TRAINING_STEP_TIMER]: + training_step_results = self.training_step() - # Collect old-API-stack-style `self._timers` results. - for k, timer in self._timers.items(): - if TIMERS not in results: - results[TIMERS] = {} - results[TIMERS]["{}_time_sec".format(k)] = timer.mean - if timer.has_units_processed(): - results[TIMERS]["{}_throughput".format(k)] = round( - timer.mean_throughput, 3 - ) + if training_step_results: + results = training_step_results - # Evaluation results. - if eval_results: - assert ( - isinstance(eval_results, dict) - and len(eval_results) == 1 - and EVALUATION_RESULTS in eval_results - ) - results.update(eval_results) - # Fault tolerance stats. - results[FAULT_TOLERANCE_STATS] = { - "num_healthy_workers": self.env_runner_group.num_healthy_remote_workers(), - "num_in_flight_async_reqs": ( - self.env_runner_group.num_in_flight_async_reqs() - ), - "num_remote_worker_restarts": ( - self.env_runner_group.num_remote_worker_restarts() - ), - } - # Resolve all `Stats` leafs by peeking (get their reduced values). - return tree.map_structure( - lambda s: s.peek() if isinstance(s, Stats) else s, - results, - ) + return results, train_iter_ctx @OldAPIStack def _compile_iteration_results_old_api_stack( @@ -3912,39 +4017,6 @@ def _compile_iteration_results_old_api_stack( return results - def __repr__(self): - return type(self).__name__ - - @property - def env_runner(self): - return self.env_runner_group.local_env_runner - - @property - def eval_env_runner(self): - return self.eval_env_runner_group.local_env_runner - - def _record_usage(self, config): - """Record the framework and algorithm used. - - Args: - config: Algorithm config dict. - """ - record_extra_usage_tag(TagKey.RLLIB_FRAMEWORK, config["framework"]) - record_extra_usage_tag(TagKey.RLLIB_NUM_WORKERS, str(config["num_env_runners"])) - alg = self.__class__.__name__ - # We do not want to collect user defined algorithm names. - if alg not in ALL_ALGORITHMS: - alg = "USER_DEFINED" - record_extra_usage_tag(TagKey.RLLIB_ALGORITHM, alg) - - @Deprecated(error=True) - def import_policy_model_from_h5(self, *args, **kwargs): - pass - - @Deprecated(error=True) - def import_model(self, *args, **kwargs): - pass - @Deprecated( new="Algorithm.env_runner_group", error=False, @@ -3979,19 +4051,20 @@ def __enter__(self): self.trained = 0 if self.algo.config.enable_env_runner_and_connector_v2: self.init_env_steps_sampled = self.algo.metrics.peek( - NUM_ENV_STEPS_SAMPLED_LIFETIME, default=0 + (ENV_RUNNER_RESULTS, NUM_ENV_STEPS_SAMPLED_LIFETIME), default=0 ) self.init_env_steps_trained = self.algo.metrics.peek( - NUM_ENV_STEPS_TRAINED_LIFETIME, default=0 + (LEARNER_RESULTS, ALL_MODULES, NUM_ENV_STEPS_TRAINED_LIFETIME), + default=0, ) self.init_agent_steps_sampled = sum( self.algo.metrics.peek( - NUM_AGENT_STEPS_SAMPLED_LIFETIME, default={} + (ENV_RUNNER_RESULTS, NUM_AGENT_STEPS_SAMPLED_LIFETIME), default={} ).values() ) self.init_agent_steps_trained = sum( self.algo.metrics.peek( - NUM_AGENT_STEPS_TRAINED_LIFETIME, default={} + (LEARNER_RESULTS, NUM_AGENT_STEPS_TRAINED_LIFETIME), default={} ).values() ) else: @@ -4013,7 +4086,7 @@ def get_time_taken_sec(self) -> float: def should_stop(self, results): # Before first call to `step()`. - if results is None: + if results in [None, False]: # Fail after n retries. self.failures += 1 if self.failures > self.failure_tolerance: @@ -4032,7 +4105,8 @@ def should_stop(self, results): self.sampled = ( sum( self.algo.metrics.peek( - NUM_AGENT_STEPS_SAMPLED_LIFETIME, default={} + (ENV_RUNNER_RESULTS, NUM_AGENT_STEPS_SAMPLED_LIFETIME), + default={}, ).values() ) - self.init_agent_steps_sampled @@ -4040,18 +4114,24 @@ def should_stop(self, results): self.trained = ( sum( self.algo.metrics.peek( - NUM_AGENT_STEPS_TRAINED_LIFETIME, default={} + (LEARNER_RESULTS, NUM_AGENT_STEPS_TRAINED_LIFETIME), + default={}, ).values() ) - self.init_agent_steps_trained ) else: self.sampled = ( - self.algo.metrics.peek(NUM_ENV_STEPS_SAMPLED_LIFETIME, default=0) + self.algo.metrics.peek( + (ENV_RUNNER_RESULTS, NUM_ENV_STEPS_SAMPLED_LIFETIME), default=0 + ) - self.init_env_steps_sampled ) self.trained = ( - self.algo.metrics.peek(NUM_ENV_STEPS_TRAINED_LIFETIME, default=0) + self.algo.metrics.peek( + (LEARNER_RESULTS, ALL_MODULES, NUM_ENV_STEPS_TRAINED_LIFETIME), + default=0, + ) - self.init_env_steps_trained ) else: diff --git a/rllib/algorithms/appo/appo.py b/rllib/algorithms/appo/appo.py index 99d32099b16b..37b8fd863c66 100644 --- a/rllib/algorithms/appo/appo.py +++ b/rllib/algorithms/appo/appo.py @@ -26,9 +26,6 @@ NUM_TARGET_UPDATES, ) from ray.rllib.utils.metrics import LEARNER_STATS_KEY -from ray.rllib.utils.typing import ( - ResultDict, -) logger = logging.getLogger(__name__) @@ -291,60 +288,51 @@ def __init__(self, config, *args, **kwargs): self.env_runner.foreach_policy_to_train(lambda p, _: p.update_target()) @override(IMPALA) - def training_step(self) -> ResultDict: - train_results = super().training_step() + def training_step(self) -> None: + if self.config.enable_rl_module_and_learner: + return super().training_step() + train_results = super().training_step() # Update the target network and the KL coefficient for the APPO-loss. # The target network update frequency is calculated automatically by the product # of `num_epochs` setting (usually 1 for APPO) and `minibatch_buffer_size`. - if self.config.enable_rl_module_and_learner: - if NUM_TARGET_UPDATES in train_results: - self._counters[NUM_TARGET_UPDATES] += train_results[NUM_TARGET_UPDATES] - self._counters[LAST_TARGET_UPDATE_TS] = train_results[ - LAST_TARGET_UPDATE_TS - ] - else: - last_update = self._counters[LAST_TARGET_UPDATE_TS] - cur_ts = self._counters[ - ( - NUM_AGENT_STEPS_SAMPLED - if self.config.count_steps_by == "agent_steps" - else NUM_ENV_STEPS_SAMPLED - ) - ] - target_update_freq = ( - self.config.num_epochs * self.config.minibatch_buffer_size + last_update = self._counters[LAST_TARGET_UPDATE_TS] + cur_ts = self._counters[ + ( + NUM_AGENT_STEPS_SAMPLED + if self.config.count_steps_by == "agent_steps" + else NUM_ENV_STEPS_SAMPLED ) - if cur_ts - last_update > target_update_freq: - self._counters[NUM_TARGET_UPDATES] += 1 - self._counters[LAST_TARGET_UPDATE_TS] = cur_ts - - # Update our target network. - self.env_runner.foreach_policy_to_train(lambda p, _: p.update_target()) - - # Also update the KL-coefficient for the APPO loss, if necessary. - if self.config.use_kl_loss: - - def update(pi, pi_id): - assert LEARNER_STATS_KEY not in train_results, ( - "{} should be nested under policy id key".format( - LEARNER_STATS_KEY - ), - train_results, - ) - if pi_id in train_results: - kl = train_results[pi_id][LEARNER_STATS_KEY].get("kl") - assert kl is not None, (train_results, pi_id) - # Make the actual `Policy.update_kl()` call. - pi.update_kl(kl) - else: - logger.warning( - "No data for {}, not updating kl".format(pi_id) - ) - - # Update KL on all trainable policies within the local (trainer) - # Worker. - self.env_runner.foreach_policy_to_train(update) + ] + target_update_freq = self.config.num_epochs * self.config.minibatch_buffer_size + if cur_ts - last_update > target_update_freq: + self._counters[NUM_TARGET_UPDATES] += 1 + self._counters[LAST_TARGET_UPDATE_TS] = cur_ts + + # Update our target network. + self.env_runner.foreach_policy_to_train(lambda p, _: p.update_target()) + + # Also update the KL-coefficient for the APPO loss, if necessary. + if self.config.use_kl_loss: + + def update(pi, pi_id): + assert LEARNER_STATS_KEY not in train_results, ( + "{} should be nested under policy id key".format( + LEARNER_STATS_KEY + ), + train_results, + ) + if pi_id in train_results: + kl = train_results[pi_id][LEARNER_STATS_KEY].get("kl") + assert kl is not None, (train_results, pi_id) + # Make the actual `Policy.update_kl()` call. + pi.update_kl(kl) + else: + logger.warning("No data for {}, not updating kl".format(pi_id)) + + # Update KL on all trainable policies within the local (trainer) + # Worker. + self.env_runner.foreach_policy_to_train(update) return train_results diff --git a/rllib/algorithms/appo/tests/test_appo.py b/rllib/algorithms/appo/tests/test_appo.py index e58eea2c782d..25814cc0688e 100644 --- a/rllib/algorithms/appo/tests/test_appo.py +++ b/rllib/algorithms/appo/tests/test_appo.py @@ -83,13 +83,11 @@ def test_appo_two_optimizers_two_lrs(self): algo.stop() def test_appo_entropy_coeff_schedule(self): - # Initial lr, doesn't really matter because of the schedule below. config = ( appo.APPOConfig() .environment("CartPole-v1") .env_runners( num_env_runners=1, - batch_mode="truncate_episodes", rollout_fragment_length=10, ) .training( diff --git a/rllib/algorithms/bc/bc.py b/rllib/algorithms/bc/bc.py index fc95b530d6c0..936bb35bb73d 100644 --- a/rllib/algorithms/bc/bc.py +++ b/rllib/algorithms/bc/bc.py @@ -3,7 +3,7 @@ from ray.rllib.algorithms.marwil.marwil import MARWIL, MARWILConfig from ray.rllib.core.rl_module.rl_module import RLModuleSpec from ray.rllib.utils.annotations import override -from ray.rllib.utils.typing import ResultDict, RLModuleSpecType +from ray.rllib.utils.typing import RLModuleSpecType class BCConfig(MARWILConfig): @@ -113,15 +113,10 @@ def validate(self) -> None: class BC(MARWIL): """Behavioral Cloning (derived from MARWIL). - Simply uses MARWIL with beta force-set to 0.0. + Uses MARWIL with beta force-set to 0.0. """ @classmethod @override(MARWIL) def get_default_config(cls) -> AlgorithmConfig: return BCConfig() - - @override(MARWIL) - def training_step(self) -> ResultDict: - # Call MARWIL's training step. - return super().training_step() diff --git a/rllib/algorithms/cql/cql.py b/rllib/algorithms/cql/cql.py index 865c9c85c14f..b5e97ff90580 100644 --- a/rllib/algorithms/cql/cql.py +++ b/rllib/algorithms/cql/cql.py @@ -24,7 +24,7 @@ train_one_step, ) from ray.rllib.policy.policy import Policy -from ray.rllib.utils.annotations import override +from ray.rllib.utils.annotations import OldAPIStack, override from ray.rllib.utils.deprecation import ( DEPRECATED_VALUE, deprecation_warning, @@ -39,9 +39,6 @@ NUM_AGENT_STEPS_TRAINED, NUM_ENV_STEPS_SAMPLED, NUM_ENV_STEPS_TRAINED, - NUM_ENV_STEPS_TRAINED_LIFETIME, - NUM_MODULE_STEPS_TRAINED, - NUM_MODULE_STEPS_TRAINED_LIFETIME, NUM_TARGET_UPDATES, OFFLINE_SAMPLING_TIMER, TARGET_NET_UPDATE_TIMER, @@ -301,14 +298,11 @@ def get_default_policy_class( return CQLTFPolicy @override(SAC) - def training_step(self) -> ResultDict: - if self.config.enable_env_runner_and_connector_v2: - return self._training_step_new_api_stack() - else: + def training_step(self) -> None: + # Old API stack (Policy, RolloutWorker, Connector). + if not self.config.enable_env_runner_and_connector_v2: return self._training_step_old_api_stack() - def _training_step_new_api_stack(self) -> ResultDict: - # Sampling from offline data. with self.metrics.log_time((TIMERS, OFFLINE_SAMPLING_TIMER)): # Return an iterator in case we are using remote learners. @@ -330,22 +324,6 @@ def _training_step_new_api_stack(self) -> ResultDict: # Log training results. self.metrics.merge_and_log_n_dicts(learner_results, key=LEARNER_RESULTS) - self.metrics.log_value( - NUM_ENV_STEPS_TRAINED_LIFETIME, - self.metrics.peek( - (LEARNER_RESULTS, ALL_MODULES, NUM_ENV_STEPS_TRAINED) - ), - reduce="sum", - ) - self.metrics.log_dict( - { - (LEARNER_RESULTS, mid, NUM_MODULE_STEPS_TRAINED_LIFETIME): ( - stats[NUM_MODULE_STEPS_TRAINED] - ) - for mid, stats in self.metrics.peek(LEARNER_RESULTS).items() - }, - reduce="sum", - ) # Synchronize weights. # As the results contain for each policy the loss and in addition the @@ -364,8 +342,7 @@ def _training_step_new_api_stack(self) -> ResultDict: inference_only=True, ) - return self.metrics.reduce() - + @OldAPIStack def _training_step_old_api_stack(self) -> ResultDict: # Collect SampleBatches from sample workers. with self._timers[SAMPLE_TIMER]: diff --git a/rllib/algorithms/dqn/dqn.py b/rllib/algorithms/dqn/dqn.py index d62cb3242e44..2c5e95602bdb 100644 --- a/rllib/algorithms/dqn/dqn.py +++ b/rllib/algorithms/dqn/dqn.py @@ -55,14 +55,6 @@ NUM_AGENT_STEPS_SAMPLED_LIFETIME, NUM_ENV_STEPS_SAMPLED, NUM_ENV_STEPS_SAMPLED_LIFETIME, - NUM_ENV_STEPS_TRAINED, - NUM_ENV_STEPS_TRAINED_LIFETIME, - NUM_EPISODES, - NUM_EPISODES_LIFETIME, - NUM_MODULE_STEPS_SAMPLED, - NUM_MODULE_STEPS_SAMPLED_LIFETIME, - NUM_MODULE_STEPS_TRAINED, - NUM_MODULE_STEPS_TRAINED_LIFETIME, NUM_TARGET_UPDATES, REPLAY_BUFFER_ADD_DATA_TIMER, REPLAY_BUFFER_SAMPLE_TIMER, @@ -640,7 +632,7 @@ def get_default_policy_class( return DQNTFPolicy @override(Algorithm) - def training_step(self) -> ResultDict: + def training_step(self) -> None: """DQN training iteration function. Each training iteration, we: @@ -655,14 +647,14 @@ def training_step(self) -> ResultDict: Returns: The results dict from executing the training iteration. """ - # New API stack (RLModule, Learner, EnvRunner, ConnectorV2). - if self.config.enable_env_runner_and_connector_v2: - return self._training_step_new_api_stack(with_noise_reset=True) - # Old API stack (Policy, RolloutWorker). - else: + # Old API stack (Policy, RolloutWorker, Connector). + if not self.config.enable_env_runner_and_connector_v2: return self._training_step_old_api_stack() - def _training_step_new_api_stack(self, *, with_noise_reset) -> ResultDict: + # New API stack (RLModule, Learner, EnvRunner, ConnectorV2). + return self._training_step_new_api_stack(with_noise_reset=True) + + def _training_step_new_api_stack(self, *, with_noise_reset): # Alternate between storing and sampling and training. store_weight, sample_and_train_weight = calculate_rr_weights(self.config) @@ -686,38 +678,16 @@ def _training_step_new_api_stack(self, *, with_noise_reset) -> ResultDict: with self.metrics.log_time((TIMERS, REPLAY_BUFFER_ADD_DATA_TIMER)): self.local_replay_buffer.add(episodes) - self.metrics.log_dict( - self.metrics.peek( - (ENV_RUNNER_RESULTS, NUM_AGENT_STEPS_SAMPLED), default={} - ), - key=NUM_AGENT_STEPS_SAMPLED_LIFETIME, - reduce="sum", - ) - self.metrics.log_value( - NUM_ENV_STEPS_SAMPLED_LIFETIME, - self.metrics.peek((ENV_RUNNER_RESULTS, NUM_ENV_STEPS_SAMPLED), default=0), - reduce="sum", - ) - self.metrics.log_value( - NUM_EPISODES_LIFETIME, - self.metrics.peek((ENV_RUNNER_RESULTS, NUM_EPISODES), default=0), - reduce="sum", - ) - self.metrics.log_dict( - self.metrics.peek( - (ENV_RUNNER_RESULTS, NUM_MODULE_STEPS_SAMPLED), - default={}, - ), - key=NUM_MODULE_STEPS_SAMPLED_LIFETIME, - reduce="sum", - ) - if self.config.count_steps_by == "agent_steps": current_ts = sum( - self.metrics.peek(NUM_AGENT_STEPS_SAMPLED_LIFETIME).values() + self.metrics.peek( + (ENV_RUNNER_RESULTS, NUM_AGENT_STEPS_SAMPLED_LIFETIME), default={} + ).values() ) else: - current_ts = self.metrics.peek(NUM_ENV_STEPS_SAMPLED_LIFETIME) + current_ts = self.metrics.peek( + (ENV_RUNNER_RESULTS, NUM_ENV_STEPS_SAMPLED_LIFETIME), default=0 + ) # If enough experiences have been sampled start training. if current_ts >= self.config.num_steps_sampled_before_learning_starts: @@ -748,10 +718,17 @@ def _training_step_new_api_stack(self, *, with_noise_reset) -> ResultDict: episodes=episodes, timesteps={ NUM_ENV_STEPS_SAMPLED_LIFETIME: ( - self.metrics.peek(NUM_ENV_STEPS_SAMPLED_LIFETIME) + self.metrics.peek( + (ENV_RUNNER_RESULTS, NUM_ENV_STEPS_SAMPLED_LIFETIME) + ) ), NUM_AGENT_STEPS_SAMPLED_LIFETIME: ( - self.metrics.peek(NUM_AGENT_STEPS_SAMPLED_LIFETIME) + self.metrics.peek( + ( + ENV_RUNNER_RESULTS, + NUM_AGENT_STEPS_SAMPLED_LIFETIME, + ) + ) ), }, ) @@ -773,29 +750,6 @@ def _training_step_new_api_stack(self, *, with_noise_reset) -> ResultDict: self.metrics.merge_and_log_n_dicts( learner_results, key=LEARNER_RESULTS ) - self.metrics.log_value( - NUM_ENV_STEPS_TRAINED_LIFETIME, - self.metrics.peek( - (LEARNER_RESULTS, ALL_MODULES, NUM_ENV_STEPS_TRAINED) - ), - reduce="sum", - ) - self.metrics.log_dict( - { - (LEARNER_RESULTS, mid, NUM_MODULE_STEPS_TRAINED_LIFETIME): ( - stats[NUM_MODULE_STEPS_TRAINED] - ) - for mid, stats in self.metrics.peek(LEARNER_RESULTS).items() - if NUM_MODULE_STEPS_TRAINED in stats - }, - reduce="sum", - ) - - # TODO (sven): Uncomment this once agent steps are available in the - # Learner stats. - # self.metrics.log_dict(self.metrics.peek( - # (LEARNER_RESULTS, NUM_AGENT_STEPS_TRAINED), default={} - # ), key=NUM_AGENT_STEPS_TRAINED_LIFETIME, reduce="sum") # Update replay buffer priorities. with self.metrics.log_time((TIMERS, REPLAY_BUFFER_UPDATE_PRIOS_TIMER)): @@ -816,8 +770,6 @@ def _training_step_new_api_stack(self, *, with_noise_reset) -> ResultDict: inference_only=True, ) - return self.metrics.reduce() - def _training_step_old_api_stack(self) -> ResultDict: """Training step for the old API stack. diff --git a/rllib/algorithms/dreamerv3/dreamerv3.py b/rllib/algorithms/dreamerv3/dreamerv3.py index 84e0cc6ae252..b583b6720110 100644 --- a/rllib/algorithms/dreamerv3/dreamerv3.py +++ b/rllib/algorithms/dreamerv3/dreamerv3.py @@ -38,13 +38,8 @@ GARBAGE_COLLECTION_TIMER, LEARN_ON_BATCH_TIMER, LEARNER_RESULTS, - NUM_AGENT_STEPS_SAMPLED, - NUM_AGENT_STEPS_SAMPLED_LIFETIME, - NUM_ENV_STEPS_SAMPLED, NUM_ENV_STEPS_SAMPLED_LIFETIME, NUM_ENV_STEPS_TRAINED_LIFETIME, - NUM_EPISODES, - NUM_EPISODES_LIFETIME, NUM_GRAD_UPDATES_LIFETIME, NUM_SYNCH_WORKER_WEIGHTS, SAMPLE_TIMER, @@ -52,7 +47,7 @@ TIMERS, ) from ray.rllib.utils.replay_buffers.episode_replay_buffer import EpisodeReplayBuffer -from ray.rllib.utils.typing import LearningRateOrSchedule, ResultDict +from ray.rllib.utils.typing import LearningRateOrSchedule logger = logging.getLogger(__name__) @@ -511,7 +506,7 @@ def setup(self, config: AlgorithmConfig): ) @override(Algorithm) - def training_step(self) -> ResultDict: + def training_step(self) -> None: # Push enough samples into buffer initially before we start training. if self.training_iteration == 0: logger.info( @@ -563,7 +558,13 @@ def training_step(self) -> ResultDict: # If we have never sampled before (just started the algo and not # recovered from a checkpoint), sample B random actions first. - if self.metrics.peek(NUM_ENV_STEPS_SAMPLED_LIFETIME, default=0) == 0: + if ( + self.metrics.peek( + (ENV_RUNNER_RESULTS, NUM_ENV_STEPS_SAMPLED_LIFETIME), + default=0, + ) + == 0 + ): _episodes, _env_runner_results = synchronous_parallel_sample( worker_set=self.env_runner_group, max_agent_steps=( @@ -581,23 +582,6 @@ def training_step(self) -> ResultDict: self.replay_buffer.add(episodes=_episodes) total_sampled += sum(len(eps) for eps in _episodes) - # Update lifetime counts (now that we gathered results from all - # EnvRunners). - self.metrics.log_dict( - { - NUM_AGENT_STEPS_SAMPLED_LIFETIME: self.metrics.peek( - (ENV_RUNNER_RESULTS, NUM_AGENT_STEPS_SAMPLED) - ), - NUM_ENV_STEPS_SAMPLED_LIFETIME: self.metrics.peek( - (ENV_RUNNER_RESULTS, NUM_ENV_STEPS_SAMPLED) - ), - NUM_EPISODES_LIFETIME: self.metrics.peek( - (ENV_RUNNER_RESULTS, NUM_EPISODES) - ), - }, - reduce="sum", - ) - # Summarize environment interaction and buffer data. report_sampling_and_replay_buffer( metrics=self.metrics, replay_buffer=self.replay_buffer @@ -646,14 +630,12 @@ def training_step(self) -> ResultDict: # time - send the current globally summed/reduced-timesteps. timesteps={ NUM_ENV_STEPS_SAMPLED_LIFETIME: self.metrics.peek( - NUM_ENV_STEPS_SAMPLED_LIFETIME, default=0 + (ENV_RUNNER_RESULTS, NUM_ENV_STEPS_SAMPLED_LIFETIME), + default=0, ) }, ) self.metrics.merge_and_log_n_dicts(learner_results, key=LEARNER_RESULTS) - self.metrics.log_value( - NUM_ENV_STEPS_TRAINED_LIFETIME, replayed_steps, reduce="sum" - ) sub_iter += 1 self.metrics.log_value(NUM_GRAD_UPDATES_LIFETIME, 1, reduce="sum") @@ -723,9 +705,6 @@ def training_step(self) -> ResultDict: # be close to the configured `training_ratio`. self.metrics.log_value("actual_training_ratio", self.training_ratio, window=1) - # Return all results. - return self.metrics.reduce() - @property def training_ratio(self) -> float: """Returns the actual training ratio of this Algorithm (not the configured one). @@ -736,7 +715,13 @@ def training_ratio(self) -> float: """ eps = 0.0001 return self.metrics.peek(NUM_ENV_STEPS_TRAINED_LIFETIME, default=0) / ( - (self.metrics.peek(NUM_ENV_STEPS_SAMPLED_LIFETIME, default=eps) or eps) + ( + self.metrics.peek( + (ENV_RUNNER_RESULTS, NUM_ENV_STEPS_SAMPLED_LIFETIME), + default=eps, + ) + or eps + ) ) # TODO (sven): Remove this once DreamerV3 is on the new SingleAgentEnvRunner. diff --git a/rllib/algorithms/impala/impala.py b/rllib/algorithms/impala/impala.py index 78e511931471..b3423f139214 100644 --- a/rllib/algorithms/impala/impala.py +++ b/rllib/algorithms/impala/impala.py @@ -42,14 +42,10 @@ MEAN_NUM_LEARNER_GROUP_RESULTS_RECEIVED, MEAN_NUM_LEARNER_GROUP_UPDATE_CALLED, NUM_AGENT_STEPS_SAMPLED, - NUM_AGENT_STEPS_SAMPLED_LIFETIME, NUM_AGENT_STEPS_TRAINED, NUM_ENV_STEPS_SAMPLED, NUM_ENV_STEPS_SAMPLED_LIFETIME, NUM_ENV_STEPS_TRAINED, - NUM_ENV_STEPS_TRAINED_LIFETIME, - NUM_EPISODES, - NUM_EPISODES_LIFETIME, NUM_MODULE_STEPS_TRAINED, NUM_SYNCH_WORKER_WEIGHTS, NUM_TRAINING_STEP_CALLS_SINCE_LAST_SYNCH_WORKER_WEIGHTS, @@ -608,7 +604,7 @@ def setup(self, config: AlgorithmConfig): self._learner_thread.start() @override(Algorithm) - def training_step(self) -> ResultDict: + def training_step(self): # Old API stack. if not self.config.enable_rl_module_and_learner: return self._training_step_old_api_stack() @@ -627,29 +623,13 @@ def training_step(self) -> ResultDict: ) = self._sample_and_get_connector_states() # Reduce EnvRunner metrics over the n EnvRunners. self.metrics.merge_and_log_n_dicts( - env_runner_metrics, key=ENV_RUNNER_RESULTS + env_runner_metrics, + key=ENV_RUNNER_RESULTS, ) # Log the average number of sample results (list of episodes) received. self.metrics.log_value(MEAN_NUM_EPISODE_LISTS_RECEIVED, len(episode_refs)) - # Log lifetime counts for env- and agent steps. - if env_runner_metrics: - self.metrics.log_dict( - { - NUM_AGENT_STEPS_SAMPLED_LIFETIME: self.metrics.peek( - (ENV_RUNNER_RESULTS, NUM_AGENT_STEPS_SAMPLED) - ), - NUM_ENV_STEPS_SAMPLED_LIFETIME: self.metrics.peek( - (ENV_RUNNER_RESULTS, NUM_ENV_STEPS_SAMPLED) - ), - NUM_EPISODES_LIFETIME: self.metrics.peek( - (ENV_RUNNER_RESULTS, NUM_EPISODES) - ), - }, - reduce="sum", - ) - # "Batch" collected episode refs into groups, such that exactly # `total_train_batch_size` timesteps are sent to # `LearnerGroup.update_from_episodes()`. @@ -681,7 +661,8 @@ def training_step(self) -> ResultDict: return_state=True, timesteps={ NUM_ENV_STEPS_SAMPLED_LIFETIME: self.metrics.peek( - NUM_ENV_STEPS_SAMPLED_LIFETIME, default=0 + (ENV_RUNNER_RESULTS, NUM_ENV_STEPS_SAMPLED_LIFETIME), + default=0, ), }, num_epochs=self.config.num_epochs, @@ -695,7 +676,8 @@ def training_step(self) -> ResultDict: return_state=True, timesteps={ NUM_ENV_STEPS_SAMPLED_LIFETIME: self.metrics.peek( - NUM_ENV_STEPS_SAMPLED_LIFETIME, default=0 + (ENV_RUNNER_RESULTS, NUM_ENV_STEPS_SAMPLED_LIFETIME), + default=0, ), }, num_epochs=self.config.num_epochs, @@ -725,16 +707,6 @@ def training_step(self) -> ResultDict: # Update LearnerGroup's own stats. self.metrics.log_dict(self.learner_group.get_stats(), key=LEARNER_GROUP) - self.metrics.log_value( - NUM_ENV_STEPS_TRAINED_LIFETIME, - self.metrics.peek( - (LEARNER_RESULTS, ALL_MODULES, NUM_ENV_STEPS_TRAINED), default=0 - ), - reduce="sum", - ) - # self.metrics.log_value(NUM_MODULE_STEPS_TRAINED_LIFETIME, self.metrics.peek( - # (LEARNER_RESULTS, NUM_MODULE_STEPS_TRAINED) - # ), reduce="sum") # Figure out, whether we should sync/broadcast the (remote) EnvRunner states. # Note: `learner_results` is a List of n (num async calls) Lists of m @@ -761,17 +733,10 @@ def training_step(self) -> ResultDict: with self.metrics.log_time((TIMERS, SYNCH_WORKER_WEIGHTS_TIMER)): self.env_runner_group.sync_env_runner_states( config=self.config, - env_steps_sampled=self.metrics.peek( - NUM_ENV_STEPS_SAMPLED_LIFETIME, default=0 - ), connector_states=connector_states, rl_module_state=rl_module_state, ) - if env_runner_metrics or last_good_learner_results: - return self.metrics.reduce() - return {} - def _sample_and_get_connector_states(self): def _remote_sample_get_state_and_metrics(_worker): _episodes = _worker.sample() diff --git a/rllib/algorithms/impala/impala_learner.py b/rllib/algorithms/impala/impala_learner.py index 5d02f5eb0029..72b158ec2547 100644 --- a/rllib/algorithms/impala/impala_learner.py +++ b/rllib/algorithms/impala/impala_learner.py @@ -137,36 +137,53 @@ def update_from_episodes( with self.metrics.log_time((ALL_MODULES, RAY_GET_EPISODES_TIMER)): # Resolve batch/episodes being ray object refs (instead of # actual batch/episodes objects). - episodes = ray.get(episodes) - episodes = tree.flatten(episodes) - env_steps = sum(map(len, episodes)) - - # Call the learner connector pipeline. - with self.metrics.log_time((ALL_MODULES, EPISODES_TO_BATCH_TIMER)): - batch = self._learner_connector( - rl_module=self.module, - batch={}, - episodes=episodes, - shared_data={}, - ) + # If this fails, it might be because some of the EnvRunners that collected + # `episodes` are down (ex. SPOT preemption or single EnvRunner crash). + # In this case, we should ignore those List[EpisodeType] references and not + # use these for the train batch. + try: + episodes = ray.get(episodes) + episodes_flat = tree.flatten(episodes) + except ray.exceptions.RayError: + # Try unreferencing one by one and collect those that are ok. + episodes_flat = [] + for e in episodes: + try: + episodes_flat.extend(ray.get(e)) + # Ignore exceptions and move on with other references. + except Exception: + pass + + env_steps = sum(map(len, episodes_flat)) + + # Only send a batch to the learner pipeline if its size is > 0. + if env_steps > 0: + # Call the learner connector pipeline. + with self.metrics.log_time((ALL_MODULES, EPISODES_TO_BATCH_TIMER)): + batch = self._learner_connector( + rl_module=self.module, + batch={}, + episodes=episodes_flat, + shared_data={}, + ) - # Queue the CPU batch to the GPU-loader thread. - if self.config.num_gpus_per_learner > 0: - self._gpu_loader_in_queue.put((batch, env_steps)) - self.metrics.log_value( - (ALL_MODULES, QUEUE_SIZE_GPU_LOADER_QUEUE), - self._gpu_loader_in_queue.qsize(), - ) - else: - # Enqueue to Learner thread's in-queue. - _LearnerThread.enqueue( - self._learner_thread_in_queue, - MultiAgentBatch( - {mid: SampleBatch(b) for mid, b in batch.items()}, - env_steps=env_steps, - ), - self.metrics, - ) + # Queue the CPU batch to the GPU-loader thread. + if self.config.num_gpus_per_learner > 0: + self._gpu_loader_in_queue.put((batch, env_steps)) + self.metrics.log_value( + (ALL_MODULES, QUEUE_SIZE_GPU_LOADER_QUEUE), + self._gpu_loader_in_queue.qsize(), + ) + else: + # Enqueue to Learner thread's in-queue. + _LearnerThread.enqueue( + self._learner_thread_in_queue, + MultiAgentBatch( + {mid: SampleBatch(b) for mid, b in batch.items()}, + env_steps=env_steps, + ), + self.metrics, + ) # Return all queued result dicts thus far (after reducing over them). results = {} diff --git a/rllib/algorithms/marwil/marwil.py b/rllib/algorithms/marwil/marwil.py index 21dbdbfbe181..d029c967e44b 100644 --- a/rllib/algorithms/marwil/marwil.py +++ b/rllib/algorithms/marwil/marwil.py @@ -19,7 +19,7 @@ train_one_step, ) from ray.rllib.policy.policy import Policy -from ray.rllib.utils.annotations import override +from ray.rllib.utils.annotations import OldAPIStack, override from ray.rllib.utils.deprecation import deprecation_warning from ray.rllib.utils.metrics import ( ALL_MODULES, @@ -27,10 +27,6 @@ LEARNER_UPDATE_TIMER, NUM_AGENT_STEPS_SAMPLED, NUM_ENV_STEPS_SAMPLED, - NUM_ENV_STEPS_TRAINED, - NUM_ENV_STEPS_TRAINED_LIFETIME, - NUM_MODULE_STEPS_TRAINED, - NUM_MODULE_STEPS_TRAINED_LIFETIME, OFFLINE_SAMPLING_TIMER, SAMPLE_TIMER, SYNCH_WORKER_WEIGHTS_TIMER, @@ -426,13 +422,7 @@ def get_default_policy_class( return MARWILTF2Policy @override(Algorithm) - def training_step(self) -> ResultDict: - if self.config.enable_env_runner_and_connector_v2: - return self._training_step_new_api_stack() - else: - return self._training_step_old_api_stack() - - def _training_step_new_api_stack(self) -> ResultDict: + def training_step(self) -> None: """Implements training logic for the new stack Note, this includes so far training with the `OfflineData` @@ -440,7 +430,10 @@ class (multi-/single-learner setup) and evaluation on `EnvRunner`s. Note further, evaluation on the dataset itself using estimators is not implemented, yet. """ - # Implement logic using RLModule and Learner API. + # Old API stack (Policy, RolloutWorker, Connector). + if not self.config.enable_env_runner_and_connector_v2: + return self._training_step_old_api_stack() + # TODO (simon): Take care of sampler metrics: right # now all rewards are `nan`, which possibly confuses # the user that sth. is not right, although it is as @@ -465,22 +458,7 @@ class (multi-/single-learner setup) and evaluation on # Log training results. self.metrics.merge_and_log_n_dicts(learner_results, key=LEARNER_RESULTS) - self.metrics.log_value( - NUM_ENV_STEPS_TRAINED_LIFETIME, - self.metrics.peek( - (LEARNER_RESULTS, ALL_MODULES, NUM_ENV_STEPS_TRAINED) - ), - reduce="sum", - ) - self.metrics.log_dict( - { - (LEARNER_RESULTS, mid, NUM_MODULE_STEPS_TRAINED_LIFETIME): ( - stats[NUM_MODULE_STEPS_TRAINED] - ) - for mid, stats in self.metrics.peek(LEARNER_RESULTS).items() - }, - reduce="sum", - ) + # Synchronize weights. # As the results contain for each policy the loss and in addition the # total loss over all policies is returned, this total loss has to be @@ -497,8 +475,7 @@ class (multi-/single-learner setup) and evaluation on inference_only=True, ) - return self.metrics.reduce() - + @OldAPIStack def _training_step_old_api_stack(self) -> ResultDict: """Implements training step for the old stack. diff --git a/rllib/algorithms/ppo/ppo.py b/rllib/algorithms/ppo/ppo.py index 1bb785643a70..ab39972849d7 100644 --- a/rllib/algorithms/ppo/ppo.py +++ b/rllib/algorithms/ppo/ppo.py @@ -24,7 +24,7 @@ multi_gpu_train_one_step, ) from ray.rllib.policy.policy import Policy -from ray.rllib.utils.annotations import override +from ray.rllib.utils.annotations import OldAPIStack, override from ray.rllib.utils.deprecation import DEPRECATED_VALUE from ray.rllib.utils.metrics import ( ENV_RUNNER_RESULTS, @@ -32,13 +32,8 @@ LEARNER_RESULTS, LEARNER_UPDATE_TIMER, NUM_AGENT_STEPS_SAMPLED, - NUM_AGENT_STEPS_SAMPLED_LIFETIME, NUM_ENV_STEPS_SAMPLED, NUM_ENV_STEPS_SAMPLED_LIFETIME, - NUM_ENV_STEPS_TRAINED, - NUM_ENV_STEPS_TRAINED_LIFETIME, - NUM_EPISODES, - NUM_EPISODES_LIFETIME, SYNCH_WORKER_WEIGHTS_TIMER, SAMPLE_TIMER, TIMERS, @@ -402,16 +397,11 @@ def get_default_policy_class( return PPOTF2Policy @override(Algorithm) - def training_step(self): - # New API stack (RLModule, Learner, EnvRunner, ConnectorV2). - if self.config.enable_env_runner_and_connector_v2: - return self._training_step_new_api_stack() - # Old API stack (Policy, RolloutWorker, Connector, maybe RLModule, - # maybe Learner). - else: + def training_step(self) -> None: + # Old API stack (Policy, RolloutWorker, Connector). + if not self.config.enable_env_runner_and_connector_v2: return self._training_step_old_api_stack() - def _training_step_new_api_stack(self) -> ResultDict: # Collect batches from sample workers until we have a full batch. with self.metrics.log_time((TIMERS, ENV_RUNNER_SAMPLING_TIMER)): # Sample in parallel from the workers. @@ -437,27 +427,12 @@ def _training_step_new_api_stack(self) -> ResultDict: ) # Return early if all our workers failed. if not episodes: - return {} + return # Reduce EnvRunner metrics over the n EnvRunners. self.metrics.merge_and_log_n_dicts( env_runner_results, key=ENV_RUNNER_RESULTS ) - # Log lifetime counts for env- and agent steps. - self.metrics.log_dict( - { - NUM_AGENT_STEPS_SAMPLED_LIFETIME: self.metrics.peek( - (ENV_RUNNER_RESULTS, NUM_AGENT_STEPS_SAMPLED) - ), - NUM_ENV_STEPS_SAMPLED_LIFETIME: self.metrics.peek( - (ENV_RUNNER_RESULTS, NUM_ENV_STEPS_SAMPLED) - ), - NUM_EPISODES_LIFETIME: self.metrics.peek( - (ENV_RUNNER_RESULTS, NUM_EPISODES) - ), - }, - reduce="sum", - ) # Perform a learner update step on the collected episodes. with self.metrics.log_time((TIMERS, LEARNER_UPDATE_TIMER)): @@ -465,7 +440,9 @@ def _training_step_new_api_stack(self) -> ResultDict: episodes=episodes, timesteps={ NUM_ENV_STEPS_SAMPLED_LIFETIME: ( - self.metrics.peek(NUM_ENV_STEPS_SAMPLED_LIFETIME) + self.metrics.peek( + (ENV_RUNNER_RESULTS, NUM_ENV_STEPS_SAMPLED_LIFETIME) + ) ), }, num_epochs=self.config.num_epochs, @@ -473,17 +450,6 @@ def _training_step_new_api_stack(self) -> ResultDict: shuffle_batch_per_epoch=self.config.shuffle_batch_per_epoch, ) self.metrics.merge_and_log_n_dicts(learner_results, key=LEARNER_RESULTS) - self.metrics.log_dict( - { - NUM_ENV_STEPS_TRAINED_LIFETIME: self.metrics.peek( - (LEARNER_RESULTS, ALL_MODULES, NUM_ENV_STEPS_TRAINED) - ), - # NUM_MODULE_STEPS_TRAINED_LIFETIME: self.metrics.peek( - # (LEARNER_RESULTS, NUM_MODULE_STEPS_TRAINED) - # ), - }, - reduce="sum", - ) # Update weights - after learning on the local worker - on all remote # workers. @@ -497,19 +463,14 @@ def _training_step_new_api_stack(self) -> ResultDict: # as it might be a very large set (100s of Modules) vs a smaller Modules # set that's present in the current train batch. modules_to_update = set(learner_results[0].keys()) - {ALL_MODULES} - # if self.env_runner_group.num_remote_workers() > 0: self.env_runner_group.sync_weights( # Sync weights from learner_group to all EnvRunners. from_worker_or_learner_group=self.learner_group, policies=modules_to_update, inference_only=True, ) - # else: - # weights = self.learner_group.get_weights(inference_only=True) - # self.env_runner.set_weights(weights) - - return self.metrics.reduce() + @OldAPIStack def _training_step_old_api_stack(self) -> ResultDict: # Collect batches from sample workers until we have a full batch. with self._timers[SAMPLE_TIMER]: diff --git a/rllib/algorithms/sac/sac.py b/rllib/algorithms/sac/sac.py index bcdfa0e69edf..71211fbc303d 100644 --- a/rllib/algorithms/sac/sac.py +++ b/rllib/algorithms/sac/sac.py @@ -18,7 +18,7 @@ from ray.rllib.utils.deprecation import DEPRECATED_VALUE, deprecation_warning from ray.rllib.utils.framework import try_import_tf, try_import_tfp from ray.rllib.utils.replay_buffers.episode_replay_buffer import EpisodeReplayBuffer -from ray.rllib.utils.typing import LearningRateOrSchedule, RLModuleSpecType, ResultDict +from ray.rllib.utils.typing import LearningRateOrSchedule, RLModuleSpecType tf1, tf, tfv = try_import_tf() tfp = try_import_tfp() @@ -587,7 +587,7 @@ def get_default_policy_class( return SACTFPolicy @override(DQN) - def training_step(self) -> ResultDict: + def training_step(self) -> None: """SAC training iteration function. Each training iteration, we: @@ -602,10 +602,8 @@ def training_step(self) -> ResultDict: Returns: The results dict from executing the training iteration. """ - # New API stack (RLModule, Learner, EnvRunner, ConnectorV2). - if self.config.enable_env_runner_and_connector_v2: - return self._training_step_new_api_stack(with_noise_reset=False) - # Old API stack (Policy, RolloutWorker, Connector, maybe RLModule, - # maybe Learner). - else: + # Old API stack (Policy, RolloutWorker, Connector). + if not self.config.enable_env_runner_and_connector_v2: return self._training_step_old_api_stack() + + return self._training_step_new_api_stack(with_noise_reset=False) diff --git a/rllib/core/learner/learner.py b/rllib/core/learner/learner.py index 5787f87d2e46..c26cd6a22a94 100644 --- a/rllib/core/learner/learner.py +++ b/rllib/core/learner/learner.py @@ -24,7 +24,12 @@ from ray.rllib.connectors.learner.learner_connector_pipeline import ( LearnerConnectorPipeline, ) -from ray.rllib.core import COMPONENT_OPTIMIZER, COMPONENT_RL_MODULE, DEFAULT_MODULE_ID +from ray.rllib.core import ( + COMPONENT_METRICS_LOGGER, + COMPONENT_OPTIMIZER, + COMPONENT_RL_MODULE, + DEFAULT_MODULE_ID, +) from ray.rllib.core.rl_module.apis import SelfSupervisedLossAPI from ray.rllib.core.rl_module import validate_module_id from ray.rllib.core.rl_module.multi_rl_module import ( @@ -51,7 +56,9 @@ ALL_MODULES, NUM_ENV_STEPS_SAMPLED_LIFETIME, NUM_ENV_STEPS_TRAINED, + NUM_ENV_STEPS_TRAINED_LIFETIME, NUM_MODULE_STEPS_TRAINED, + NUM_MODULE_STEPS_TRAINED_LIFETIME, LEARNER_CONNECTOR_TIMER, MODULE_TRAIN_BATCH_SIZE_MEAN, WEIGHTS_SEQ_NO, @@ -1212,6 +1219,10 @@ def get_state( if self._check_component(COMPONENT_OPTIMIZER, components, not_components): state[COMPONENT_OPTIMIZER] = self._get_optimizer_state() + if self._check_component(COMPONENT_METRICS_LOGGER, components, not_components): + # TODO (sven): Make `MetricsLogger` a Checkpointable. + state[COMPONENT_METRICS_LOGGER] = self.metrics.get_state() + return state @override(Checkpointable) @@ -1236,6 +1247,10 @@ def set_state(self, state: StateDict) -> None: if "should_module_be_updated" in state: self.config.multi_agent(policies_to_train=state["should_module_be_updated"]) + # TODO (sven): Make `MetricsLogger` a Checkpointable. + if COMPONENT_METRICS_LOGGER in state: + self.metrics.set_state(state[COMPONENT_METRICS_LOGGER]) + @override(Checkpointable) def get_ctor_args_and_kwargs(self): return ( @@ -1647,8 +1662,6 @@ def _get_global_norm_function() -> Callable: def _log_steps_trained_metrics(self, batch: MultiAgentBatch): """Logs this iteration's steps trained, based on given `batch`.""" - - log_dict = defaultdict(dict) for mid, module_batch in batch.policy_batches.items(): module_batch_size = len(module_batch) # Log average batch size (for each module). @@ -1657,17 +1670,29 @@ def _log_steps_trained_metrics(self, batch: MultiAgentBatch): value=module_batch_size, ) # Log module steps (for each module). - if NUM_MODULE_STEPS_TRAINED not in log_dict[mid]: - log_dict[mid][NUM_MODULE_STEPS_TRAINED] = module_batch_size - else: - log_dict[mid][NUM_MODULE_STEPS_TRAINED] += module_batch_size - + self.metrics.log_value( + key=(mid, NUM_MODULE_STEPS_TRAINED), + value=module_batch_size, + reduce="sum", + clear_on_reduce=True, + ) + self.metrics.log_value( + key=(mid, NUM_MODULE_STEPS_TRAINED_LIFETIME), + value=module_batch_size, + reduce="sum", + ) # Log module steps (sum of all modules). - if NUM_MODULE_STEPS_TRAINED not in log_dict[ALL_MODULES]: - log_dict[ALL_MODULES][NUM_MODULE_STEPS_TRAINED] = module_batch_size - else: - log_dict[ALL_MODULES][NUM_MODULE_STEPS_TRAINED] += module_batch_size - + self.metrics.log_value( + key=(ALL_MODULES, NUM_MODULE_STEPS_TRAINED), + value=module_batch_size, + reduce="sum", + clear_on_reduce=True, + ) + self.metrics.log_value( + key=(ALL_MODULES, NUM_MODULE_STEPS_TRAINED_LIFETIME), + value=module_batch_size, + reduce="sum", + ) # Log env steps (all modules). self.metrics.log_value( (ALL_MODULES, NUM_ENV_STEPS_TRAINED), @@ -1675,8 +1700,11 @@ def _log_steps_trained_metrics(self, batch: MultiAgentBatch): reduce="sum", clear_on_reduce=True, ) - # Log per-module steps trained (plus all modules) and per-agent steps trained. - self.metrics.log_dict(dict(log_dict), reduce="sum", clear_on_reduce=True) + self.metrics.log_value( + (ALL_MODULES, NUM_ENV_STEPS_TRAINED_LIFETIME), + batch.env_steps(), + reduce="sum", + ) @Deprecated( new="Learner.before_gradient_based_update(" diff --git a/rllib/core/learner/tests/test_learner_group.py b/rllib/core/learner/tests/test_learner_group.py index d3ab6510b62c..f4c3b56c17e5 100644 --- a/rllib/core/learner/tests/test_learner_group.py +++ b/rllib/core/learner/tests/test_learner_group.py @@ -1,5 +1,4 @@ import gymnasium as gym -import itertools import numpy as np import tempfile import unittest @@ -9,6 +8,7 @@ from ray.rllib.algorithms.algorithm_config import AlgorithmConfig from ray.rllib.algorithms.bc import BCConfig from ray.rllib.core import ( + Columns, COMPONENT_LEARNER, COMPONENT_MULTI_RL_MODULE_SPEC, COMPONENT_RL_MODULE, @@ -54,10 +54,11 @@ np.array([0.5, 0.6, 0.7, 0.8], dtype=np.float32), np.array([0.9, 1.0, 1.1, 1.2], dtype=np.float32), np.array([0.1, 0.2, 0.3, 0.4], dtype=np.float32), + np.array([-0.1, -0.2, -0.3, -0.4], dtype=np.float32), ], action_space=gym.spaces.Discrete(2), - actions=[0, 1, 1], - rewards=[1.0, -1.0, 0.5], + actions=[0, 1, 1, 0], + rewards=[1.0, -1.0, 0.5, 0.3], terminated=True, len_lookback_buffer=0, # all data part of actual episode ), @@ -80,7 +81,7 @@ 0: FAKE_EPISODES[0].get_observations(i), 1: FAKE_EPISODES[0].get_observations(i), } - for i in range(4) + for i in range(5) ], action_space=gym.spaces.Dict( { @@ -93,14 +94,14 @@ 0: FAKE_EPISODES[0].get_actions(i), 1: FAKE_EPISODES[0].get_actions(i), } - for i in range(3) + for i in range(4) ], rewards=[ { 0: FAKE_EPISODES[0].get_rewards(i), 1: FAKE_EPISODES[0].get_rewards(i), } - for i in range(3) + for i in range(4) ], len_lookback_buffer=0, # all data part of actual episode ), @@ -111,10 +112,10 @@ MultiAgentEpisode( agent_module_ids={0: "p0"}, observation_space=gym.spaces.Dict({0: FAKE_EPISODES[0].observation_space}), - observations=[{0: FAKE_EPISODES[0].get_observations(i)} for i in range(4)], + observations=[{0: FAKE_EPISODES[0].get_observations(i)} for i in range(5)], action_space=gym.spaces.Dict({0: FAKE_EPISODES[0].action_space}), - actions=[{0: FAKE_EPISODES[0].get_actions(i)} for i in range(3)], - rewards=[{0: FAKE_EPISODES[0].get_rewards(i)} for i in range(3)], + actions=[{0: FAKE_EPISODES[0].get_actions(i)} for i in range(4)], + rewards=[{0: FAKE_EPISODES[0].get_rewards(i)} for i in range(4)], len_lookback_buffer=0, # all data part of actual episode ), ] @@ -159,12 +160,10 @@ def test_learner_group_build_from_algorithm_config(self): def test_update_multi_gpu(self): return - fws = ["torch"] scaling_modes = ["multi-gpu-ddp", "remote-gpu"] - test_iterator = itertools.product(fws, scaling_modes) - for fw, scaling_mode in test_iterator: - print(f"Testing framework: {fw}, scaling mode: {scaling_mode}.") + for scaling_mode in scaling_modes: + print(f"Testing scaling mode: {scaling_mode}.") env = gym.make("CartPole-v1") config_overrides = REMOTE_CONFIGS[scaling_mode] @@ -270,13 +269,11 @@ def tearDownClass(cls) -> None: def test_restore_from_path_multi_rl_module_and_individual_modules(self): """Tests whether MultiRLModule- and single RLModule states can be restored.""" - fws = ["torch"] # this is expanded to more scaling modes on the release ci. scaling_modes = ["local-cpu", "multi-gpu-ddp"] - test_iterator = itertools.product(fws, scaling_modes) - for fw, scaling_mode in test_iterator: - print(f"Testing framework: {fw}, scaling mode: {scaling_mode}.") + for scaling_mode in scaling_modes: + print(f"Testing scaling mode: {scaling_mode}.") # env will have agent ids 0 and 1 env = MultiAgentCartPole({"num_agents": 2}) @@ -368,6 +365,38 @@ def test_restore_from_path_multi_rl_module_and_individual_modules(self): class TestLearnerGroupSaveLoadState(unittest.TestCase): + + FAKE_BATCH = { + Columns.OBS: np.array( + [ + [0.1, 0.2, 0.3, 0.4], + [0.5, 0.6, 0.7, 0.8], + [0.9, 1.0, 1.1, 1.2], + [1.3, 1.4, 1.5, 1.6], + ], + dtype=np.float32, + ), + Columns.NEXT_OBS: np.array( + [ + [0.1, 0.2, 0.3, 0.4], + [0.5, 0.6, 0.7, 0.8], + [0.9, 1.0, 1.1, 1.2], + [1.3, 1.4, 1.5, 1.6], + ], + dtype=np.float32, + ), + Columns.ACTIONS: np.array([0, 1, 1, 0]), + Columns.REWARDS: np.array([1.0, -1.0, 0.5, 0.6], dtype=np.float32), + Columns.TERMINATEDS: np.array([False, False, True, False]), + Columns.TRUNCATEDS: np.array([False, False, False, False]), + Columns.VF_PREDS: np.array([0.5, 0.6, 0.7, 0.8], dtype=np.float32), + Columns.ACTION_DIST_INPUTS: np.array( + [[-2.0, 0.5], [-3.0, -0.3], [-0.1, 2.5], [-0.2, 3.5]], dtype=np.float32 + ), + Columns.ACTION_LOGP: np.array([-0.5, -0.1, -0.2, -0.3], dtype=np.float32), + Columns.EPS_ID: np.array([0, 0, 0, 0]), + } + @classmethod def setUpClass(cls) -> None: ray.init() @@ -439,7 +468,7 @@ def test_save_to_path_and_restore_from_path(self): check(initial_weights, weights_after_restore) # Perform 2 updates to get to the same state as the previous learners. learner_group.update_from_episodes(FAKE_EPISODES) - results_2nd_without_break = learner_group.update_from_episodes( + results_2nd_update_without_break = learner_group.update_from_episodes( FAKE_EPISODES ) weights_after_2_updates_without_break = learner_group.get_weights() @@ -447,11 +476,15 @@ def test_save_to_path_and_restore_from_path(self): del learner_group # Compare the results of the two updates. - results_2nd_update_with_break[0][ALL_MODULES].pop("learner_connector_timer") - results_2nd_without_break[0][ALL_MODULES].pop("learner_connector_timer") + for r1, r2 in zip( + results_2nd_update_with_break, + results_2nd_update_without_break, + ): + r1[ALL_MODULES].pop("learner_connector_timer") + r2[ALL_MODULES].pop("learner_connector_timer") check( MetricsLogger.peek_results(results_2nd_update_with_break), - MetricsLogger.peek_results(results_2nd_without_break), + MetricsLogger.peek_results(results_2nd_update_without_break), rtol=0.05, ) check( diff --git a/rllib/env/env_runner_group.py b/rllib/env/env_runner_group.py index e0d1883e24a5..2d42084684b1 100644 --- a/rllib/env/env_runner_group.py +++ b/rllib/env/env_runner_group.py @@ -463,8 +463,13 @@ def sync_env_runner_states( ) # Update the global number of environment steps, if necessary. + # Make sure to divide by the number of env runners (such that each EnvRunner + # knows (roughly) its own(!) lifetime count and can infer the global lifetime + # count from it). if env_steps_sampled is not None: - env_runner_states[NUM_ENV_STEPS_SAMPLED_LIFETIME] = env_steps_sampled + env_runner_states[NUM_ENV_STEPS_SAMPLED_LIFETIME] = env_steps_sampled // ( + config.num_env_runners or 1 + ) # Update the rl_module component of the EnvRunner states, if necessary: if rl_module_state: diff --git a/rllib/env/multi_agent_env_runner.py b/rllib/env/multi_agent_env_runner.py index 3ec1864bb394..56703258f756 100644 --- a/rllib/env/multi_agent_env_runner.py +++ b/rllib/env/multi_agent_env_runner.py @@ -35,6 +35,7 @@ NUM_ENV_STEPS_SAMPLED, NUM_ENV_STEPS_SAMPLED_LIFETIME, NUM_EPISODES, + NUM_EPISODES_LIFETIME, NUM_MODULE_STEPS_SAMPLED, NUM_MODULE_STEPS_SAMPLED_LIFETIME, WEIGHTS_SEQ_NO, @@ -281,9 +282,10 @@ def _sample_timesteps( # MultiRLModule forward pass: Explore or not. if explore: - env_steps_lifetime = self.metrics.peek( - NUM_ENV_STEPS_SAMPLED_LIFETIME, default=0 - ) + self.metrics.peek(NUM_ENV_STEPS_SAMPLED, default=0) + env_steps_lifetime = ( + self.metrics.peek(NUM_ENV_STEPS_SAMPLED_LIFETIME, default=0) + + self.metrics.peek(NUM_ENV_STEPS_SAMPLED, default=0) + ) * (self.config.num_env_runners or 1) to_env = self.module.forward_exploration( to_module, t=env_steps_lifetime ) @@ -321,7 +323,6 @@ def _sample_timesteps( force_reset=True, ) obs, rewards, terminateds, truncateds, infos = results - ts += self._increase_sampled_metrics(self.num_envs, obs, self._episode) # TODO (sven): This simple approach to re-map `to_env` from a # dict[col, List[MADict]] to a dict[agentID, MADict] would not work for @@ -346,6 +347,8 @@ def _sample_timesteps( extra_model_outputs=extra_model_outputs, ) + ts += self._increase_sampled_metrics(self.num_envs, obs, self._episode) + # Make the `on_episode_step` callback (before finalizing the episode # object). self._make_on_episode_callback("on_episode_step") @@ -482,9 +485,10 @@ def _sample_episodes( # MultiRLModule forward pass: Explore or not. if explore: - env_steps_lifetime = self.metrics.peek( - NUM_ENV_STEPS_SAMPLED_LIFETIME, default=0 - ) + self.metrics.peek(NUM_ENV_STEPS_SAMPLED, default=0) + env_steps_lifetime = ( + self.metrics.peek(NUM_ENV_STEPS_SAMPLED_LIFETIME, default=0) + + self.metrics.peek(NUM_ENV_STEPS_SAMPLED, default=0) + ) * (self.config.num_env_runners or 1) to_env = self.module.forward_exploration( to_module, t=env_steps_lifetime ) @@ -522,8 +526,6 @@ def _sample_episodes( ) obs, rewards, terminateds, truncateds, infos = results - ts += self._increase_sampled_metrics(self.num_envs, obs, _episode) - # TODO (sven): This simple approach to re-map `to_env` from a # dict[col, List[MADict]] to a dict[agentID, MADict] would not work for # a vectorized env. @@ -547,6 +549,8 @@ def _sample_episodes( extra_model_outputs=extra_model_outputs, ) + ts += self._increase_sampled_metrics(self.num_envs, obs, _episode) + # Make `on_episode_step` callback before finalizing the episode. self._make_on_episode_callback("on_episode_step", _episode) @@ -672,14 +676,6 @@ def get_metrics(self) -> ResultDict: dict(agent_steps), ) - # Log num episodes counter for this iteration. - self.metrics.log_value( - NUM_EPISODES, - len(self._done_episodes_for_metrics), - reduce="sum", - clear_on_reduce=True, # Not a lifetime count. - ) - # Now that we have logged everything, clear cache of done episodes. self._done_episodes_for_metrics.clear() @@ -916,10 +912,16 @@ def _make_on_episode_callback(self, which: str, episode=None): ) def _increase_sampled_metrics(self, num_steps, next_obs, episode): + # Env steps. self.metrics.log_value( NUM_ENV_STEPS_SAMPLED, num_steps, reduce="sum", clear_on_reduce=True ) self.metrics.log_value(NUM_ENV_STEPS_SAMPLED_LIFETIME, num_steps, reduce="sum") + # Completed episodes. + if episode.is_done: + self.metrics.log_value(NUM_EPISODES, 1, reduce="sum", clear_on_reduce=True) + self.metrics.log_value(NUM_EPISODES_LIFETIME, 1, reduce="sum") + # TODO (sven): obs is not-vectorized. Support vectorized MA envs. for aid in next_obs: self.metrics.log_value( diff --git a/rllib/env/single_agent_env_runner.py b/rllib/env/single_agent_env_runner.py index ef56b54fb5ad..81498a793854 100644 --- a/rllib/env/single_agent_env_runner.py +++ b/rllib/env/single_agent_env_runner.py @@ -41,6 +41,7 @@ NUM_ENV_STEPS_SAMPLED, NUM_ENV_STEPS_SAMPLED_LIFETIME, NUM_EPISODES, + NUM_EPISODES_LIFETIME, NUM_MODULE_STEPS_SAMPLED, NUM_MODULE_STEPS_SAMPLED_LIFETIME, SAMPLE_TIMER, @@ -276,12 +277,14 @@ def _sample( # RLModule forward pass: Explore or not. if explore: - env_steps_lifetime = ( + # Global env steps sampled are (roughly) this EnvRunner's lifetime + # count times the number of env runners in the algo. + global_env_steps_lifetime = ( self.metrics.peek(NUM_ENV_STEPS_SAMPLED_LIFETIME, default=0) + ts - ) + ) * (self.config.num_env_runners or 1) to_env = self.module.forward_exploration( - to_module, t=env_steps_lifetime + to_module, t=global_env_steps_lifetime ) else: to_env = self.module.forward_inference(to_module) @@ -420,7 +423,7 @@ def _sample( # Continue collecting into the cut Episode chunks. self._episodes = ongoing_episodes_continuations - self._increase_sampled_metrics(ts) + self._increase_sampled_metrics(ts, len(done_episodes_to_return)) # Return collected episode data. return done_episodes_to_return + ongoing_episodes_to_return @@ -454,15 +457,6 @@ def get_metrics(self) -> ResultDict: episode_length, episode_return, episode_duration_s ) - # Log num episodes counter for this iteration. - self.metrics.log_value( - NUM_EPISODES, - len(self._done_episodes_for_metrics), - reduce="sum", - # Reset internal data on `reduce()` call below (not a lifetime count). - clear_on_reduce=True, - ) - # Now that we have logged everything, clear cache of done episodes. self._done_episodes_for_metrics.clear() @@ -537,6 +531,7 @@ def set_state(self, state: StateDict) -> None: key=NUM_ENV_STEPS_SAMPLED_LIFETIME, value=state[NUM_ENV_STEPS_SAMPLED_LIFETIME], reduce="sum", + clear_on_reduce=False, # lifetime ) @override(Checkpointable) @@ -714,7 +709,7 @@ def _make_on_episode_callback(self, which: str, idx: int, episodes): env_index=idx, ) - def _increase_sampled_metrics(self, num_steps): + def _increase_sampled_metrics(self, num_steps, num_episodes_completed): # Per sample cycle stats. self.metrics.log_value( NUM_ENV_STEPS_SAMPLED, num_steps, reduce="sum", clear_on_reduce=True @@ -731,6 +726,12 @@ def _increase_sampled_metrics(self, num_steps): reduce="sum", clear_on_reduce=True, ) + self.metrics.log_value( + NUM_EPISODES, + num_episodes_completed, + reduce="sum", + clear_on_reduce=True, + ) # Lifetime stats. self.metrics.log_value(NUM_ENV_STEPS_SAMPLED_LIFETIME, num_steps, reduce="sum") self.metrics.log_value( @@ -743,6 +744,11 @@ def _increase_sampled_metrics(self, num_steps): num_steps, reduce="sum", ) + self.metrics.log_value( + NUM_EPISODES_LIFETIME, + num_episodes_completed, + reduce="sum", + ) return num_steps def _log_episode_metrics(self, length, ret, sec): diff --git a/rllib/evaluation/tests/test_rollout_worker.py b/rllib/evaluation/tests/test_rollout_worker.py index f371ba942d5f..52ce25f409b0 100644 --- a/rllib/evaluation/tests/test_rollout_worker.py +++ b/rllib/evaluation/tests/test_rollout_worker.py @@ -1,10 +1,8 @@ import gymnasium as gym from gymnasium.spaces import Box, Discrete -import json import numpy as np import os import random -import tempfile import time import unittest @@ -25,8 +23,6 @@ from ray.rllib.examples.envs.classes.multi_agent import MultiAgentCartPole from ray.rllib.examples.envs.classes.random_env import RandomEnv from ray.rllib.examples._old_api_stack.policy.random_policy import RandomPolicy -from ray.rllib.offline.dataset_reader import DatasetReader, get_dataset_and_shards -from ray.rllib.offline.json_reader import JsonReader from ray.rllib.policy.policy import Policy, PolicySpec from ray.rllib.policy.sample_batch import ( DEFAULT_POLICY_ID, @@ -360,87 +356,6 @@ def test_action_normalization(self): self.assertLess(np.min(sample["actions"]), action_space.low[0]) ev.stop() - def test_action_normalization_offline_dataset(self): - with tempfile.TemporaryDirectory() as tmp_dir: - # create environment - env = gym.make("Pendulum-v1") - - # create temp data with actions at min and max - data = { - "type": "SampleBatch", - "actions": [[2.0], [-2.0]], - "terminateds": [0.0, 0.0], - "truncateds": [0.0, 0.0], - "rewards": [0.0, 0.0], - "obs": [[0.0, 0.0, 0.0], [0.0, 0.0, 0.0]], - "new_obs": [[0.0, 0.0, 0.0], [0.0, 0.0, 0.0]], - } - - data_file = os.path.join(tmp_dir, "data.json") - - with open(data_file, "w") as f: - json.dump(data, f) - - # create input reader functions - def dataset_reader_creator(ioctx): - config = AlgorithmConfig().offline_data( - input_="dataset", - input_config={"format": "json", "paths": data_file}, - ) - _, shards = get_dataset_and_shards(config, num_workers=0) - return DatasetReader(shards[0], ioctx) - - def json_reader_creator(ioctx): - return JsonReader(data_file, ioctx) - - input_creators = [dataset_reader_creator, json_reader_creator] - - # actions_in_input_normalized, normalize_actions - parameters = [ - (True, True), - (True, False), - (False, True), - (False, False), - ] - - # check that samples from dataset will be normalized if and only if - # actions_in_input_normalized == False and - # normalize_actions == True - for input_creator in input_creators: - for actions_in_input_normalized, normalize_actions in parameters: - ev = RolloutWorker( - env_creator=lambda _: env, - default_policy_class=MockPolicy, - config=AlgorithmConfig() - .env_runners( - num_env_runners=0, - rollout_fragment_length=1, - ) - .environment( - normalize_actions=normalize_actions, - clip_actions=False, - ) - .training(train_batch_size=1) - .offline_data( - offline_sampling=True, - actions_in_input_normalized=actions_in_input_normalized, - input_=input_creator, - ), - ) - - sample = ev.sample() - - if normalize_actions and not actions_in_input_normalized: - # check if the samples from dataset are normalized properly - self.assertLessEqual(np.max(sample["actions"]), 1.0) - self.assertGreaterEqual(np.min(sample["actions"]), -1.0) - else: - # check if the samples from dataset are not normalized - self.assertGreater(np.max(sample["actions"]), 1.5) - self.assertLess(np.min(sample["actions"]), -1.5) - - ev.stop() - def test_action_immutability(self): action_space = gym.spaces.Box(0.0001, 0.0002, (5,)) diff --git a/rllib/examples/fault_tolerance/crashing_and_stalling_env.py b/rllib/examples/fault_tolerance/crashing_and_stalling_env.py index 66eff0e86070..bdae74c6d4be 100644 --- a/rllib/examples/fault_tolerance/crashing_and_stalling_env.py +++ b/rllib/examples/fault_tolerance/crashing_and_stalling_env.py @@ -77,11 +77,15 @@ from gymnasium.wrappers import TimeLimit from ray import tune +from ray.rllib.core.rl_module.default_model_config import DefaultModelConfig from ray.rllib.examples.envs.classes.cartpole_crashing import ( CartPoleCrashing, MultiAgentCartPoleCrashing, ) -from ray.rllib.utils.test_utils import add_rllib_example_script_args +from ray.rllib.utils.test_utils import ( + add_rllib_example_script_args, + run_rllib_example_script_experiment, +) parser = add_rllib_example_script_args( default_reward=450.0, @@ -105,63 +109,69 @@ help="Whether to restart a failed environment (vs restarting the entire " "EnvRunner).", ) -args = parser.parse_args() - -# Register our environment with tune. -if args.num_agents > 0: - tune.register_env("env", lambda cfg: MultiAgentCartPoleCrashing(cfg)) -else: - tune.register_env( - "env", - lambda cfg: TimeLimit(CartPoleCrashing(cfg), max_episode_steps=500), - ) - -config = ( - tune.registry.get_trainable_cls(args.algo) - .get_default_config() - .api_stack( - enable_rl_module_and_learner=True, - enable_env_runner_and_connector_v2=True, - ) - .environment( - "env", - env_config={ - "num_agents": args.num_agents, - # Probability to crash during step(). - "p_crash": 0.0001, - # Probability to crash during reset(). - "p_crash_reset": 0.001, - "crash_on_worker_indices": [1, 2], - "init_time_s": 2.0, - # Probability to stall during step(). - "p_stall": 0.0005, - # Probability to stall during reset(). - "p_stall_reset": 0.001, - # Stall from 2 to 5sec (or 0.0 if --stall not set). - "stall_time_sec": (2, 5) if args.stall else 0.0, - # EnvRunner indices to stall on. - "stall_on_worker_indices": [2, 3], - }, - ) - # Switch on resiliency. - .fault_tolerance( - # Recreate any failed EnvRunners. - restart_failed_env_runners=True, - # Restart any failed environment (w/o recreating the EnvRunner). Note that this - # is the much faster option. - restart_failed_sub_environments=args.restart_failed_envs, - ) -) - -# Add a simple multi-agent setup. -if args.num_agents > 0: - config.multi_agent( - policies={f"p{i}" for i in range(args.num_agents)}, - policy_mapping_fn=lambda aid, *a, **kw: f"p{aid}", - ) if __name__ == "__main__": - from ray.rllib.utils.test_utils import run_rllib_example_script_experiment + args = parser.parse_args() + + # Register our environment with tune. + if args.num_agents > 0: + tune.register_env("env", lambda cfg: MultiAgentCartPoleCrashing(cfg)) + else: + tune.register_env( + "env", + lambda cfg: TimeLimit(CartPoleCrashing(cfg), max_episode_steps=500), + ) + + base_config = ( + tune.registry.get_trainable_cls(args.algo) + .get_default_config() + .environment( + "env", + env_config={ + "num_agents": args.num_agents, + # Probability to crash during step(). + "p_crash": 0.0001, + # Probability to crash during reset(). + "p_crash_reset": 0.001, + "crash_on_worker_indices": [1, 2], + "init_time_s": 2.0, + # Probability to stall during step(). + "p_stall": 0.0005, + # Probability to stall during reset(). + "p_stall_reset": 0.001, + # Stall from 2 to 5sec (or 0.0 if --stall not set). + "stall_time_sec": (2, 5) if args.stall else 0.0, + # EnvRunner indices to stall on. + "stall_on_worker_indices": [2, 3], + }, + ) + # Switch on resiliency. + .fault_tolerance( + # Recreate any failed EnvRunners. + restart_failed_env_runners=True, + # Restart any failed environment (w/o recreating the EnvRunner). Note that + # this is the much faster option. + restart_failed_sub_environments=args.restart_failed_envs, + ) + ) - run_rllib_example_script_experiment(config, args=args) + # Use more stabilizing hyperparams for APPO. + if args.algo == "APPO": + base_config.training( + grad_clip=40.0, + entropy_coeff=0.0, + vf_loss_coeff=0.05, + ) + base_config.rl_module( + model_config=DefaultModelConfig(vf_share_layers=True), + ) + + # Add a simple multi-agent setup. + if args.num_agents > 0: + base_config.multi_agent( + policies={f"p{i}" for i in range(args.num_agents)}, + policy_mapping_fn=lambda aid, *a, **kw: f"p{aid}", + ) + + run_rllib_example_script_experiment(base_config, args=args) diff --git a/rllib/utils/metrics/__init__.py b/rllib/utils/metrics/__init__.py index dd1caef5c72e..d9a67933998c 100644 --- a/rllib/utils/metrics/__init__.py +++ b/rllib/utils/metrics/__init__.py @@ -19,6 +19,7 @@ NUM_AGENT_STEPS_SAMPLED_THIS_ITER = "num_agent_steps_sampled_this_iter" # @OldAPIStack NUM_ENV_STEPS_SAMPLED = "num_env_steps_sampled" NUM_ENV_STEPS_SAMPLED_LIFETIME = "num_env_steps_sampled_lifetime" +NUM_ENV_STEPS_SAMPLED_PER_SECOND = "num_env_steps_sampled_per_second" NUM_ENV_STEPS_SAMPLED_THIS_ITER = "num_env_steps_sampled_this_iter" # @OldAPIStack NUM_ENV_STEPS_SAMPLED_FOR_EVALUATION_THIS_ITER = ( "num_env_steps_sampled_for_evaluation_this_iter" diff --git a/rllib/utils/metrics/learner_info.py b/rllib/utils/metrics/learner_info.py index 457717568cf5..b653607cddf3 100644 --- a/rllib/utils/metrics/learner_info.py +++ b/rllib/utils/metrics/learner_info.py @@ -3,8 +3,8 @@ import tree # pip install dm_tree from typing import Dict -from ray.util.annotations import DeveloperAPI from ray.rllib.policy.sample_batch import DEFAULT_POLICY_ID +from ray.rllib.utils.annotations import OldAPIStack from ray.rllib.utils.typing import PolicyID # Instant metrics (keys for metrics.info). @@ -14,7 +14,7 @@ LEARNER_STATS_KEY = "learner_stats" -@DeveloperAPI +@OldAPIStack class LearnerInfoBuilder: def __init__(self, num_devices: int = 1): self.num_devices = num_devices @@ -90,6 +90,7 @@ def finalize(self): return info +@OldAPIStack def _all_tower_reduce(path, *tower_data): """Reduces stats across towers based on their stats-dict paths.""" # TD-errors: Need to stay per batch item in order to be able to update diff --git a/rllib/utils/metrics/metrics_logger.py b/rllib/utils/metrics/metrics_logger.py index 51d74cb405a8..f1f6f4cc12e8 100644 --- a/rllib/utils/metrics/metrics_logger.py +++ b/rllib/utils/metrics/metrics_logger.py @@ -34,16 +34,72 @@ class MetricsLogger: .. testcode:: + import time from ray.rllib.utils.metrics.metrics_logger import MetricsLogger + from ray.rllib.utils.test_utils import check logger = MetricsLogger() - # Log n simple float values under the "loss" key. By default, all logged values - # under that key are averaged over once `reduce()` is called. - logger.log_value("loss", 0.001) - logger.log_value("loss", 0.002) - logger.log_value("loss", 0.003) + # 1) Logging float values (mean over window): + # Log some loss under the "loss" key. By default, all logged values + # under that key are averaged and reported back, once `reduce()` is called. + logger.log_value("loss", 0.001, reduce="mean", window=10) + logger.log_value("loss", 0.002) # <- no need to repeat arg/options on same key # Peek at the current (reduced) value of "loss": + check(logger.peek("loss"), 0.0015) # <- expect average value + # Actually reduce the underlying Stats object(s). + results = logger.reduce() + check(results["loss"], 0.0015) + + # 2) Logging float values (minimum over window): + # Log the minimum of loss values under the "min_loss" key. + logger.log_value("min_loss", 0.1, reduce="min", window=2) + logger.log_value("min_loss", 0.01) + logger.log_value("min_loss", 0.1) + logger.log_value("min_loss", 0.02) + # Peek at the current (reduced) value of "min_loss": + check(logger.peek("min_loss"), 0.02) # <- expect min value (over window=2) + # Actually reduce the underlying Stats object(s). + results = logger.reduce() + check(results["min_loss"], 0.02) + + # 3) Log n counts in different (remote?) components and merge them on the + # controller side. + remote_logger_1 = MetricsLogger() + remote_logger_2 = MetricsLogger() + main_logger = MetricsLogger() + remote_logger_1.log_value("count", 2, reduce="sum", clear_on_reduce=True) + remote_logger_2.log_value("count", 3, reduce="sum", clear_on_reduce=True) + # Reduce the two remote loggers .. + remote_results_1 = remote_logger_1.reduce() + remote_results_2 = remote_logger_2.reduce() + # .. then merge the two results into the controller logger. + main_logger.merge_and_log_n_dicts([remote_results_1, remote_results_2]) + check(main_logger.peek("count"), 5) + + # 4) Time blocks of code using EMA (coeff=0.1). Note that the higher the coeff + # (the closer to 1.0), the more short term the EMA turns out. + logger = MetricsLogger() + + # First delta measurement: + with logger.log_time("my_block_to_be_timed", reduce="mean", ema_coeff=0.1): + time.sleep(1.0) + # EMA should be ~1sec. + assert 1.1 > logger.peek("my_block_to_be_timed") > 0.9 + # Second delta measurement (note that we don't have to repeat the args again, as + # the stats under that name have already been created above with the correct + # args). + with logger.log_time("my_block_to_be_timed"): + time.sleep(2.0) + # EMA should be ~1.1sec. + assert 1.15 > logger.peek("my_block_to_be_timed") > 1.05 + + # When calling `reduce()`, the internal values list gets cleaned up (reduced) + # and reduction results are returned. + results = logger.reduce() + # EMA should be ~1.1sec. + assert 1.15 > results["my_block_to_be_timed"] > 1.05 + """ @@ -137,7 +193,6 @@ def peek( def peek_results(results: Any) -> Any: """Performs `peek()` on any leaf element of an arbitrarily nested Stats struct. - Args: results: The nested structure of Stats-leafs to be peek'd and returned. @@ -149,141 +204,6 @@ def peek_results(results: Any) -> Any: lambda s: s.peek() if isinstance(s, Stats) else s, results ) - def reduce( - self, - key: Optional[Union[str, Tuple[str, ...]]] = None, - *, - return_stats_obj: bool = True, - ) -> Dict: - """Reduces all logged values based on their settings and returns a result dict. - - DO NOT CALL THIS METHOD under normal circumstances! RLlib's components call it - right before a distinct step has been completed and the (MetricsLogger-based) - results of that step need to be passed on to other components for further - processing. - - The returned result dict has the exact same structure as the logged keys (or - nested key sequences) combined. At the leafs of the returned structure are - either `Stats` objects (`return_stats_obj=True`, which is the default) or - primitive (non-Stats) values (`return_stats_obj=False`). In case of - `return_stats_obj=True`, the returned dict with Stats at the leafs can - conveniently be re-used downstream for further logging and reduction operations. - - For example, imagine component A (e.g. an Algorithm) containing a MetricsLogger - and n remote components (e.g. n EnvRunners), each with their own - MetricsLogger object. Component A calls its n remote components, each of - which returns an equivalent, reduced dict with `Stats` as leafs. - Component A can then further log these n result dicts through its own - MetricsLogger through: - `logger.merge_and_log_n_dicts([n returned result dicts from n subcomponents])`. - - .. testcode:: - - from ray.rllib.utils.metrics.metrics_logger import MetricsLogger - from ray.rllib.utils.test_utils import check - - # Log some (EMA reduced) values. - logger = MetricsLogger() - logger.log_value("a", 2.0) - logger.log_value("a", 3.0) - expected_reduced = (1.0 - 0.01) * 2.0 + 0.01 * 3.0 - # Reduce and return primitive values (not Stats objects). - results = logger.reduce(return_stats_obj=False) - check(results, {"a": expected_reduced}) - - # Log some values to be averaged with a sliding window. - logger = MetricsLogger() - logger.log_value("a", 2.0, window=2) - logger.log_value("a", 3.0) - logger.log_value("a", 4.0) - expected_reduced = (3.0 + 4.0) / 2 # <- win size is only 2; first logged - # item not used - # Reduce and return primitive values (not Stats objects). - results = logger.reduce(return_stats_obj=False) - check(results, {"a": expected_reduced}) - - # Assume we have 2 remote components, each one returning an equivalent - # reduced dict when called. We can simply use these results and log them - # to our own MetricsLogger, then reduce over these 2 logged results. - comp1_logger = MetricsLogger() - comp1_logger.log_value("a", 1.0, window=10) - comp1_logger.log_value("a", 2.0) - result1 = comp1_logger.reduce() # <- return Stats objects as leafs - - comp2_logger = MetricsLogger() - comp2_logger.log_value("a", 3.0, window=10) - comp2_logger.log_value("a", 4.0) - result2 = comp2_logger.reduce() # <- return Stats objects as leafs - - # Now combine the 2 equivalent results into 1 end result dict. - downstream_logger = MetricsLogger() - downstream_logger.merge_and_log_n_dicts([result1, result2]) - # What happens internally is that both values lists of the 2 components - # are merged (concat'd) and randomly shuffled, then clipped at 10 (window - # size). This is done such that no component has an "advantage" over the - # other as we don't know the exact time-order in which these parallelly - # running components logged their own "a"-values. - # We execute similarly useful merging strategies for other reduce settings, - # such as EMA, max/min/sum-reducing, etc.. - end_result = downstream_logger.reduce(return_stats_obj=False) - check(end_result, {"a": 2.5}) - - Args: - key: Optional key or key sequence (for nested location within self.stats), - limiting the reduce operation to that particular sub-structure of self. - If None, will reduce all of self's Stats. - return_stats_obj: Whether in the returned dict, the leafs should be Stats - objects. This is the default as it enables users to continue using - (and further logging) the results of this call inside another - (downstream) MetricsLogger object. - - Returns: - A (nested) dict matching the structure of `self.stats` (contains all ever - logged keys to this MetricsLogger) with the leafs being (reduced) Stats - objects if `return_stats_obj=True` or primitive values, carrying no - reduction and history information, if `return_stats_obj=False`. - """ - # For better error message, catch the last key-path (reducing of which might - # throw an error). - PATH = None - - def _reduce(path, stats): - nonlocal PATH - PATH = path - return stats.reduce() - - # Create a shallow copy of `self.stats` in case we need to reset some of our - # stats due to this `reduce()` call (and the Stat having self.clear_on_reduce - # set to True). In case we clear the Stats upon `reduce`, we receive a - # new empty `Stats` object from `stat.reduce()` with the same settings as - # existing one and can now re-assign it to `self.stats[key]` (while we return - # from this method the properly reduced, but not cleared/emptied new `Stats`). - try: - if key is not None: - stats_to_return = self._get_key(key).copy() - self._set_key( - key, tree.map_structure_with_path(_reduce, stats_to_return) - ) - else: - stats_to_return = self.stats.copy() - self.stats = tree.map_structure_with_path(_reduce, stats_to_return) - # Provide proper error message if reduction fails due to bad data. - except Exception as e: - raise ValueError( - "There was an error while reducing the Stats object under key=" - f"{PATH}! Check, whether you logged invalid or incompatible " - "values into this key over time in your custom code." - f"\nThe values under this key are: {self._get_key(PATH).values}." - f"\nThe original error was {str(e)}" - ) - - # Return (reduced) `Stats` objects as leafs. - if return_stats_obj: - return stats_to_return - # Return actual (reduced) values (not reduced `Stats` objects) as leafs. - else: - return tree.map_structure(lambda s: s.peek(), stats_to_return) - def log_value( self, key: Union[str, Tuple[str, ...]], @@ -396,6 +316,7 @@ def log_value( self._check_tensor(key, value) + # `key` doesn't exist -> Automatically create it. if not self._key_in_stats(key): self._set_key( key, @@ -411,10 +332,10 @@ def log_value( ) ), ) - # If value itself is a stat, we merge it on time axis into `self`. + # If value itself is a `Stats`, we merge it on time axis into self's `Stats`. elif isinstance(value, Stats): self._get_key(key).merge_on_time_axis(value) - # Otherwise, we just push the value into `self`. + # Otherwise, we just push the value into self's `Stats`. else: self._get_key(key).push(value) @@ -523,6 +444,8 @@ def merge_and_log_n_dicts( stats_dicts: List[Dict[str, Any]], *, key: Optional[Union[str, Tuple[str, ...]]] = None, + # TODO (sven): Maybe remove these args. They don't seem to make sense in this + # method. If we do so, values in the dicts must be Stats instances, though. reduce: Optional[str] = "mean", window: Optional[Union[int, float]] = None, ema_coeff: Optional[float] = None, @@ -680,7 +603,9 @@ def merge_and_log_n_dicts( for key in all_keys: extended_key = prefix_key + key available_stats = [ - self._get_key(key, s) for s in stats_dicts if self._key_in_stats(key, s) + self._get_key(key, stats=s) + for s in stats_dicts + if self._key_in_stats(key, stats=s) ] base_stats = None more_stats = [] @@ -706,6 +631,18 @@ def merge_and_log_n_dicts( else: more_stats.append(stat_or_value) + # Special case: `base_stats` is a lifetime sum (reduce=sum, + # clear_on_reduce=False) -> We only(!) use `base_stats`'s values, not + # our own (b/c the sum over `base_stats` already contains older values from + # before). + if ( + base_stats._reduce_method == "sum" + and base_stats._window is None + and base_stats._clear_on_reduce is False + ): + for stat in [base_stats] + more_stats: + stat.push(-stat.peek(previous=True)) + # There are more than one incoming parallel others -> Merge all of them # first in parallel. if len(more_stats) > 0: @@ -728,18 +665,44 @@ def log_time( window: Optional[Union[int, float]] = None, ema_coeff: Optional[float] = None, clear_on_reduce: bool = False, - # throughput_key: Optional[Union[str, Tuple[str, ...]]] = None, - # throughput_key_of_unit_count: Optional[Union[str, Tuple[str, ...]]] = None, + key_for_throughput: Optional[Union[str, Tuple[str, ...]]] = None, + key_for_unit_count: Optional[Union[str, Tuple[str, ...]]] = None, ) -> None: """Measures and logs a time delta value under `key` when used with a with-block. - Additionally measures and logs the throughput for the timed code, iff - `log_throughput=True` and `throughput_key_for_unit_count` is provided. + Additionally, measures and logs the throughput for the timed code, iff + `key_for_throughput` and `key_for_unit_count` are provided. .. testcode:: + import time from ray.rllib.utils.metrics.metrics_logger import MetricsLogger - # TODO (sven): finish test case + from ray.rllib.utils.test_utils import check + + logger = MetricsLogger() + + # First delta measurement: + with logger.log_time("my_block_to_be_timed", reduce="mean", ema_coeff=0.1): + time.sleep(1.0) + + # EMA should be ~1sec. + assert 1.1 > logger.peek("my_block_to_be_timed") > 0.9 + + # Second delta measurement (note that we don't have to repeat the args + # again, as the stats under that name have already been created above with + # the correct args). + with logger.log_time("my_block_to_be_timed"): + time.sleep(2.0) + + # EMA should be ~1.1sec. + assert 1.15 > logger.peek("my_block_to_be_timed") > 1.05 + + # When calling `reduce()`, the internal values list gets cleaned up. + check(len(logger.stats["my_block_to_be_timed"].values), 2) # still 2 deltas + results = logger.reduce() + check(len(logger.stats["my_block_to_be_timed"].values), 1) # reduced to 1 + # EMA should be ~1.1sec. + assert 1.15 > results["my_block_to_be_timed"] > 1.05 Args: key: The key (or tuple of keys) to log the measured time delta under. @@ -770,12 +733,10 @@ def log_time( clear_on_reduce = True if not self._key_in_stats(key): - # TODO (sven): Figure out how to best implement an additional throughput - # measurement. - # measure_throughput = None - # if throughput_key_of_unit_count is not None: - # measure_throughput = True - # throughput_key = throughput_key or (key + "_throughput_per_s") + measure_throughput = None + if key_for_unit_count is not None: + measure_throughput = True + key_for_throughput = key_for_throughput or (key + "_throughput_per_s") self._set_key( key, @@ -784,24 +745,177 @@ def log_time( window=window, ema_coeff=ema_coeff, clear_on_reduce=clear_on_reduce, - # on_exit=( - # lambda stats: ( - # self.log_value( - # throughput_key, - # self.peek(throughput_key_of_unit_count), - # reduce=reduce, - # window=window, - # ema_coeff=ema_coeff, - # clear_on_reduce=clear_on_reduce, - # ) - # ), - # ), + on_exit=( + lambda time_delta_s, kt=key_for_throughput, ku=key_for_unit_count, r=reduce, w=window, e=ema_coeff, c=clear_on_reduce: ( # noqa + self.log_value( + kt, + value=self.peek(ku) / time_delta_s, + reduce=r, + window=w, + ema_coeff=e, + clear_on_reduce=c, + ) + ) + ) + if measure_throughput + else None, ), ) # Return the Stats object, so a `with` clause can enter and exit it. return self._get_key(key) + def reduce( + self, + key: Optional[Union[str, Tuple[str, ...]]] = None, + *, + return_stats_obj: bool = True, + ) -> Dict: + """Reduces all logged values based on their settings and returns a result dict. + + DO NOT CALL THIS METHOD under normal circumstances! RLlib's components call it + right before a distinct step has been completed and the (MetricsLogger-based) + results of that step need to be passed upstream to other components for further + processing. + + The returned result dict has the exact same structure as the logged keys (or + nested key sequences) combined. At the leafs of the returned structure are + either `Stats` objects (`return_stats_obj=True`, which is the default) or + primitive (non-Stats) values (`return_stats_obj=False`). In case of + `return_stats_obj=True`, the returned dict with `Stats` at the leafs can + conveniently be re-used upstream for further logging and reduction operations. + + For example, imagine component A (e.g. an Algorithm) containing a MetricsLogger + and n remote components (e.g. n EnvRunners), each with their own + MetricsLogger object. Component A calls its n remote components, each of + which returns an equivalent, reduced dict with `Stats` as leafs. + Component A can then further log these n result dicts through its own + MetricsLogger through: + `logger.merge_and_log_n_dicts([n returned result dicts from n subcomponents])`. + + The returned result dict has the exact same structure as the logged keys (or + nested key sequences) combined. At the leafs of the returned structure are + either `Stats` objects (`return_stats_obj=True`, which is the default) or + primitive (non-Stats) values (`return_stats_obj=False`). In case of + `return_stats_obj=True`, the returned dict with Stats at the leafs can be + reused conveniently downstream for further logging and reduction operations. + + For example, imagine component A (e.g. an Algorithm) containing a MetricsLogger + and n remote components (e.g. n EnvRunner workers), each with their own + MetricsLogger object. Component A calls its n remote components, each of + which returns an equivalent, reduced dict with `Stats` instances as leafs. + Component A can now further log these n result dicts through its own + MetricsLogger: + `logger.merge_and_log_n_dicts([n returned result dicts from the remote + components])`. + + .. testcode:: + + from ray.rllib.utils.metrics.metrics_logger import MetricsLogger + from ray.rllib.utils.test_utils import check + + # Log some (EMA reduced) values. + logger = MetricsLogger() + logger.log_value("a", 2.0) + logger.log_value("a", 3.0) + expected_reduced = (1.0 - 0.01) * 2.0 + 0.01 * 3.0 + # Reduce and return primitive values (not Stats objects). + results = logger.reduce(return_stats_obj=False) + check(results, {"a": expected_reduced}) + + # Log some values to be averaged with a sliding window. + logger = MetricsLogger() + logger.log_value("a", 2.0, window=2) + logger.log_value("a", 3.0) + logger.log_value("a", 4.0) + expected_reduced = (3.0 + 4.0) / 2 # <- win size is only 2; first logged + # item not used + # Reduce and return primitive values (not Stats objects). + results = logger.reduce(return_stats_obj=False) + check(results, {"a": expected_reduced}) + + # Assume we have 2 remote components, each one returning an equivalent + # reduced dict when called. We can simply use these results and log them + # to our own MetricsLogger, then reduce over these 2 logged results. + comp1_logger = MetricsLogger() + comp1_logger.log_value("a", 1.0, window=10) + comp1_logger.log_value("a", 2.0) + result1 = comp1_logger.reduce() # <- return Stats objects as leafs + + comp2_logger = MetricsLogger() + comp2_logger.log_value("a", 3.0, window=10) + comp2_logger.log_value("a", 4.0) + result2 = comp2_logger.reduce() # <- return Stats objects as leafs + + # Now combine the 2 equivalent results into 1 end result dict. + downstream_logger = MetricsLogger() + downstream_logger.merge_and_log_n_dicts([result1, result2]) + # What happens internally is that both values lists of the 2 components + # are merged (concat'd) and randomly shuffled, then clipped at 10 (window + # size). This is done such that no component has an "advantage" over the + # other as we don't know the exact time-order in which these parallelly + # running components logged their own "a"-values. + # We execute similarly useful merging strategies for other reduce settings, + # such as EMA, max/min/sum-reducing, etc.. + end_result = downstream_logger.reduce(return_stats_obj=False) + check(end_result, {"a": 2.5}) + + Args: + key: Optional key or key sequence (for nested location within self.stats), + limiting the reduce operation to that particular sub-structure of self. + If None, will reduce all of self's Stats. + return_stats_obj: Whether in the returned dict, the leafs should be Stats + objects. This is the default as it enables users to continue using + (and further logging) the results of this call inside another + (downstream) MetricsLogger object. + + Returns: + A (nested) dict matching the structure of `self.stats` (contains all ever + logged keys to this MetricsLogger) with the leafs being (reduced) Stats + objects if `return_stats_obj=True` or primitive values, carrying no + reduction and history information, if `return_stats_obj=False`. + """ + # For better error message, catch the last key-path (reducing of which might + # throw an error). + PATH = None + + def _reduce(path, stats): + nonlocal PATH + PATH = path + return stats.reduce() + + # Create a shallow copy of `self.stats` in case we need to reset some of our + # stats due to this `reduce()` call (and the Stat having self.clear_on_reduce + # set to True). In case we clear the Stats upon `reduce`, we receive a + # new empty `Stats` object from `stat.reduce()` with the same settings as + # existing one and can now re-assign it to `self.stats[key]` (while we return + # from this method the properly reduced, but not cleared/emptied new `Stats`). + try: + if key is not None: + stats_to_return = self._get_key(key, key_error=False).copy() + self._set_key( + key, tree.map_structure_with_path(_reduce, stats_to_return) + ) + else: + stats_to_return = self.stats.copy() + self.stats = tree.map_structure_with_path(_reduce, stats_to_return) + # Provide proper error message if reduction fails due to bad data. + except Exception as e: + raise ValueError( + "There was an error while reducing the Stats object under key=" + f"{PATH}! Check, whether you logged invalid or incompatible " + "values into this key over time in your custom code." + f"\nThe values under this key are: {self._get_key(PATH).values}." + f"\nThe original error was {str(e)}" + ) + + # Return (reduced) `Stats` objects as leafs. + if return_stats_obj: + return stats_to_return + # Return actual (reduced) values (not reduced `Stats` objects) as leafs. + else: + return tree.map_structure(lambda s: s.peek(), stats_to_return) + def activate_tensor_mode(self): """Switches to tensor-mode, in which in-graph tensors can be logged. @@ -959,7 +1073,7 @@ def _check_tensor(self, key: Tuple[str], value) -> None: ): self._tensor_keys.add(key) - def _key_in_stats(self, flat_key, stats=None): + def _key_in_stats(self, flat_key, *, stats=None): flat_key = force_tuple(tree.flatten(flat_key)) _dict = stats if stats is not None else self.stats for key in flat_key: @@ -968,11 +1082,17 @@ def _key_in_stats(self, flat_key, stats=None): _dict = _dict[key] return True - def _get_key(self, flat_key, stats=None): + def _get_key(self, flat_key, *, stats=None, key_error=True): flat_key = force_tuple(tree.flatten(flat_key)) _dict = stats if stats is not None else self.stats for key in flat_key: - _dict = _dict[key] + try: + _dict = _dict[key] + except KeyError as e: + if key_error: + raise e + else: + return {} return _dict def _set_key(self, flat_key, stats): diff --git a/rllib/utils/metrics/stats.py b/rllib/utils/metrics/stats.py index 41d7d6f5dbd0..1929cec2b063 100644 --- a/rllib/utils/metrics/stats.py +++ b/rllib/utils/metrics/stats.py @@ -216,6 +216,9 @@ def __init__( # Code to execute when exiting a with-context. self._on_exit = on_exit + # On each `.reduce()` call, we store the result of this call in + self._hist = (0, 0) + def push(self, value) -> None: """Appends a new value into the internal values list. @@ -247,25 +250,32 @@ def __exit__(self, exc_type, exc_value, tb) -> None: """Called when exiting a context (with which users can measure a time delta).""" thread_id = threading.get_ident() assert self._start_times[thread_id] is not None - time_delta = time.perf_counter() - self._start_times[thread_id] - self.push(time_delta) + time_delta_s = time.perf_counter() - self._start_times[thread_id] + self.push(time_delta_s) # Call the on_exit handler. if self._on_exit: - self._on_exit(time_delta) + self._on_exit(time_delta_s) del self._start_times[thread_id] - def peek(self) -> Any: + def peek(self, *, previous: bool = False) -> Any: """Returns the result of reducing the internal values list. Note that this method does NOT alter the internal values list in this process. Thus, users can call this method to get an accurate look at the reduced value given the current internal values list. + Args: + previous: If True, returns the previous (reduced) result of this `Stats` + object. + Returns: - The result of reducing the internal values list. + The result of reducing the internal values list (or the previously computed + reduced result, if `previous` is True). """ + if previous: + return self._hist[1] return self._reduced_values()[0] def reduce(self) -> "Stats": @@ -281,10 +291,16 @@ class for details on the reduction logic applied to the values list, based on Returns a new `Stats` object with an empty internal values list, but otherwise the same constructor settings (window, reduce, etc..) as `self`. """ + reduced, values = self._reduced_values() + # Reduce everything to a single (init) value. - self.values = self._reduced_values()[1] - # `clear_on_reduce` -> Return an empty new Stats object with the same option as - # `self`. + self.values = values + + # Shift historic reduced valued by one in our hist-tuple. + self._hist = (reduced, self._hist[0]) + + # `clear_on_reduce` -> Return an empty new Stats object with the same settings + # as `self`. if self._clear_on_reduce: return Stats.similar_to(self) # No reset required upon `reduce()` -> Return `self`. @@ -549,7 +565,7 @@ def __format__(self, fmt): def get_state(self) -> Dict[str, Any]: return { - "values": self.values, + "values": convert_to_numpy(self.values), "reduce": self._reduce_method, "window": self._window, "ema_coeff": self._ema_coeff, @@ -567,14 +583,35 @@ def from_state(state: Dict[str, Any]) -> "Stats": ) @staticmethod - def similar_to(other: "Stats", init_value: Optional[Any] = None): - return Stats( + def similar_to( + other: "Stats", + init_value: Optional[Any] = None, + prev_values: Optional[Tuple[Any, Any]] = None, + ) -> "Stats": + """Returns a new Stats object that's similar to `other`. + + "Similar" here means it has the exact same settings (reduce, window, ema_coeff, + etc..). The initial values of the returned `Stats` are empty by default, but + can be set as well. + + Args: + other: The other Stats object to return a similar new Stats equivalent for. + init_value: The initial value to already push into the returned Stats. If + None (default), the returned Stats object will have no values in it. + + Returns: + A new Stats object similar to `other`, with the exact same settings and + maybe a custom initial value (if provided; otherwise empty). + """ + stats = Stats( init_value=init_value, reduce=other._reduce_method, window=other._window, ema_coeff=other._ema_coeff, clear_on_reduce=other._clear_on_reduce, ) + stats._hist = other._hist + return stats def _reduced_values(self, values=None, window=None) -> Tuple[Any, Any]: """Runs a non-commited reduction procedure on given values (or `self.values`). diff --git a/rllib/utils/metrics/window_stat.py b/rllib/utils/metrics/window_stat.py index 83f86dd9b374..1ea4fe35c956 100644 --- a/rllib/utils/metrics/window_stat.py +++ b/rllib/utils/metrics/window_stat.py @@ -1,6 +1,9 @@ import numpy as np +from ray.rllib.utils.annotations import OldAPIStack + +@OldAPIStack class WindowStat: """Handles/stores incoming dataset and provides window-based statistics. diff --git a/rllib/utils/test_utils.py b/rllib/utils/test_utils.py index 6e76ff8aca15..0973188d8848 100644 --- a/rllib/utils/test_utils.py +++ b/rllib/utils/test_utils.py @@ -422,6 +422,14 @@ def check(x, y, decimals=5, atol=None, rtol=None, false=False): if isinstance(y, torch.Tensor): y = y.detach().cpu().numpy() + # Stats objects. + from ray.rllib.utils.metrics.stats import Stats + + if isinstance(x, Stats): + x = x.peek() + if isinstance(y, Stats): + y = y.peek() + # Using decimals. if atol is None and rtol is None: # Assert equality of both values. @@ -800,8 +808,6 @@ def check_train_results_new_api_stack(train_results: ResultDict) -> None: ENV_RUNNER_RESULTS, FAULT_TOLERANCE_STATS, LEARNER_RESULTS, - NUM_AGENT_STEPS_SAMPLED_LIFETIME, - NUM_ENV_STEPS_SAMPLED_LIFETIME, TIMERS, ) @@ -810,8 +816,6 @@ def check_train_results_new_api_stack(train_results: ResultDict) -> None: ENV_RUNNER_RESULTS, FAULT_TOLERANCE_STATS, LEARNER_RESULTS, - NUM_AGENT_STEPS_SAMPLED_LIFETIME, - NUM_ENV_STEPS_SAMPLED_LIFETIME, TIMERS, TRAINING_ITERATION, "config", @@ -1058,7 +1062,9 @@ def run_rllib_example_script_experiment( if stop is None: stop = { f"{ENV_RUNNER_RESULTS}/{EPISODE_RETURN_MEAN}": args.stop_reward, - f"{NUM_ENV_STEPS_SAMPLED_LIFETIME}": args.stop_timesteps, + f"{ENV_RUNNER_RESULTS}/{NUM_ENV_STEPS_SAMPLED_LIFETIME}": ( + args.stop_timesteps + ), TRAINING_ITERATION: args.stop_iters, }