diff --git a/.flake8 b/.flake8 index c10e964ed..290d0ea9a 100644 --- a/.flake8 +++ b/.flake8 @@ -2,4 +2,4 @@ ignore = E226,E302,E41,W391, E701, W291, E722, W503, E128, E126, E127, E731, E401 max-line-length = 160 max-complexity = 25 -exclude = fastMRI/ +exclude = fastMRI/ test_outputs/ diff --git a/CHANGELOG.md b/CHANGELOG.md index 921117d48..6f5a26278 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,8 @@ any large models anymore because data loaders ran out of memory. - ([#472](https://github.com/microsoft/InnerEye-DeepLearning/pull/472)) Correct model path for moving ensemble models. - ([#494](https://github.com/microsoft/InnerEye-DeepLearning/pull/494)) Fix an issue where multi-node jobs for LightningContainer models can get stuck at test set inference. +- ([#498](https://github.com/microsoft/InnerEye-DeepLearning/pull/498)) Workaround for the problem that downloading +multiple large checkpoints can time out. ### Removed diff --git a/InnerEye/Azure/azure_util.py b/InnerEye/Azure/azure_util.py index 48c3bfed2..0f9937ee2 100644 --- a/InnerEye/Azure/azure_util.py +++ b/InnerEye/Azure/azure_util.py @@ -337,34 +337,53 @@ def is_parent_run(run: Run) -> bool: return PARENT_RUN_CONTEXT and run.id == PARENT_RUN_CONTEXT.id -def download_outputs_from_run(blobs_path: Path, - destination: Path, - run: Optional[Run] = None, - is_file: bool = False, - append_prefix: bool = False) -> Path: - """ - Download the blobs from the run's default output directory: DEFAULT_AML_UPLOAD_DIR. - Silently returns for offline runs. - :param blobs_path: Blobs path in DEFAULT_AML_UPLOAD_DIR to download from - :param run: Run to download from (default to current run if None) - :param destination: Local path to save the downloaded blobs to - :param is_file: Set to True if downloading a single file. - :param append_prefix: An optional flag whether to append the specified prefix from the final output file path. - If False then the prefix is removed from the output file path. +def download_run_output_file(blob_path: Path, + destination: Path, + run: Run) -> Path: + """ + Downloads a single file from the run's default output directory: DEFAULT_AML_UPLOAD_DIR ("outputs"). + For example, if blobs_path = "foo/bar.csv", then the run result file "outputs/foo/bar.csv" will be downloaded + to /bar.csv (the directory will be stripped off). + :param blob_path: The name of the file to download. + :param run: The AzureML run to download the files from + :param destination: Local path to save the downloaded blob to. :return: Destination path to the downloaded file(s) """ - run = run or Run.get_context() - blobs_root_path = str(fixed_paths.DEFAULT_AML_UPLOAD_DIR / blobs_path) - if is_file: - destination = destination / blobs_path.name - logging.info(f"Downloading single file from run {run.id}: {blobs_root_path} -> {str(destination)}") - run.download_file(blobs_root_path, str(destination), _validate_checksum=True) - else: - logging.info(f"Downloading multiple files from run {run.id}: {blobs_root_path} -> {str(destination)}") - run.download_files(blobs_root_path, str(destination), append_prefix=append_prefix) + blobs_prefix = str((fixed_paths.DEFAULT_AML_UPLOAD_DIR / blob_path).as_posix()) + destination = destination / blob_path.name + logging.info(f"Downloading single file from run {run.id}: {blobs_prefix} -> {str(destination)}") + run.download_file(blobs_prefix, str(destination), _validate_checksum=True) return destination +def download_run_outputs_by_prefix(blobs_prefix: Path, + destination: Path, + run: Run) -> None: + """ + Download all the blobs from the run's default output directory: DEFAULT_AML_UPLOAD_DIR ("outputs") that + have a given prefix (folder structure). When saving, the prefix string will be stripped off. For example, + if blobs_prefix = "foo", and the run has a file "outputs/foo/bar.csv", it will be downloaded to destination/bar.csv. + If there is in addition a file "foo.txt", that file will be skipped. + :param blobs_prefix: The prefix for all files in "outputs" that should be downloaded. + :param run: The AzureML run to download the files from. + :param destination: Local path to save the downloaded blobs to. + """ + prefix_str = str((fixed_paths.DEFAULT_AML_UPLOAD_DIR / blobs_prefix).as_posix()) + logging.info(f"Downloading multiple files from run {run.id}: {prefix_str} -> {str(destination)}") + # There is a download_files function, but that can time out when downloading several large checkpoints file + # (120sec timeout for all files). + for file in run.get_file_names(): + if file.startswith(prefix_str): + target_path = file[len(prefix_str):] + if target_path.startswith("/"): + target_path = target_path[1:] + logging.info(f"Downloading {file}") + run.download_file(file, str(destination / target_path), _validate_checksum=True) + else: + logging.warning(f"Skipping file {file}, because the desired prefix {prefix_str} is not aligned with " + f"the folder structure") + + def is_running_on_azure_agent() -> bool: """ Returns True if the code appears to be running on an Azure build agent, and False otherwise. @@ -406,8 +425,8 @@ def get_comparison_baseline_paths(outputs_folder: Path, # Look for dataset.csv inside epoch_NNN/Test, epoch_NNN/ and at top level for blob_path_parent in step_up_directories(blob_path): try: - comparison_dataset_path = download_outputs_from_run( - blob_path_parent / dataset_csv_file_name, destination_folder, run, True) + comparison_dataset_path = download_run_output_file( + blob_path_parent / dataset_csv_file_name, destination_folder, run) break except (ValueError, UserErrorException): logging.warning(f"cannot find {dataset_csv_file_name} at {blob_path_parent} in {run_rec_id}") @@ -418,8 +437,8 @@ def get_comparison_baseline_paths(outputs_folder: Path, logging.warning(f"cannot find {dataset_csv_file_name} at or above {blob_path} in {run_rec_id}") # Look for epoch_NNN/Test/metrics.csv try: - comparison_metrics_path = download_outputs_from_run( - blob_path / SUBJECT_METRICS_FILE_NAME, destination_folder, run, True) + comparison_metrics_path = download_run_output_file( + blob_path / SUBJECT_METRICS_FILE_NAME, destination_folder, run) except (ValueError, UserErrorException): logging.warning(f"cannot find {SUBJECT_METRICS_FILE_NAME} at {blob_path} in {run_rec_id}") return (comparison_dataset_path, comparison_metrics_path) diff --git a/InnerEye/Azure/run_pytest.py b/InnerEye/Azure/run_pytest.py index 025364acb..be8ac24f8 100644 --- a/InnerEye/Azure/run_pytest.py +++ b/InnerEye/Azure/run_pytest.py @@ -11,7 +11,7 @@ # Test result file from running pytest inside the AzureML job. This file must have a prefix that # matches the string in the build definition build-pr.yml, in the TrainInAzureML job. -from InnerEye.Azure.azure_util import download_outputs_from_run +from InnerEye.Azure.azure_util import download_run_output_file PYTEST_RESULTS_FILE = Path("test-results-on-azure-ml.xml") @@ -49,10 +49,9 @@ def download_pytest_result(run: Run, destination_folder: Path = Path.cwd()) -> P """ logging.info(f"Downloading pytest result file: {PYTEST_RESULTS_FILE}") try: - return download_outputs_from_run( + return download_run_output_file( PYTEST_RESULTS_FILE, destination=destination_folder, - run=run, - is_file=True) + run=run) except: raise ValueError(f"No pytest result file {PYTEST_RESULTS_FILE} was found for run {run.id}") diff --git a/InnerEye/ML/surface_distance_heatmaps.py b/InnerEye/ML/surface_distance_heatmaps.py index da6fd15a4..4db34a7ed 100644 --- a/InnerEye/ML/surface_distance_heatmaps.py +++ b/InnerEye/ML/surface_distance_heatmaps.py @@ -11,7 +11,7 @@ from InnerEye.Azure.azure_config import AzureConfig from InnerEye.Azure.azure_runner import create_runner_parser, parse_args_and_add_yaml_variables -from InnerEye.Azure.azure_util import download_outputs_from_run +from InnerEye.Azure.azure_util import download_run_outputs_by_prefix from InnerEye.Common.metrics_constants import MetricsFileColumns from InnerEye.ML.common import ModelExecutionMode from InnerEye.ML.config import SegmentationModelBase @@ -67,8 +67,8 @@ def load_predictions(run_type: SurfaceDistanceRunType, azure_config: AzureConfig for (subject_id, structure_name, dice_score, _) in worst_performers: subject_prefix = sd_util.get_subject_prefix(model_config, execution_mode, subject_id) # if not already present, download data for subject - download_outputs_from_run( - blobs_path=subject_prefix, + download_run_outputs_by_prefix( + blobs_prefix=subject_prefix, destination=output_dir, run=first_child_run ) diff --git a/InnerEye/ML/utils/run_recovery.py b/InnerEye/ML/utils/run_recovery.py index bb92cdabe..620f05485 100644 --- a/InnerEye/ML/utils/run_recovery.py +++ b/InnerEye/ML/utils/run_recovery.py @@ -12,7 +12,8 @@ from azureml.core import Run -from InnerEye.Azure.azure_util import RUN_CONTEXT, download_outputs_from_run, fetch_child_runs, tag_values_all_distinct +from InnerEye.Azure.azure_util import RUN_CONTEXT, download_run_output_file, download_run_outputs_by_prefix, \ + fetch_child_runs, tag_values_all_distinct from InnerEye.Common.common_util import OTHER_RUNS_SUBDIR_NAME, check_properties_are_not_none from InnerEye.ML.common import BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX, get_best_checkpoint_path, \ get_recovery_checkpoint_path @@ -52,11 +53,10 @@ def download_best_checkpoints_from_child_runs(config: OutputParams, run: Run) -> else: subdir = str(child.tags[tag_to_use] if can_use_split_indices else child.number) child_dst = config.checkpoint_folder / OTHER_RUNS_SUBDIR_NAME / subdir - download_outputs_from_run( - blobs_path=Path(CHECKPOINT_FOLDER) / BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX, + download_run_output_file( + blob_path=Path(CHECKPOINT_FOLDER) / BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX, destination=child_dst, - run=child, - is_file=True + run=child ) child_runs_checkpoints_roots.append(child_dst) return RunRecovery(checkpoints_roots=child_runs_checkpoints_roots) @@ -82,8 +82,8 @@ def download_all_checkpoints_from_run(config: OutputParams, run: Run, destination_folder = config.checkpoint_folder / subfolder if subfolder else config.checkpoint_folder if not only_return_path: - download_outputs_from_run( - blobs_path=Path(CHECKPOINT_FOLDER), + download_run_outputs_by_prefix( + blobs_prefix=Path(CHECKPOINT_FOLDER), destination=destination_folder, run=run ) diff --git a/InnerEye/ML/visualizers/plot_cross_validation.py b/InnerEye/ML/visualizers/plot_cross_validation.py index 877004816..143d05925 100644 --- a/InnerEye/ML/visualizers/plot_cross_validation.py +++ b/InnerEye/ML/visualizers/plot_cross_validation.py @@ -29,7 +29,7 @@ import InnerEye.Common.Statistics.mann_whitney_test as mann_whitney from InnerEye.Azure.azure_config import AzureConfig -from InnerEye.Azure.azure_util import CROSS_VALIDATION_SPLIT_INDEX_TAG_KEY, download_outputs_from_run, \ +from InnerEye.Azure.azure_util import CROSS_VALIDATION_SPLIT_INDEX_TAG_KEY, download_run_output_file, \ fetch_child_runs, is_offline_run_context, is_parent_run from InnerEye.Common import common_util, fixed_paths from InnerEye.Common.Statistics.wilcoxon_signed_rank_test import WilcoxonTestConfig, wilcoxon_signed_rank_test @@ -232,11 +232,10 @@ def download_or_get_local_file(self, return None else: try: - return download_outputs_from_run( - blobs_path=blob_path, + return download_run_output_file( + blob_path=blob_path, destination=destination, - run=run, - is_file=True + run=run ) except Exception as ex: logging.warning(f"File {blob_to_download} not found in output of run {run.id}: {ex}") diff --git a/Tests/AfterTraining/test_after_training.py b/Tests/AfterTraining/test_after_training.py index 58c2b6320..9b99b6a3b 100644 --- a/Tests/AfterTraining/test_after_training.py +++ b/Tests/AfterTraining/test_after_training.py @@ -9,7 +9,6 @@ All of the tests in this file rely on previous InnerEye runs that submit an AzureML job. They pick up the most recently run AzureML job from most_recent_run.txt """ - import os import shutil import sys @@ -24,13 +23,14 @@ from InnerEye.Azure.azure_config import AzureConfig from InnerEye.Azure.azure_runner import RUN_RECOVERY_FILE -from InnerEye.Azure.azure_util import MODEL_ID_KEY_NAME, get_comparison_baseline_paths, \ +from InnerEye.Azure.azure_util import MODEL_ID_KEY_NAME, download_run_outputs_by_prefix, \ + get_comparison_baseline_paths, \ is_running_on_azure_agent, to_azure_friendly_string from InnerEye.Common import common_util, fixed_paths, fixed_paths_for_tests -from InnerEye.Common.common_util import CROSSVAL_RESULTS_FOLDER, ENSEMBLE_SPLIT_NAME, get_best_epoch_results_path -from InnerEye.Common.fixed_paths import DEFAULT_AML_LOGS_DIR, DEFAULT_RESULT_IMAGE_NAME, \ - DEFAULT_RESULT_ZIP_DICOM_NAME, \ - PYTHON_ENVIRONMENT_NAME, repository_root_directory +from InnerEye.Common.common_util import BEST_EPOCH_FOLDER_NAME, CROSSVAL_RESULTS_FOLDER, ENSEMBLE_SPLIT_NAME, \ + get_best_epoch_results_path +from InnerEye.Common.fixed_paths import (DEFAULT_AML_LOGS_DIR, DEFAULT_RESULT_IMAGE_NAME, DEFAULT_RESULT_ZIP_DICOM_NAME, + PYTHON_ENVIRONMENT_NAME, repository_root_directory) from InnerEye.Common.fixed_paths_for_tests import full_ml_test_data_path from InnerEye.Common.output_directories import OutputFolderForTests from InnerEye.Common.spawn_subprocess import spawn_and_monitor_subprocess @@ -38,6 +38,7 @@ from InnerEye.ML.configs.segmentation.BasicModel2Epochs import BasicModel2Epochs from InnerEye.ML.deep_learning_config import CHECKPOINT_FOLDER, ModelCategory from InnerEye.ML.model_inference_config import read_model_inference_config +from InnerEye.ML.model_testing import THUMBNAILS_FOLDER from InnerEye.ML.reports.notebook_report import get_html_report_name from InnerEye.ML.runner import main from InnerEye.ML.utils.config_loader import ModelConfigLoader @@ -46,11 +47,11 @@ from InnerEye.Scripts import submit_for_inference from Tests.ML.util import assert_nifti_content, get_default_azure_config, get_nifti_shape -FALLBACK_ENSEMBLE_RUN = "refs_pull_439_merge:HD_403627fe-c564-4e36-8ba3-c2915d64e220" -FALLBACK_SINGLE_RUN = "refs_pull_444_merge:refs_pull_444_merge_1620835799_e84ac017" -FALLBACK_2NODE_RUN = "refs_pull_439_merge:refs_pull_439_merge_1618850855_4d2356f9" -FALLBACK_CV_GLAUCOMA = "refs_pull_439_merge:HD_252cdfa3-bce4-49c5-bf53-995ee3bcab4c" -FALLBACK_HELLO_CONTAINER_RUN = "refs_pull_455_merge:refs_pull_455_merge_1620723534_e086c5c5" +FALLBACK_SINGLE_RUN = "refs_pull_498_merge:refs_pull_498_merge_1624292750_743430ab" +FALLBACK_ENSEMBLE_RUN = "refs_pull_498_merge:HD_4bf4efc3-182a-4596-8f93-76f128418142" +FALLBACK_2NODE_RUN = "refs_pull_498_merge:refs_pull_498_merge_1624292776_52b2f7e1" +FALLBACK_CV_GLAUCOMA = "refs_pull_498_merge:HD_cefb6e59-3929-43aa-8fc8-821b9a062219" +FALLBACK_HELLO_CONTAINER_RUN = "refs_pull_498_merge:refs_pull_498_merge_1624292748_45756bf8" def get_most_recent_run_id(fallback_run_id_for_local_execution: str = FALLBACK_SINGLE_RUN) -> str: @@ -402,3 +403,34 @@ def test_recovery_on_2_nodes(test_output_dirs: OutputFolderForTests) -> None: assert "Downloading multiple files from run" not in log1_txt assert "Loading checkpoint that was created at (epoch = 2, global_step = 2)" in log0_txt assert "Loading checkpoint that was created at (epoch = 2, global_step = 2)" in log1_txt + + +@pytest.mark.after_training_single_run +def test_download_outputs(test_output_dirs: OutputFolderForTests) -> None: + """ + Test if downloading multiple files works as expected + """ + run = get_most_recent_run(fallback_run_id_for_local_execution=FALLBACK_SINGLE_RUN) + prefix = Path(BEST_EPOCH_FOLDER_NAME) / ModelExecutionMode.TEST.value / THUMBNAILS_FOLDER + download_run_outputs_by_prefix(prefix, test_output_dirs.root_dir, run=run) + expected_files = ["005_lung_l_slice_053.png", "005_lung_r_slice_037.png", "005_spinalcord_slice_088.png"] + for file in expected_files: + expected = test_output_dirs.root_dir / file + assert expected.is_file(), f"File missing: {file}" + # Check that no more than the expected files were downloaded + all_files = [f for f in test_output_dirs.root_dir.rglob("*") if f.is_file()] + assert len(all_files) == len(expected_files) + + +@pytest.mark.after_training_single_run +def test_download_outputs_skipped(test_output_dirs: OutputFolderForTests) -> None: + """ + Test if downloading multiple skips files where the prefix string is not a folder. + """ + run = get_most_recent_run(fallback_run_id_for_local_execution=FALLBACK_SINGLE_RUN) + # There is a file outputs/Train/metrics.csv, but for that file the prefix is not the full folder, hence should + # not be downloaded. + prefix = Path("Tra") + download_run_outputs_by_prefix(prefix, test_output_dirs.root_dir, run=run) + all_files = list(test_output_dirs.root_dir.rglob("*")) + assert len(all_files) == 0