Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Horovod] Fix Reduce for Horovod #6585

Closed
wants to merge 12 commits into from
2 changes: 1 addition & 1 deletion pytorch_lightning/plugins/training_type/horovod.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def reduce(self, tensor, group: Optional[Any] = None, reduce_op: Optional[Union[

if reduce_op in (None, "avg", "mean"):
reduce_op = hvd.Average
elif reduce_op == "sum":
elif reduce_op in ("sum", ReduceOp.SUM):
reduce_op = hvd.Sum
else:
raise ValueError(f"unrecognized `reduce_op`: {reduce_op}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def training_step(self, batch, batch_idx):
self.log('my_loss', batch_idx * (1 + local_rank), on_epoch=True)
return super().training_step(batch, batch_idx)

def training_epoch_end(self, outputs) -> None:
def training_epoch_end(self, outputsƒ) -> None:
Borda marked this conversation as resolved.
Show resolved Hide resolved
data = str(self.global_rank)
obj = [[data], (data, ), set(data)]
out = self.trainer.training_type_plugin.broadcast(obj)
Expand Down
43 changes: 42 additions & 1 deletion tests/models/test_horovod.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import shlex
import subprocess
import sys
from unittest import mock
from unittest.mock import patch

import numpy as np
Expand All @@ -26,7 +27,7 @@

import tests.helpers.pipelines as tpipes
import tests.helpers.utils as tutils
from pytorch_lightning import Trainer
from pytorch_lightning import Trainer, callbacks
from pytorch_lightning.accelerators import CPUAccelerator
from pytorch_lightning.metrics.classification.accuracy import Accuracy
from pytorch_lightning.trainer.states import TrainerState
Expand Down Expand Up @@ -376,3 +377,43 @@ def configure_optimizers(self):

# Called every 3 steps, meaning for 1 epoch of 11 batches, it is called 3 times with gamma=0.1
assert pytest.approx(init_lr * 0.1) == adjusted_lr2

@mock.patch('torch.save')
@RunIf(special=True, horovod=True, skip_windows=True, min_gpus=2)
carmocca marked this conversation as resolved.
Show resolved Hide resolved
@pytest.mark.parametrize(['k', 'epochs', 'val_check_interval', 'expected'], [(1, 1, 1.0, 1), (2, 2, 0.3, 5)])
def test_top_k_horovod(save_mock, tmpdir, k, epochs, val_check_interval,
expected):
"""This test mirrors test_checkpoint_callback_frequency::test_top_k_ddp"""

class TestModel(BoringModel):

def training_step(self, batch, batch_idx):
local_rank = int(os.getenv("LOCAL_RANK"))
self.log('my_loss', batch_idx * (1 + local_rank), on_epoch=True)
return super().training_step(batch, batch_idx)

def training_epoch_end(self, outputs) -> None:
data = str(self.global_rank)
obj = [[data], (data, ), set(data)]
out = self.trainer.training_type_plugin.broadcast(obj)
assert obj == [[str(self.global_rank)], (str(self.global_rank), ), set(str(self.global_rank))]
assert out == [['0'], ('0', ), set('0')]

model = TestModel()
trainer = Trainer(
callbacks=[callbacks.ModelCheckpoint(dirpath=tmpdir, monitor='my_loss_step', save_top_k=k, mode="max")],
default_root_dir=tmpdir,
max_epochs=epochs,
weights_summary=None,
val_check_interval=val_check_interval,
accelerator="horovod",
gpus=2,
limit_train_batches=64,
limit_val_batches=32,
)
if os.getenv("LOCAL_RANK") == "0":
with pytest.raises(UserWarning, match="The value associated to the key my_loss_epoch: [15.5, 31.0]"):
trainer.fit(model)
assert save_mock.call_count == expected
else:
trainer.fit(model)