Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RLlib] Add "shuffle batch per epoch" option. #47458

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions rllib/algorithms/algorithm_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,13 @@ def __init__(self, algo_class: Optional[type] = None):
# Simple logic for now: If None, use `train_batch_size`.
self.train_batch_size_per_learner = None
self.train_batch_size = 32 # @OldAPIStack

# These setting have been adopted from the original PPO batch settings:
# num_sgd_iter, minibatch_size, and shuffle_sequences.
self.num_epochs = 1
self.minibatch_size = None
self.shuffle_batch_per_epoch = False

# TODO (sven): Unsolved problem with RLModules sometimes requiring settings from
# the main AlgorithmConfig. We should not require the user to provide those
# settings in both, the AlgorithmConfig (as property) AND the model config
Expand Down Expand Up @@ -2045,6 +2052,9 @@ def training(
grad_clip_by: Optional[str] = NotProvided,
train_batch_size: Optional[int] = NotProvided,
train_batch_size_per_learner: Optional[int] = NotProvided,
num_epochs: Optional[int] = NotProvided,
minibatch_size: Optional[int] = NotProvided,
shuffle_batch_per_epoch: Optional[bool] = NotProvided,
model: Optional[dict] = NotProvided,
optimizer: Optional[dict] = NotProvided,
max_requests_in_flight_per_sampler_worker: Optional[int] = NotProvided,
Expand Down Expand Up @@ -2103,6 +2113,15 @@ def training(
stack, this setting should no longer be used. Instead, use
`train_batch_size_per_learner` (in combination with
`num_learners`).
num_epochs: The number of complete passes over the entire train batch (per
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome! For Offline RL we might want to add here that an epoch might loop over the entire dataset?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add!

Learner). Each pass might be further split into n minibatches (if
`minibatch_size` provided).
minibatch_size: The size of minibatches to use to further split the train
batch into.
shuffle_batch_per_epoch: Whether to shuffle the train batch once per epoch.
If the train batch has a time rank (axis=1), shuffling will only take
place along the batch axis to not disturb any intact (episode)
trajectories.
model: Arguments passed into the policy model. See models/catalog.py for a
full list of the available model options.
TODO: Provide ModelConfig objects instead of dicts.
Expand Down Expand Up @@ -2166,6 +2185,13 @@ def training(
self.train_batch_size_per_learner = train_batch_size_per_learner
if train_batch_size is not NotProvided:
self.train_batch_size = train_batch_size
if num_epochs is not NotProvided:
self.num_epochs = num_epochs
if minibatch_size is not NotProvided:
self.minibatch_size = minibatch_size
if shuffle_batch_per_epoch is not NotProvided:
self.shuffle_batch_per_epoch = shuffle_batch_per_epoch

if model is not NotProvided:
self.model.update(model)
if (
Expand Down
23 changes: 13 additions & 10 deletions rllib/algorithms/appo/appo.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,22 +98,19 @@ def __init__(self, algo_class=None):
self.use_kl_loss = False
self.kl_coeff = 1.0
self.kl_target = 0.01
# TODO (sven): Activate once v-trace sequences in non-RNN batch are solved.
# If we switch this on right now, the shuffling would destroy the rollout
# sequences (non-zero-padded!) needed in the batch for v-trace.
# self.shuffle_batch_per_epoch = True

# Override some of IMPALAConfig's default values with APPO-specific values.
self.num_env_runners = 2
self.min_time_s_per_iteration = 10
self.num_gpus = 0
self.num_multi_gpu_tower_stacks = 1
self.minibatch_buffer_size = 1
self.num_sgd_iter = 1
self.target_network_update_freq = 1
self.replay_proportion = 0.0
self.replay_buffer_num_slots = 100
self.learner_queue_size = 16
self.learner_queue_timeout = 300
self.max_sample_requests_in_flight_per_worker = 2
self.broadcast_interval = 1

self.grad_clip = 40.0
# Note: Only when using enable_rl_module_and_learner=True can the clipping mode
# be configured by the user. On the old API stack, RLlib will always clip by
Expand All @@ -140,6 +137,12 @@ def __init__(self, algo_class=None):
# Add constructor kwargs here (if any).
}

self.num_gpus = 0 # @OldAPIStack
self.num_multi_gpu_tower_stacks = 1 # @OldAPIStack
self.minibatch_buffer_size = 1 # @OldAPIStack
self.replay_proportion = 0.0 # @OldAPIStack
self.replay_buffer_num_slots = 100 # @OldAPIStack

# __sphinx_doc_end__
# fmt: on

Expand Down Expand Up @@ -185,7 +188,7 @@ def training(
target_network_update_freq: The frequency to update the target policy and
tune the kl loss coefficients that are used during training. After
setting this parameter, the algorithm waits for at least
`target_network_update_freq * minibatch_size * num_sgd_iter` number of
`target_network_update_freq * minibatch_size * num_epochs` number of
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might not completely understand this, but isn't minibatch_size the size of a minibatch and not necessarily the number of minibatches per epoch?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, maybe this means the number of samples that have been trained on until we update the target networks, correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wait, great catch. I think this comment here is incorrect.
When we update e.g. PPO with a batch of 4000, the num_env_steps_trained_lifetime counter only(!) gets increased by that 4000, and NOT by: num_epochs * 4000. So for APPO here, this is also wrong. Will fix the comment and clarify.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

samples to be trained on by the learner group before updating the target
networks and tuned the kl loss coefficients that are used during
training.
Expand Down Expand Up @@ -292,7 +295,7 @@ def training_step(self) -> ResultDict:

# Update the target network and the KL coefficient for the APPO-loss.
# The target network update frequency is calculated automatically by the product
# of `num_sgd_iter` setting (usually 1 for APPO) and `minibatch_buffer_size`.
# of `num_epochs` setting (usually 1 for APPO) and `minibatch_buffer_size`.
if self.config.enable_rl_module_and_learner:
if NUM_TARGET_UPDATES in train_results:
self._counters[NUM_TARGET_UPDATES] += train_results[NUM_TARGET_UPDATES]
Expand All @@ -309,7 +312,7 @@ def training_step(self) -> ResultDict:
)
]
target_update_freq = (
self.config.num_sgd_iter * self.config.minibatch_buffer_size
self.config.num_epochs * self.config.minibatch_buffer_size
)
if cur_ts - last_update > target_update_freq:
self._counters[NUM_TARGET_UPDATES] += 1
Expand Down
6 changes: 3 additions & 3 deletions rllib/algorithms/appo/appo_learner.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,14 @@ def after_gradient_based_update(self, *, timesteps: Dict[str, Any]) -> None:
# TODO (avnish) Using steps trained here instead of sampled ... I'm not sure
# why the other implementation uses sampled.
# The difference in steps sampled/trained is pretty
# much always going to be larger than self.config.num_sgd_iter *
# much always going to be larger than self.config.num_epochs *
# self.config.minibatch_buffer_size unless the number of steps collected
# is really small. The thing is that the default rollout fragment length
# is 50, so the minibatch buffer size * num_sgd_iter is going to be
# is 50, so the minibatch buffer size * num_epochs is going to be
# have to be 50 to even meet the threshold of having delayed target
# updates.
# We should instead have the target / kl threshold update be based off
# of the train_batch_size * some target update frequency * num_sgd_iter.
# of the train_batch_size * some target update frequency * num_epochs.

last_update_ts_key = (module_id, LAST_TARGET_UPDATE_TS)
if timestep - self.metrics.peek(
Expand Down
51 changes: 14 additions & 37 deletions rllib/algorithms/impala/impala.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ def __init__(self, algo_class=None):
self.vtrace_clip_pg_rho_threshold = 1.0
self.num_multi_gpu_tower_stacks = 1 # @OldAPIstack
self.minibatch_buffer_size = 1 # @OldAPIstack
self.num_sgd_iter = 1
self.replay_proportion = 0.0 # @OldAPIstack
self.replay_buffer_num_slots = 0 # @OldAPIstack
self.learner_queue_size = 3
Expand Down Expand Up @@ -168,10 +167,10 @@ def __init__(self, algo_class=None):
self._lr_vf = 0.0005 # @OldAPIstack

# Override some of AlgorithmConfig's default values with IMPALA-specific values.
self.num_learners = 1
self.rollout_fragment_length = 50
self.train_batch_size = 500 # @OldAPIstack
self.train_batch_size_per_learner = 500
self._minibatch_size = "auto"
self.num_env_runners = 2
self.num_gpus = 1 # @OldAPIstack
self.lr = 0.0005
Expand Down Expand Up @@ -200,8 +199,6 @@ def training(
num_gpu_loader_threads: Optional[int] = NotProvided,
num_multi_gpu_tower_stacks: Optional[int] = NotProvided,
minibatch_buffer_size: Optional[int] = NotProvided,
minibatch_size: Optional[Union[int, str]] = NotProvided,
num_sgd_iter: Optional[int] = NotProvided,
replay_proportion: Optional[float] = NotProvided,
replay_buffer_num_slots: Optional[int] = NotProvided,
learner_queue_size: Optional[int] = NotProvided,
Expand Down Expand Up @@ -252,15 +249,7 @@ def training(
- This enables us to preload data into these stacks while another stack
is performing gradient calculations.
minibatch_buffer_size: How many train batches should be retained for
minibatching. This conf only has an effect if `num_sgd_iter > 1`.
minibatch_size: The size of minibatches that are trained over during
each SGD iteration. If "auto", will use the same value as
`train_batch_size`.
Note that this setting only has an effect if
`enable_rl_module_and_learner=True` and it must be a multiple of
`rollout_fragment_length` or `sequence_length` and smaller than or equal
to `train_batch_size`.
num_sgd_iter: Number of passes to make over each train batch.
minibatching. This conf only has an effect if `num_epochs > 1`.
replay_proportion: Set >0 to enable experience replay. Saved samples will
be replayed with a p:1 proportion to new data samples.
replay_buffer_num_slots: Number of sample batches to store for replay.
Expand Down Expand Up @@ -330,8 +319,6 @@ def training(
self.num_multi_gpu_tower_stacks = num_multi_gpu_tower_stacks
if minibatch_buffer_size is not NotProvided:
self.minibatch_buffer_size = minibatch_buffer_size
if num_sgd_iter is not NotProvided:
self.num_sgd_iter = num_sgd_iter
if replay_proportion is not NotProvided:
self.replay_proportion = replay_proportion
if replay_buffer_num_slots is not NotProvided:
Expand Down Expand Up @@ -374,8 +361,6 @@ def training(
self._separate_vf_optimizer = _separate_vf_optimizer
if _lr_vf is not NotProvided:
self._lr_vf = _lr_vf
if minibatch_size is not NotProvided:
self._minibatch_size = minibatch_size

return self

Expand Down Expand Up @@ -452,14 +437,14 @@ def validate(self) -> None:
# Learner API specific checks.
if (
self.enable_rl_module_and_learner
and self._minibatch_size != "auto"
and self.minibatch_size is not None
and not (
(self.minibatch_size % self.rollout_fragment_length == 0)
and self.minibatch_size <= self.total_train_batch_size
)
):
raise ValueError(
f"`minibatch_size` ({self._minibatch_size}) must either be 'auto' "
f"`minibatch_size` ({self._minibatch_size}) must either be None "
"or a multiple of `rollout_fragment_length` "
f"({self.rollout_fragment_length}) while at the same time smaller "
"than or equal to `total_train_batch_size` "
Expand All @@ -474,20 +459,6 @@ def replay_ratio(self) -> float:
"""
return (1 / self.replay_proportion) if self.replay_proportion > 0 else 0.0

@property
def minibatch_size(self):
# If 'auto', use the train_batch_size (meaning each SGD iter is a single pass
# through the entire train batch). Otherwise, use user provided setting.
return (
(
self.train_batch_size_per_learner
if self.enable_env_runner_and_connector_v2
else self.train_batch_size
)
if self._minibatch_size == "auto"
else self._minibatch_size
)

@override(AlgorithmConfig)
def get_default_learner_class(self):
if self.framework_str == "torch":
Expand Down Expand Up @@ -539,7 +510,7 @@ class IMPALA(Algorithm):
2. If enabled, the replay buffer stores and produces batches of size
`rollout_fragment_length * num_envs_per_env_runner`.
3. If enabled, the minibatch ring buffer stores and replays batches of
size `train_batch_size` up to `num_sgd_iter` times per batch.
size `train_batch_size` up to `num_epochs` times per batch.
4. The learner thread executes data parallel SGD across `num_gpus` GPUs
on batches of size `train_batch_size`.
"""
Expand Down Expand Up @@ -734,6 +705,9 @@ def training_step(self) -> ResultDict:
NUM_ENV_STEPS_SAMPLED_LIFETIME, default=0
),
},
num_epochs=self.config.num_epochs,
minibatch_size=self.config.minibatch_size,
shuffle_batch_per_epoch=self.config.shuffle_batch_per_epoch,
)
else:
learner_results = self.learner_group.update_from_episodes(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder: isn't it possible to just turn over a ray.data.DataIterator ti the learner via update_from_iterator and then iterate over the train batch (as a materialized dataset) in minibatch_size batches?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could run all of this (in the new stack) through the PreLearner to prefetch and make the learner connector run.

Expand All @@ -745,6 +719,9 @@ def training_step(self) -> ResultDict:
NUM_ENV_STEPS_SAMPLED_LIFETIME, default=0
),
},
num_epochs=self.config.num_epochs,
minibatch_size=self.config.minibatch_size,
shuffle_batch_per_epoch=self.config.shuffle_batch_per_epoch,
)
if not do_async_updates:
learner_results = [learner_results]
Expand Down Expand Up @@ -1292,7 +1269,7 @@ def _learn_on_processed_samples(self) -> ResultDict:
),
},
async_update=async_update,
num_iters=self.config.num_sgd_iter,
num_epochs=self.config.num_epochs,
minibatch_size=self.config.minibatch_size,
)
if not async_update:
Expand Down Expand Up @@ -1531,7 +1508,7 @@ def make_learner_thread(local_worker, config):
lr=config["lr"],
train_batch_size=config["train_batch_size"],
num_multi_gpu_tower_stacks=config["num_multi_gpu_tower_stacks"],
num_sgd_iter=config["num_sgd_iter"],
num_sgd_iter=config["num_epochs"],
learner_queue_size=config["learner_queue_size"],
learner_queue_timeout=config["learner_queue_timeout"],
num_data_load_threads=config["num_gpu_loader_threads"],
Expand All @@ -1540,7 +1517,7 @@ def make_learner_thread(local_worker, config):
learner_thread = LearnerThread(
local_worker,
minibatch_buffer_size=config["minibatch_buffer_size"],
num_sgd_iter=config["num_sgd_iter"],
num_sgd_iter=config["num_epochs"],
learner_queue_size=config["learner_queue_size"],
learner_queue_timeout=config["learner_queue_timeout"],
)
Expand Down
27 changes: 24 additions & 3 deletions rllib/algorithms/impala/impala_learner.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ def build(self) -> None:
in_queue=self._learner_thread_in_queue,
out_queue=self._learner_thread_out_queue,
metrics_logger=self.metrics,
num_epochs=self.config.num_epochs,
minibatch_size=self.config.minibatch_size,
shuffle_batch_per_epoch=self.config.shuffle_batch_per_epoch,
)
self._learner_thread.start()

Expand All @@ -108,8 +111,9 @@ def update_from_episodes(
# TODO (sven): Deprecate these in favor of config attributes for only those
# algos that actually need (and know how) to do minibatching.
minibatch_size: Optional[int] = None,
num_iters: int = 1,
num_total_mini_batches: int = 0,
num_epochs: int = 1,
shuffle_batch_per_epoch: bool = False,
num_total_minibatches: int = 0,
reduce_fn=None, # Deprecated args.
**kwargs,
) -> ResultDict:
Expand Down Expand Up @@ -228,7 +232,17 @@ def _step(self) -> None:


class _LearnerThread(threading.Thread):
def __init__(self, *, update_method, in_queue, out_queue, metrics_logger):
def __init__(
self,
*,
update_method,
in_queue,
out_queue,
metrics_logger,
num_epochs,
minibatch_size,
shuffle_batch_per_epoch,
):
super().__init__()
self.daemon = True
self.metrics: MetricsLogger = metrics_logger
Expand All @@ -238,6 +252,10 @@ def __init__(self, *, update_method, in_queue, out_queue, metrics_logger):
self._in_queue: deque = in_queue
self._out_queue: Queue = out_queue

self._num_epochs = num_epochs
self._minibatch_size = minibatch_size
self._shuffle_batch_per_epoch = shuffle_batch_per_epoch

def run(self) -> None:
while not self.stopped:
self.step()
Expand All @@ -263,6 +281,9 @@ def step(self):
NUM_ENV_STEPS_SAMPLED_LIFETIME, default=0
)
},
num_epochs=self._num_epochs,
minibatch_size=self._minibatch_size,
shuffle_batch_per_epoch=self._shuffle_batch_per_epoch,
)
# We have to deepcopy the results dict, b/c we must avoid having a returned
# Stats object sit in the queue and getting a new (possibly even tensor)
Expand Down
2 changes: 1 addition & 1 deletion rllib/algorithms/marwil/marwil.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ class (multi-/single-learner setup) and evaluation on
learner_results = self.learner_group.update_from_batch(
batch,
minibatch_size=self.config.train_batch_size_per_learner,
num_iters=self.config.dataset_num_iters_per_learner,
num_epochs=self.config.dataset_num_iters_per_learner,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here for example this is a bit confusing: the dataset_num_iters_per_learner is here irrelevant because we pass over a batch that is trained on as a whole if this is a single learner. In the multi learner setup we pass an iterator and dataset_num_iters_per_learner defines many batches should be pulled from it in a single RLlib training iteration (this is set to None by default which would mean it runs over the entire dataset once - so only a single epoch - during a single RLlib training iteration).

I know this is somehow still messy, but due to the different entries of the learner API not really aligned with offline RL.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch. Clarified the arg names and added this protection to Learner.update_from_iterator for now:

        if "num_epochs" in kwargs:
            raise ValueError(
                "`num_epochs` arg NOT supported by Learner.update_from_iterator! Use "
                "`num_iters` instead."
            )

such that it cannot be confused with num_epochs passed in by accident.

)

# Log training results.
Expand Down
Loading