Skip to content

Commit

Permalink
add distributed test
Browse files Browse the repository at this point in the history
  • Loading branch information
linshokaku committed Dec 21, 2023
1 parent 2619cf8 commit 01708d9
Showing 1 changed file with 293 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
AccumulateBase,
)
from pytorch_pfn_extras.training.trigger import Trigger, get_trigger
from pytorch_pfn_extras import distributed
import torch
import torch.distributed

ITERATION_LENGTH = 2 * 3 * 5
NUM_EPOCH = 5
Expand All @@ -33,13 +36,27 @@
"trigger": (7, "iteration"),
}

def _init_distributed(use_cuda):
if "OMPI_COMM_WORLD_SIZE" in os.environ:
size, rank, local_rank = distributed.initialize_ompi_environment(
backend="nccl", init_method="env"
)
else:
pytest.skip("This test requires MPI to run")

device = torch.device("cuda:{}".format(local_rank) if use_cuda else "cpu")
torch.cuda.set_device(device)

return size, rank, local_rank, device


def check_accumulate_extension(
extension: AccumulateBase,
expected_fn: Callable[..., float],
value_list: List[float],
trigger: Trigger,
allow_nan: bool = False,
distributed: bool = False,
) -> None:
manager = ppe.training.ExtensionsManager(
{}, [], NUM_EPOCH, iters_per_epoch=NUM_ITERATION_PER_EPOCH
Expand All @@ -59,17 +76,22 @@ def check_accumulate_extension(
manager.observation["value/accumulated"]
)
try:
expcted_value = expected_fn(epoch_value_list)
if distributed:
world_size = torch.distributed.get_world_size()
epoch_value_list_list = [None] * world_size
torch.distributed.all_gather_object(epoch_value_list_list, epoch_value_list)
epoch_value_list = sum(epoch_value_list_list, [])
expected_value = expected_fn(epoch_value_list)
except Exception:
if allow_nan:
expcted_value = float("nan")
expected_value = float("nan")
else:
raise RuntimeError
epoch_value_list.clear()
assert (
isnan(accumulated_value) and isnan(expcted_value)
isnan(accumulated_value) and isnan(expected_value)
) or isclose(
accumulated_value, expcted_value, rel_tol=1e-9, abs_tol=1e-6
accumulated_value, expected_value, rel_tol=1e-9, abs_tol=1e-6
)


Expand Down Expand Up @@ -183,13 +205,139 @@ def test_max_accumulate(case: Dict[str, Any]):
trigger=trigger,
)

@pytest.mark.mpi
@pytest.mark.gpu
@pytest.mark.parametrize(
"case",
[
epoch_trigger_case,
float_epoch_trigger_case,
epoch_iteration_trigger_case,
primary_iteration_trigger_case,
],
)
def test_average_accumulate_distributed(case: Dict[str, Any]):
_init_distributed(use_cuda=True)
trigger = get_trigger(case["trigger"])
extension = ppe.training.extensions.AverageAccumulate(
conversion_key_pair=("value", "value/accumulated"), trigger=trigger, distributed=True
)
check_accumulate_extension(
extension=extension,
expected_fn=mean,
value_list=case["value"],
trigger=trigger,
distributed=True,
)

@pytest.mark.mpi
@pytest.mark.gpu
@pytest.mark.parametrize(
"case",
[
epoch_trigger_case,
float_epoch_trigger_case,
epoch_iteration_trigger_case,
primary_iteration_trigger_case,
],
)
def test_standard_deviation_accumulate_distributed(case: Dict[str, Any]):
_init_distributed(use_cuda=True)
trigger = get_trigger(case["trigger"])
extension = ppe.training.extensions.StandardDeviationAccumulate(
conversion_key_pair=("value", "value/accumulated"), trigger=trigger, distributed=True
)
check_accumulate_extension(
extension=extension,
expected_fn=pstdev,
value_list=case["value"],
trigger=trigger,
distributed=True,
)

@pytest.mark.mpi
@pytest.mark.gpu
@pytest.mark.parametrize(
"case",
[
epoch_trigger_case,
float_epoch_trigger_case,
epoch_iteration_trigger_case,
primary_iteration_trigger_case,
],
)
def test_unbiased_standard_deviation_accumulate_distributed(case: Dict[str, Any]):
_init_distributed(use_cuda=True)
trigger = get_trigger(case["trigger"])
extension = ppe.training.extensions.UnbiasedStandardDeviationAccumulate(
conversion_key_pair=("value", "value/accumulated"), trigger=trigger, distributed=True
)
check_accumulate_extension(
extension=extension,
expected_fn=stdev,
value_list=case["value"],
trigger=trigger,
allow_nan=True,
distributed=True,
)

@pytest.mark.mpi
@pytest.mark.gpu
@pytest.mark.parametrize(
"case",
[
epoch_trigger_case,
float_epoch_trigger_case,
epoch_iteration_trigger_case,
primary_iteration_trigger_case,
],
)
def test_min_accumulate_distributed(case: Dict[str, Any]):
_init_distributed(use_cuda=True)
trigger = get_trigger(case["trigger"])
extension = ppe.training.extensions.MinAccumulate(
conversion_key_pair=("value", "value/accumulated"), trigger=trigger, distributed=True
)
check_accumulate_extension(
extension=extension,
expected_fn=min,
value_list=case["value"],
trigger=trigger,
distributed=True,
)

@pytest.mark.mpi
@pytest.mark.gpu
@pytest.mark.parametrize(
"case",
[
epoch_trigger_case,
float_epoch_trigger_case,
epoch_iteration_trigger_case,
primary_iteration_trigger_case,
],
)
def test_max_accumulate_distributed(case: Dict[str, Any]):
_init_distributed(use_cuda=True)
trigger = get_trigger(case["trigger"])
extension = ppe.training.extensions.MaxAccumulate(
conversion_key_pair=("value", "value/accumulated"), trigger=trigger, distributed=True
)
check_accumulate_extension(
extension=extension,
expected_fn=max,
value_list=case["value"],
trigger=trigger,
distributed=True,
)

def check_accumulate_extension_with_log_report(
extension: AccumulateBase,
expected_fn: Callable[..., float],
value_list: List[float],
trigger: Trigger,
allow_nan: bool = False,
distributed: bool = False,
) -> None:
with tempfile.TemporaryDirectory() as tmp_dir:
manager = ppe.training.ExtensionsManager(
Expand Down Expand Up @@ -226,20 +374,25 @@ def check_accumulate_extension_with_log_report(
assert "value/accumulated" in last_value
accumulated_value = last_value["value/accumulated"]
try:
expcted_value = expected_fn(epoch_value_list)
if distributed:
world_size = torch.distributed.get_world_size()
epoch_value_list_list = [None] * world_size
torch.distributed.all_gather_object(epoch_value_list_list, epoch_value_list)
epoch_value_list = sum(epoch_value_list_list, [])
expected_value = expected_fn(epoch_value_list)
except Exception:
if allow_nan:
expcted_value = float("nan")
expected_value = float("nan")
else:
raise RuntimeError
epoch_value_list.clear()
assert (
allow_nan
and isnan(accumulated_value)
and isnan(expcted_value)
and isnan(expected_value)
) or isclose(
accumulated_value,
expcted_value,
expected_value,
rel_tol=1e-9,
abs_tol=1e-6,
)
Expand Down Expand Up @@ -356,3 +509,135 @@ def test_max_accumulate_with_log_report(case: Dict[str, Any]):
value_list=case["value"],
trigger=trigger,
)

@pytest.mark.mpi
@pytest.mark.gpu
@pytest.mark.parametrize(
"case",
[
epoch_trigger_case,
float_epoch_trigger_case,
epoch_iteration_trigger_case,
primary_iteration_trigger_case,
],
)
def test_average_accumulate_with_log_report_distributed(case: Dict[str, Any]):
_init_distributed(use_cuda=True)
trigger = get_trigger(case["trigger"])
extension = ppe.training.extensions.AverageAccumulate(
conversion_key_pair=("value", "value/accumulated"), trigger=trigger, distributed=True
)
check_accumulate_extension_with_log_report(
extension=extension,
expected_fn=mean,
value_list=case["value"],
trigger=trigger,
distributed=True,
)


@pytest.mark.mpi
@pytest.mark.gpu
@pytest.mark.parametrize(
"case",
[
epoch_trigger_case,
float_epoch_trigger_case,
epoch_iteration_trigger_case,
primary_iteration_trigger_case,
],
)
def test_standard_deviation_accumulate_with_log_report_distributed(case: Dict[str, Any]):
_init_distributed(use_cuda=True)
trigger = get_trigger(case["trigger"])
extension = ppe.training.extensions.StandardDeviationAccumulate(
conversion_key_pair=("value", "value/accumulated"), trigger=trigger, distributed=True
)
check_accumulate_extension_with_log_report(
extension=extension,
expected_fn=pstdev,
value_list=case["value"],
trigger=trigger,
distributed=True,
)


@pytest.mark.mpi
@pytest.mark.gpu
@pytest.mark.parametrize(
"case",
[
epoch_trigger_case,
float_epoch_trigger_case,
epoch_iteration_trigger_case,
primary_iteration_trigger_case,
],
)
def test_unbiased_standard_deviation_accumulate_with_log_report_distributed(
case: Dict[str, Any]
):
_init_distributed(use_cuda=True)
trigger = get_trigger(case["trigger"])
extension = ppe.training.extensions.UnbiasedStandardDeviationAccumulate(
conversion_key_pair=("value", "value/accumulated"), trigger=trigger, distributed=True
)
check_accumulate_extension_with_log_report(
extension=extension,
expected_fn=stdev,
value_list=case["value"],
trigger=trigger,
allow_nan=True,
distributed=True,
)


@pytest.mark.mpi
@pytest.mark.gpu
@pytest.mark.parametrize(
"case",
[
epoch_trigger_case,
float_epoch_trigger_case,
epoch_iteration_trigger_case,
primary_iteration_trigger_case,
],
)
def test_min_accumulate_with_log_report_distributed(case: Dict[str, Any]):
_init_distributed(use_cuda=True)
trigger = get_trigger(case["trigger"])
extension = ppe.training.extensions.MinAccumulate(
conversion_key_pair=("value", "value/accumulated"), trigger=trigger, distributed=True
)
check_accumulate_extension_with_log_report(
extension=extension,
expected_fn=min,
value_list=case["value"],
trigger=trigger,
distributed=True,
)


@pytest.mark.mpi
@pytest.mark.gpu
@pytest.mark.parametrize(
"case",
[
epoch_trigger_case,
float_epoch_trigger_case,
epoch_iteration_trigger_case,
primary_iteration_trigger_case,
],
)
def test_max_accumulate_with_log_report_distributed(case: Dict[str, Any]):
_init_distributed(use_cuda=True)
trigger = get_trigger(case["trigger"])
extension = ppe.training.extensions.MaxAccumulate(
conversion_key_pair=("value", "value/accumulated"), trigger=trigger, distributed=True
)
check_accumulate_extension_with_log_report(
extension=extension,
expected_fn=max,
value_list=case["value"],
trigger=trigger,
distributed=True,
)

0 comments on commit 01708d9

Please sign in to comment.