diff --git a/rllib/utils/metrics/metrics_logger.py b/rllib/utils/metrics/metrics_logger.py index 773fc737510f..d662ed5dc6f6 100644 --- a/rllib/utils/metrics/metrics_logger.py +++ b/rllib/utils/metrics/metrics_logger.py @@ -4,7 +4,6 @@ import tree # pip install dm_tree from ray.rllib.utils import force_tuple -from ray.rllib.utils.deprecation import Deprecated from ray.rllib.utils.metrics.stats import Stats from ray.rllib.utils.framework import try_import_tf, try_import_torch from ray.util.annotations import PublicAPI @@ -54,6 +53,183 @@ def __init__(self): self._tensor_mode = False self._tensor_keys = set() + def peek( + self, + key: Union[str, Tuple[str, ...]], + *, + default: Optional[Any] = None, + ) -> Any: + """Returns the (reduced) value(s) found under the given key or key sequence. + + If `key` only reaches to a nested dict deeper in `self`, that + sub-dictionary's entire values are returned as a (nested) dict with its leafs + being the reduced peek values. + + Note that calling this method does NOT cause an actual underlying value list + reduction, even though reduced values are being returned. It'll keep all + internal structures as-is. + + .. testcode:: + from ray.rllib.utils.metrics.metrics_logger import MetricsLogger + from ray.rllib.utils.test_utils import check + + logger = MetricsLogger() + ema = 0.01 + + # Log some (EMA reduced) values. + key = ("some", "nested", "key", "sequence") + logger.log_value(key, 2.0, ema_coeff=ema) + logger.log_value(key, 3.0) + + # Expected reduced value: + expected_reduced = (1.0 - ema) * 2.0 + ema * 3.0 + + # Peek at the (reduced) value under `key`. + check(logger.peek(key), expected_reduced) + + # Peek at the (reduced) nested struct under ("some", "nested"). + check( + logger.peek(("some", "nested")), + {"key": {"sequence": expected_reduced}}, + ) + + # Log some more, check again. + logger.log_value(key, 4.0) + expected_reduced = (1.0 - ema) * expected_reduced + ema * 4.0 + check(logger.peek(key), expected_reduced) + + Args: + key: The key/key sequence of the sub-structure of `self`, whose (reduced) + values to return. + default: An optional default value in case `key` cannot be found in `self`. + If default is not provided and `key` cannot be found, throws a KeyError. + + Returns: + The (reduced) values of the (possibly nested) sub-structure found under + the given `key` or key sequence. + + Raises: + KeyError: If `key` cannot be found AND `default` is not provided. + """ + # Use default value, b/c `key` cannot be found in our stats. + if not self._key_in_stats(key) and default is not None: + return default + + # Create a reduced view of the requested sub-structure or leaf (Stats object). + ret = tree.map_structure(lambda s: s.peek(), self._get_key(key)) + + # Otherwise, return the reduced Stats' (peek) value. + return ret + + def reduce( + self, + key: Optional[Union[str, Tuple[str, ...]]] = None, + *, + return_stats_obj: bool = True, + ) -> Dict: + """Reduces all logged values based on their settings and returns a result dict. + + The returned result dict has the exact same structure as the logged keys (or + nested key sequences) combined. At the leafs of the returned structure are + either `Stats` objects (return_stats_obj=True, which is the default) or + primitive (non-Stats) values. In case of `return_stats_obj=True`, the returned + dict with Stats at the leafs can conveniently be re-used downstream for further + logging and reduction operations. + + For example, imagine component A (e.g. an Algorithm) containing a MetricsLogger + and n remote components (e.g. EnvRunner workers), each with their own + MetricsLogger object. Component A now calls its n remote components, each of + which returns an equivalent, reduced dict with `Stats` as leafs. + Component A can then further log these n result dicts via its own MetricsLogger: + `logger.merge_and_log_n_dicts([n returned result dicts from the remote + components])`. + + .. testcode:: + + from ray.rllib.utils.metrics.metrics_logger import MetricsLogger + from ray.rllib.utils.test_utils import check + + # Log some (EMA reduced) values. + logger = MetricsLogger() + logger.log_value("a", 2.0) + logger.log_value("a", 3.0) + expected_reduced = (1.0 - 0.01) * 2.0 + 0.01 * 3.0 + # Reduce and return primitive values (not Stats objects). + results = logger.reduce(return_stats_obj=False) + check(results, {"a": expected_reduced}) + + # Log some values to be averaged with a sliding window. + logger = MetricsLogger() + logger.log_value("a", 2.0, window=2) + logger.log_value("a", 3.0) + logger.log_value("a", 4.0) + expected_reduced = (3.0 + 4.0) / 2 # <- win size is only 2; first logged + # item not used + # Reduce and return primitive values (not Stats objects). + results = logger.reduce(return_stats_obj=False) + check(results, {"a": expected_reduced}) + + # Assume we have 2 remote components, each one returning an equivalent + # reduced dict when called. We can simply use these results and log them + # to our own MetricsLogger, then reduce over these 2 logged results. + comp1_logger = MetricsLogger() + comp1_logger.log_value("a", 1.0, window=10) + comp1_logger.log_value("a", 2.0) + result1 = comp1_logger.reduce() # <- return Stats objects as leafs + + comp2_logger = MetricsLogger() + comp2_logger.log_value("a", 3.0, window=10) + comp2_logger.log_value("a", 4.0) + result2 = comp2_logger.reduce() # <- return Stats objects as leafs + + # Now combine the 2 equivalent results into 1 end result dict. + downstream_logger = MetricsLogger() + downstream_logger.merge_and_log_n_dicts([result1, result2]) + # What happens internally is that both values lists of the 2 components + # are merged (concat'd) and randomly shuffled, then clipped at 10 (window + # size). This is done such that no component has an "advantage" over the + # other as we don't know the exact time-order in which these parallelly + # running components logged their own "a"-values. + # We execute similarly useful merging strategies for other reduce settings, + # such as EMA, max/min/sum-reducing, etc.. + end_result = downstream_logger.reduce(return_stats_obj=False) + check(end_result, {"a": 2.5}) + + Args: + key: Optional key or key sequence (for nested location within self.stats), + limiting the reduce operation to that particular sub-structure of self. + If None, will reduce all of self's Stats. + return_stats_obj: Whether in the returned dict, the leafs should be Stats + objects. This is the default as it enables users to continue using + (and further logging) the results of this call inside another + (downstream) MetricsLogger object. + + Returns: + A (nested) dict matching the structure of `self.stats` (contains all ever + logged keys to this MetricsLogger) with the leafs being (reduced) Stats + objects if `return_stats_obj=True` or primitive values, carrying no + reduction and history information, if `return_stats_obj=False`. + """ + # Create a shallow copy of `self.stats` in case we need to reset some of our + # stats due to this `reduce()` call (and the Stat having self.clear_on_reduce + # set to True). In case we clear the Stats upon `reduce`, we get returned a + # new empty `Stats` object from `stat.reduce()` with the same settings as + # existing one and can now re-assign it to `self.stats[key]` (while we return + # from this method the properly reduced, but not cleared/emptied new `Stats`). + if key is not None: + stats_to_return = self._get_key(key).copy() + self._set_key( + key, tree.map_structure(lambda s: s.reduce(), stats_to_return) + ) + else: + stats_to_return = self.stats.copy() + self.stats = tree.map_structure(lambda s: s.reduce(), stats_to_return) + + if return_stats_obj: + return stats_to_return + else: + return tree.map_structure(lambda s: s.peek(), stats_to_return) + def log_value( self, key: Union[str, Tuple[str, ...]], @@ -608,74 +784,6 @@ def tensors_to_numpy(self, tensor_metrics): def tensor_mode(self): return self._tensor_mode - def peek( - self, - key: Union[str, Tuple[str, ...]], - *, - default: Optional[Any] = None, - ) -> Any: - """Returns the (reduced) value(s) found under the given key or key sequence. - - If `key` only reaches to a nested dict deeper in `self`, that - sub-dictionary's entire values are returned as a (nested) dict with its leafs - being the reduced peek values. - - Note that calling this method does NOT cause an actual underlying value list - reduction, even though reduced values are being returned. It'll keep all - internal structures as-is. - - .. testcode:: - from ray.rllib.utils.metrics.metrics_logger import MetricsLogger - from ray.rllib.utils.test_utils import check - - logger = MetricsLogger() - ema = 0.01 - - # Log some (EMA reduced) values. - key = ("some", "nested", "key", "sequence") - logger.log_value(key, 2.0, ema_coeff=ema) - logger.log_value(key, 3.0) - - # Expected reduced value: - expected_reduced = (1.0 - ema) * 2.0 + ema * 3.0 - - # Peek at the (reduced) value under `key`. - check(logger.peek(key), expected_reduced) - - # Peek at the (reduced) nested struct under ("some", "nested"). - check( - logger.peek(("some", "nested")), - {"key": {"sequence": expected_reduced}}, - ) - - # Log some more, check again. - logger.log_value(key, 4.0) - expected_reduced = (1.0 - ema) * expected_reduced + ema * 4.0 - check(logger.peek(key), expected_reduced) - - Args: - key: The key/key sequence of the sub-structure of `self`, whose (reduced) - values to return. - default: An optional default value in case `key` cannot be found in `self`. - If default is not provided and `key` cannot be found, throws a KeyError. - - Returns: - The (reduced) values of the (possibly nested) sub-structure found under - the given `key` or key sequence. - - Raises: - KeyError: If `key` cannot be found AND `default` is not provided. - """ - # Use default value, b/c `key` cannot be found in our stats. - if not self._key_in_stats(key) and default is not None: - return default - - # Create a reduced view of the requested sub-structure or leaf (Stats object). - ret = tree.map_structure(lambda s: s.peek(), self._get_key(key)) - - # Otherwise, return the reduced Stats' (peek) value. - return ret - def set_value( self, key: Union[str, Tuple[str, ...]], @@ -736,127 +844,35 @@ def set_value( clear_on_reduce=clear_on_reduce, ) - def delete(self, *key: Tuple[str, ...], key_error: bool = True) -> None: - """Deletes th egiven `key` from this metrics logger's stats. - - Args: - key: The key or key sequence (for nested location within self.stats), - to delete from this MetricsLogger's stats. - key_error: Whether to throw a KeyError if `key` cannot be found in `self`. - - Raises: - KeyError: If `key` cannot be found in `self` AND `key_error` is True. - """ - self._del_key(key, key_error) - - def reduce( - self, - key: Optional[Union[str, Tuple[str, ...]]] = None, - *, - return_stats_obj: bool = True, - ) -> Dict: - """Reduces all logged values based on their settings and returns a result dict. - - The returned result dict has the exact same structure as the logged keys (or - nested key sequences) combined. At the leafs of the returned structure are - either `Stats` objects (return_stats_obj=True, which is the default) or - primitive (non-Stats) values. In case of `return_stats_obj=True`, the returned - dict with Stats at the leafs can conveniently be re-used downstream for further - logging and reduction operations. - - For example, imagine component A (e.g. an Algorithm) containing a MetricsLogger - and n remote components (e.g. EnvRunner workers), each with their own - MetricsLogger object. Component A now calls its n remote components, each of - which returns an equivalent, reduced dict with `Stats` as leafs. - Component A can then further log these n result dicts via its own MetricsLogger: - `logger.merge_and_log_n_dicts([n returned result dicts from the remote - components])`. + def reset(self) -> None: + """Resets all data stored in this MetricsLogger. .. testcode:: from ray.rllib.utils.metrics.metrics_logger import MetricsLogger from ray.rllib.utils.test_utils import check - # Log some (EMA reduced) values. logger = MetricsLogger() - logger.log_value("a", 2.0) - logger.log_value("a", 3.0) - expected_reduced = (1.0 - 0.01) * 2.0 + 0.01 * 3.0 - # Reduce and return primitive values (not Stats objects). - results = logger.reduce(return_stats_obj=False) - check(results, {"a": expected_reduced}) - - # Log some values to be averaged with a sliding window. - logger = MetricsLogger() - logger.log_value("a", 2.0, window=2) - logger.log_value("a", 3.0) - logger.log_value("a", 4.0) - expected_reduced = (3.0 + 4.0) / 2 # <- win size is only 2; first logged - # item not used - # Reduce and return primitive values (not Stats objects). - results = logger.reduce(return_stats_obj=False) - check(results, {"a": expected_reduced}) - - # Assume we have 2 remote components, each one returning an equivalent - # reduced dict when called. We can simply use these results and log them - # to our own MetricsLogger, then reduce over these 2 logged results. - comp1_logger = MetricsLogger() - comp1_logger.log_value("a", 1.0, window=10) - comp1_logger.log_value("a", 2.0) - result1 = comp1_logger.reduce() # <- return Stats objects as leafs - - comp2_logger = MetricsLogger() - comp2_logger.log_value("a", 3.0, window=10) - comp2_logger.log_value("a", 4.0) - result2 = comp2_logger.reduce() # <- return Stats objects as leafs + logger.log_value("a", 1.0) + check(logger.peek("a", 1.0) + logger.reset() + check(logger.reduce(), {}) + """ + self.stats = {} + self._tensor_keys = set() - # Now combine the 2 equivalent results into 1 end result dict. - downstream_logger = MetricsLogger() - downstream_logger.merge_and_log_n_dicts([result1, result2]) - # What happens internally is that both values lists of the 2 components - # are merged (concat'd) and randomly shuffled, then clipped at 10 (window - # size). This is done such that no component has an "advantage" over the - # other as we don't know the exact time-order in which these parallelly - # running components logged their own "a"-values. - # We execute similarly useful merging strategies for other reduce settings, - # such as EMA, max/min/sum-reducing, etc.. - end_result = downstream_logger.reduce(return_stats_obj=False) - check(end_result, {"a": 2.5}) + def delete(self, *key: Tuple[str, ...], key_error: bool = True) -> None: + """Deletes the given `key` from this metrics logger's stats. Args: - key: Optional key or key sequence (for nested location within self.stats), - limiting the reduce operation to that particular sub-structure of self. - If None, will reduce all of self's Stats. - return_stats_obj: Whether in the returned dict, the leafs should be Stats - objects. This is the default as it enables users to continue using - (and further logging) the results of this call inside another - (downstream) MetricsLogger object. + key: The key or key sequence (for nested location within self.stats), + to delete from this MetricsLogger's stats. + key_error: Whether to throw a KeyError if `key` cannot be found in `self`. - Returns: - A (nested) dict matching the structure of `self.stats` (contains all ever - logged keys to this MetricsLogger) with the leafs being (reduced) Stats - objects if `return_stats_obj=True` or primitive values, carrying no - reduction and history information, if `return_stats_obj=False`. + Raises: + KeyError: If `key` cannot be found in `self` AND `key_error` is True. """ - # Create a shallow copy of `self.stats` in case we need to reset some of our - # stats due to this `reduce()` call (and the Stat having self.clear_on_reduce - # set to True). In case we clear the Stats upon `reduce`, we get returned a - # new empty `Stats` object from `stat.reduce()` with the same settings as - # existing one and can now re-assign it to `self.stats[key]` (while we return - # from this method the properly reduced, but not cleared/emptied new `Stats`). - if key is not None: - stats_to_return = self._get_key(key).copy() - self._set_key( - key, tree.map_structure(lambda s: s.reduce(), stats_to_return) - ) - else: - stats_to_return = self.stats.copy() - self.stats = tree.map_structure(lambda s: s.reduce(), stats_to_return) - - if return_stats_obj: - return stats_to_return - else: - return tree.map_structure(lambda s: s.peek(), stats_to_return) + self._del_key(key, key_error) def get_state(self) -> Dict[str, Any]: """Returns the current state of `self` as a dict. @@ -932,7 +948,3 @@ def _del_key(self, flat_key, key_error=False): except KeyError as e: if key_error: raise e - - @Deprecated(new="MetricsLogger.merge_and_log_n_dicts()", error=True) - def log_n_dicts(self, *args, **kwargs): - pass