Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parametrize fit hook test with different precision plugins #8070

Merged
merged 47 commits into from
Jul 5, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
a5f2e6b
Parametrize fit hook test with different precision plugins
carmocca Jun 21, 2021
0ce2295
Fix tests
carmocca Jun 22, 2021
72d5ee3
Comments
carmocca Jun 22, 2021
f34ee7e
Fix message
carmocca Jun 22, 2021
39c4a85
Test CI error
carmocca Jun 22, 2021
c3b458d
Revert "Test CI error"
carmocca Jun 22, 2021
c700cab
Add ddp training type teardown
carmocca Jun 22, 2021
e5602c9
Update CHANGELOG
carmocca Jun 22, 2021
52b2256
Adrian's fix
carmocca Jun 22, 2021
0b94b6c
Use destructor
carmocca Jun 23, 2021
aaf32ab
Update CHANGELOG.md
carmocca Jun 23, 2021
0444d54
RPC destructor
carmocca Jun 23, 2021
5d4f811
Update pytorch_lightning/plugins/training_type/ddp.py
carmocca Jun 23, 2021
bf8766d
Why do you not work :(
carmocca Jun 23, 2021
48bcb7e
Missing condition
carmocca Jun 23, 2021
5d6fa39
Merge branch 'master' into bug/teardown-ddp-process-group
carmocca Jun 23, 2021
21ad2d8
Fix deepspeed test
carmocca Jun 24, 2021
bbc489e
GC collect in conftest
carmocca Jun 24, 2021
5b06fd2
Do not show warnings for special tests
carmocca Jun 24, 2021
5e69ed8
Needs to run on 1.8
carmocca Jun 24, 2021
1e0cf40
Merge branch 'master' into tests/parametrize-hooks-precision-plugins
awaelchli Jun 24, 2021
aed51a2
Run torch 1.8
carmocca Jun 24, 2021
e0a3e87
Skip test due to 'Python bus error'
carmocca Jun 24, 2021
9ee2d19
Debug NCCL
carmocca Jun 24, 2021
3588aaa
shm size
carmocca Jun 24, 2021
067bf1a
Disable warnings for special tests
carmocca Jun 24, 2021
6060b05
Remove NCCL_DEBUG statement
carmocca Jun 24, 2021
f0fa1b7
Try smaller shm size
carmocca Jun 24, 2021
6dd7038
Revert "Skip test due to 'Python bus error'"
carmocca Jun 24, 2021
53082bf
Merge branch 'ci/gpu-tests-torch-1.8' into bug/teardown-ddp-process-g…
carmocca Jun 24, 2021
73e62f8
README and adjust versions
carmocca Jun 24, 2021
902ef02
Avoid self.on_gpu call
carmocca Jun 24, 2021
4ce0f9a
empty cache cleanup
carmocca Jun 24, 2021
990b2e9
Merge branch 'master' into bug/teardown-ddp-process-group
carmocca Jun 24, 2021
738daa5
More garbage collection
carmocca Jun 24, 2021
236aa97
Unroll parametrizations
awaelchli Jun 24, 2021
ffa532d
Do not reuse mock
carmocca Jun 24, 2021
5aa3790
Merge branch 'master' into tests/parametrize-hooks-precision-plugins
carmocca Jun 24, 2021
78baa5f
Merge branch 'bug/teardown-ddp-process-group' into tests/parametrize-…
carmocca Jun 24, 2021
e190089
Undo changes
carmocca Jun 24, 2021
261a166
Undo notebooks modification
carmocca Jun 24, 2021
acec7b0
Merge branch 'master' into tests/parametrize-hooks-precision-plugins
carmocca Jul 3, 2021
33a68d4
Undo
carmocca Jul 3, 2021
ac006c7
Fix test
carmocca Jul 3, 2021
a5becf4
Update test
carmocca Jul 3, 2021
15e7726
Merge branch 'master' into tests/parametrize-hooks-precision-plugins
carmocca Jul 5, 2021
88b3183
Fix merge
carmocca Jul 5, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions pytorch_lightning/plugins/precision/deepspeed_precision.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,14 @@ def backward(
) -> Tensor:
if is_overridden('backward', model):
warning_cache.warn(
"Overridden backward hook in the LightningModule will be ignored since DeepSpeed handles"
"backward logic outside of the LightningModule"
"You have overridden the `LightningModule.backward` hook but it will be ignored since DeepSpeed handles"
" the backward logic internally."
)
# todo: hack around for deepspeed engine to call backward
deepspeed_engine = model.trainer.model
deepspeed_engine.backward(closure_loss, *args, **kwargs)
# once backward has been applied, release graph
closure_loss = closure_loss.detach()

return closure_loss

def clip_gradients(
Expand Down
9 changes: 9 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from http.server import SimpleHTTPRequestHandler

import pytest
import torch.distributed
import torch.multiprocessing as mp


Expand All @@ -41,6 +42,14 @@ def restore_env_variables():
os.environ.update(env_backup)


@pytest.fixture(scope="function", autouse=True)
def teardown_process_group():
""" Ensures that the distributed process group gets closed before the next test runs. """
yield
if torch.distributed.is_available() and torch.distributed.is_initialized():
carmocca marked this conversation as resolved.
Show resolved Hide resolved
torch.distributed.destroy_process_group()


def pytest_configure(config):
config.addinivalue_line("markers", "spawn: spawn test in a separate process using torch.multiprocessing.spawn")

Expand Down
68 changes: 50 additions & 18 deletions tests/models/test_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ def call(hook, fn, *args, **kwargs):
d = {'name': hook}
if args:
d['args'] = args
elif hook == 'train':
# DeepSpeed calls `train(mode)` but we do not. Standardize
# https://github.com/microsoft/DeepSpeed/pull/571
d['args'] = (True, )
if kwargs:
d['kwargs'] = kwargs
called.append(d)
Expand All @@ -283,7 +287,8 @@ def test_epoch_end(self, *args, **kwargs):
pass

@staticmethod
def _train_batch(trainer, model, batches):
def _train_batch(trainer, model, batches, device=torch.device('cpu'), **kwargs):
using_native_amp = kwargs.get('amp_backend') == 'native'
out = []
for i in range(batches):
out.extend([
Expand All @@ -292,7 +297,7 @@ def _train_batch(trainer, model, batches):
dict(name='Callback.on_train_batch_start', args=(trainer, model, ANY, i, 0)),
dict(name='on_train_batch_start', args=(ANY, i, 0)),
dict(name='on_before_batch_transfer', args=(ANY, None)),
dict(name='transfer_batch_to_device', args=(ANY, torch.device('cpu'), None)),
dict(name='transfer_batch_to_device', args=(ANY, device, None)),
dict(name='on_after_batch_transfer', args=(ANY, None)),
dict(name='forward', args=(ANY, )),
dict(name='training_step', args=(ANY, i)),
Expand All @@ -301,14 +306,15 @@ def _train_batch(trainer, model, batches):
dict(name='on_before_zero_grad', args=(ANY, )),
dict(name='optimizer_zero_grad', args=(0, i, ANY, 0)),
# TODO: `on_before_backward`
dict(name='backward', args=(ANY, ANY, 0)),
# DeepSpeed handles backward internally
*([dict(name='backward', args=(ANY, ANY, 0))] if kwargs.get('plugins') != 'deepspeed' else []),
carmocca marked this conversation as resolved.
Show resolved Hide resolved
dict(name='Callback.on_after_backward', args=(trainer, model)),
dict(name='on_after_backward'),
# TODO: `on_before_optimizer_step`
dict(
name='optimizer_step',
args=(0, i, ANY, 0, ANY),
kwargs=dict(on_tpu=False, using_lbfgs=False, using_native_amp=False)
kwargs=dict(on_tpu=False, using_lbfgs=False, using_native_amp=using_native_amp)
),
dict(name='Callback.on_train_batch_end', args=(trainer, model, dict(loss=ANY), ANY, i, 0)),
dict(name='on_train_batch_end', args=(dict(loss=ANY), ANY, i, 0)),
Expand All @@ -317,14 +323,14 @@ def _train_batch(trainer, model, batches):
return out

@staticmethod
def _eval_epoch(fn, trainer, model, batches, key):
def _eval_epoch(fn, trainer, model, batches, key, device=torch.device('cpu')):
outputs = {key: ANY}
return [
dict(name='Callback.on_epoch_start', args=(trainer, model)),
dict(name='on_epoch_start'),
dict(name=f'Callback.on_{fn}_epoch_start', args=(trainer, model)),
dict(name=f'on_{fn}_epoch_start'),
*HookedModel._eval_batch(fn, trainer, model, batches, key),
*HookedModel._eval_batch(fn, trainer, model, batches, key, device=device),
dict(name=f'{fn}_epoch_end', args=([outputs] * batches, )),
dict(name=f'Callback.on_{fn}_epoch_end', args=(trainer, model)),
dict(name=f'on_{fn}_epoch_end'),
Expand All @@ -333,7 +339,7 @@ def _eval_epoch(fn, trainer, model, batches, key):
]

@staticmethod
def _eval_batch(fn, trainer, model, batches, key):
def _eval_batch(fn, trainer, model, batches, key, device=torch.device('cpu')):
out = []
outputs = {key: ANY}
for i in range(batches):
Expand All @@ -342,7 +348,7 @@ def _eval_batch(fn, trainer, model, batches, key):
dict(name=f'Callback.on_{fn}_batch_start', args=(trainer, model, ANY, i, 0)),
dict(name=f'on_{fn}_batch_start', args=(ANY, i, 0)),
dict(name='on_before_batch_transfer', args=(ANY, None)),
dict(name='transfer_batch_to_device', args=(ANY, torch.device('cpu'), None)),
dict(name='transfer_batch_to_device', args=(ANY, device, None)),
dict(name='on_after_batch_transfer', args=(ANY, None)),
dict(name='forward', args=(ANY, )),
dict(name=f'{fn}_step', args=(ANY, i)),
Expand Down Expand Up @@ -372,7 +378,17 @@ def _predict_batch(trainer, model, batches):
return out


def test_trainer_model_hook_system_fit(tmpdir):
@pytest.mark.parametrize(
'kwargs',
[
{},
# these precision plugins modify the optimization flow, so testing them explicitly
pytest.param(dict(gpus=1, precision=16, plugins='deepspeed'), marks=RunIf(deepspeed=True, min_gpus=1)),
pytest.param(dict(gpus=1, precision=16, amp_backend='native'), marks=RunIf(amp_native=True, min_gpus=1)),
pytest.param(dict(gpus=1, precision=16, amp_backend='apex'), marks=RunIf(amp_apex=True, min_gpus=1)),
]
)
def test_trainer_model_hook_system_fit(tmpdir, kwargs):
called = []
model = HookedModel(called)
callback = HookedCallback(called)
Expand All @@ -385,13 +401,17 @@ def test_trainer_model_hook_system_fit(tmpdir):
limit_val_batches=val_batches,
progress_bar_refresh_rate=0,
weights_summary=None,
callbacks=[callback]
callbacks=[callback],
**kwargs,
)

assert called == [
dict(name='Callback.on_init_start', args=(trainer, )),
dict(name='Callback.on_init_end', args=(trainer, )),
]

trainer.fit(model)

saved_ckpt = {
'callbacks': ANY,
'epoch': 1,
Expand All @@ -401,19 +421,31 @@ def test_trainer_model_hook_system_fit(tmpdir):
'pytorch-lightning_version': __version__,
'state_dict': ANY,
}
if kwargs.get('amp_backend') == 'native':
saved_ckpt['native_amp_scaling_state'] = ANY
elif kwargs.get('amp_backend') == 'apex':
saved_ckpt['amp_scaling_state'] = ANY
device = torch.device('cuda:0' if 'gpus' in kwargs else 'cpu')

expected = [
dict(name='Callback.on_init_start', args=(trainer, )),
dict(name='Callback.on_init_end', args=(trainer, )),
dict(name='prepare_data'),
dict(name='configure_callbacks'),
dict(name='Callback.on_before_accelerator_backend_setup', args=(trainer, model)),
# DeepSpeed needs the batch size to figure out throughput logging
*([dict(name='train_dataloader')] if kwargs.get('plugins') == 'deepspeed' else []),
dict(name='Callback.setup', args=(trainer, model), kwargs=dict(stage='fit')),
dict(name='setup', kwargs=dict(stage='fit')),
dict(name='configure_sharded_model'),
dict(name='Callback.on_configure_sharded_model', args=(trainer, model)),
dict(name='configure_optimizers'),
# DeepSpeed skips initializing optimizers here as they are handled via config
*([dict(name='configure_optimizers')] if kwargs.get('plugins') != 'deepspeed' else []),
dict(name='Callback.on_fit_start', args=(trainer, model)),
dict(name='on_fit_start'),
# TODO: explore whether DeepSpeed can have the same flow for optimizers
# DeepSpeed did not find any optimizer in the config so they are loaded here
*([dict(name='configure_optimizers')] if kwargs.get('plugins') == 'deepspeed' else []),
dict(name='Callback.on_pretrain_routine_start', args=(trainer, model)),
dict(name='on_pretrain_routine_start'),
dict(name='Callback.on_pretrain_routine_end', args=(trainer, model)),
Expand All @@ -426,14 +458,14 @@ def test_trainer_model_hook_system_fit(tmpdir):
dict(name='zero_grad'),
dict(name='Callback.on_validation_start', args=(trainer, model)),
dict(name='on_validation_start'),
*model._eval_epoch('validation', trainer, model, val_batches, 'x'),
*model._eval_epoch('validation', trainer, model, val_batches, 'x', device=device),
dict(name='Callback.on_validation_end', args=(trainer, model)),
dict(name='on_validation_end'),
dict(name='train'),
dict(name='train', args=(True, )),
dict(name='on_validation_model_train'),
dict(name='Callback.on_sanity_check_end', args=(trainer, model)),
# duplicate `train` because `_run_train` calls it again in case validation wasn't run
dict(name='train'),
dict(name='train', args=(True, )),
dict(name='on_train_dataloader'),
dict(name='train_dataloader'),
dict(name='Callback.on_train_start', args=(trainer, model)),
Expand All @@ -442,19 +474,19 @@ def test_trainer_model_hook_system_fit(tmpdir):
dict(name='on_epoch_start'),
dict(name='Callback.on_train_epoch_start', args=(trainer, model)),
dict(name='on_train_epoch_start'),
*model._train_batch(trainer, model, train_batches),
*model._train_batch(trainer, model, train_batches, device=device, **kwargs),
dict(name='train', args=(False, )),
dict(name='on_validation_model_eval'),
dict(name='zero_grad'),
dict(name='Callback.on_validation_start', args=(trainer, model)),
dict(name='on_validation_start'),
*model._eval_epoch('validation', trainer, model, val_batches, 'x'),
*model._eval_epoch('validation', trainer, model, val_batches, 'x', device=device),
dict(name='Callback.on_validation_end', args=(trainer, model)),
# `ModelCheckpoint.save_checkpoint` is called here from `Callback.on_validation_end`
dict(name='Callback.on_save_checkpoint', args=(trainer, model, saved_ckpt)),
dict(name='on_save_checkpoint', args=(saved_ckpt, )),
dict(name='on_validation_end'),
dict(name='train'),
dict(name='train', args=(True, )),
dict(name='on_validation_model_train'),
dict(name='training_epoch_end', args=([dict(loss=ANY)] * train_batches, )),
dict(name='Callback.on_train_epoch_end', args=(trainer, model, [dict(loss=ANY)] * train_batches)),
Expand Down Expand Up @@ -567,7 +599,7 @@ def test_trainer_model_hook_system_eval(tmpdir, batches, verb, noun, dataloader,
*model._eval_epoch(noun, trainer, model, batches, key),
dict(name=f'Callback.on_{noun}_end', args=(trainer, model)),
dict(name=f'on_{noun}_end'),
dict(name='train'),
dict(name='train', args=(True, )),
dict(name=f'on_{noun}_model_train'),
]
expected = [
Expand Down