Skip to content
This repository has been archived by the owner on Mar 21, 2024. It is now read-only.

Fix timeouts when downloading multiple checkpoint files #498

Merged
merged 9 commits into from
Jun 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
ignore = E226,E302,E41,W391, E701, W291, E722, W503, E128, E126, E127, E731, E401
max-line-length = 160
max-complexity = 25
exclude = fastMRI/
exclude = fastMRI/ test_outputs/
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ any large models anymore because data loaders ran out of memory.
- ([#472](https://github.com/microsoft/InnerEye-DeepLearning/pull/472)) Correct model path for moving ensemble models.
- ([#494](https://github.com/microsoft/InnerEye-DeepLearning/pull/494)) Fix an issue where multi-node jobs for
LightningContainer models can get stuck at test set inference.
- ([#498](https://github.com/microsoft/InnerEye-DeepLearning/pull/498)) Workaround for the problem that downloading
multiple large checkpoints can time out.

### Removed

Expand Down
73 changes: 46 additions & 27 deletions InnerEye/Azure/azure_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,34 +337,53 @@ def is_parent_run(run: Run) -> bool:
return PARENT_RUN_CONTEXT and run.id == PARENT_RUN_CONTEXT.id


def download_outputs_from_run(blobs_path: Path,
destination: Path,
run: Optional[Run] = None,
is_file: bool = False,
append_prefix: bool = False) -> Path:
"""
Download the blobs from the run's default output directory: DEFAULT_AML_UPLOAD_DIR.
Silently returns for offline runs.
:param blobs_path: Blobs path in DEFAULT_AML_UPLOAD_DIR to download from
:param run: Run to download from (default to current run if None)
:param destination: Local path to save the downloaded blobs to
:param is_file: Set to True if downloading a single file.
:param append_prefix: An optional flag whether to append the specified prefix from the final output file path.
If False then the prefix is removed from the output file path.
def download_run_output_file(blob_path: Path,
destination: Path,
run: Run) -> Path:
"""
Downloads a single file from the run's default output directory: DEFAULT_AML_UPLOAD_DIR ("outputs").
For example, if blobs_path = "foo/bar.csv", then the run result file "outputs/foo/bar.csv" will be downloaded
to <destination>/bar.csv (the directory will be stripped off).
:param blob_path: The name of the file to download.
:param run: The AzureML run to download the files from
:param destination: Local path to save the downloaded blob to.
:return: Destination path to the downloaded file(s)
"""
run = run or Run.get_context()
blobs_root_path = str(fixed_paths.DEFAULT_AML_UPLOAD_DIR / blobs_path)
if is_file:
destination = destination / blobs_path.name
logging.info(f"Downloading single file from run {run.id}: {blobs_root_path} -> {str(destination)}")
run.download_file(blobs_root_path, str(destination), _validate_checksum=True)
else:
logging.info(f"Downloading multiple files from run {run.id}: {blobs_root_path} -> {str(destination)}")
run.download_files(blobs_root_path, str(destination), append_prefix=append_prefix)
blobs_prefix = str((fixed_paths.DEFAULT_AML_UPLOAD_DIR / blob_path).as_posix())
destination = destination / blob_path.name
logging.info(f"Downloading single file from run {run.id}: {blobs_prefix} -> {str(destination)}")
run.download_file(blobs_prefix, str(destination), _validate_checksum=True)
return destination


def download_run_outputs_by_prefix(blobs_prefix: Path,
destination: Path,
run: Run) -> None:
"""
Download all the blobs from the run's default output directory: DEFAULT_AML_UPLOAD_DIR ("outputs") that
have a given prefix (folder structure). When saving, the prefix string will be stripped off. For example,
if blobs_prefix = "foo", and the run has a file "outputs/foo/bar.csv", it will be downloaded to destination/bar.csv.
If there is in addition a file "foo.txt", that file will be skipped.
:param blobs_prefix: The prefix for all files in "outputs" that should be downloaded.
:param run: The AzureML run to download the files from.
:param destination: Local path to save the downloaded blobs to.
"""
prefix_str = str((fixed_paths.DEFAULT_AML_UPLOAD_DIR / blobs_prefix).as_posix())
logging.info(f"Downloading multiple files from run {run.id}: {prefix_str} -> {str(destination)}")
# There is a download_files function, but that can time out when downloading several large checkpoints file
# (120sec timeout for all files).
for file in run.get_file_names():
if file.startswith(prefix_str):
target_path = file[len(prefix_str):]
if target_path.startswith("/"):
target_path = target_path[1:]
logging.info(f"Downloading {file}")
run.download_file(file, str(destination / target_path), _validate_checksum=True)
else:
logging.warning(f"Skipping file {file}, because the desired prefix {prefix_str} is not aligned with "
f"the folder structure")


def is_running_on_azure_agent() -> bool:
"""
Returns True if the code appears to be running on an Azure build agent, and False otherwise.
Expand Down Expand Up @@ -406,8 +425,8 @@ def get_comparison_baseline_paths(outputs_folder: Path,
# Look for dataset.csv inside epoch_NNN/Test, epoch_NNN/ and at top level
for blob_path_parent in step_up_directories(blob_path):
try:
comparison_dataset_path = download_outputs_from_run(
blob_path_parent / dataset_csv_file_name, destination_folder, run, True)
comparison_dataset_path = download_run_output_file(
blob_path_parent / dataset_csv_file_name, destination_folder, run)
break
except (ValueError, UserErrorException):
logging.warning(f"cannot find {dataset_csv_file_name} at {blob_path_parent} in {run_rec_id}")
Expand All @@ -418,8 +437,8 @@ def get_comparison_baseline_paths(outputs_folder: Path,
logging.warning(f"cannot find {dataset_csv_file_name} at or above {blob_path} in {run_rec_id}")
# Look for epoch_NNN/Test/metrics.csv
try:
comparison_metrics_path = download_outputs_from_run(
blob_path / SUBJECT_METRICS_FILE_NAME, destination_folder, run, True)
comparison_metrics_path = download_run_output_file(
blob_path / SUBJECT_METRICS_FILE_NAME, destination_folder, run)
except (ValueError, UserErrorException):
logging.warning(f"cannot find {SUBJECT_METRICS_FILE_NAME} at {blob_path} in {run_rec_id}")
return (comparison_dataset_path, comparison_metrics_path)
Expand Down
7 changes: 3 additions & 4 deletions InnerEye/Azure/run_pytest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

# Test result file from running pytest inside the AzureML job. This file must have a prefix that
# matches the string in the build definition build-pr.yml, in the TrainInAzureML job.
from InnerEye.Azure.azure_util import download_outputs_from_run
from InnerEye.Azure.azure_util import download_run_output_file

PYTEST_RESULTS_FILE = Path("test-results-on-azure-ml.xml")

Expand Down Expand Up @@ -49,10 +49,9 @@ def download_pytest_result(run: Run, destination_folder: Path = Path.cwd()) -> P
"""
logging.info(f"Downloading pytest result file: {PYTEST_RESULTS_FILE}")
try:
return download_outputs_from_run(
return download_run_output_file(
PYTEST_RESULTS_FILE,
destination=destination_folder,
run=run,
is_file=True)
run=run)
except:
raise ValueError(f"No pytest result file {PYTEST_RESULTS_FILE} was found for run {run.id}")
6 changes: 3 additions & 3 deletions InnerEye/ML/surface_distance_heatmaps.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from InnerEye.Azure.azure_config import AzureConfig
from InnerEye.Azure.azure_runner import create_runner_parser, parse_args_and_add_yaml_variables
from InnerEye.Azure.azure_util import download_outputs_from_run
from InnerEye.Azure.azure_util import download_run_outputs_by_prefix
from InnerEye.Common.metrics_constants import MetricsFileColumns
from InnerEye.ML.common import ModelExecutionMode
from InnerEye.ML.config import SegmentationModelBase
Expand Down Expand Up @@ -67,8 +67,8 @@ def load_predictions(run_type: SurfaceDistanceRunType, azure_config: AzureConfig
for (subject_id, structure_name, dice_score, _) in worst_performers:
subject_prefix = sd_util.get_subject_prefix(model_config, execution_mode, subject_id)
# if not already present, download data for subject
download_outputs_from_run(
blobs_path=subject_prefix,
download_run_outputs_by_prefix(
blobs_prefix=subject_prefix,
destination=output_dir,
run=first_child_run
)
Expand Down
14 changes: 7 additions & 7 deletions InnerEye/ML/utils/run_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@

from azureml.core import Run

from InnerEye.Azure.azure_util import RUN_CONTEXT, download_outputs_from_run, fetch_child_runs, tag_values_all_distinct
from InnerEye.Azure.azure_util import RUN_CONTEXT, download_run_output_file, download_run_outputs_by_prefix, \
fetch_child_runs, tag_values_all_distinct
from InnerEye.Common.common_util import OTHER_RUNS_SUBDIR_NAME, check_properties_are_not_none
from InnerEye.ML.common import BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX, get_best_checkpoint_path, \
get_recovery_checkpoint_path
Expand Down Expand Up @@ -52,11 +53,10 @@ def download_best_checkpoints_from_child_runs(config: OutputParams, run: Run) ->
else:
subdir = str(child.tags[tag_to_use] if can_use_split_indices else child.number)
child_dst = config.checkpoint_folder / OTHER_RUNS_SUBDIR_NAME / subdir
download_outputs_from_run(
blobs_path=Path(CHECKPOINT_FOLDER) / BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX,
download_run_output_file(
blob_path=Path(CHECKPOINT_FOLDER) / BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX,
destination=child_dst,
run=child,
is_file=True
run=child
)
child_runs_checkpoints_roots.append(child_dst)
return RunRecovery(checkpoints_roots=child_runs_checkpoints_roots)
Expand All @@ -82,8 +82,8 @@ def download_all_checkpoints_from_run(config: OutputParams, run: Run,
destination_folder = config.checkpoint_folder / subfolder if subfolder else config.checkpoint_folder

if not only_return_path:
download_outputs_from_run(
blobs_path=Path(CHECKPOINT_FOLDER),
download_run_outputs_by_prefix(
blobs_prefix=Path(CHECKPOINT_FOLDER),
destination=destination_folder,
run=run
)
Expand Down
9 changes: 4 additions & 5 deletions InnerEye/ML/visualizers/plot_cross_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

import InnerEye.Common.Statistics.mann_whitney_test as mann_whitney
from InnerEye.Azure.azure_config import AzureConfig
from InnerEye.Azure.azure_util import CROSS_VALIDATION_SPLIT_INDEX_TAG_KEY, download_outputs_from_run, \
from InnerEye.Azure.azure_util import CROSS_VALIDATION_SPLIT_INDEX_TAG_KEY, download_run_output_file, \
fetch_child_runs, is_offline_run_context, is_parent_run
from InnerEye.Common import common_util, fixed_paths
from InnerEye.Common.Statistics.wilcoxon_signed_rank_test import WilcoxonTestConfig, wilcoxon_signed_rank_test
Expand Down Expand Up @@ -232,11 +232,10 @@ def download_or_get_local_file(self,
return None
else:
try:
return download_outputs_from_run(
blobs_path=blob_path,
return download_run_output_file(
blob_path=blob_path,
destination=destination,
run=run,
is_file=True
run=run
)
except Exception as ex:
logging.warning(f"File {blob_to_download} not found in output of run {run.id}: {ex}")
Expand Down
54 changes: 43 additions & 11 deletions Tests/AfterTraining/test_after_training.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
All of the tests in this file rely on previous InnerEye runs that submit an AzureML job. They pick
up the most recently run AzureML job from most_recent_run.txt
"""

import os
import shutil
import sys
Expand All @@ -24,20 +23,22 @@

from InnerEye.Azure.azure_config import AzureConfig
from InnerEye.Azure.azure_runner import RUN_RECOVERY_FILE
from InnerEye.Azure.azure_util import MODEL_ID_KEY_NAME, get_comparison_baseline_paths, \
from InnerEye.Azure.azure_util import MODEL_ID_KEY_NAME, download_run_outputs_by_prefix, \
get_comparison_baseline_paths, \
is_running_on_azure_agent, to_azure_friendly_string
from InnerEye.Common import common_util, fixed_paths, fixed_paths_for_tests
from InnerEye.Common.common_util import CROSSVAL_RESULTS_FOLDER, ENSEMBLE_SPLIT_NAME, get_best_epoch_results_path
from InnerEye.Common.fixed_paths import DEFAULT_AML_LOGS_DIR, DEFAULT_RESULT_IMAGE_NAME, \
DEFAULT_RESULT_ZIP_DICOM_NAME, \
PYTHON_ENVIRONMENT_NAME, repository_root_directory
from InnerEye.Common.common_util import BEST_EPOCH_FOLDER_NAME, CROSSVAL_RESULTS_FOLDER, ENSEMBLE_SPLIT_NAME, \
get_best_epoch_results_path
from InnerEye.Common.fixed_paths import (DEFAULT_AML_LOGS_DIR, DEFAULT_RESULT_IMAGE_NAME, DEFAULT_RESULT_ZIP_DICOM_NAME,
PYTHON_ENVIRONMENT_NAME, repository_root_directory)
from InnerEye.Common.fixed_paths_for_tests import full_ml_test_data_path
from InnerEye.Common.output_directories import OutputFolderForTests
from InnerEye.Common.spawn_subprocess import spawn_and_monitor_subprocess
from InnerEye.ML.common import DATASET_CSV_FILE_NAME, ModelExecutionMode
from InnerEye.ML.configs.segmentation.BasicModel2Epochs import BasicModel2Epochs
from InnerEye.ML.deep_learning_config import CHECKPOINT_FOLDER, ModelCategory
from InnerEye.ML.model_inference_config import read_model_inference_config
from InnerEye.ML.model_testing import THUMBNAILS_FOLDER
from InnerEye.ML.reports.notebook_report import get_html_report_name
from InnerEye.ML.runner import main
from InnerEye.ML.utils.config_loader import ModelConfigLoader
Expand All @@ -46,11 +47,11 @@
from InnerEye.Scripts import submit_for_inference
from Tests.ML.util import assert_nifti_content, get_default_azure_config, get_nifti_shape

FALLBACK_ENSEMBLE_RUN = "refs_pull_439_merge:HD_403627fe-c564-4e36-8ba3-c2915d64e220"
FALLBACK_SINGLE_RUN = "refs_pull_444_merge:refs_pull_444_merge_1620835799_e84ac017"
FALLBACK_2NODE_RUN = "refs_pull_439_merge:refs_pull_439_merge_1618850855_4d2356f9"
FALLBACK_CV_GLAUCOMA = "refs_pull_439_merge:HD_252cdfa3-bce4-49c5-bf53-995ee3bcab4c"
FALLBACK_HELLO_CONTAINER_RUN = "refs_pull_455_merge:refs_pull_455_merge_1620723534_e086c5c5"
FALLBACK_SINGLE_RUN = "refs_pull_498_merge:refs_pull_498_merge_1624292750_743430ab"
FALLBACK_ENSEMBLE_RUN = "refs_pull_498_merge:HD_4bf4efc3-182a-4596-8f93-76f128418142"
FALLBACK_2NODE_RUN = "refs_pull_498_merge:refs_pull_498_merge_1624292776_52b2f7e1"
FALLBACK_CV_GLAUCOMA = "refs_pull_498_merge:HD_cefb6e59-3929-43aa-8fc8-821b9a062219"
FALLBACK_HELLO_CONTAINER_RUN = "refs_pull_498_merge:refs_pull_498_merge_1624292748_45756bf8"


def get_most_recent_run_id(fallback_run_id_for_local_execution: str = FALLBACK_SINGLE_RUN) -> str:
Expand Down Expand Up @@ -402,3 +403,34 @@ def test_recovery_on_2_nodes(test_output_dirs: OutputFolderForTests) -> None:
assert "Downloading multiple files from run" not in log1_txt
assert "Loading checkpoint that was created at (epoch = 2, global_step = 2)" in log0_txt
assert "Loading checkpoint that was created at (epoch = 2, global_step = 2)" in log1_txt


@pytest.mark.after_training_single_run
def test_download_outputs(test_output_dirs: OutputFolderForTests) -> None:
"""
Test if downloading multiple files works as expected
"""
run = get_most_recent_run(fallback_run_id_for_local_execution=FALLBACK_SINGLE_RUN)
prefix = Path(BEST_EPOCH_FOLDER_NAME) / ModelExecutionMode.TEST.value / THUMBNAILS_FOLDER
download_run_outputs_by_prefix(prefix, test_output_dirs.root_dir, run=run)
expected_files = ["005_lung_l_slice_053.png", "005_lung_r_slice_037.png", "005_spinalcord_slice_088.png"]
for file in expected_files:
expected = test_output_dirs.root_dir / file
assert expected.is_file(), f"File missing: {file}"
# Check that no more than the expected files were downloaded
all_files = [f for f in test_output_dirs.root_dir.rglob("*") if f.is_file()]
assert len(all_files) == len(expected_files)


@pytest.mark.after_training_single_run
def test_download_outputs_skipped(test_output_dirs: OutputFolderForTests) -> None:
"""
Test if downloading multiple skips files where the prefix string is not a folder.
"""
run = get_most_recent_run(fallback_run_id_for_local_execution=FALLBACK_SINGLE_RUN)
# There is a file outputs/Train/metrics.csv, but for that file the prefix is not the full folder, hence should
# not be downloaded.
prefix = Path("Tra")
download_run_outputs_by_prefix(prefix, test_output_dirs.root_dir, run=run)
all_files = list(test_output_dirs.root_dir.rglob("*"))
assert len(all_files) == 0