diff --git a/.yamato/com.unity.ml-agents-pack.yml b/.yamato/com.unity.ml-agents-pack.yml index 5c399a2edd..ac3221a807 100644 --- a/.yamato/com.unity.ml-agents-pack.yml +++ b/.yamato/com.unity.ml-agents-pack.yml @@ -5,7 +5,7 @@ pack: image: package-ci/ubuntu:stable flavor: b1.large commands: - - npm install upm-ci-utils@stable -g --registry https://api.bintray.com/npm/unity/unity-npm + - npm install upm-ci-utils@stable -g --registry https://artifactory.prd.cds.internal.unity3d.com/artifactory/api/npm/upm-npm - upm-ci package pack --package-path com.unity.ml-agents artifacts: packages: diff --git a/.yamato/com.unity.ml-agents-test.yml b/.yamato/com.unity.ml-agents-test.yml index 0658219163..f9720b9110 100644 --- a/.yamato/com.unity.ml-agents-test.yml +++ b/.yamato/com.unity.ml-agents-test.yml @@ -33,7 +33,7 @@ test_{{ platform.name }}_{{ editor.version }}: image: {{ platform.image }} flavor: {{ platform.flavor}} commands: - - npm install upm-ci-utils@stable -g --registry https://api.bintray.com/npm/unity/unity-npm + - npm install upm-ci-utils@stable -g --registry https://artifactory.prd.cds.internal.unity3d.com/artifactory/api/npm/upm-npm - upm-ci package test -u {{ editor.version }} --package-path com.unity.ml-agents {{ editor.coverageOptions }} - python ml-agents/tests/yamato/check_coverage_percent.py upm-ci~/test-results/ {{ editor.minCoveragePct }} artifacts: diff --git a/com.unity.ml-agents/CHANGELOG.md b/com.unity.ml-agents/CHANGELOG.md index 25a00b2e01..9c0501b00c 100755 --- a/com.unity.ml-agents/CHANGELOG.md +++ b/com.unity.ml-agents/CHANGELOG.md @@ -64,6 +64,8 @@ and this project adheres to overwrite the existing files. (#3705) - `StackingSensor` was changed from `internal` visibility to `public` - Updated Barracuda to 0.6.3-preview. + - Model updates can now happen asynchronously with environment steps for better performance. (#3690) + - `num_updates` and `train_interval` for SAC were replaced with `steps_per_update`. (#3690) ### Bug Fixes diff --git a/config/sac_trainer_config.yaml b/config/sac_trainer_config.yaml index 64c02acc02..53a09af73d 100644 --- a/config/sac_trainer_config.yaml +++ b/config/sac_trainer_config.yaml @@ -10,8 +10,7 @@ default: max_steps: 5.0e5 memory_size: 128 normalize: false - num_update: 1 - train_interval: 1 + steps_per_update: 10 num_layers: 2 time_horizon: 64 sequence_length: 64 @@ -30,11 +29,10 @@ FoodCollector: buffer_size: 500000 max_steps: 2.0e6 init_entcoef: 0.05 - train_interval: 1 Bouncer: normalize: true - max_steps: 2.0e6 + max_steps: 1.0e6 num_layers: 2 hidden_units: 64 summary_freq: 20000 @@ -43,7 +41,7 @@ PushBlock: max_steps: 2e6 init_entcoef: 0.05 hidden_units: 256 - summary_freq: 60000 + summary_freq: 100000 time_horizon: 64 num_layers: 2 @@ -159,10 +157,10 @@ CrawlerStatic: normalize: true time_horizon: 1000 batch_size: 256 - train_interval: 2 + steps_per_update: 20 buffer_size: 500000 buffer_init_steps: 2000 - max_steps: 5e6 + max_steps: 3e6 summary_freq: 30000 init_entcoef: 1.0 num_layers: 3 @@ -178,9 +176,9 @@ CrawlerDynamic: batch_size: 256 buffer_size: 500000 summary_freq: 30000 - train_interval: 2 + steps_per_update: 20 num_layers: 3 - max_steps: 1e7 + max_steps: 5e6 hidden_units: 512 reward_signals: extrinsic: @@ -195,7 +193,7 @@ Walker: max_steps: 2e7 summary_freq: 30000 num_layers: 4 - train_interval: 2 + steps_per_update: 30 hidden_units: 512 reward_signals: extrinsic: @@ -208,6 +206,7 @@ Reacher: batch_size: 128 buffer_size: 500000 max_steps: 2e7 + steps_per_update: 20 summary_freq: 60000 Hallway: @@ -216,7 +215,7 @@ Hallway: hidden_units: 128 memory_size: 128 init_entcoef: 0.1 - max_steps: 1.0e7 + max_steps: 5.0e6 summary_freq: 10000 time_horizon: 64 use_recurrent: true diff --git a/docs/Migrating.md b/docs/Migrating.md index e2cdba355c..8bf409182f 100644 --- a/docs/Migrating.md +++ b/docs/Migrating.md @@ -33,6 +33,8 @@ double-check that the versions are in the same. The versions can be found in - The signature of `Agent.Heuristic()` was changed to take a `float[]` as a parameter, instead of returning the array. This was done to prevent a common source of error where users would return arrays of the wrong size. +- `num_updates` and `train_interval` for SAC have been replaced with `steps_per_update`. + ### Steps to Migrate @@ -54,6 +56,8 @@ double-check that the versions are in the same. The versions can be found in - If your Agent class overrides `Heuristic()`, change the signature to `public override void Heuristic(float[] actionsOut)` and assign values to `actionsOut` instead of returning an array. +- Set `steps_per_update` to be around equal to the number of agents in your environment, + times `num_updates` and divided by `train_interval`. ## Migrating from 0.14 to 0.15 diff --git a/docs/Python-API.md b/docs/Python-API.md index 1da4abdf13..751d298ac0 100644 --- a/docs/Python-API.md +++ b/docs/Python-API.md @@ -149,8 +149,6 @@ A `DecisionSteps` has the following fields : `env.step()`). - `reward` is a float vector of length batch size. Corresponds to the rewards collected by each agent since the last simulation step. - - `done` is an array of booleans of length batch size. Is true if the - associated Agent was terminated during the last simulation step. - `agent_id` is an int vector of length batch size containing unique identifier for the corresponding Agent. This is used to track Agents across simulation steps. @@ -174,8 +172,6 @@ A `DecisionStep` has the following fields: (Each array has one less dimension than the arrays in `DecisionSteps`) - `reward` is a float. Corresponds to the rewards collected by the agent since the last simulation step. - - `done` is a bool. Is true if the Agent was terminated during the last - simulation step. - `agent_id` is an int and an unique identifier for the corresponding Agent. - `action_mask` is an optional list of one dimensional array of booleans. Only available in multi-discrete action space type. @@ -197,8 +193,6 @@ A `TerminalSteps` has the following fields : `env.step()`). - `reward` is a float vector of length batch size. Corresponds to the rewards collected by each agent since the last simulation step. - - `done` is an array of booleans of length batch size. Is true if the - associated Agent was terminated during the last simulation step. - `agent_id` is an int vector of length batch size containing unique identifier for the corresponding Agent. This is used to track Agents across simulation steps. @@ -219,8 +213,6 @@ A `TerminalStep` has the following fields: (Each array has one less dimension than the arrays in `TerminalSteps`) - `reward` is a float. Corresponds to the rewards collected by the agent since the last simulation step. - - `done` is a bool. Is true if the Agent was terminated during the last - simulation step. - `agent_id` is an int and an unique identifier for the corresponding Agent. - `max_step` is a bool. Is true if the Agent reached its maximum number of steps during the last simulation step. diff --git a/docs/Training-ML-Agents.md b/docs/Training-ML-Agents.md index 6fa92aba25..cf34a60d25 100644 --- a/docs/Training-ML-Agents.md +++ b/docs/Training-ML-Agents.md @@ -158,10 +158,10 @@ Cloning (Imitation), GAIL = Generative Adversarial Imitation Learning | tau | How aggressively to update the target network used for bootstrapping value estimation in SAC. | SAC | | time_horizon | How many steps of experience to collect per-agent before adding it to the experience buffer. | PPO, SAC | | trainer | The type of training to perform: "ppo", "sac", "offline_bc" or "online_bc". | PPO, SAC | -| train_interval | How often to update the agent. | SAC | -| num_update | Number of mini-batches to update the agent with during each update. | SAC | +| steps_per_update | Ratio of agent steps per mini-batch update. | SAC | | use_recurrent | Train using a recurrent neural network. See [Using Recurrent Neural Networks](Feature-Memory.md). | PPO, SAC | | init_path | Initialize trainer from a previously saved model. | PPO, SAC | +| threaded | Run the trainer in a parallel thread from the environment steps. (Default: true) | PPO, SAC | For specific advice on setting hyperparameters based on the type of training you are conducting, see: diff --git a/docs/Training-PPO.md b/docs/Training-PPO.md index 9fae31066b..5e306a7fb9 100644 --- a/docs/Training-PPO.md +++ b/docs/Training-PPO.md @@ -300,6 +300,15 @@ This option is provided in case you want to initialize different behaviors from in most cases, it is sufficient to use the `--initialize-from` CLI parameter to initialize all models from the same run. +### (Optional) Advanced: Disable Threading + +By default, PPO model updates can happen while the environment is being stepped. This violates the +[on-policy](https://spinningup.openai.com/en/latest/user/algorithms.html#the-on-policy-algorithms) +assumption of PPO slightly in exchange for a 10-20% training speedup. To maintain the +strict on-policyness of PPO, you can disable parallel updates by setting `threaded` to `false`. + +Default Value: `true` + ## Training Statistics To view training statistics, use TensorBoard. For information on launching and diff --git a/docs/Training-SAC.md b/docs/Training-SAC.md index 043cc11cd8..54d1e921fe 100644 --- a/docs/Training-SAC.md +++ b/docs/Training-SAC.md @@ -40,19 +40,18 @@ ML-Agents provides two reward signals by default, the Extrinsic (environment) re Curiosity reward, which can be used to encourage exploration in sparse extrinsic reward environments. -#### Number of Updates for Reward Signal (Optional) +#### Steps Per Update for Reward Signal (Optional) -`reward_signal_num_update` for the reward signals corresponds to the number of mini batches sampled -and used for updating the reward signals during each -update. By default, we update the reward signals once every time the main policy is updated. +`reward_signal_steps_per_update` for the reward signals corresponds to the number of steps per mini batch sampled +and used for updating the reward signals. By default, we update the reward signals once every time the main policy is updated. However, to imitate the training procedure in certain imitation learning papers (e.g. [Kostrikov et. al](http://arxiv.org/abs/1809.02925), [Blondé et. al](http://arxiv.org/abs/1809.02064)), -we may want to update the policy N times, then update the reward signal (GAIL) M times. -We can change `train_interval` and `num_update` of SAC to N, as well as `reward_signal_num_update` -under `reward_signals` to M to accomplish this. By default, `reward_signal_num_update` is set to -`num_update`. +we may want to update the reward signal (GAIL) M times for every update of the policy. +We can change `steps_per_update` of SAC to N, as well as `reward_signal_steps_per_update` +under `reward_signals` to N / M to accomplish this. By default, `reward_signal_steps_per_update` is set to +`steps_per_update`. -Typical Range: `num_update` +Typical Range: `steps_per_update` ### Buffer Size @@ -106,17 +105,22 @@ there may not be any new interesting information between steps, and `train_inter Typical Range: `1` - `5` -### Number of Updates +### Steps Per Update -`num_update` corresponds to the number of mini batches sampled and used for training during each -training event. In SAC, a single "update" corresponds to grabbing a batch of size `batch_size` from the experience -replay buffer, and using this mini batch to update the models. Typically, this can be left at 1. -However, to imitate the training procedure in certain papers (e.g. -[Kostrikov et. al](http://arxiv.org/abs/1809.02925), [Blondé et. al](http://arxiv.org/abs/1809.02064)), -we may want to update N times with different mini batches before grabbing additional samples. -We can change `train_interval` and `num_update` to N to accomplish this. +`steps_per_update` corresponds to the average ratio of agent steps (actions) taken to updates made of the agent's +policy. In SAC, a single "update" corresponds to grabbing a batch of size `batch_size` from the experience +replay buffer, and using this mini batch to update the models. Note that it is not guaranteed that after +exactly `steps_per_update` steps an update will be made, only that the ratio will hold true over many steps. + +Typically, `steps_per_update` should be greater than or equal to 1. Note that setting `steps_per_update` lower will +improve sample efficiency (reduce the number of steps required to train) +but increase the CPU time spent performing updates. For most environments where steps are fairly fast (e.g. our example +environments) `steps_per_update` equal to the number of agents in the scene is a good balance. +For slow environments (steps take 0.1 seconds or more) reducing `steps_per_update` may improve training speed. +We can also change `steps_per_update` to lower than 1 to update more often than once per step, though this will +usually result in a slowdown unless the environment is very slow. -Typical Range: `1` +Typical Range: `1` - `20` ### Tau diff --git a/ml-agents/mlagents/trainers/agent_processor.py b/ml-agents/mlagents/trainers/agent_processor.py index edacc4da32..40fefac3be 100644 --- a/ml-agents/mlagents/trainers/agent_processor.py +++ b/ml-agents/mlagents/trainers/agent_processor.py @@ -1,6 +1,7 @@ import sys -from typing import List, Dict, Deque, TypeVar, Generic, Tuple, Any, Union -from collections import defaultdict, Counter, deque +from typing import List, Dict, TypeVar, Generic, Tuple, Any, Union +from collections import defaultdict, Counter +import queue from mlagents_envs.base_env import ( DecisionSteps, @@ -229,26 +230,53 @@ class Empty(Exception): pass - def __init__(self, behavior_id: str, maxlen: int = 1000): + def __init__(self, behavior_id: str, maxlen: int = 20): """ Initializes an AgentManagerQueue. Note that we can give it a behavior_id so that it can be identified separately from an AgentManager. """ - self.maxlen: int = maxlen - self.queue: Deque[T] = deque(maxlen=self.maxlen) - self.behavior_id = behavior_id + self._maxlen: int = maxlen + self._queue: queue.Queue = queue.Queue(maxsize=maxlen) + self._behavior_id = behavior_id + + @property + def maxlen(self): + """ + The maximum length of the queue. + :return: Maximum length of the queue. + """ + return self._maxlen + + @property + def behavior_id(self): + """ + The Behavior ID of this queue. + :return: Behavior ID associated with the queue. + """ + return self._behavior_id + + def qsize(self) -> int: + """ + Returns the approximate size of the queue. Note that values may differ + depending on the underlying queue implementation. + """ + return self._queue.qsize() def empty(self) -> bool: - return len(self.queue) == 0 + return self._queue.empty() def get_nowait(self) -> T: + """ + Gets the next item from the queue, throwing an AgentManagerQueue.Empty exception + if the queue is empty. + """ try: - return self.queue.popleft() - except IndexError: + return self._queue.get_nowait() + except queue.Empty: raise self.Empty("The AgentManagerQueue is empty.") def put(self, item: T) -> None: - self.queue.append(item) + self._queue.put(item) class AgentManager(AgentProcessor): @@ -268,8 +296,10 @@ def __init__( self.trajectory_queue: AgentManagerQueue[Trajectory] = AgentManagerQueue( self.behavior_id ) + # NOTE: we make policy queues of infinite length to avoid lockups of the trainers. + # In the environment manager, we make sure to empty the policy queue before continuing to produce steps. self.policy_queue: AgentManagerQueue[Policy] = AgentManagerQueue( - self.behavior_id + self.behavior_id, maxlen=0 ) self.publish_trajectory_queue(self.trajectory_queue) diff --git a/ml-agents/mlagents/trainers/env_manager.py b/ml-agents/mlagents/trainers/env_manager.py index b8f6771e29..bd0958279c 100644 --- a/ml-agents/mlagents/trainers/env_manager.py +++ b/ml-agents/mlagents/trainers/env_manager.py @@ -88,13 +88,17 @@ def advance(self): if self.first_step_infos is not None: self._process_step_infos(self.first_step_infos) self.first_step_infos = None - # Get new policies if found + # Get new policies if found. Always get the latest policy. for brain_name in self.external_brains: + _policy = None try: - _policy = self.agent_managers[brain_name].policy_queue.get_nowait() - self.set_policy(brain_name, _policy) + # We make sure to empty the policy queue before continuing to produce steps. + # This halts the trainers until the policy queue is empty. + while True: + _policy = self.agent_managers[brain_name].policy_queue.get_nowait() except AgentManagerQueue.Empty: - pass + if _policy is not None: + self.set_policy(brain_name, _policy) # Step the environment new_step_infos = self._step() # Add to AgentProcessor diff --git a/ml-agents/mlagents/trainers/ghost/trainer.py b/ml-agents/mlagents/trainers/ghost/trainer.py index efb0423798..5de4d22832 100644 --- a/ml-agents/mlagents/trainers/ghost/trainer.py +++ b/ml-agents/mlagents/trainers/ghost/trainer.py @@ -223,7 +223,7 @@ def advance(self) -> None: # We grab at most the maximum length of the queue. # This ensures that even if the queue is being filled faster than it is # being emptied, the trajectories in the queue are on-policy. - for _ in range(trajectory_queue.maxlen): + for _ in range(trajectory_queue.qsize()): t = trajectory_queue.get_nowait() # adds to wrapped trainers queue internal_trajectory_queue.put(t) @@ -233,7 +233,7 @@ def advance(self) -> None: else: # Dump trajectories from non-learning policy try: - for _ in range(trajectory_queue.maxlen): + for _ in range(trajectory_queue.qsize()): t = trajectory_queue.get_nowait() # count ghost steps self.ghost_step += len(t.steps) diff --git a/ml-agents/mlagents/trainers/ppo/trainer.py b/ml-agents/mlagents/trainers/ppo/trainer.py index 3538de231e..81ed6ffab7 100644 --- a/ml-agents/mlagents/trainers/ppo/trainer.py +++ b/ml-agents/mlagents/trainers/ppo/trainer.py @@ -219,6 +219,7 @@ def _update_policy(self): for stat, val in update_stats.items(): self._stats_reporter.add_stat(stat, val) self._clear_update_buffer() + return True def create_policy( self, parsed_behavior_id: BehaviorIdentifiers, brain_parameters: BrainParameters diff --git a/ml-agents/mlagents/trainers/sac/trainer.py b/ml-agents/mlagents/trainers/sac/trainer.py index 286d5be4a4..7c6fcfd777 100644 --- a/ml-agents/mlagents/trainers/sac/trainer.py +++ b/ml-agents/mlagents/trainers/sac/trainer.py @@ -24,6 +24,7 @@ logger = get_logger(__name__) BUFFER_TRUNCATE_PERCENT = 0.8 +DEFAULT_STEPS_PER_UPDATE = 1 class SACTrainer(RLTrainer): @@ -64,9 +65,9 @@ def __init__( "init_entcoef", "max_steps", "normalize", - "num_update", "num_layers", "time_horizon", + "steps_per_update", "sequence_length", "summary_freq", "tau", @@ -84,15 +85,22 @@ def __init__( self.optimizer: SACOptimizer = None # type: ignore self.step = 0 - self.train_interval = ( - trainer_parameters["train_interval"] - if "train_interval" in trainer_parameters - else 1 + + # Don't count buffer_init_steps in steps_per_update ratio, but also don't divide-by-0 + self.update_steps = max(1, self.trainer_parameters["buffer_init_steps"]) + self.reward_signal_update_steps = max( + 1, self.trainer_parameters["buffer_init_steps"] + ) + + self.steps_per_update = ( + trainer_parameters["steps_per_update"] + if "steps_per_update" in trainer_parameters + else DEFAULT_STEPS_PER_UPDATE ) - self.reward_signal_updates_per_train = ( - trainer_parameters["reward_signals"]["reward_signal_num_update"] - if "reward_signal_num_update" in trainer_parameters["reward_signals"] - else trainer_parameters["num_update"] + self.reward_signal_steps_per_update = ( + trainer_parameters["reward_signals"]["reward_signal_steps_per_update"] + if "reward_signal_steps_per_update" in trainer_parameters["reward_signals"] + else self.steps_per_update ) self.checkpoint_replay_buffer = ( @@ -207,7 +215,7 @@ def _process_trajectory(self, trajectory: Trajectory) -> None: def _is_ready_update(self) -> bool: """ Returns whether or not the trainer has enough elements to run update model - :return: A boolean corresponding to whether or not update_model() can be run + :return: A boolean corresponding to whether or not _update_policy() can be run """ return ( self.update_buffer.num_experiences >= self.trainer_parameters["batch_size"] @@ -215,14 +223,16 @@ def _is_ready_update(self) -> bool: ) @timed - def _update_policy(self) -> None: + def _update_policy(self) -> bool: """ - If train_interval is met, update the SAC policy given the current reward signals. - If reward_signal_train_interval is met, update the reward signals from the buffer. + Update the SAC policy and reward signals. The reward signal generators are updated using different mini batches. + By default we imitate http://arxiv.org/abs/1809.02925 and similar papers, where the policy is updated + N times, then the reward signals are updated N times. + :return: Whether or not the policy was updated. """ - if self.step % self.train_interval == 0: - self.update_sac_policy() - self.update_reward_signals() + policy_was_updated = self._update_sac_policy() + self._update_reward_signals() + return policy_was_updated def create_policy( self, parsed_behavior_id: BehaviorIdentifiers, brain_parameters: BrainParameters @@ -253,23 +263,19 @@ def create_policy( return policy - def update_sac_policy(self) -> None: + def _update_sac_policy(self) -> bool: """ - Uses demonstration_buffer to update the policy. - The reward signal generators are updated using different mini batches. - If we want to imitate http://arxiv.org/abs/1809.02925 and similar papers, where the policy is updated - N times, then the reward signals are updated N times, then reward_signal_updates_per_train - is greater than 1 and the reward signals are not updated in parallel. + Uses update_buffer to update the policy. We sample the update_buffer and update + until the steps_per_update ratio is met. """ - + has_updated = False self.cumulative_returns_since_policy_update.clear() n_sequences = max( int(self.trainer_parameters["batch_size"] / self.policy.sequence_length), 1 ) - num_updates = self.trainer_parameters["num_update"] batch_update_stats: Dict[str, list] = defaultdict(list) - for _ in range(num_updates): + while self.step / self.update_steps > self.steps_per_update: logger.debug("Updating SAC policy at step {}".format(self.step)) buffer = self.update_buffer if ( @@ -290,22 +296,26 @@ def update_sac_policy(self) -> None: for stat_name, value in update_stats.items(): batch_update_stats[stat_name].append(value) + self.update_steps += 1 + + for stat, stat_list in batch_update_stats.items(): + self._stats_reporter.add_stat(stat, np.mean(stat_list)) + has_updated = True + + if self.optimizer.bc_module: + update_stats = self.optimizer.bc_module.update() + for stat, val in update_stats.items(): + self._stats_reporter.add_stat(stat, val) + # Truncate update buffer if neccessary. Truncate more than we need to to avoid truncating # a large buffer at each update. if self.update_buffer.num_experiences > self.trainer_parameters["buffer_size"]: self.update_buffer.truncate( int(self.trainer_parameters["buffer_size"] * BUFFER_TRUNCATE_PERCENT) ) + return has_updated - for stat, stat_list in batch_update_stats.items(): - self._stats_reporter.add_stat(stat, np.mean(stat_list)) - - if self.optimizer.bc_module: - update_stats = self.optimizer.bc_module.update() - for stat, val in update_stats.items(): - self._stats_reporter.add_stat(stat, val) - - def update_reward_signals(self) -> None: + def _update_reward_signals(self) -> None: """ Iterate through the reward signals and update them. Unlike in PPO, do it separate from the policy so that it can be done at a different @@ -316,12 +326,14 @@ def update_reward_signals(self) -> None: and policy are updated in parallel. """ buffer = self.update_buffer - num_updates = self.reward_signal_updates_per_train n_sequences = max( int(self.trainer_parameters["batch_size"] / self.policy.sequence_length), 1 ) batch_update_stats: Dict[str, list] = defaultdict(list) - for _ in range(num_updates): + while ( + self.step / self.reward_signal_update_steps + > self.reward_signal_steps_per_update + ): # Get minibatches for reward signal update if needed reward_signal_minibatches = {} for name, signal in self.optimizer.reward_signals.items(): @@ -337,8 +349,10 @@ def update_reward_signals(self) -> None: ) for stat_name, value in update_stats.items(): batch_update_stats[stat_name].append(value) - for stat, stat_list in batch_update_stats.items(): - self._stats_reporter.add_stat(stat, np.mean(stat_list)) + self.reward_signal_update_steps += 1 + + for stat, stat_list in batch_update_stats.items(): + self._stats_reporter.add_stat(stat, np.mean(stat_list)) def add_policy( self, parsed_behavior_id: BehaviorIdentifiers, policy: TFPolicy diff --git a/ml-agents/mlagents/trainers/tests/test_reward_signals.py b/ml-agents/mlagents/trainers/tests/test_reward_signals.py index 0c07125003..db6203dbcb 100644 --- a/ml-agents/mlagents/trainers/tests/test_reward_signals.py +++ b/ml-agents/mlagents/trainers/tests/test_reward_signals.py @@ -48,8 +48,7 @@ def sac_dummy_config(): max_steps: 5.0e4 memory_size: 256 normalize: false - num_update: 1 - train_interval: 1 + steps_per_update: 1 num_layers: 2 time_horizon: 64 sequence_length: 64 diff --git a/ml-agents/mlagents/trainers/tests/test_rl_trainer.py b/ml-agents/mlagents/trainers/tests/test_rl_trainer.py index 0767d69760..5484976743 100644 --- a/ml-agents/mlagents/trainers/tests/test_rl_trainer.py +++ b/ml-agents/mlagents/trainers/tests/test_rl_trainer.py @@ -1,5 +1,6 @@ import yaml from unittest import mock +import pytest import mlagents.trainers.tests.mock_brain as mb from mlagents.trainers.trainer.rl_trainer import RLTrainer from mlagents.trainers.tests.test_buffer import construct_fake_buffer @@ -32,6 +33,9 @@ def create_mock_brain(): # Add concrete implementations of abstract methods class FakeTrainer(RLTrainer): + def set_is_policy_updating(self, is_updating): + self.update_policy = is_updating + def get_policy(self, name_behavior_id): return mock.Mock() @@ -39,7 +43,7 @@ def _is_ready_update(self): return True def _update_policy(self): - pass + return self.update_policy def add_policy(self): pass @@ -54,6 +58,7 @@ def _process_trajectory(self, trajectory): def create_rl_trainer(): mock_brainparams = create_mock_brain() trainer = FakeTrainer(mock_brainparams, dummy_config(), True, 0) + trainer.set_is_policy_updating(True) return trainer @@ -80,8 +85,10 @@ def test_clear_update_buffer(): def test_advance(mocked_clear_update_buffer): trainer = create_rl_trainer() trajectory_queue = AgentManagerQueue("testbrain") + policy_queue = AgentManagerQueue("testbrain") trainer.subscribe_trajectory_queue(trajectory_queue) - time_horizon = 15 + trainer.publish_policy_queue(policy_queue) + time_horizon = 10 trajectory = mb.make_fake_trajectory( length=time_horizon, max_step_complete=True, @@ -92,12 +99,24 @@ def test_advance(mocked_clear_update_buffer): trajectory_queue.put(trajectory) trainer.advance() + policy_queue.get_nowait() # Check that get_step is correct assert trainer.get_step == time_horizon # Check that we can turn off the trainer and that the buffer is cleared + for _ in range(0, 5): + trajectory_queue.put(trajectory) + trainer.advance() + # Check that there is stuff in the policy queue + policy_queue.get_nowait() + + # Check that if the policy doesn't update, we don't push it to the queue + trainer.set_is_policy_updating(False) for _ in range(0, 10): trajectory_queue.put(trajectory) trainer.advance() + # Check that there nothing in the policy queue + with pytest.raises(AgentManagerQueue.Empty): + policy_queue.get_nowait() # Check that the buffer has been cleared assert not trainer.should_still_train diff --git a/ml-agents/mlagents/trainers/tests/test_sac.py b/ml-agents/mlagents/trainers/tests/test_sac.py index 2c24a6b2ae..2f35a8c0bd 100644 --- a/ml-agents/mlagents/trainers/tests/test_sac.py +++ b/ml-agents/mlagents/trainers/tests/test_sac.py @@ -20,7 +20,7 @@ def dummy_config(): return yaml.safe_load( """ trainer: sac - batch_size: 32 + batch_size: 8 buffer_size: 10240 buffer_init_steps: 0 hidden_units: 32 @@ -29,8 +29,7 @@ def dummy_config(): max_steps: 1024 memory_size: 10 normalize: true - num_update: 1 - train_interval: 1 + steps_per_update: 1 num_layers: 1 time_horizon: 64 sequence_length: 16 @@ -175,18 +174,21 @@ def test_add_get_policy(sac_optimizer, dummy_config): trainer.add_policy(brain_params, policy) -def test_process_trajectory(dummy_config): +def test_advance(dummy_config): brain_params = make_brain_parameters( discrete_action=False, visual_inputs=0, vec_obs_size=6 ) dummy_config["summary_path"] = "./summaries/test_trainer_summary" dummy_config["model_path"] = "./models/test_trainer_models/TestModel" + dummy_config["steps_per_update"] = 20 trainer = SACTrainer(brain_params, 0, dummy_config, True, False, 0, "0") policy = trainer.create_policy(brain_params.brain_name, brain_params) trainer.add_policy(brain_params.brain_name, policy) trajectory_queue = AgentManagerQueue("testbrain") + policy_queue = AgentManagerQueue("testbrain") trainer.subscribe_trajectory_queue(trajectory_queue) + trainer.publish_policy_queue(policy_queue) trajectory = make_fake_trajectory( length=15, @@ -194,6 +196,7 @@ def test_process_trajectory(dummy_config): vec_obs_size=6, num_vis_obs=0, action_space=[2], + is_discrete=False, ) trajectory_queue.put(trajectory) trainer.advance() @@ -208,11 +211,12 @@ def test_process_trajectory(dummy_config): # Add a terminal trajectory trajectory = make_fake_trajectory( - length=15, + length=6, max_step_complete=False, vec_obs_size=6, num_vis_obs=0, action_space=[2], + is_discrete=False, ) trajectory_queue.put(trajectory) trainer.advance() @@ -227,6 +231,24 @@ def test_process_trajectory(dummy_config): trainer.stats_reporter.get_stats_summaries("Policy/Extrinsic Reward").mean > 0 ) + # Make sure there is a policy on the queue + policy_queue.get_nowait() + + # Add another trajectory. Since this is less than 20 steps total (enough for) + # two updates, there should NOT be a policy on the queue. + trajectory = make_fake_trajectory( + length=5, + max_step_complete=False, + vec_obs_size=6, + num_vis_obs=0, + action_space=[2], + is_discrete=False, + ) + trajectory_queue.put(trajectory) + trainer.advance() + with pytest.raises(AgentManagerQueue.Empty): + policy_queue.get_nowait() + def test_bad_config(dummy_config): brain_params = make_brain_parameters( diff --git a/ml-agents/mlagents/trainers/tests/test_simple_rl.py b/ml-agents/mlagents/trainers/tests/test_simple_rl.py index 9b73be1196..0714e061ad 100644 --- a/ml-agents/mlagents/trainers/tests/test_simple_rl.py +++ b/ml-agents/mlagents/trainers/tests/test_simple_rl.py @@ -45,6 +45,7 @@ sequence_length: 64 summary_freq: 500 use_recurrent: false + threaded: false reward_signals: extrinsic: strength: 1.0 @@ -55,7 +56,7 @@ {BRAIN_NAME}: trainer: sac batch_size: 8 - buffer_size: 500 + buffer_size: 5000 buffer_init_steps: 100 hidden_units: 16 init_entcoef: 0.01 @@ -63,8 +64,7 @@ max_steps: 1000 memory_size: 16 normalize: false - num_update: 1 - train_interval: 1 + steps_per_update: 1 num_layers: 1 time_horizon: 64 sequence_length: 32 @@ -74,6 +74,7 @@ curiosity_enc_size: 128 demo_path: None vis_encode_type: simple + threaded: false reward_signals: extrinsic: strength: 1.0 @@ -138,6 +139,8 @@ def _check_environment_trains( StatsReporter.writers.clear() # Clear StatsReporters so we don't write to file debug_writer = DebugWriter() StatsReporter.add_writer(debug_writer) + # Make sure threading is turned off for determinism + trainer_config["threading"] = False if env_manager is None: env_manager = SimpleEnvManager(env, FloatPropertiesChannel()) trainer_factory = TrainerFactory( @@ -306,9 +309,10 @@ def test_recurrent_sac(use_discrete): override_vals = { "batch_size": 64, "use_recurrent": True, - "max_steps": 3000, + "max_steps": 5000, "learning_rate": 1e-3, "buffer_init_steps": 500, + "steps_per_update": 2, } config = generate_config(SAC_CONFIG, override_vals) _check_environment_trains(env, config) diff --git a/ml-agents/mlagents/trainers/tests/test_subprocess_env_manager.py b/ml-agents/mlagents/trainers/tests/test_subprocess_env_manager.py index 3d4492530a..c66c9343e2 100644 --- a/ml-agents/mlagents/trainers/tests/test_subprocess_env_manager.py +++ b/ml-agents/mlagents/trainers/tests/test_subprocess_env_manager.py @@ -17,6 +17,7 @@ from mlagents_envs.exception import UnityEnvironmentException from mlagents.trainers.tests.simple_test_envs import SimpleEnvironment from mlagents.trainers.stats import StatsReporter +from mlagents.trainers.agent_processor import AgentManagerQueue from mlagents.trainers.tests.test_simple_rl import ( _check_environment_trains, PPO_CONFIG, @@ -151,6 +152,12 @@ def test_advance(self, mock_create_worker, external_brains_mock, step_mock): ) external_brains_mock.return_value = [brain_name] agent_manager_mock = mock.Mock() + mock_policy = mock.Mock() + agent_manager_mock.policy_queue.get_nowait.side_effect = [ + mock_policy, + mock_policy, + AgentManagerQueue.Empty(), + ] env_manager.set_agent_manager(brain_name, agent_manager_mock) step_info_dict = {brain_name: (Mock(), Mock())} @@ -173,9 +180,6 @@ def test_advance(self, mock_create_worker, external_brains_mock, step_mock): ) # Test policy queue - mock_policy = mock.Mock() - agent_manager_mock.policy_queue.get_nowait.return_value = mock_policy - env_manager.advance() assert env_manager.policies[brain_name] == mock_policy assert agent_manager_mock.policy == mock_policy @@ -189,7 +193,7 @@ def simple_env_factory(worker_id, config): env_manager = SubprocessEnvManager( simple_env_factory, EngineConfig.default_config(), num_envs ) - trainer_config = generate_config(PPO_CONFIG) + trainer_config = generate_config(PPO_CONFIG, override_vals={"max_steps": 5000}) # Run PPO using env_manager _check_environment_trains( simple_env_factory(0, []), diff --git a/ml-agents/mlagents/trainers/tests/test_trainer_controller.py b/ml-agents/mlagents/trainers/tests/test_trainer_controller.py index c151ed99e0..bb253d4860 100644 --- a/ml-agents/mlagents/trainers/tests/test_trainer_controller.py +++ b/ml-agents/mlagents/trainers/tests/test_trainer_controller.py @@ -147,4 +147,5 @@ def test_advance_adds_experiences_to_trainer_and_trains( env_mock.reset.assert_not_called() env_mock.advance.assert_called_once() - trainer_mock.advance.assert_called_once() + # May have been called many times due to thread + trainer_mock.advance.call_count > 0 diff --git a/ml-agents/mlagents/trainers/trainer/rl_trainer.py b/ml-agents/mlagents/trainers/trainer/rl_trainer.py index a4e6d599ce..6419f22fea 100644 --- a/ml-agents/mlagents/trainers/trainer/rl_trainer.py +++ b/ml-agents/mlagents/trainers/trainer/rl_trainer.py @@ -83,9 +83,10 @@ def _is_ready_update(self): return False @abc.abstractmethod - def _update_policy(self): + def _update_policy(self) -> bool: """ Uses demonstration_buffer to update model. + :return: Whether or not the policy was updated. """ pass @@ -134,13 +135,14 @@ def _maybe_write_summary(self, step_after_process: int) -> None: def advance(self) -> None: """ Steps the trainer, taking in trajectories and updates if ready. + Will block and wait briefly if there are no trajectories. """ with hierarchical_timer("process_trajectory"): for traj_queue in self.trajectory_queues: # We grab at most the maximum length of the queue. # This ensures that even if the queue is being filled faster than it is # being emptied, the trajectories in the queue are on-policy. - for _ in range(traj_queue.maxlen): + for _ in range(traj_queue.qsize()): try: t = traj_queue.get_nowait() self._process_trajectory(t) @@ -149,9 +151,9 @@ def advance(self) -> None: if self.should_still_train: if self._is_ready_update(): with hierarchical_timer("_update_policy"): - self._update_policy() - for q in self.policy_queues: - # Get policies that correspond to the policy queue in question - q.put(self.get_policy(q.behavior_id)) + if self._update_policy(): + for q in self.policy_queues: + # Get policies that correspond to the policy queue in question + q.put(self.get_policy(q.behavior_id)) else: self._clear_update_buffer() diff --git a/ml-agents/mlagents/trainers/trainer/trainer.py b/ml-agents/mlagents/trainers/trainer/trainer.py index f5d97969fa..3128aee081 100644 --- a/ml-agents/mlagents/trainers/trainer/trainer.py +++ b/ml-agents/mlagents/trainers/trainer/trainer.py @@ -43,6 +43,7 @@ def __init__( self.run_id = run_id self.trainer_parameters = trainer_parameters self.summary_path = trainer_parameters["summary_path"] + self._threaded = trainer_parameters.get("threaded", True) self._stats_reporter = StatsReporter(self.summary_path) self.is_training = training self._reward_buffer: Deque[float] = deque(maxlen=reward_buff_cap) @@ -90,6 +91,15 @@ def get_step(self) -> int: """ return self.step + @property + def threaded(self) -> bool: + """ + Whether or not to run the trainer in a thread. True allows the trainer to + update the policy while the environment is taking steps. Set to False to + enforce strict on-policy updates (i.e. don't update the policy when taking steps.) + """ + return self._threaded + @property def should_still_train(self) -> bool: """ diff --git a/ml-agents/mlagents/trainers/trainer_controller.py b/ml-agents/mlagents/trainers/trainer_controller.py index 19b8671c3f..70609ca34c 100644 --- a/ml-agents/mlagents/trainers/trainer_controller.py +++ b/ml-agents/mlagents/trainers/trainer_controller.py @@ -4,7 +4,8 @@ import os import sys -from typing import Dict, Optional, Set +import threading +from typing import Dict, Optional, Set, List from collections import defaultdict import numpy as np @@ -49,6 +50,7 @@ def __init__( :param training_seed: Seed to use for Numpy and Tensorflow random number generation. :param sampler_manager: SamplerManager object handles samplers for resampling the reset parameters. :param resampling_interval: Specifies number of simulation steps after which reset parameters are resampled. + :param threaded: Whether or not to run trainers in a separate thread. Disable for testing/debugging. """ self.trainers: Dict[str, Trainer] = {} self.brain_name_to_identifier: Dict[str, Set] = defaultdict(set) @@ -62,6 +64,9 @@ def __init__( self.meta_curriculum = meta_curriculum self.sampler_manager = sampler_manager self.resampling_interval = resampling_interval + + self.trainer_threads: List[threading.Thread] = [] + self.kill_trainers = False np.random.seed(training_seed) tf.set_random_seed(training_seed) @@ -181,6 +186,13 @@ def _create_trainer_and_manager( trainer.publish_policy_queue(agent_manager.policy_queue) trainer.subscribe_trajectory_queue(agent_manager.trajectory_queue) + if trainer.threaded: + # Start trainer thread + trainerthread = threading.Thread( + target=self.trainer_update_func, args=(trainer,), daemon=True + ) + trainerthread.start() + self.trainer_threads.append(trainerthread) def _create_trainers_and_managers( self, env_manager: EnvManager, behavior_ids: Set[str] @@ -208,7 +220,8 @@ def start_learning(self, env_manager: EnvManager) -> None: self.reset_env_if_ready(env_manager, global_step) if self._should_save_model(global_step): self._save_model() - + # Stop advancing trainers + self.kill_trainers = True # Final save Tensorflow model if global_step != 0 and self.train_model: self._save_model() @@ -217,6 +230,7 @@ def start_learning(self, env_manager: EnvManager) -> None: UnityCommunicationException, UnityEnvironmentException, ) as ex: + self.kill_trainers = True if self.train_model: self._save_model_when_interrupted() @@ -282,9 +296,14 @@ def advance(self, env: EnvManager) -> int: "Environment/Lesson", curr.lesson_num ) - # Advance trainers. This can be done in a separate loop in the future. - with hierarchical_timer("trainer_advance"): - for trainer in self.trainers.values(): - trainer.advance() + for trainer in self.trainers.values(): + if not trainer.threaded: + with hierarchical_timer("trainer_advance"): + trainer.advance() return num_steps + + def trainer_update_func(self, trainer: Trainer) -> None: + while not self.kill_trainers: + with hierarchical_timer("trainer_advance"): + trainer.advance()