diff --git a/.azure/gpu-tests.yml b/.azure/gpu-tests.yml index b5dbd9e3340c7..8e8e2edb91d85 100644 --- a/.azure/gpu-tests.yml +++ b/.azure/gpu-tests.yml @@ -116,6 +116,15 @@ jobs: timeoutInMinutes: "35" condition: eq(variables['continue'], '1') + - bash: bash run_standalone_tasks.sh + workingDirectory: tests/tests_pytorch + env: + PL_USE_MOCKED_MNIST: "1" + PL_RUN_CUDA_TESTS: "1" + displayName: 'Testing: PyTorch standalone tasks' + timeoutInMinutes: "10" + condition: eq(variables['continue'], '1') + - bash: | python -m coverage report python -m coverage xml diff --git a/.circleci/config.yml b/.circleci/config.yml index 91d57cd707b97..7ac10195c75a9 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -81,6 +81,8 @@ references: job_name=$(jsonnet -J ml-testing-accelerators/ dockers/tpu-tests/tpu_test_cases.jsonnet | kubectl create -f -) && \ job_name=${job_name#job.batch/} job_name=${job_name% created} + pod_name=$(kubectl get po -l controller-uid=`kubectl get job $job_name -o "jsonpath={.metadata.labels.controller-uid}"` | awk 'match($0,!/NAME/) {print $1}') + echo "GKE pod name: $pod_name" echo "Waiting on kubernetes job: $job_name" i=0 && \ # N checks spaced 30s apart = 900s total. @@ -92,8 +94,6 @@ references: printf "Waiting for job to finish: " && \ while [ $i -lt $MAX_CHECKS ]; do ((i++)); if kubectl get jobs $job_name -o jsonpath='Failed:{.status.failed}' | grep "Failed:1"; then status_code=1 && break; elif kubectl get jobs $job_name -o jsonpath='Succeeded:{.status.succeeded}' | grep "Succeeded:1" ; then status_code=0 && break; else printf "."; fi; sleep $CHECK_SPEEP; done && \ echo "Done waiting. Job status code: $status_code" && \ - pod_name=$(kubectl get po -l controller-uid=`kubectl get job $job_name -o "jsonpath={.metadata.labels.controller-uid}"` | awk 'match($0,!/NAME/) {print $1}') && \ - echo "GKE pod name: $pod_name" && \ kubectl logs -f $pod_name --container=train > /tmp/full_output.txt if grep -q '' /tmp/full_output.txt ; then csplit /tmp/full_output.txt '//'; else mv /tmp/full_output.txt xx00; fi && \ # First portion is the test logs. Print these to Github Action stdout. @@ -106,10 +106,6 @@ references: name: Statistics command: | mv ./xx01 coverage.xml - # TODO: add human readable report - cat coverage.xml - sudo pip install pycobertura - pycobertura show coverage.xml jobs: @@ -119,7 +115,7 @@ jobs: environment: - XLA_VER: 1.9 - PYTHON_VER: 3.7 - - MAX_CHECKS: 240 + - MAX_CHECKS: 1000 - CHECK_SPEEP: 5 steps: - checkout diff --git a/dockers/tpu-tests/tpu_test_cases.jsonnet b/dockers/tpu-tests/tpu_test_cases.jsonnet index e3f5f1d98802a..18a0c894c31a2 100644 --- a/dockers/tpu-tests/tpu_test_cases.jsonnet +++ b/dockers/tpu-tests/tpu_test_cases.jsonnet @@ -8,7 +8,7 @@ local tputests = base.BaseTest { mode: 'postsubmit', configMaps: [], - timeout: 1200, # 20 minutes, in seconds. + timeout: 6000, # 100 minutes, in seconds. image: 'pytorchlightning/pytorch_lightning', imageTag: 'base-xla-py{PYTHON_VERSION}-torch{PYTORCH_VERSION}', @@ -34,16 +34,11 @@ local tputests = base.BaseTest { pip install -e .[test] echo $KUBE_GOOGLE_CLOUD_TPU_ENDPOINTS export XRT_TPU_CONFIG="tpu_worker;0;${KUBE_GOOGLE_CLOUD_TPU_ENDPOINTS:7}" + export PL_RUN_TPU_TESTS=1 cd tests/tests_pytorch - echo $PWD - # TODO (@kaushikb11): Add device stats tests here - coverage run --source pytorch_lightning -m pytest -v --capture=no \ - strategies/test_tpu_spawn.py \ - profilers/test_xla_profiler.py \ - accelerators/test_tpu.py \ - models/test_tpu.py \ - plugins/environments/test_xla_environment.py \ - utilities/test_xla_device_utils.py + coverage run --source=pytorch_lightning -m pytest -vv --durations=0 ./ + echo "\n||| Running standalone tests |||\n" + bash run_standalone_tests.sh -b 1 test_exit_code=$? echo "\n||| END PYTEST LOGS |||\n" coverage xml diff --git a/src/pytorch_lightning/plugins/training_type/single_tpu.py b/src/pytorch_lightning/plugins/training_type/single_tpu.py index 51713fa4f0ee2..5d305a51c497a 100644 --- a/src/pytorch_lightning/plugins/training_type/single_tpu.py +++ b/src/pytorch_lightning/plugins/training_type/single_tpu.py @@ -18,7 +18,7 @@ class SingleTPUPlugin(SingleTPUStrategy): def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def] rank_zero_deprecation( - "The `pl.plugins.training_type.single_tpu.SingleTPUPlugin` is deprecated in v1.6 and will be removed in." + "The `pl.plugins.training_type.single_tpu.SingleTPUPlugin` is deprecated in v1.6 and will be removed in" " v1.8. Use `pl.strategies.single_tpu.SingleTPUStrategy` instead." ) super().__init__(*args, **kwargs) diff --git a/src/pytorch_lightning/strategies/launchers/xla.py b/src/pytorch_lightning/strategies/launchers/xla.py index 699f92bed72e7..037ec027bfd7d 100644 --- a/src/pytorch_lightning/strategies/launchers/xla.py +++ b/src/pytorch_lightning/strategies/launchers/xla.py @@ -13,10 +13,12 @@ # limitations under the License. import os import time +from functools import wraps from multiprocessing.queues import SimpleQueue -from typing import Any, Callable, Optional, TYPE_CHECKING +from typing import Any, Callable, Optional, Tuple, TYPE_CHECKING import torch.multiprocessing as mp +from torch.multiprocessing import ProcessContext import pytorch_lightning as pl from pytorch_lightning.strategies.launchers.multiprocessing import _FakeQueue, _MultiProcessingLauncher, _WorkerOutput @@ -26,9 +28,10 @@ from pytorch_lightning.utilities.rank_zero import rank_zero_debug if _TPU_AVAILABLE: + import torch_xla.core.xla_model as xm import torch_xla.distributed.xla_multiprocessing as xmp else: - xm, xmp, MpDeviceLoader, rendezvous = [None] * 4 + xm, xmp = None, None if TYPE_CHECKING: from pytorch_lightning.strategies import Strategy @@ -72,7 +75,7 @@ def launch(self, function: Callable, *args: Any, trainer: Optional["pl.Trainer"] """ context = mp.get_context(self._start_method) return_queue = context.SimpleQueue() - xmp.spawn( + _save_spawn( self._wrapping_function, args=(trainer, function, args, kwargs, return_queue), nprocs=len(self._strategy.parallel_devices), @@ -103,14 +106,6 @@ def _wrapping_function( if self._strategy.local_rank == 0: return_queue.put(move_data_to_device(results, "cpu")) - # https://github.com/pytorch/xla/issues/1801#issuecomment-602799542 - self._strategy.barrier("end-process") - - # Ensure that the rank 0 process is the one exiting last - # https://github.com/pytorch/xla/issues/2190#issuecomment-641665358 - if self._strategy.local_rank == 0: - time.sleep(2) - def _collect_rank_zero_results(self, trainer: "pl.Trainer", results: Any) -> Optional["_WorkerOutput"]: rank_zero_debug("Collecting results from rank 0 process.") checkpoint_callback = trainer.checkpoint_callback @@ -138,3 +133,30 @@ def _collect_rank_zero_results(self, trainer: "pl.Trainer", results: Any) -> Opt self.add_to_queue(trainer, extra) return _WorkerOutput(best_model_path, weights_path, trainer.state, results, extra) + + +def _save_spawn( + fn: Callable, + args: Tuple = (), + nprocs: Optional[int] = None, + join: bool = True, + daemon: bool = False, + start_method: str = "spawn", +) -> Optional[ProcessContext]: + """Wraps the :func:`torch_xla.distributed.xla_multiprocessing.spawn` with added teardown logic for the worker + processes.""" + + @wraps(fn) + def wrapped(rank: int, *_args: Any) -> None: + fn(rank, *_args) + + # Make all processes wait for each other before joining + # https://github.com/pytorch/xla/issues/1801#issuecomment-602799542 + xm.rendezvous("end-process") + + # Ensure that the rank 0 process is the one exiting last + # https://github.com/pytorch/xla/issues/2190#issuecomment-641665358 + if rank == 0: + time.sleep(1) + + return xmp.spawn(wrapped, args=args, nprocs=nprocs, join=join, daemon=daemon, start_method=start_method) diff --git a/src/pytorch_lightning/strategies/tpu_spawn.py b/src/pytorch_lightning/strategies/tpu_spawn.py index f4953a9f64baa..2d474fafe51b1 100644 --- a/src/pytorch_lightning/strategies/tpu_spawn.py +++ b/src/pytorch_lightning/strategies/tpu_spawn.py @@ -74,6 +74,7 @@ def __init__( start_method="fork", ) self.debug = debug + self._launched = False @property def checkpoint_io(self) -> CheckpointIO: @@ -90,6 +91,8 @@ def checkpoint_io(self, io: Optional[CheckpointIO]) -> None: @property def root_device(self) -> torch.device: + if not self._launched: + raise RuntimeError("Accessing the XLA device before processes have spawned is not allowed.") return xm.xla_device() @staticmethod @@ -130,7 +133,7 @@ def setup(self, trainer: "pl.Trainer") -> None: self.accelerator.setup(trainer) if self.debug: - os.environ["PT_XLA_DEBUG"] = str(1) + os.environ["PT_XLA_DEBUG"] = "1" shared_params = find_shared_parameters(self.model) self.model_to_device() @@ -150,8 +153,8 @@ def distributed_sampler_kwargs(self) -> Dict[str, int]: @property def is_distributed(self) -> bool: - # HOST_WORLD_SIZE is None outside the xmp.spawn process - return os.getenv(xenv.HOST_WORLD_SIZE, None) and self.world_size != 1 + # HOST_WORLD_SIZE is not set outside the xmp.spawn process + return (xenv.HOST_WORLD_SIZE in os.environ) and self.world_size != 1 def process_dataloader(self, dataloader: DataLoader) -> MpDeviceLoader: TPUSpawnStrategy._validate_dataloader(dataloader) @@ -189,8 +192,9 @@ def reduce(self, output, group: Optional[Any] = None, reduce_op: Optional[Union[ invalid_reduce_op = isinstance(reduce_op, ReduceOp) and reduce_op != ReduceOp.SUM invalid_reduce_op_str = isinstance(reduce_op, str) and reduce_op.lower() not in ("sum", "mean", "avg") if invalid_reduce_op or invalid_reduce_op_str: - raise MisconfigurationException( - "Currently, TPUSpawn Strategy only support `sum`, `mean`, `avg` reduce operation." + raise ValueError( + "Currently, the TPUSpawnStrategy only supports `sum`, `mean`, `avg` for the reduce operation, got:" + f" {reduce_op}" ) output = xm.mesh_reduce("reduce", output, sum) @@ -201,6 +205,7 @@ def reduce(self, output, group: Optional[Any] = None, reduce_op: Optional[Union[ return output def _worker_setup(self, process_idx: int): + self._launched = True reset_seed() self.set_world_ranks(process_idx) rank_zero_only.rank = self.global_rank diff --git a/tests/tests_pytorch/accelerators/test_accelerator_connector.py b/tests/tests_pytorch/accelerators/test_accelerator_connector.py index 06f088e87ea4d..dc53fb5e36588 100644 --- a/tests/tests_pytorch/accelerators/test_accelerator_connector.py +++ b/tests/tests_pytorch/accelerators/test_accelerator_connector.py @@ -671,7 +671,7 @@ def test_devices_auto_choice_mps(): @pytest.mark.parametrize( ["parallel_devices", "accelerator"], - [([torch.device("cpu")], "cuda"), ([torch.device("cuda", i) for i in range(8)], ("tpu"))], + [([torch.device("cpu")], "cuda"), ([torch.device("cuda", i) for i in range(8)], "tpu")], ) def test_parallel_devices_in_strategy_confilict_with_accelerator(parallel_devices, accelerator): with pytest.raises(MisconfigurationException, match=r"parallel_devices set through"): diff --git a/tests/tests_pytorch/accelerators/test_ipu.py b/tests/tests_pytorch/accelerators/test_ipu.py index 589ec7b29dd5b..248ac0dbb1818 100644 --- a/tests/tests_pytorch/accelerators/test_ipu.py +++ b/tests/tests_pytorch/accelerators/test_ipu.py @@ -602,7 +602,7 @@ def test_strategy_choice_ipu_plugin(tmpdir): @RunIf(ipu=True) -def test_device_type_when_training_plugin_ipu_passed(tmpdir): +def test_device_type_when_ipu_strategy_passed(tmpdir): trainer = Trainer(strategy=IPUStrategy(), accelerator="ipu", devices=8) assert isinstance(trainer.strategy, IPUStrategy) assert isinstance(trainer.accelerator, IPUAccelerator) diff --git a/tests/tests_pytorch/accelerators/test_tpu.py b/tests/tests_pytorch/accelerators/test_tpu.py index 8e0eb52a9a424..bad6c2801f94f 100644 --- a/tests/tests_pytorch/accelerators/test_tpu.py +++ b/tests/tests_pytorch/accelerators/test_tpu.py @@ -28,7 +28,6 @@ from pytorch_lightning.strategies import DDPStrategy, TPUSpawnStrategy from pytorch_lightning.utilities import find_shared_parameters from tests_pytorch.helpers.runif import RunIf -from tests_pytorch.helpers.utils import pl_multi_process_test class WeightSharingModule(BoringModel): @@ -46,8 +45,7 @@ def forward(self, x): return x -@RunIf(tpu=True) -@pl_multi_process_test +@RunIf(tpu=True, standalone=True) def test_resume_training_on_cpu(tmpdir): """Checks if training can be resumed from a saved checkpoint on CPU.""" # Train a model on TPU @@ -65,11 +63,9 @@ def test_resume_training_on_cpu(tmpdir): # Verify that training is resumed on CPU trainer = Trainer(max_epochs=1, default_root_dir=tmpdir) trainer.fit(model, ckpt_path=model_path) - assert trainer.state.finished, f"Training failed with {trainer.state}" @RunIf(tpu=True) -@pl_multi_process_test def test_if_test_works_after_train(tmpdir): """Ensure that .test() works after .fit()""" @@ -293,12 +289,14 @@ def test_xla_checkpoint_plugin_being_default(): assert isinstance(trainer.strategy.checkpoint_io, XLACheckpointIO) -@RunIf(tpu=True) -@patch("pytorch_lightning.strategies.tpu_spawn.xm") -def test_mp_device_dataloader_attribute(_): +@patch("pytorch_lightning.strategies.tpu_spawn.MpDeviceLoader") +@patch("pytorch_lightning.strategies.tpu_spawn.TPUSpawnStrategy.root_device") +def test_mp_device_dataloader_attribute(root_device_mock, mp_loader_mock): dataset = RandomDataset(32, 64) - dataloader = TPUSpawnStrategy().process_dataloader(DataLoader(dataset)) - assert dataloader.dataset == dataset + dataloader = DataLoader(dataset) + processed_dataloader = TPUSpawnStrategy().process_dataloader(dataloader) + mp_loader_mock.assert_called_with(dataloader, root_device_mock) + assert processed_dataloader.dataset == processed_dataloader._loader.dataset @RunIf(tpu=True) @@ -307,8 +305,7 @@ def test_warning_if_tpus_not_used(): Trainer() -@RunIf(tpu=True) -@pl_multi_process_test +@RunIf(tpu=True, standalone=True) @pytest.mark.parametrize( ["devices", "expected_device_ids"], [ diff --git a/tests/tests_pytorch/callbacks/test_device_stats_monitor.py b/tests/tests_pytorch/callbacks/test_device_stats_monitor.py index 0da6e5c32b9c4..2a2bae8a2e5a4 100644 --- a/tests/tests_pytorch/callbacks/test_device_stats_monitor.py +++ b/tests/tests_pytorch/callbacks/test_device_stats_monitor.py @@ -96,7 +96,6 @@ def log_metrics(self, metrics: Dict[str, float], step: Optional[int] = None) -> assert cpu_stats_mock.call_count == expected -@pytest.mark.skipif(True, reason="TODO (@kaushikb11): fix this test, timeout") @RunIf(tpu=True) def test_device_stats_monitor_tpu(tmpdir): """Test TPU stats are logged using a logger.""" @@ -106,24 +105,23 @@ def test_device_stats_monitor_tpu(tmpdir): class DebugLogger(CSVLogger): @rank_zero_only - def log_metrics(self, metrics: Dict[str, float], step: Optional[int] = None) -> None: + def log_metrics(self, metrics, step=None) -> None: fields = ["avg. free memory (MB)", "avg. peak memory (MB)"] for f in fields: assert any(f in h for h in metrics) trainer = Trainer( default_root_dir=tmpdir, - max_epochs=1, - limit_train_batches=2, + max_epochs=2, + limit_train_batches=5, accelerator="tpu", - devices=1, + devices=8, log_every_n_steps=1, callbacks=[device_stats], logger=DebugLogger(tmpdir), enable_checkpointing=False, enable_progress_bar=False, ) - trainer.fit(model) @@ -146,7 +144,7 @@ def test_device_stats_monitor_no_logger(tmpdir): trainer.fit(model) -def test_prefix_metric_keys(tmpdir): +def test_prefix_metric_keys(): """Test that metric key names are converted correctly.""" metrics = {"1": 1.0, "2": 2.0, "3": 3.0} prefix = "foo" diff --git a/tests/tests_pytorch/conftest.py b/tests/tests_pytorch/conftest.py index e41a236486a93..745067cc2f9f1 100644 --- a/tests/tests_pytorch/conftest.py +++ b/tests/tests_pytorch/conftest.py @@ -180,6 +180,7 @@ def pytest_collection_modifyitems(items: List[pytest.Function], config: pytest.C min_cuda_gpus="PL_RUN_CUDA_TESTS", slow="PL_RUN_SLOW_TESTS", ipu="PL_RUN_IPU_TESTS", + tpu="PL_RUN_TPU_TESTS", ) if os.getenv(options["standalone"], "0") == "1" and os.getenv(options["min_cuda_gpus"], "0") == "1": # special case: we don't have a CPU job for standalone tests, so we shouldn't run only cuda tests. diff --git a/tests/tests_pytorch/deprecated_api/test_remove_1-8.py b/tests/tests_pytorch/deprecated_api/test_remove_1-8.py index 12aca123eacc1..6da335383e11e 100644 --- a/tests/tests_pytorch/deprecated_api/test_remove_1-8.py +++ b/tests/tests_pytorch/deprecated_api/test_remove_1-8.py @@ -360,12 +360,10 @@ def test_v1_8_0_deprecated_single_device_plugin_class(): SingleDevicePlugin("cpu") -@RunIf(tpu=True) +@RunIf(tpu=True, standalone=True) def test_v1_8_0_deprecated_single_tpu_plugin_class(): with pytest.deprecated_call( - match=( - "SingleTPUPlugin` is deprecated in v1.6 and will be removed in v1.8." " Use `.*SingleTPUStrategy` instead." - ) + match="SingleTPUPlugin` is deprecated in v1.6 and will be removed in v1.8. Use `.*SingleTPUStrategy` instead." ): SingleTPUPlugin(0) diff --git a/tests/tests_pytorch/helpers/runif.py b/tests/tests_pytorch/helpers/runif.py index d8e38e7101fe0..abbca75f626ad 100644 --- a/tests/tests_pytorch/helpers/runif.py +++ b/tests/tests_pytorch/helpers/runif.py @@ -177,6 +177,8 @@ def __new__( if tpu: conditions.append(not _TPU_AVAILABLE) reasons.append("TPU") + # used in conftest.py::pytest_collection_modifyitems + kwargs["tpu"] = True if ipu: conditions.append(not _IPU_AVAILABLE) diff --git a/tests/tests_pytorch/helpers/utils.py b/tests/tests_pytorch/helpers/utils.py index 6da53e7b54b20..a9efd7f178f2b 100644 --- a/tests/tests_pytorch/helpers/utils.py +++ b/tests/tests_pytorch/helpers/utils.py @@ -11,10 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import functools import os import re -import traceback from contextlib import contextmanager from typing import Optional, Type @@ -80,40 +78,6 @@ def init_checkpoint_callback(logger): return checkpoint -def pl_multi_process_test(func): - """Wrapper for running multi-processing tests_pytorch.""" - - @functools.wraps(func) - def wrapper(*args, **kwargs): - - from multiprocessing import Process, Queue - - queue = Queue() - - def inner_f(queue, **kwargs): - try: - func(**kwargs) - queue.put(1) - except Exception: - _trace = traceback.format_exc() - print(_trace) - # code 17 means RuntimeError: tensorflow/compiler/xla/xla_client/mesh_service.cc:364 : - # Failed to meet rendezvous 'torch_xla.core.xla_model.save': Socket closed (14) - if "terminated with exit code 17" in _trace: - queue.put(1) - else: - queue.put(-1) - - proc = Process(target=inner_f, args=(queue,), kwargs=kwargs) - proc.start() - proc.join() - - result = queue.get() - assert result == 1, "expected 1, but returned %s" % result - - return wrapper - - @contextmanager def no_warning_call(expected_warning: Type[Warning] = UserWarning, match: Optional[str] = None): with pytest.warns(None) as record: diff --git a/tests/tests_pytorch/lite/test_lite.py b/tests/tests_pytorch/lite/test_lite.py index 3652613526549..ca2b06b6d695b 100644 --- a/tests/tests_pytorch/lite/test_lite.py +++ b/tests/tests_pytorch/lite/test_lite.py @@ -318,31 +318,35 @@ def test_setup_dataloaders_replace_standard_sampler(shuffle, strategy): ("cpu", "cpu"), pytest.param("cuda", "cuda:0", marks=RunIf(min_cuda_gpus=1)), pytest.param("gpu", "cuda:0", marks=RunIf(min_cuda_gpus=1)), - pytest.param("tpu", "xla:0", marks=RunIf(tpu=True)), + pytest.param("tpu", "xla:0", marks=RunIf(tpu=True, standalone=True)), pytest.param("mps", "mps:0", marks=RunIf(mps=True)), pytest.param("gpu", "mps:0", marks=RunIf(mps=True)), ], ) def test_to_device(accelerator, expected): """Test that the to_device method can move various objects to the device determined by the accelerator.""" - lite = EmptyLite(accelerator=accelerator, devices=1) - expected_device = torch.device(expected) + class Lite(LightningLite): + def run(self): + expected_device = torch.device(expected) + + # module + module = torch.nn.Linear(2, 3) + module = lite.to_device(module) + assert all(param.device == expected_device for param in module.parameters()) - # module - module = torch.nn.Linear(2, 3) - module = lite.to_device(module) - assert all(param.device == expected_device for param in module.parameters()) + # tensor + tensor = torch.rand(2, 2) + tensor = lite.to_device(tensor) + assert tensor.device == expected_device - # tensor - tensor = torch.rand(2, 2) - tensor = lite.to_device(tensor) - assert tensor.device == expected_device + # collection + collection = {"data": torch.rand(2, 2), "int": 1} + collection = lite.to_device(collection) + assert collection["data"].device == expected_device - # collection - collection = {"data": torch.rand(2, 2), "int": 1} - collection = lite.to_device(collection) - assert collection["data"].device == expected_device + lite = Lite(accelerator=accelerator, devices=1) + lite.run() def test_rank_properties(): diff --git a/tests/tests_pytorch/models/test_horovod.py b/tests/tests_pytorch/models/test_horovod.py index 244a3e3d885c3..6cd354ef22cfe 100644 --- a/tests/tests_pytorch/models/test_horovod.py +++ b/tests/tests_pytorch/models/test_horovod.py @@ -390,7 +390,7 @@ def _compute_batch(): trainer = Trainer(fast_dev_run=True, strategy="horovod", logger=False) assert isinstance(trainer.accelerator, CPUAccelerator) - # TODO: test that we selected the correct training_type_plugin based on horovod flags + # TODO: test that we selected the correct strategy based on horovod flags metric = Accuracy( compute_on_step=True, diff --git a/tests/tests_pytorch/models/test_tpu.py b/tests/tests_pytorch/models/test_tpu.py index b6829d444701d..a41ba7429c0e9 100644 --- a/tests/tests_pytorch/models/test_tpu.py +++ b/tests/tests_pytorch/models/test_tpu.py @@ -26,18 +26,15 @@ from pytorch_lightning.callbacks import EarlyStopping from pytorch_lightning.demos.boring_classes import BoringModel, RandomDataset from pytorch_lightning.strategies import TPUSpawnStrategy +from pytorch_lightning.strategies.launchers.xla import _save_spawn from pytorch_lightning.trainer.connectors.logger_connector.result import _Sync from pytorch_lightning.utilities import _TPU_AVAILABLE from pytorch_lightning.utilities.distributed import ReduceOp from pytorch_lightning.utilities.exceptions import MisconfigurationException from tests_pytorch.helpers.runif import RunIf -from tests_pytorch.helpers.utils import pl_multi_process_test if _TPU_AVAILABLE: import torch_xla - import torch_xla.distributed.xla_multiprocessing as xmp - - SERIAL_EXEC = xmp.MpSerialExecutor() class SerialLoaderBoringModel(BoringModel): @@ -48,8 +45,7 @@ def val_dataloader(self): return DataLoader(RandomDataset(32, 2000), batch_size=32) -@RunIf(tpu=True) -@pl_multi_process_test +@RunIf(tpu=True, standalone=True) def test_model_tpu_devices_1(tmpdir): """Make sure model trains on TPU.""" tutils.reset_seed() @@ -68,8 +64,7 @@ def test_model_tpu_devices_1(tmpdir): @pytest.mark.parametrize("tpu_core", [1, 5]) -@RunIf(tpu=True) -@pl_multi_process_test +@RunIf(tpu=True, standalone=True) def test_model_tpu_index(tmpdir, tpu_core): """Make sure model trains on TPU.""" tutils.reset_seed() @@ -89,7 +84,6 @@ def test_model_tpu_index(tmpdir, tpu_core): @RunIf(tpu=True) -@pl_multi_process_test def test_model_tpu_devices_8(tmpdir): """Make sure model trains on TPU.""" tutils.reset_seed() @@ -108,8 +102,7 @@ def test_model_tpu_devices_8(tmpdir): tpipes.run_model_test(trainer_options, model, with_hpc=False, min_acc=0.05) -@RunIf(tpu=True) -@pl_multi_process_test +@RunIf(tpu=True, standalone=True) def test_model_16bit_tpu_devices_1(tmpdir): """Make sure model trains on TPU.""" tutils.reset_seed() @@ -129,8 +122,7 @@ def test_model_16bit_tpu_devices_1(tmpdir): @pytest.mark.parametrize("tpu_core", [1, 5]) -@RunIf(tpu=True) -@pl_multi_process_test +@RunIf(tpu=True, standalone=True) def test_model_16bit_tpu_index(tmpdir, tpu_core): """Make sure model trains on TPU.""" tutils.reset_seed() @@ -151,7 +143,6 @@ def test_model_16bit_tpu_index(tmpdir, tpu_core): @RunIf(tpu=True) -@pl_multi_process_test def test_model_16bit_tpu_devices_8(tmpdir): """Make sure model trains on TPU.""" tutils.reset_seed() @@ -172,7 +163,6 @@ def test_model_16bit_tpu_devices_8(tmpdir): @RunIf(tpu=True) -@pl_multi_process_test def test_model_tpu_early_stop(tmpdir): """Test if single TPU core training works.""" @@ -198,8 +188,7 @@ def validation_step(self, *args, **kwargs): trainer.test(dataloaders=DataLoader(RandomDataset(32, 2000), batch_size=32)) -@RunIf(tpu=True) -@pl_multi_process_test +@RunIf(tpu=True, standalone=True) def test_tpu_grad_norm(tmpdir): """Test if grad_norm works on TPU.""" tutils.reset_seed() @@ -218,8 +207,7 @@ def test_tpu_grad_norm(tmpdir): tpipes.run_model_test(trainer_options, model, with_hpc=False) -@RunIf(tpu=True) -@pl_multi_process_test +@RunIf(tpu=True, standalone=True) def test_tpu_clip_grad_by_value(tmpdir): """Test if clip_gradients by value works on TPU.""" tutils.reset_seed() @@ -240,7 +228,6 @@ def test_tpu_clip_grad_by_value(tmpdir): @RunIf(tpu=True) -@pl_multi_process_test def test_dataloaders_passed_to_fit(tmpdir): """Test if dataloaders passed to trainer works on TPU.""" tutils.reset_seed() @@ -248,7 +235,6 @@ def test_dataloaders_passed_to_fit(tmpdir): trainer = Trainer(default_root_dir=tmpdir, max_epochs=1, accelerator="tpu", devices=8) trainer.fit(model, train_dataloaders=model.train_dataloader(), val_dataloaders=model.val_dataloader()) - assert trainer.state.finished, f"Training failed with {trainer.state}" @RunIf(tpu=True) @@ -267,14 +253,13 @@ def test_exception_when_no_tpu_found(): @pytest.mark.parametrize("tpu_cores", [1, 8, [1]]) -@RunIf(tpu=True) +@RunIf(tpu=True, standalone=True) def test_accelerator_set_when_using_tpu(tpu_cores): """Test if the accelerator is set to `tpu` when tpu_cores is not None.""" assert isinstance(Trainer(accelerator="tpu", devices=tpu_cores).accelerator, TPUAccelerator) @RunIf(tpu=True) -@pl_multi_process_test def test_broadcast_on_tpu(): """Checks if an object from the main process is broadcasted to other processes correctly.""" @@ -282,19 +267,19 @@ def test_broadcast(rank): trainer = Trainer(accelerator="tpu", devices=8) assert isinstance(trainer.accelerator, TPUAccelerator) assert isinstance(trainer.strategy, TPUSpawnStrategy) + trainer.strategy._launched = True obj = ("ver_0.5", "logger_name", rank) result = trainer.strategy.broadcast(obj) assert result == ("ver_0.5", "logger_name", 0) - xmp.spawn(test_broadcast, nprocs=8, start_method="fork") + _save_spawn(test_broadcast, nprocs=8, start_method="fork") @pytest.mark.parametrize( ["cli_args", "expected"], [("--tpu_cores=8", {"tpu_cores": 8}), ("--tpu_cores=1,", {"tpu_cores": "1,"})], ) -@RunIf(tpu=True) -@pl_multi_process_test +@RunIf(tpu=True, standalone=True) def test_tpu_cores_with_argparse(cli_args, expected): """Test passing tpu_cores in command line.""" cli_args = cli_args.split(" ") if cli_args else [] @@ -310,30 +295,31 @@ def test_tpu_cores_with_argparse(cli_args, expected): @RunIf(tpu=True) -@pl_multi_process_test def test_tpu_reduce(): """Test tpu spawn reduce operation.""" def test_reduce(rank): trainer = Trainer(accelerator="tpu", devices=8) - # faster this way - reduce_ops = ["mean", "AVG", "undefined", "sum", ReduceOp.SUM, ReduceOp.MAX] - for reduce_op in reduce_ops: - if reduce_op == "undefined" or reduce_op == ReduceOp.MAX: - with pytest.raises(MisconfigurationException, match="TPUSpawn Strategy only support"): - result = trainer.strategy.reduce(1, reduce_op) - else: - result = trainer.strategy.reduce(1, reduce_op) + trainer.strategy._launched = True + + with pytest.raises(ValueError, match="TPUSpawnStrategy only supports"): + trainer.strategy.reduce(1, reduce_op="undefined") + + with pytest.raises(ValueError, match="TPUSpawnStrategy only supports"): + trainer.strategy.reduce(1, reduce_op=ReduceOp.MAX) + + # it is faster to loop over here than to parameterize the test + for reduce_op in ("mean", "AVG", "sum", ReduceOp.SUM): + result = trainer.strategy.reduce(1, reduce_op=reduce_op) if isinstance(reduce_op, str) and reduce_op.lower() in ("mean", "avg"): assert result.item() == 1 else: assert result.item() == 8 - xmp.spawn(test_reduce, nprocs=8, start_method="fork") + _save_spawn(test_reduce, nprocs=8, start_method="fork") -@RunIf(tpu=True) -@pl_multi_process_test +@RunIf(tpu=True, standalone=True) @pytest.mark.parametrize("clip_val", [10]) @mock.patch("torch.nn.utils.clip_grad_norm_") def test_tpu_precision_16_clip_gradients(mock_clip_grad_norm, clip_val, tmpdir): @@ -363,7 +349,6 @@ def test_tpu_precision_16_clip_gradients(mock_clip_grad_norm, clip_val, tmpdir): @RunIf(tpu=True) -@pl_multi_process_test def test_if_test_works_with_checkpoint_false(tmpdir): """Ensure that model trains properly when `enable_checkpointing` is set to False.""" @@ -382,21 +367,22 @@ def test_if_test_works_with_checkpoint_false(tmpdir): @RunIf(tpu=True) -@pl_multi_process_test def test_tpu_sync_dist(): """Test tpu spawn sync dist operation.""" - def test_sync_dist(_): - sync = _Sync(TPUSpawnStrategy().reduce, should=True, _op=torch.distributed.ReduceOp.SUM) + def test_sync_dist(rank): + trainer = Trainer(accelerator="tpu", devices=8) + trainer.strategy._launched = True + + sync = _Sync(trainer.strategy.reduce, _should=True, _op=torch.distributed.ReduceOp.SUM) value = torch.tensor([1.0]) - value = (sync(value),) + value = sync(value) assert value.item() == 8 - xmp.spawn(test_sync_dist, nprocs=8, start_method="fork") + _save_spawn(test_sync_dist, nprocs=8, start_method="fork") @RunIf(tpu=True) -@pl_multi_process_test def test_tpu_debug_mode(tmpdir): """Test if debug mode works on TPU.""" @@ -424,7 +410,6 @@ def teardown(self, stage): @RunIf(tpu=True) -@pl_multi_process_test def test_tpu_host_world_size(tmpdir): """Test Host World size env setup on TPU.""" @@ -432,9 +417,6 @@ class DebugModel(BoringModel): def on_train_start(self): assert os.environ.get("XRT_HOST_WORLD_SIZE") == str(1) - def teardown(self, stage): - assert "XRT_HOST_WORLD_SIZE" not in os.environ - tutils.reset_seed() trainer_options = dict( default_root_dir=tmpdir, @@ -447,12 +429,13 @@ def teardown(self, stage): ) model = DebugModel() + assert "XRT_HOST_WORLD_SIZE" not in os.environ tpipes.run_model_test(trainer_options, model, with_hpc=False) + assert "XRT_HOST_WORLD_SIZE" not in os.environ @RunIf(tpu=True) -@pl_multi_process_test -def test_device_type_when_training_plugin_tpu_passed(tmpdir): - trainer = Trainer(strategy=TPUSpawnStrategy(), accelerator="tpu", devices=8) +def test_device_type_when_tpu_strategy_passed(tmpdir): + trainer = Trainer(default_root_dir=tmpdir, strategy=TPUSpawnStrategy(), accelerator="tpu", devices=8) assert isinstance(trainer.strategy, TPUSpawnStrategy) assert isinstance(trainer.accelerator, TPUAccelerator) diff --git a/tests/tests_pytorch/plugins/environments/test_xla_environment.py b/tests/tests_pytorch/plugins/environments/test_xla_environment.py index 8c6bae204ed17..ac1f17bc2dde0 100644 --- a/tests/tests_pytorch/plugins/environments/test_xla_environment.py +++ b/tests/tests_pytorch/plugins/environments/test_xla_environment.py @@ -15,6 +15,7 @@ from unittest import mock import pytest +import torch import pytorch_lightning as pl from pytorch_lightning.plugins.environments import XLAEnvironment @@ -23,7 +24,8 @@ @RunIf(tpu=True) @mock.patch.dict(os.environ, {}, clear=True) -def test_default_attributes(): +@mock.patch("torch_xla._XLAC._xla_get_default_device", return_value=torch.device("xla:0")) +def test_default_attributes(*_): """Test the default attributes when no environment variables are set.""" env = XLAEnvironment() assert not env.creates_processes_externally diff --git a/tests/tests_pytorch/profilers/test_xla_profiler.py b/tests/tests_pytorch/profilers/test_xla_profiler.py index 7f5b0ecdd7740..694d978905177 100644 --- a/tests/tests_pytorch/profilers/test_xla_profiler.py +++ b/tests/tests_pytorch/profilers/test_xla_profiler.py @@ -35,7 +35,6 @@ def test_xla_profiler_instance(tmpdir): assert isinstance(trainer.profiler, XLAProfiler) trainer.fit(model) - assert trainer.state.finished, f"Training failed with {trainer.state}" @pytest.mark.skipif(True, reason="XLA Profiler doesn't support Prog. capture yet") diff --git a/tests/tests_pytorch/run_standalone_tasks.sh b/tests/tests_pytorch/run_standalone_tasks.sh new file mode 100644 index 0000000000000..4d433399e5736 --- /dev/null +++ b/tests/tests_pytorch/run_standalone_tasks.sh @@ -0,0 +1,48 @@ +#!/bin/bash +# Copyright The PyTorch Lightning team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +set -e +# THIS FILE ASSUMES IT IS RUN INSIDE THE tests/tests_pytorch DIRECTORY + +report='' + +if nvcc --version; then + nvprof --profile-from-start off -o trace_name.prof -- python -m coverage run --source pytorch_lightning --append -m pytest --no-header profilers/test_profiler.py::test_pytorch_profiler_nested_emit_nvtx +fi + +# needs to run outside of `pytest` +python utilities/test_warnings.py +if [ $? -eq 0 ]; then + report+="Ran\tutilities/test_warnings.py\n" +fi + +# test deadlock is properly handled with TorchElastic. +LOGS=$(PL_RUN_STANDALONE_TESTS=1 PL_RECONCILE_PROCESS=1 python -m torch.distributed.run --nproc_per_node=2 --max_restarts 0 -m coverage run --source pytorch_lightning -a plugins/environments/torch_elastic_deadlock.py | grep "SUCCEEDED") +if [ -z "$LOGS" ]; then + exit 1 +fi +report+="Ran\tplugins/environments/torch_elastic_deadlock.py\n" + +# test that a user can manually launch individual processes +export PYTHONPATH="${PYTHONPATH}:$(pwd)" +args="--trainer.accelerator gpu --trainer.devices 2 --trainer.strategy ddp --trainer.max_epochs=1 --trainer.limit_train_batches=1 --trainer.limit_val_batches=1 --trainer.limit_test_batches=1" +MASTER_ADDR="localhost" MASTER_PORT=1234 LOCAL_RANK=1 python ../../examples/convert_from_pt_to_pl/image_classifier_5_lightning_datamodule.py ${args} & +MASTER_ADDR="localhost" MASTER_PORT=1234 LOCAL_RANK=0 python ../../examples/convert_from_pt_to_pl/image_classifier_5_lightning_datamodule.py ${args} +report+="Ran\tmanual ddp launch test\n" + +# echo test report +printf '=%.s' {1..80} +printf "\n$report" +printf '=%.s' {1..80} +printf '\n' diff --git a/tests/tests_pytorch/run_standalone_tests.sh b/tests/tests_pytorch/run_standalone_tests.sh index 5297cbd033347..55a0d330f6188 100644 --- a/tests/tests_pytorch/run_standalone_tests.sh +++ b/tests/tests_pytorch/run_standalone_tests.sh @@ -93,31 +93,7 @@ done # wait for leftover tests for pid in ${pids[*]}; do wait $pid; done show_batched_output -echo "Batched mode finished. Continuing with the rest of standalone tests." - -if nvcc --version; then - nvprof --profile-from-start off -o trace_name.prof -- python ${defaults} profilers/test_profiler.py::test_pytorch_profiler_nested_emit_nvtx -fi - -# needs to run outside of `pytest` -python utilities/test_warnings.py -if [ $? -eq 0 ]; then - report+="Ran\tutilities/test_warnings.py\n" -fi - -# test deadlock is properly handled with TorchElastic. -LOGS=$(PL_RUN_STANDALONE_TESTS=1 PL_RECONCILE_PROCESS=1 python -m torch.distributed.run --nproc_per_node=2 --max_restarts 0 -m coverage run --source pytorch_lightning -a plugins/environments/torch_elastic_deadlock.py | grep "SUCCEEDED") -if [ -z "$LOGS" ]; then - exit 1 -fi -report+="Ran\tplugins/environments/torch_elastic_deadlock.py\n" - -# test that a user can manually launch individual processes -export PYTHONPATH="${PYTHONPATH}:$(pwd)" -args="--trainer.accelerator gpu --trainer.devices 2 --trainer.strategy ddp --trainer.max_epochs=1 --trainer.limit_train_batches=1 --trainer.limit_val_batches=1 --trainer.limit_test_batches=1" -MASTER_ADDR="localhost" MASTER_PORT=1234 LOCAL_RANK=1 python ../../examples/convert_from_pt_to_pl/image_classifier_5_lightning_datamodule.py ${args} & -MASTER_ADDR="localhost" MASTER_PORT=1234 LOCAL_RANK=0 python ../../examples/convert_from_pt_to_pl/image_classifier_5_lightning_datamodule.py ${args} -report+="Ran\tmanual ddp launch test\n" +echo "Batched mode finished. End of standalone tests." # echo test report printf '=%.s' {1..80} diff --git a/tests/tests_pytorch/strategies/test_tpu_spawn.py b/tests/tests_pytorch/strategies/test_tpu_spawn.py index 246df92c45e46..967e44a42c9de 100644 --- a/tests/tests_pytorch/strategies/test_tpu_spawn.py +++ b/tests/tests_pytorch/strategies/test_tpu_spawn.py @@ -25,7 +25,6 @@ from pytorch_lightning.utilities.exceptions import MisconfigurationException from tests_pytorch.helpers.dataloaders import CustomNotImplementedErrorDataloader from tests_pytorch.helpers.runif import RunIf -from tests_pytorch.helpers.utils import pl_multi_process_test class BoringModelNoDataloaders(BoringModel): @@ -85,18 +84,16 @@ def test_error_process_iterable_dataloader(_): class BoringModelTPU(BoringModel): def on_train_start(self) -> None: + # assert strategy attributes for device setting assert self.device == torch.device("xla", index=1) assert os.environ.get("PT_XLA_DEBUG") == "1" -@RunIf(tpu=True) -@pl_multi_process_test +@RunIf(tpu=True, standalone=True) def test_model_tpu_one_core(): """Tests if device/debug flag is set correctly when training and after teardown for TPUSpawnStrategy.""" + model = BoringModelTPU() trainer = Trainer(accelerator="tpu", devices=1, fast_dev_run=True, strategy=TPUSpawnStrategy(debug=True)) - # assert training strategy attributes for device setting assert isinstance(trainer.strategy, TPUSpawnStrategy) - assert trainer.strategy.root_device == torch.device("xla", index=1) - model = BoringModelTPU() trainer.fit(model) assert "PT_XLA_DEBUG" not in os.environ diff --git a/tests/tests_pytorch/trainer/properties/test_estimated_stepping_batches.py b/tests/tests_pytorch/trainer/properties/test_estimated_stepping_batches.py index 35a8a0a8d5789..8d8fb7f3a8c21 100644 --- a/tests/tests_pytorch/trainer/properties/test_estimated_stepping_batches.py +++ b/tests/tests_pytorch/trainer/properties/test_estimated_stepping_batches.py @@ -14,8 +14,10 @@ import logging from unittest import mock +from unittest.mock import PropertyMock import pytest +import torch from torch.utils.data import DataLoader from pytorch_lightning import Trainer @@ -26,7 +28,6 @@ from pytorch_lightning.utilities.exceptions import MisconfigurationException from tests_pytorch.helpers.datasets import RandomIterableDataset from tests_pytorch.helpers.runif import RunIf -from tests_pytorch.helpers.utils import pl_multi_process_test def test_num_stepping_batches_basic(): @@ -135,16 +136,29 @@ def test_num_stepping_batches_gpu(trainer_kwargs, estimated_steps, monkeypatch): assert trainer.estimated_stepping_batches == estimated_steps +@RunIf(tpu=True, standalone=True) +def test_num_stepping_batches_with_tpu_single(): + """Test stepping batches with the single-core TPU strategy.""" + trainer = Trainer(accelerator="tpu", devices=1, max_epochs=1) + model = BoringModel() + trainer._data_connector.attach_data(model) + trainer.strategy.connect(model) + assert trainer.estimated_stepping_batches == len(model.train_dataloader()) + + @RunIf(tpu=True) -@pl_multi_process_test -@pytest.mark.parametrize("devices,estimated_steps", [([1], 64), (8, 8)]) -def test_num_stepping_batches_with_tpu(devices, estimated_steps): - """Test stepping batches with TPU training which acts like DDP.""" - trainer = Trainer(accelerator="tpu", devices=devices, max_epochs=1) +@mock.patch( + "pytorch_lightning.strategies.tpu_spawn.TPUSpawnStrategy.root_device", + new_callable=PropertyMock, + return_value=torch.device("xla:0"), +) +def test_num_stepping_batches_with_tpu_multi(_): + """Test stepping batches with the TPU strategy across multiple devices.""" + trainer = Trainer(accelerator="tpu", devices=8, max_epochs=1) model = BoringModel() trainer._data_connector.attach_data(model) trainer.strategy.connect(model) - assert trainer.estimated_stepping_batches == estimated_steps + assert trainer.estimated_stepping_batches == len(model.train_dataloader()) // 8 @mock.patch("pytorch_lightning.accelerators.ipu.IPUAccelerator.is_available", return_value=True)