diff --git a/composer/callbacks/checkpoint_saver.py b/composer/callbacks/checkpoint_saver.py index c876343f21d..bbbc69d9eca 100644 --- a/composer/callbacks/checkpoint_saver.py +++ b/composer/callbacks/checkpoint_saver.py @@ -15,12 +15,14 @@ from typing import Callable, List, Optional, Union from composer.core import Callback, Event, State, Time -from composer.loggers import Logger +from composer.loggers import Logger, MLFlowLogger from composer.utils import (FORMAT_NAME_WITH_DIST_AND_TIME_TABLE, FORMAT_NAME_WITH_DIST_TABLE, PartialFilePath, checkpoint, create_interval_scheduler, create_symlink_file, dist, ensure_folder_has_no_conflicting_files, format_name_with_dist, - format_name_with_dist_and_time, is_model_deepspeed, reproducibility, using_torch_2) + format_name_with_dist_and_time, is_model_deepspeed, partial_format, reproducibility, + using_torch_2) from composer.utils.checkpoint import _TORCH_DISTRIBUTED_CHECKPOINTS_FILENAME +from composer.utils.object_store.mlflow_object_store import MLFLOW_EXPERIMENT_ID_FORMAT_KEY, MLFLOW_RUN_ID_FORMAT_KEY log = logging.getLogger(__name__) @@ -270,6 +272,30 @@ def __init__( self.start_batch = None def init(self, state: State, logger: Logger) -> None: + # If MLFlowLogger is being used, format MLFlow-specific placeholders in the save folder and paths. + # Assumes that MLFlowLogger comes before CheckpointSaver in the list of loggers. + for destination in logger.destinations: + if isinstance(destination, MLFlowLogger): + mlflow_format_kwargs = { + MLFLOW_EXPERIMENT_ID_FORMAT_KEY: destination._experiment_id, + MLFLOW_RUN_ID_FORMAT_KEY: destination._run_id + } + self.folder = partial_format(self.folder, **mlflow_format_kwargs) + + self.filename.folder = self.folder + if self.latest_filename is not None: + self.latest_filename.folder = self.folder + + # The remote paths have the placeholders in their filename rather than folder + if self.remote_file_name is not None: + self.remote_file_name.filename = partial_format(self.remote_file_name.filename, + **mlflow_format_kwargs) + if self.latest_remote_file_name is not None: + self.latest_remote_file_name.filename = partial_format(self.latest_remote_file_name.filename, + **mlflow_format_kwargs) + + break + folder = format_name_with_dist(self.folder, state.run_name) os.makedirs(folder, exist_ok=True) diff --git a/composer/loggers/mlflow_logger.py b/composer/loggers/mlflow_logger.py index 48daa7d1cb6..dd60b93f25d 100644 --- a/composer/loggers/mlflow_logger.py +++ b/composer/loggers/mlflow_logger.py @@ -93,6 +93,10 @@ def __init__( self._rank_zero_only = rank_zero_only self._last_flush_time = time.time() self._flush_interval = flush_interval + + self._experiment_id = None + self._run_id = None + if self._enabled: self.tracking_uri = str(tracking_uri or mlflow.get_tracking_uri()) mlflow.set_tracking_uri(self.tracking_uri) @@ -155,6 +159,13 @@ def init(self, state: State, logger: Logger) -> None: log_system_metrics=self.log_system_metrics, ) + # If rank zero only, broadcast the MLFlow experiment and run IDs to other ranks, so the MLFlow run info is + # available to other ranks during runtime. + if self._rank_zero_only: + mlflow_ids_list = [self._experiment_id, self._run_id] + dist.broadcast_object_list(mlflow_ids_list, src=0) + self._experiment_id, self._run_id = mlflow_ids_list + def log_table(self, columns: List[str], rows: List[List[Any]], name: str = 'Table') -> None: if self._enabled: try: diff --git a/composer/trainer/trainer.py b/composer/trainer/trainer.py index fe0fa99e15e..286b6a7a0c8 100644 --- a/composer/trainer/trainer.py +++ b/composer/trainer/trainer.py @@ -39,7 +39,7 @@ PyTorchScheduler, State, Time, Timestamp, TimeUnit, TrainerMode, ensure_data_spec, ensure_evaluator, ensure_time, get_precision_context, validate_eval_automicrobatching) from composer.devices import Device, DeviceCPU, DeviceGPU, DeviceMPS, DeviceTPU -from composer.loggers import (ConsoleLogger, Logger, LoggerDestination, MosaicMLLogger, ProgressBarLogger, +from composer.loggers import (ConsoleLogger, Logger, LoggerDestination, MLFlowLogger, MosaicMLLogger, ProgressBarLogger, RemoteUploaderDownloader, WandBLogger) from composer.loggers.mosaicml_logger import MOSAICML_ACCESS_TOKEN_ENV_VAR, MOSAICML_PLATFORM_ENV_VAR from composer.models import ComposerModel @@ -54,8 +54,9 @@ ensure_tuple, export_with_logger, extract_hparams, format_name_with_dist, get_composer_env_dict, get_device, get_file, is_tpu_installed, map_collection, maybe_create_object_store_from_uri, maybe_create_remote_uploader_downloader_from_uri, - model_eval_mode, parse_uri, reproducibility, using_torch_2) + model_eval_mode, parse_uri, partial_format, reproducibility, using_torch_2) from composer.utils.misc import is_model_deepspeed +from composer.utils.object_store.mlflow_object_store import MLFLOW_EXPERIMENT_ID_FORMAT_KEY, MLFLOW_RUN_ID_FORMAT_KEY if is_tpu_installed(): import torch_xla.core.xla_model as xm @@ -1163,6 +1164,21 @@ def __init__( # Run Event.INIT self.engine.run_event(Event.INIT) + # If the experiment is being tracked with an `MLFlowLogger`, then the `save_folder` and + # related paths/filenames may have placeholders or the MLFlow experiment and run IDs that must be populated + # after running Event.INIT. + if save_folder is not None: + for destination in self.logger.destinations: + if isinstance(destination, MLFlowLogger): + mlflow_format_kwargs = { + MLFLOW_EXPERIMENT_ID_FORMAT_KEY: destination._experiment_id, + MLFLOW_RUN_ID_FORMAT_KEY: destination._run_id + } + + save_folder = partial_format(save_folder, **mlflow_format_kwargs) + if latest_remote_file_name is not None: + latest_remote_file_name = partial_format(latest_remote_file_name, **mlflow_format_kwargs) + # Log hparams. if self.auto_log_hparams: self.local_hparams = extract_hparams(locals()) diff --git a/composer/utils/__init__.py b/composer/utils/__init__.py index 30930250d97..81fabb4e451 100644 --- a/composer/utils/__init__.py +++ b/composer/utils/__init__.py @@ -26,68 +26,20 @@ UCObjectStore) from composer.utils.retrying import retry from composer.utils.string_enum import StringEnum +from composer.utils.string_helpers import partial_format __all__ = [ - 'ensure_tuple', - 'get_free_tcp_port', - 'map_collection', - 'IteratorFileStream', - 'FORMAT_NAME_WITH_DIST_AND_TIME_TABLE', - 'FORMAT_NAME_WITH_DIST_TABLE', - 'get_file', - 'PartialFilePath', - 'create_symlink_file', - 'ObjectStore', - 'ObjectStoreTransientError', - 'LibcloudObjectStore', - 'S3ObjectStore', - 'SFTPObjectStore', - 'OCIObjectStore', - 'GCSObjectStore', - 'UCObjectStore', - 'MLFlowObjectStore', - 'MissingConditionalImportError', - 'import_object', - 'is_model_deepspeed', - 'is_model_fsdp', - 'is_notebook', - 'StringEnum', - 'load_checkpoint', - 'save_checkpoint', - 'safe_torch_load', - 'ensure_folder_is_empty', - 'ensure_folder_has_no_conflicting_files', - 'export_for_inference', - 'export_with_logger', - 'quantize_dynamic', - 'format_name_with_dist', - 'format_name_with_dist_and_time', - 'is_tar', - 'maybe_create_object_store_from_uri', - 'maybe_create_remote_uploader_downloader_from_uri', - 'parse_uri', - 'batch_get', - 'batch_set', - 'configure_excepthook', - 'disable_env_report', - 'enable_env_report', - 'print_env', - 'get_composer_env_dict', - 'retry', - 'model_eval_mode', - 'get_device', - 'is_tpu_installed', - 'is_hpu_installed', - 'ExportFormat', - 'Transform', - 'export_with_logger', - 'extract_hparams', - 'convert_nested_dict_to_flat_dict', - 'convert_flat_dict_to_nested_dict', - 'using_torch_2', - 'create_interval_scheduler', - 'EvalClient', - 'LambdaEvalClient', - 'LocalEvalClient', - 'MosaicMLLambdaEvalClient', + 'ensure_tuple', 'get_free_tcp_port', 'map_collection', 'IteratorFileStream', 'FORMAT_NAME_WITH_DIST_AND_TIME_TABLE', + 'FORMAT_NAME_WITH_DIST_TABLE', 'get_file', 'PartialFilePath', 'create_symlink_file', 'ObjectStore', + 'ObjectStoreTransientError', 'LibcloudObjectStore', 'S3ObjectStore', 'SFTPObjectStore', 'OCIObjectStore', + 'GCSObjectStore', 'UCObjectStore', 'MLFlowObjectStore', 'MissingConditionalImportError', 'import_object', + 'is_model_deepspeed', 'is_model_fsdp', 'is_notebook', 'StringEnum', 'load_checkpoint', 'save_checkpoint', + 'safe_torch_load', 'ensure_folder_is_empty', 'ensure_folder_has_no_conflicting_files', 'export_for_inference', + 'export_with_logger', 'quantize_dynamic', 'format_name_with_dist', 'format_name_with_dist_and_time', 'is_tar', + 'maybe_create_object_store_from_uri', 'maybe_create_remote_uploader_downloader_from_uri', 'parse_uri', 'batch_get', + 'batch_set', 'configure_excepthook', 'disable_env_report', 'enable_env_report', 'print_env', + 'get_composer_env_dict', 'retry', 'model_eval_mode', 'get_device', 'is_tpu_installed', 'is_hpu_installed', + 'ExportFormat', 'Transform', 'export_with_logger', 'extract_hparams', 'convert_nested_dict_to_flat_dict', + 'convert_flat_dict_to_nested_dict', 'using_torch_2', 'create_interval_scheduler', 'EvalClient', 'LambdaEvalClient', + 'LocalEvalClient', 'MosaicMLLambdaEvalClient', 'partial_format' ] diff --git a/composer/utils/object_store/mlflow_object_store.py b/composer/utils/object_store/mlflow_object_store.py index 52cfc9646e2..4c6e44ba63e 100644 --- a/composer/utils/object_store/mlflow_object_store.py +++ b/composer/utils/object_store/mlflow_object_store.py @@ -22,8 +22,11 @@ DEFAULT_MLFLOW_EXPERIMENT_NAME = 'mlflow-object-store' -PLACEHOLDER_EXPERIMENT_ID = 'MLFLOW_EXPERIMENT_ID' -PLACEHOLDER_RUN_ID = 'MLFLOW_RUN_ID' +MLFLOW_EXPERIMENT_ID_FORMAT_KEY = 'mlflow_experiment_id' +MLFLOW_RUN_ID_FORMAT_KEY = 'mlflow_run_id' + +MLFLOW_EXPERIMENT_ID_PLACEHOLDER = '{' + MLFLOW_EXPERIMENT_ID_FORMAT_KEY + '}' +MLFLOW_RUN_ID_PLACEHOLDER = '{' + MLFLOW_RUN_ID_FORMAT_KEY + '}' log = logging.getLogger(__name__) @@ -131,9 +134,9 @@ def __init__(self, path: str, multipart_upload_chunk_size: int = 100 * 1024 * 10 mlflow.environment_variables.MLFLOW_MULTIPART_UPLOAD_CHUNK_SIZE.set(multipart_upload_chunk_size) experiment_id, run_id, _ = MLFlowObjectStore.parse_dbfs_path(path) - if experiment_id == PLACEHOLDER_EXPERIMENT_ID: + if experiment_id == MLFLOW_EXPERIMENT_ID_PLACEHOLDER: experiment_id = None - if run_id == PLACEHOLDER_RUN_ID: + if run_id == MLFLOW_RUN_ID_PLACEHOLDER: run_id = None # Construct the `experiment_id` and `run_id` depending on whether format placeholders were provided. @@ -244,10 +247,10 @@ def get_artifact_path(self, object_name: str) -> str: """ if object_name.startswith(MLFLOW_DBFS_PATH_PREFIX): experiment_id, run_id, object_name = self.parse_dbfs_path(object_name) - if (experiment_id != self.experiment_id and experiment_id != PLACEHOLDER_EXPERIMENT_ID): + if (experiment_id != self.experiment_id and experiment_id != MLFLOW_EXPERIMENT_ID_PLACEHOLDER): raise ValueError(f'Object {object_name} belongs to experiment with id={experiment_id}, ' f'but MLFlowObjectStore is associated with experiment {self.experiment_id}.') - if (run_id != self.run_id and run_id != PLACEHOLDER_RUN_ID): + if (run_id != self.run_id and run_id != MLFLOW_RUN_ID_PLACEHOLDER): raise ValueError(f'Object {object_name} belongs to run with id={run_id}, ' f'but MLFlowObjectStore is associated with run {self.run_id}.') return object_name diff --git a/composer/utils/string_helpers.py b/composer/utils/string_helpers.py new file mode 100644 index 00000000000..1db1d7620dc --- /dev/null +++ b/composer/utils/string_helpers.py @@ -0,0 +1,31 @@ +# Copyright 2022 MosaicML Composer authors +# SPDX-License-Identifier: Apache-2.0 + +"""Utilities for string manipulation.""" + + +def partial_format(s, *args, **kwargs): + """Format a string with a partial set of arguments. + + Since `str.format()` raises a `KeyError` if a format key is missing from the arguments, this + function allows for a partial set of arguments to be provided. + + For example: + + >>> partial_format('{foo} {bar}', foo='Hello') + 'Hello {bar}' + + >>> partial_format('{foo} {bar}', foo='Hello', bar='World') + 'Hello World' + """ + result = s + done = False + while not done: + try: + result = s.format(*args, **kwargs) + done = True + except KeyError as e: + key = e.args[0] + kwargs[key] = '{' + key + '}' + + return result diff --git a/tests/utils/object_store/test_mlflow_object_store.py b/tests/utils/object_store/test_mlflow_object_store.py index d46fc493a4f..ecbedd2e500 100644 --- a/tests/utils/object_store/test_mlflow_object_store.py +++ b/tests/utils/object_store/test_mlflow_object_store.py @@ -8,7 +8,7 @@ import pytest from composer.utils import MLFlowObjectStore -from composer.utils.object_store.mlflow_object_store import PLACEHOLDER_EXPERIMENT_ID, PLACEHOLDER_RUN_ID +from composer.utils.object_store.mlflow_object_store import MLFLOW_EXPERIMENT_ID_PLACEHOLDER, MLFLOW_RUN_ID_PLACEHOLDER TEST_PATH_FORMAT = 'databricks/mlflow-tracking/{experiment_id}/{run_id}/artifacts/' EXPERIMENT_ID = '123' @@ -66,7 +66,7 @@ def test_init_with_experiment_and_no_run(monkeypatch): mock_mlflow_client.return_value.create_run.return_value = MagicMock( info=MagicMock(run_id=RUN_ID, run_name='test-run')) - store = MLFlowObjectStore(TEST_PATH_FORMAT.format(experiment_id=EXPERIMENT_ID, run_id=PLACEHOLDER_RUN_ID)) + store = MLFlowObjectStore(TEST_PATH_FORMAT.format(experiment_id=EXPERIMENT_ID, run_id=MLFLOW_RUN_ID_PLACEHOLDER)) assert store.experiment_id == EXPERIMENT_ID assert store.run_id == RUN_ID @@ -76,7 +76,7 @@ def test_init_with_run_and_no_experiment(monkeypatch): monkeypatch.setattr(dbx_sdk, 'WorkspaceClient', MagicMock()) with pytest.raises(ValueError): - MLFlowObjectStore(TEST_PATH_FORMAT.format(experiment_id=PLACEHOLDER_EXPERIMENT_ID, run_id=RUN_ID)) + MLFlowObjectStore(TEST_PATH_FORMAT.format(experiment_id=MLFLOW_EXPERIMENT_ID_PLACEHOLDER, run_id=RUN_ID)) def test_init_with_active_run(monkeypatch): @@ -91,7 +91,7 @@ def test_init_with_active_run(monkeypatch): mock_active_run.return_value = MagicMock(info=MagicMock(experiment_id=EXPERIMENT_ID, run_id=RUN_ID)) store = MLFlowObjectStore( - TEST_PATH_FORMAT.format(experiment_id=PLACEHOLDER_EXPERIMENT_ID, run_id=PLACEHOLDER_RUN_ID)) + TEST_PATH_FORMAT.format(experiment_id=MLFLOW_EXPERIMENT_ID_PLACEHOLDER, run_id=MLFLOW_RUN_ID_PLACEHOLDER)) assert store.experiment_id == EXPERIMENT_ID assert store.run_id == RUN_ID @@ -109,7 +109,7 @@ def test_init_with_existing_experiment_and_no_run(monkeypatch): info=MagicMock(run_id=RUN_ID, run_name='test-run')) store = MLFlowObjectStore( - TEST_PATH_FORMAT.format(experiment_id=PLACEHOLDER_EXPERIMENT_ID, run_id=PLACEHOLDER_RUN_ID)) + TEST_PATH_FORMAT.format(experiment_id=MLFLOW_EXPERIMENT_ID_PLACEHOLDER, run_id=MLFLOW_RUN_ID_PLACEHOLDER)) assert store.experiment_id == EXPERIMENT_ID assert store.run_id == RUN_ID @@ -128,7 +128,7 @@ def test_init_with_no_experiment_and_no_run(monkeypatch): info=MagicMock(run_id=RUN_ID, run_name='test-run')) store = MLFlowObjectStore( - TEST_PATH_FORMAT.format(experiment_id=PLACEHOLDER_EXPERIMENT_ID, run_id=PLACEHOLDER_RUN_ID)) + TEST_PATH_FORMAT.format(experiment_id=MLFLOW_EXPERIMENT_ID_PLACEHOLDER, run_id=MLFLOW_RUN_ID_PLACEHOLDER)) assert store.experiment_id == EXPERIMENT_ID assert store.run_id == RUN_ID @@ -190,16 +190,19 @@ def test_get_artifact_path(mlflow_object_store): assert mlflow_object_store.get_artifact_path(DEFAULT_PATH + ARTIFACT_PATH) == ARTIFACT_PATH # Absolute DBFS path with placeholders - path = TEST_PATH_FORMAT.format(experiment_id=PLACEHOLDER_EXPERIMENT_ID, run_id=PLACEHOLDER_RUN_ID) + ARTIFACT_PATH + path = TEST_PATH_FORMAT.format(experiment_id=MLFLOW_EXPERIMENT_ID_PLACEHOLDER, + run_id=MLFLOW_RUN_ID_PLACEHOLDER) + ARTIFACT_PATH assert mlflow_object_store.get_artifact_path(path) == ARTIFACT_PATH # Raises ValueError for different experiment ID - path = TEST_PATH_FORMAT.format(experiment_id='different-experiment', run_id=PLACEHOLDER_RUN_ID) + ARTIFACT_PATH + path = TEST_PATH_FORMAT.format(experiment_id='different-experiment', + run_id=MLFLOW_RUN_ID_PLACEHOLDER) + ARTIFACT_PATH with pytest.raises(ValueError): mlflow_object_store.get_artifact_path(path) # Raises ValueError for different run ID - path = TEST_PATH_FORMAT.format(experiment_id=PLACEHOLDER_EXPERIMENT_ID, run_id='different-run') + ARTIFACT_PATH + path = TEST_PATH_FORMAT.format(experiment_id=MLFLOW_EXPERIMENT_ID_PLACEHOLDER, + run_id='different-run') + ARTIFACT_PATH with pytest.raises(ValueError): mlflow_object_store.get_artifact_path(path)