diff --git a/ding/utils/lock_helper.py b/ding/utils/lock_helper.py index 1023517abf..67586b0767 100644 --- a/ding/utils/lock_helper.py +++ b/ding/utils/lock_helper.py @@ -14,7 +14,8 @@ @unique class LockContextType(Enum): """ - Enum to express the type of the lock + Overview: + Enum to express the type of the lock. """ THREAD_LOCK = 1 PROCESS_LOCK = 2 @@ -32,7 +33,7 @@ class LockContext(object): Generate a LockContext in order to make sure the thread safety. Interfaces: - ``__init__``, ``__enter__``, ``__exit__`` + ``__init__``, ``__enter__``, ``__exit__``. Example: >>> with LockContext() as lock: @@ -40,29 +41,40 @@ class LockContext(object): """ def __init__(self, type_: LockContextType = LockContextType.THREAD_LOCK): - r""" + """ Overview: - Init the lock according to given type + Init the lock according to the given type. + + Arguments: + type_ (:obj:`LockContextType`): The type of lock to be used. Defaults to LockContextType.THREAD_LOCK. """ self.lock = _LOCK_TYPE_MAPPING[type_]() def acquire(self): + """ + Overview: + Acquires the lock. + """ self.lock.acquire() def release(self): + """ + Overview: + Releases the lock. + """ self.lock.release() def __enter__(self): """ Overview: - Entering the context and acquire lock + Enters the context and acquires the lock. """ self.lock.acquire() def __exit__(self, *args, **kwargs): """ Overview: - Quiting the context and release lock + Exits the context and releases the lock. """ self.lock.release() @@ -71,15 +83,15 @@ def __exit__(self, *args, **kwargs): def get_rw_file_lock(name: str, op: str): - r''' + """ Overview: Get generated file lock with name and operator Arguments: - - name (:obj:`str`) Lock's name. - - op (:obj:`str`) Assigned operator, i.e. ``read`` or ``write``. + - name (:obj:`str`): Lock's name. + - op (:obj:`str`): Assigned operator, i.e. ``read`` or ``write``. Returns: - - (:obj:`RWLockFairD`) Generated rwlock - ''' + - (:obj:`RWLockFairD`): Generated rwlock + """ assert op in ['read', 'write'] try: from readerwriterlock import rwlock @@ -98,22 +110,60 @@ def get_rw_file_lock(name: str, op: str): class FcntlContext: + """ + Overview: + A context manager that acquires an exclusive lock on a file using fcntl. \ + This is useful for preventing multiple processes from running the same code. + + Interfaces: + ``__init__``, ``__enter__``, ``__exit__``. + + Example: + >>> lock_path = "/path/to/lock/file" + >>>with FcntlContext(lock_path) as lock: + >>> # Perform operations while the lock is held + + """ def __init__(self, lock_path: str) -> None: + """ + Overview: + Initialize the LockHelper object. + + Arguments: + - lock_path (:obj:`str`): The path to the lock file. + """ self.lock_path = lock_path self.f = None def __enter__(self) -> None: + """ + Overview: + Acquires the lock and opens the lock file in write mode. \ + If the lock file does not exist, it is created. + """ assert self.f is None, self.lock_path self.f = open(self.lock_path, 'w') fcntl.flock(self.f.fileno(), fcntl.LOCK_EX) def __exit__(self, *args, **kwargs) -> None: + """ + Overview: + Closes the file and releases any resources used by the lock_helper object. + """ self.f.close() self.f = None -def get_file_lock(name: str, op: str) -> None: +def get_file_lock(name: str, op: str) -> FcntlContext: + """ + Overview: + Acquires a file lock for the specified file. \ + + Arguments: + - name (:obj:`str`): The name of the file. + - op (:obj:`str`): The operation to perform on the file lock. + """ if fcntl is None: return get_rw_file_lock(name, op) else: diff --git a/ding/utils/normalizer_helper.py b/ding/utils/normalizer_helper.py index 5d2e1aff38..ad968a365e 100755 --- a/ding/utils/normalizer_helper.py +++ b/ding/utils/normalizer_helper.py @@ -2,14 +2,32 @@ class DatasetNormalizer: + """ + Overview: + The `DatasetNormalizer` class provides functionality to normalize and unnormalize data in a dataset. + It takes a dataset as input and applies a normalizer function to each key in the dataset. + + Interface: + ``__init__``, ``__repr__``, ``normalize``, ``unnormalize``. + """ def __init__(self, dataset: np.ndarray, normalizer: str, path_lengths: int = None): + """ + Overview: + Initialize the NormalizerHelper object. + + Arguments: + - dataset (:obj:`np.ndarray`): The dataset to be normalized. + - normalizer (:obj:`str`): The type of normalizer to be used. Can be a string representing the name of \ + the normalizer class. + - path_lengths (:obj:`int`): The length of the paths in the dataset. Defaults to None. + """ dataset = flatten(dataset, path_lengths) self.observation_dim = dataset['observations'].shape[1] self.action_dim = dataset['actions'].shape[1] - if type(normalizer) == str: + if isinstance(normalizer, str): normalizer = eval(normalizer) self.normalizers = {} @@ -21,23 +39,61 @@ def __init__(self, dataset: np.ndarray, normalizer: str, path_lengths: int = Non # key: normalizer(val) # for key, val in dataset.items() - def __repr__(self): + def __repr__(self) -> str: + """ + Overview: + Returns a string representation of the NormalizerHelper object. \ + The string representation includes the key-value pairs of the normalizers \ + stored in the NormalizerHelper object. + Returns: + - ret (:obj:`str`):A string representation of the NormalizerHelper object. + """ string = '' for key, normalizer in self.normalizers.items(): string += f'{key}: {normalizer}]\n' return string - def normalize(self, x, key): + def normalize(self, x: np.ndarray, key: str) -> np.ndarray: + """ + Overview: + Normalize the input data using the specified key. + + Arguments: + - x (:obj:`np.ndarray`): The input data to be normalized. + - key (:obj`str`): The key to identify the normalizer. + + Returns: + - ret (:obj:`np.ndarray`): The normalized value of the input data. + """ return self.normalizers[key].normalize(x) - def unnormalize(self, x, key): + def unnormalize(self, x: np.ndarray, key: str) -> np.ndarray: + """ + Overview: + Unnormalizes the given value `x` using the specified `key`. + + Arguments: + - x (:obj:`np.ndarray`): The value to be unnormalized. + - key (:obj`str`): The key to identify the normalizer. + + Returns: + - ret (:obj:`np.ndarray`): The unnormalized value. + """ return self.normalizers[key].unnormalize(x) -def flatten(dataset, path_lengths): +def flatten(dataset: dict, path_lengths: list) -> dict: """ - flattens dataset of { key: [ n_episodes x max_path_lenth x dim ] } - to { key : [ (n_episodes * sum(path_lengths)) x dim ]} + Overview: + Flattens dataset of { key: [ n_episodes x max_path_length x dim ] } \ + to { key : [ (n_episodes * sum(path_lengths)) x dim ] } + + Arguments: + - dataset (:obj:`dict`): The dataset to be flattened. + - path_lengths (:obj:`list`): A list of path lengths for each episode. + + Returns: + - flattened (:obj:`dict`): The flattened dataset. """ flattened = {} for key, xs in dataset.items(): @@ -50,7 +106,11 @@ def flatten(dataset, path_lengths): class Normalizer: """ - parent class, subclass by defining the `normalize` and `unnormalize` methods + Overview: + Parent class, subclass by defining the `normalize` and `unnormalize` methods + + Interface: + ``__init__``, ``__repr__``, ``normalize``, ``unnormalize``. """ def __init__(self, X): @@ -60,8 +120,8 @@ def __init__(self, X): def __repr__(self): return ( - f'''[ Normalizer ] dim: {self.mins.size}\n -: ''' - f'''{np.round(self.mins, 2)}\n +: {np.round(self.maxs, 2)}\n''' + f"""[ Normalizer ] dim: {self.mins.size}\n -: """ + f"""{np.round(self.mins, 2)}\n +: {np.round(self.maxs, 2)}\n""" ) def normalize(self, *args, **kwargs): @@ -73,7 +133,11 @@ def unnormalize(self, *args, **kwargs): class GaussianNormalizer(Normalizer): """ - normalizes to zero mean and unit variance + Overview: + A class that normalizes data to zero mean and unit variance. + + Interface: + ``__init__``, ``__repr__``, ``normalize``, ``unnormalize``. """ def __init__(self, *args, **kwargs): @@ -84,21 +148,45 @@ def __init__(self, *args, **kwargs): def __repr__(self): return ( - f'''[ Normalizer ] dim: {self.mins.size}\n ''' - f'''means: {np.round(self.means, 2)}\n ''' - f'''stds: {np.round(self.z * self.stds, 2)}\n''' + f"""[ Normalizer ] dim: {self.mins.size}\n """ + f"""means: {np.round(self.means, 2)}\n """ + f"""stds: {np.round(self.z * self.stds, 2)}\n""" ) - def normalize(self, x): + def normalize(self, x: np.ndarray) -> np.ndarray: + """ + Overview: + Normalize the input data. + + Arguments: + - x (:obj:`np.ndarray`): The input data to be normalized. + + Returns: + - ret (:obj:`np.ndarray`): The normalized data. + """ return (x - self.means) / self.stds - def unnormalize(self, x): + def unnormalize(self, x: np.ndarray) -> np.ndarray: + """ + Overview: + Unnormalize the input data. + + Arguments: + - x (:obj:`np.ndarray`): The input data to be unnormalized. + + Returns: + - ret (:obj:`np.ndarray`): The unnormalized data. + """ return x * self.stds + self.means class CDFNormalizer(Normalizer): """ - makes training data uniform (over each dimension) by transforming it with marginal CDFs + Overview: + A class that makes training data uniform (over each dimension) by transforming it with marginal CDFs. + + Interface: + ``__init__``, ``__repr__``, ``normalize``, ``unnormalize``. """ def __init__(self, X): @@ -111,7 +199,18 @@ def __repr__(self): f'{i:3d}: {cdf}' for i, cdf in enumerate(self.cdfs) ) - def wrap(self, fn_name, x): + def wrap(self, fn_name: str, x: np.ndarray) -> np.ndarray: + """ + Overview: + Wraps the given function name and applies it to the input data. + + Arguments: + - fn_name (:obj:`str`): The name of the function to be applied. + - x (:obj:`np.ndarray`): The input data. + + Returns: + - ret: The output of the function applied to the input data. + """ shape = x.shape # reshape to 2d x = x.reshape(-1, self.dim) @@ -121,19 +220,43 @@ def wrap(self, fn_name, x): out[:, i] = fn(x[:, i]) return out.reshape(shape) - def normalize(self, x): + def normalize(self, x: np.ndarray) -> np.ndarray: + """ + Overview: + Normalizes the input data. + + Arguments: + - x (:obj:`np.ndarray`): The input data. + + Returns: + - ret (:obj:`np.ndarray`): The normalized data. + """ return self.wrap('normalize', x) - def unnormalize(self, x): + def unnormalize(self, x: np.ndarray) -> np.ndarray: + """ + Overview: + Unnormalizes the input data. + + Arguments: + - x (:obj:`np.ndarray`): The input data. + + Returns: + - ret (:obj:`np.ndarray`):: The unnormalized data. + """ return self.wrap('unnormalize', x) class CDFNormalizer1d: """ - CDF normalizer for a single dimension + Overview: + CDF normalizer for a single dimension. This class provides methods to normalize and unnormalize data \ + using the Cumulative Distribution Function (CDF) approach. + Interface: + ``__init__``, ``__repr__``, ``normalize``, ``unnormalize``. """ - def __init__(self, X): + def __init__(self, X: np.ndarray): import scipy.interpolate as interpolate assert X.ndim == 1 self.X = X.astype(np.float32) @@ -148,10 +271,20 @@ def __init__(self, X): self.xmin, self.xmax = quantiles.min(), quantiles.max() self.ymin, self.ymax = cumprob.min(), cumprob.max() - def __repr__(self): + def __repr__(self) -> str: return (f'[{np.round(self.xmin, 2):.4f}, {np.round(self.xmax, 2):.4f}') - def normalize(self, x): + def normalize(self, x: np.ndarray) -> np.ndarray: + """ + Overview: + Normalize the input data. + + Arguments: + - x (:obj:`np.ndarray`): The data to be normalized. + + Returns: + - ret (:obj:`np.ndarray`): The normalized data. + """ if self.constant: return x @@ -162,9 +295,17 @@ def normalize(self, x): y = 2 * y - 1 return y - def unnormalize(self, x, eps=1e-4): + def unnormalize(self, x: np.ndarray, eps: float = 1e-4) -> np.ndarray: """ - X : [ -1, 1 ] + Overview: + Unnormalize the input data. + + Arguments: + - x (:obj:`np.ndarray`): The data to be unnormalized. + - eps (:obj:`float`): A small value used for numerical stability. Defaults to 1e-4. + + Returns: + - ret (:obj:`np.ndarray`): The unnormalized data. """ # [ -1, 1 ] --> [ 0, 1 ] if self.constant: @@ -174,10 +315,10 @@ def unnormalize(self, x, eps=1e-4): if (x < self.ymin - eps).any() or (x > self.ymax + eps).any(): print( - f'''[ dataset/normalization ] Warning: out of range in unnormalize: ''' - f'''[{x.min()}, {x.max()}] | ''' - f'''x : [{self.xmin}, {self.xmax}] | ''' - f'''y: [{self.ymin}, {self.ymax}]''' + f"""[ dataset/normalization ] Warning: out of range in unnormalize: """ + f"""[{x.min()}, {x.max()}] | """ + f"""x : [{self.xmin}, {self.xmax}] | """ + f"""y: [{self.ymin}, {self.ymax}]""" ) x = np.clip(x, self.ymin, self.ymax) @@ -186,8 +327,21 @@ def unnormalize(self, x, eps=1e-4): return y -def empirical_cdf(sample): - # https://stackoverflow.com/a/33346366 +def empirical_cdf(sample: np.ndarray) -> (np.ndarray, np.ndarray): + """ + Overview: + Compute the empirical cumulative distribution function (CDF) of a given sample. + + Arguments: + - sample (:obj:`np.ndarray`): The input sample for which to compute the empirical CDF. + + Returns: + - quantiles (:obj:`np.ndarray`): The unique values in the sample. + - cumprob (:obj:`np.ndarray`): The cumulative probabilities corresponding to the quantiles. + + References: + - Stack Overflow: https://stackoverflow.com/a/33346366 + """ # find the unique values and their corresponding counts quantiles, counts = np.unique(sample, return_counts=True) @@ -199,28 +353,63 @@ def empirical_cdf(sample): return quantiles, cumprob -def atleast_2d(x): +def atleast_2d(x: np.ndarray) -> np.ndarray: + """ + Overview: + Ensure that the input array has at least two dimensions. + + Arguments: + - x (:obj:`np.ndarray`): The input array. + + Returns: + - ret (:obj:`np.ndarray`): The input array with at least two dimensions. + """ if x.ndim < 2: x = x[:, None] return x class LimitsNormalizer(Normalizer): - ''' - maps [ xmin, xmax ] to [ -1, 1 ] - ''' + """ + Overview: + A class that normalizes and unnormalizes values within specified limits. \ + This class maps values within the range [xmin, xmax] to the range [-1, 1]. + + Interface: + ``__init__``, ``__repr__``, ``normalize``, ``unnormalize``. + """ + + def normalize(self, x: np.ndarray) -> np.ndarray: + """ + Overview: + Normalizes the input values. - def normalize(self, x): + Argments: + - x (:obj:`np.ndarray`): The input values to be normalized. + + Returns: + - ret (:obj:`np.ndarray`): The normalized values. + + """ # [ 0, 1 ] x = (x - self.mins) / (self.maxs - self.mins) # [ -1, 1 ] x = 2 * x - 1 return x - def unnormalize(self, x, eps=1e-4): - ''' - x : [ -1, 1 ] - ''' + def unnormalize(self, x: np.ndarray, eps: float = 1e-4) -> np.ndarray: + """ + Overview: + Unnormalizes the input values. + + Arguments: + - x (:obj:`np.ndarray`): The input values to be unnormalized. + - eps (:obj:`float`): A small value used for clipping. Defaults to 1e-4. + + Returns: + - ret (:obj:`np.ndarray`): The unnormalized values. + + """ if x.max() > 1 + eps or x.min() < -1 - eps: # print(f'[ datasets/mujoco ] Warning: sample out of range | ({x.min():.4f}, {x.max():.4f})') x = np.clip(x, -1, 1)