From 01708d942941018f41554c0c2a3ef5214c8b67ed Mon Sep 17 00:00:00 2001 From: Linsho Kaku Date: Thu, 21 Dec 2023 20:19:19 +0900 Subject: [PATCH] add distributed test --- .../extensions_tests/test_accumulate.py | 301 +++++++++++++++++- 1 file changed, 293 insertions(+), 8 deletions(-) diff --git a/tests/pytorch_pfn_extras_tests/training_tests/extensions_tests/test_accumulate.py b/tests/pytorch_pfn_extras_tests/training_tests/extensions_tests/test_accumulate.py index 75ae13e0..b24322d5 100644 --- a/tests/pytorch_pfn_extras_tests/training_tests/extensions_tests/test_accumulate.py +++ b/tests/pytorch_pfn_extras_tests/training_tests/extensions_tests/test_accumulate.py @@ -12,6 +12,9 @@ AccumulateBase, ) from pytorch_pfn_extras.training.trigger import Trigger, get_trigger +from pytorch_pfn_extras import distributed +import torch +import torch.distributed ITERATION_LENGTH = 2 * 3 * 5 NUM_EPOCH = 5 @@ -33,6 +36,19 @@ "trigger": (7, "iteration"), } +def _init_distributed(use_cuda): + if "OMPI_COMM_WORLD_SIZE" in os.environ: + size, rank, local_rank = distributed.initialize_ompi_environment( + backend="nccl", init_method="env" + ) + else: + pytest.skip("This test requires MPI to run") + + device = torch.device("cuda:{}".format(local_rank) if use_cuda else "cpu") + torch.cuda.set_device(device) + + return size, rank, local_rank, device + def check_accumulate_extension( extension: AccumulateBase, @@ -40,6 +56,7 @@ def check_accumulate_extension( value_list: List[float], trigger: Trigger, allow_nan: bool = False, + distributed: bool = False, ) -> None: manager = ppe.training.ExtensionsManager( {}, [], NUM_EPOCH, iters_per_epoch=NUM_ITERATION_PER_EPOCH @@ -59,17 +76,22 @@ def check_accumulate_extension( manager.observation["value/accumulated"] ) try: - expcted_value = expected_fn(epoch_value_list) + if distributed: + world_size = torch.distributed.get_world_size() + epoch_value_list_list = [None] * world_size + torch.distributed.all_gather_object(epoch_value_list_list, epoch_value_list) + epoch_value_list = sum(epoch_value_list_list, []) + expected_value = expected_fn(epoch_value_list) except Exception: if allow_nan: - expcted_value = float("nan") + expected_value = float("nan") else: raise RuntimeError epoch_value_list.clear() assert ( - isnan(accumulated_value) and isnan(expcted_value) + isnan(accumulated_value) and isnan(expected_value) ) or isclose( - accumulated_value, expcted_value, rel_tol=1e-9, abs_tol=1e-6 + accumulated_value, expected_value, rel_tol=1e-9, abs_tol=1e-6 ) @@ -183,6 +205,131 @@ def test_max_accumulate(case: Dict[str, Any]): trigger=trigger, ) +@pytest.mark.mpi +@pytest.mark.gpu +@pytest.mark.parametrize( + "case", + [ + epoch_trigger_case, + float_epoch_trigger_case, + epoch_iteration_trigger_case, + primary_iteration_trigger_case, + ], +) +def test_average_accumulate_distributed(case: Dict[str, Any]): + _init_distributed(use_cuda=True) + trigger = get_trigger(case["trigger"]) + extension = ppe.training.extensions.AverageAccumulate( + conversion_key_pair=("value", "value/accumulated"), trigger=trigger, distributed=True + ) + check_accumulate_extension( + extension=extension, + expected_fn=mean, + value_list=case["value"], + trigger=trigger, + distributed=True, + ) + +@pytest.mark.mpi +@pytest.mark.gpu +@pytest.mark.parametrize( + "case", + [ + epoch_trigger_case, + float_epoch_trigger_case, + epoch_iteration_trigger_case, + primary_iteration_trigger_case, + ], +) +def test_standard_deviation_accumulate_distributed(case: Dict[str, Any]): + _init_distributed(use_cuda=True) + trigger = get_trigger(case["trigger"]) + extension = ppe.training.extensions.StandardDeviationAccumulate( + conversion_key_pair=("value", "value/accumulated"), trigger=trigger, distributed=True + ) + check_accumulate_extension( + extension=extension, + expected_fn=pstdev, + value_list=case["value"], + trigger=trigger, + distributed=True, + ) + +@pytest.mark.mpi +@pytest.mark.gpu +@pytest.mark.parametrize( + "case", + [ + epoch_trigger_case, + float_epoch_trigger_case, + epoch_iteration_trigger_case, + primary_iteration_trigger_case, + ], +) +def test_unbiased_standard_deviation_accumulate_distributed(case: Dict[str, Any]): + _init_distributed(use_cuda=True) + trigger = get_trigger(case["trigger"]) + extension = ppe.training.extensions.UnbiasedStandardDeviationAccumulate( + conversion_key_pair=("value", "value/accumulated"), trigger=trigger, distributed=True + ) + check_accumulate_extension( + extension=extension, + expected_fn=stdev, + value_list=case["value"], + trigger=trigger, + allow_nan=True, + distributed=True, + ) + +@pytest.mark.mpi +@pytest.mark.gpu +@pytest.mark.parametrize( + "case", + [ + epoch_trigger_case, + float_epoch_trigger_case, + epoch_iteration_trigger_case, + primary_iteration_trigger_case, + ], +) +def test_min_accumulate_distributed(case: Dict[str, Any]): + _init_distributed(use_cuda=True) + trigger = get_trigger(case["trigger"]) + extension = ppe.training.extensions.MinAccumulate( + conversion_key_pair=("value", "value/accumulated"), trigger=trigger, distributed=True + ) + check_accumulate_extension( + extension=extension, + expected_fn=min, + value_list=case["value"], + trigger=trigger, + distributed=True, + ) + +@pytest.mark.mpi +@pytest.mark.gpu +@pytest.mark.parametrize( + "case", + [ + epoch_trigger_case, + float_epoch_trigger_case, + epoch_iteration_trigger_case, + primary_iteration_trigger_case, + ], +) +def test_max_accumulate_distributed(case: Dict[str, Any]): + _init_distributed(use_cuda=True) + trigger = get_trigger(case["trigger"]) + extension = ppe.training.extensions.MaxAccumulate( + conversion_key_pair=("value", "value/accumulated"), trigger=trigger, distributed=True + ) + check_accumulate_extension( + extension=extension, + expected_fn=max, + value_list=case["value"], + trigger=trigger, + distributed=True, + ) def check_accumulate_extension_with_log_report( extension: AccumulateBase, @@ -190,6 +337,7 @@ def check_accumulate_extension_with_log_report( value_list: List[float], trigger: Trigger, allow_nan: bool = False, + distributed: bool = False, ) -> None: with tempfile.TemporaryDirectory() as tmp_dir: manager = ppe.training.ExtensionsManager( @@ -226,20 +374,25 @@ def check_accumulate_extension_with_log_report( assert "value/accumulated" in last_value accumulated_value = last_value["value/accumulated"] try: - expcted_value = expected_fn(epoch_value_list) + if distributed: + world_size = torch.distributed.get_world_size() + epoch_value_list_list = [None] * world_size + torch.distributed.all_gather_object(epoch_value_list_list, epoch_value_list) + epoch_value_list = sum(epoch_value_list_list, []) + expected_value = expected_fn(epoch_value_list) except Exception: if allow_nan: - expcted_value = float("nan") + expected_value = float("nan") else: raise RuntimeError epoch_value_list.clear() assert ( allow_nan and isnan(accumulated_value) - and isnan(expcted_value) + and isnan(expected_value) ) or isclose( accumulated_value, - expcted_value, + expected_value, rel_tol=1e-9, abs_tol=1e-6, ) @@ -356,3 +509,135 @@ def test_max_accumulate_with_log_report(case: Dict[str, Any]): value_list=case["value"], trigger=trigger, ) + +@pytest.mark.mpi +@pytest.mark.gpu +@pytest.mark.parametrize( + "case", + [ + epoch_trigger_case, + float_epoch_trigger_case, + epoch_iteration_trigger_case, + primary_iteration_trigger_case, + ], +) +def test_average_accumulate_with_log_report_distributed(case: Dict[str, Any]): + _init_distributed(use_cuda=True) + trigger = get_trigger(case["trigger"]) + extension = ppe.training.extensions.AverageAccumulate( + conversion_key_pair=("value", "value/accumulated"), trigger=trigger, distributed=True + ) + check_accumulate_extension_with_log_report( + extension=extension, + expected_fn=mean, + value_list=case["value"], + trigger=trigger, + distributed=True, + ) + + +@pytest.mark.mpi +@pytest.mark.gpu +@pytest.mark.parametrize( + "case", + [ + epoch_trigger_case, + float_epoch_trigger_case, + epoch_iteration_trigger_case, + primary_iteration_trigger_case, + ], +) +def test_standard_deviation_accumulate_with_log_report_distributed(case: Dict[str, Any]): + _init_distributed(use_cuda=True) + trigger = get_trigger(case["trigger"]) + extension = ppe.training.extensions.StandardDeviationAccumulate( + conversion_key_pair=("value", "value/accumulated"), trigger=trigger, distributed=True + ) + check_accumulate_extension_with_log_report( + extension=extension, + expected_fn=pstdev, + value_list=case["value"], + trigger=trigger, + distributed=True, + ) + + +@pytest.mark.mpi +@pytest.mark.gpu +@pytest.mark.parametrize( + "case", + [ + epoch_trigger_case, + float_epoch_trigger_case, + epoch_iteration_trigger_case, + primary_iteration_trigger_case, + ], +) +def test_unbiased_standard_deviation_accumulate_with_log_report_distributed( + case: Dict[str, Any] +): + _init_distributed(use_cuda=True) + trigger = get_trigger(case["trigger"]) + extension = ppe.training.extensions.UnbiasedStandardDeviationAccumulate( + conversion_key_pair=("value", "value/accumulated"), trigger=trigger, distributed=True + ) + check_accumulate_extension_with_log_report( + extension=extension, + expected_fn=stdev, + value_list=case["value"], + trigger=trigger, + allow_nan=True, + distributed=True, + ) + + +@pytest.mark.mpi +@pytest.mark.gpu +@pytest.mark.parametrize( + "case", + [ + epoch_trigger_case, + float_epoch_trigger_case, + epoch_iteration_trigger_case, + primary_iteration_trigger_case, + ], +) +def test_min_accumulate_with_log_report_distributed(case: Dict[str, Any]): + _init_distributed(use_cuda=True) + trigger = get_trigger(case["trigger"]) + extension = ppe.training.extensions.MinAccumulate( + conversion_key_pair=("value", "value/accumulated"), trigger=trigger, distributed=True + ) + check_accumulate_extension_with_log_report( + extension=extension, + expected_fn=min, + value_list=case["value"], + trigger=trigger, + distributed=True, + ) + + +@pytest.mark.mpi +@pytest.mark.gpu +@pytest.mark.parametrize( + "case", + [ + epoch_trigger_case, + float_epoch_trigger_case, + epoch_iteration_trigger_case, + primary_iteration_trigger_case, + ], +) +def test_max_accumulate_with_log_report_distributed(case: Dict[str, Any]): + _init_distributed(use_cuda=True) + trigger = get_trigger(case["trigger"]) + extension = ppe.training.extensions.MaxAccumulate( + conversion_key_pair=("value", "value/accumulated"), trigger=trigger, distributed=True + ) + check_accumulate_extension_with_log_report( + extension=extension, + expected_fn=max, + value_list=case["value"], + trigger=trigger, + distributed=True, + )