Skip to content

Commit

Permalink
Try formatting mlflow save folder after INIT
Browse files Browse the repository at this point in the history
Make MLFlow experiment and run ID available on all ranks

Fix path issue

Format mlflow placeholders in remote filenames
  • Loading branch information
jerrychen109 committed Jan 2, 2024
1 parent 1165ad1 commit cc51073
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 81 deletions.
30 changes: 28 additions & 2 deletions composer/callbacks/checkpoint_saver.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
from typing import Callable, List, Optional, Union

from composer.core import Callback, Event, State, Time
from composer.loggers import Logger
from composer.loggers import Logger, MLFlowLogger
from composer.utils import (FORMAT_NAME_WITH_DIST_AND_TIME_TABLE, FORMAT_NAME_WITH_DIST_TABLE, PartialFilePath,
checkpoint, create_interval_scheduler, create_symlink_file, dist,
ensure_folder_has_no_conflicting_files, format_name_with_dist,
format_name_with_dist_and_time, is_model_deepspeed, reproducibility, using_torch_2)
format_name_with_dist_and_time, is_model_deepspeed, partial_format, reproducibility,
using_torch_2)
from composer.utils.checkpoint import _TORCH_DISTRIBUTED_CHECKPOINTS_FILENAME
from composer.utils.object_store.mlflow_object_store import MLFLOW_EXPERIMENT_ID_FORMAT_KEY, MLFLOW_RUN_ID_FORMAT_KEY

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -270,6 +272,30 @@ def __init__(
self.start_batch = None

def init(self, state: State, logger: Logger) -> None:
# If MLFlowLogger is being used, format MLFlow-specific placeholders in the save folder and paths.
# Assumes that MLFlowLogger comes before CheckpointSaver in the list of loggers.
for destination in logger.destinations:
if isinstance(destination, MLFlowLogger):
mlflow_format_kwargs = {
MLFLOW_EXPERIMENT_ID_FORMAT_KEY: destination._experiment_id,
MLFLOW_RUN_ID_FORMAT_KEY: destination._run_id
}
self.folder = partial_format(self.folder, **mlflow_format_kwargs)

self.filename.folder = self.folder
if self.latest_filename is not None:
self.latest_filename.folder = self.folder

# The remote paths have the placeholders in their filename rather than folder
if self.remote_file_name is not None:
self.remote_file_name.filename = partial_format(self.remote_file_name.filename,
**mlflow_format_kwargs)
if self.latest_remote_file_name is not None:
self.latest_remote_file_name.filename = partial_format(self.latest_remote_file_name.filename,
**mlflow_format_kwargs)

break

folder = format_name_with_dist(self.folder, state.run_name)
os.makedirs(folder, exist_ok=True)

Expand Down
11 changes: 11 additions & 0 deletions composer/loggers/mlflow_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ def __init__(
self._rank_zero_only = rank_zero_only
self._last_flush_time = time.time()
self._flush_interval = flush_interval

self._experiment_id = None
self._run_id = None

if self._enabled:
self.tracking_uri = str(tracking_uri or mlflow.get_tracking_uri())
mlflow.set_tracking_uri(self.tracking_uri)
Expand Down Expand Up @@ -155,6 +159,13 @@ def init(self, state: State, logger: Logger) -> None:
log_system_metrics=self.log_system_metrics,
)

# If rank zero only, broadcast the MLFlow experiment and run IDs to other ranks, so the MLFlow run info is
# available to other ranks during runtime.
if self._rank_zero_only:
mlflow_ids_list = [self._experiment_id, self._run_id]
dist.broadcast_object_list(mlflow_ids_list, src=0)
self._experiment_id, self._run_id = mlflow_ids_list

def log_table(self, columns: List[str], rows: List[List[Any]], name: str = 'Table') -> None:
if self._enabled:
try:
Expand Down
20 changes: 18 additions & 2 deletions composer/trainer/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
PyTorchScheduler, State, Time, Timestamp, TimeUnit, TrainerMode, ensure_data_spec,
ensure_evaluator, ensure_time, get_precision_context, validate_eval_automicrobatching)
from composer.devices import Device, DeviceCPU, DeviceGPU, DeviceMPS, DeviceTPU
from composer.loggers import (ConsoleLogger, Logger, LoggerDestination, MosaicMLLogger, ProgressBarLogger,
from composer.loggers import (ConsoleLogger, Logger, LoggerDestination, MLFlowLogger, MosaicMLLogger, ProgressBarLogger,
RemoteUploaderDownloader, WandBLogger)
from composer.loggers.mosaicml_logger import MOSAICML_ACCESS_TOKEN_ENV_VAR, MOSAICML_PLATFORM_ENV_VAR
from composer.models import ComposerModel
Expand All @@ -54,8 +54,9 @@
ensure_tuple, export_with_logger, extract_hparams, format_name_with_dist,
get_composer_env_dict, get_device, get_file, is_tpu_installed, map_collection,
maybe_create_object_store_from_uri, maybe_create_remote_uploader_downloader_from_uri,
model_eval_mode, parse_uri, reproducibility, using_torch_2)
model_eval_mode, parse_uri, partial_format, reproducibility, using_torch_2)
from composer.utils.misc import is_model_deepspeed
from composer.utils.object_store.mlflow_object_store import MLFLOW_EXPERIMENT_ID_FORMAT_KEY, MLFLOW_RUN_ID_FORMAT_KEY

if is_tpu_installed():
import torch_xla.core.xla_model as xm
Expand Down Expand Up @@ -1163,6 +1164,21 @@ def __init__(
# Run Event.INIT
self.engine.run_event(Event.INIT)

# If the experiment is being tracked with an `MLFlowLogger`, then the `save_folder` and
# related paths/filenames may have placeholders or the MLFlow experiment and run IDs that must be populated
# after running Event.INIT.
if save_folder is not None:
for destination in self.logger.destinations:
if isinstance(destination, MLFlowLogger):
mlflow_format_kwargs = {
MLFLOW_EXPERIMENT_ID_FORMAT_KEY: destination._experiment_id,
MLFLOW_RUN_ID_FORMAT_KEY: destination._run_id
}

save_folder = partial_format(save_folder, **mlflow_format_kwargs)
if latest_remote_file_name is not None:
latest_remote_file_name = partial_format(latest_remote_file_name, **mlflow_format_kwargs)

# Log hparams.
if self.auto_log_hparams:
self.local_hparams = extract_hparams(locals())
Expand Down
76 changes: 14 additions & 62 deletions composer/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,68 +26,20 @@
UCObjectStore)
from composer.utils.retrying import retry
from composer.utils.string_enum import StringEnum
from composer.utils.string_helpers import partial_format

__all__ = [
'ensure_tuple',
'get_free_tcp_port',
'map_collection',
'IteratorFileStream',
'FORMAT_NAME_WITH_DIST_AND_TIME_TABLE',
'FORMAT_NAME_WITH_DIST_TABLE',
'get_file',
'PartialFilePath',
'create_symlink_file',
'ObjectStore',
'ObjectStoreTransientError',
'LibcloudObjectStore',
'S3ObjectStore',
'SFTPObjectStore',
'OCIObjectStore',
'GCSObjectStore',
'UCObjectStore',
'MLFlowObjectStore',
'MissingConditionalImportError',
'import_object',
'is_model_deepspeed',
'is_model_fsdp',
'is_notebook',
'StringEnum',
'load_checkpoint',
'save_checkpoint',
'safe_torch_load',
'ensure_folder_is_empty',
'ensure_folder_has_no_conflicting_files',
'export_for_inference',
'export_with_logger',
'quantize_dynamic',
'format_name_with_dist',
'format_name_with_dist_and_time',
'is_tar',
'maybe_create_object_store_from_uri',
'maybe_create_remote_uploader_downloader_from_uri',
'parse_uri',
'batch_get',
'batch_set',
'configure_excepthook',
'disable_env_report',
'enable_env_report',
'print_env',
'get_composer_env_dict',
'retry',
'model_eval_mode',
'get_device',
'is_tpu_installed',
'is_hpu_installed',
'ExportFormat',
'Transform',
'export_with_logger',
'extract_hparams',
'convert_nested_dict_to_flat_dict',
'convert_flat_dict_to_nested_dict',
'using_torch_2',
'create_interval_scheduler',
'EvalClient',
'LambdaEvalClient',
'LocalEvalClient',
'MosaicMLLambdaEvalClient',
'ensure_tuple', 'get_free_tcp_port', 'map_collection', 'IteratorFileStream', 'FORMAT_NAME_WITH_DIST_AND_TIME_TABLE',
'FORMAT_NAME_WITH_DIST_TABLE', 'get_file', 'PartialFilePath', 'create_symlink_file', 'ObjectStore',
'ObjectStoreTransientError', 'LibcloudObjectStore', 'S3ObjectStore', 'SFTPObjectStore', 'OCIObjectStore',
'GCSObjectStore', 'UCObjectStore', 'MLFlowObjectStore', 'MissingConditionalImportError', 'import_object',
'is_model_deepspeed', 'is_model_fsdp', 'is_notebook', 'StringEnum', 'load_checkpoint', 'save_checkpoint',
'safe_torch_load', 'ensure_folder_is_empty', 'ensure_folder_has_no_conflicting_files', 'export_for_inference',
'export_with_logger', 'quantize_dynamic', 'format_name_with_dist', 'format_name_with_dist_and_time', 'is_tar',
'maybe_create_object_store_from_uri', 'maybe_create_remote_uploader_downloader_from_uri', 'parse_uri', 'batch_get',
'batch_set', 'configure_excepthook', 'disable_env_report', 'enable_env_report', 'print_env',
'get_composer_env_dict', 'retry', 'model_eval_mode', 'get_device', 'is_tpu_installed', 'is_hpu_installed',
'ExportFormat', 'Transform', 'export_with_logger', 'extract_hparams', 'convert_nested_dict_to_flat_dict',
'convert_flat_dict_to_nested_dict', 'using_torch_2', 'create_interval_scheduler', 'EvalClient', 'LambdaEvalClient',
'LocalEvalClient', 'MosaicMLLambdaEvalClient', 'partial_format'
]
15 changes: 9 additions & 6 deletions composer/utils/object_store/mlflow_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@

DEFAULT_MLFLOW_EXPERIMENT_NAME = 'mlflow-object-store'

PLACEHOLDER_EXPERIMENT_ID = 'MLFLOW_EXPERIMENT_ID'
PLACEHOLDER_RUN_ID = 'MLFLOW_RUN_ID'
MLFLOW_EXPERIMENT_ID_FORMAT_KEY = 'mlflow_experiment_id'
MLFLOW_RUN_ID_FORMAT_KEY = 'mlflow_run_id'

MLFLOW_EXPERIMENT_ID_PLACEHOLDER = '{' + MLFLOW_EXPERIMENT_ID_FORMAT_KEY + '}'
MLFLOW_RUN_ID_PLACEHOLDER = '{' + MLFLOW_RUN_ID_FORMAT_KEY + '}'

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -131,9 +134,9 @@ def __init__(self, path: str, multipart_upload_chunk_size: int = 100 * 1024 * 10
mlflow.environment_variables.MLFLOW_MULTIPART_UPLOAD_CHUNK_SIZE.set(multipart_upload_chunk_size)

experiment_id, run_id, _ = MLFlowObjectStore.parse_dbfs_path(path)
if experiment_id == PLACEHOLDER_EXPERIMENT_ID:
if experiment_id == MLFLOW_EXPERIMENT_ID_PLACEHOLDER:
experiment_id = None
if run_id == PLACEHOLDER_RUN_ID:
if run_id == MLFLOW_RUN_ID_PLACEHOLDER:
run_id = None

# Construct the `experiment_id` and `run_id` depending on whether format placeholders were provided.
Expand Down Expand Up @@ -244,10 +247,10 @@ def get_artifact_path(self, object_name: str) -> str:
"""
if object_name.startswith(MLFLOW_DBFS_PATH_PREFIX):
experiment_id, run_id, object_name = self.parse_dbfs_path(object_name)
if (experiment_id != self.experiment_id and experiment_id != PLACEHOLDER_EXPERIMENT_ID):
if (experiment_id != self.experiment_id and experiment_id != MLFLOW_EXPERIMENT_ID_PLACEHOLDER):
raise ValueError(f'Object {object_name} belongs to experiment with id={experiment_id}, '
f'but MLFlowObjectStore is associated with experiment {self.experiment_id}.')
if (run_id != self.run_id and run_id != PLACEHOLDER_RUN_ID):
if (run_id != self.run_id and run_id != MLFLOW_RUN_ID_PLACEHOLDER):
raise ValueError(f'Object {object_name} belongs to run with id={run_id}, '
f'but MLFlowObjectStore is associated with run {self.run_id}.')
return object_name
Expand Down
31 changes: 31 additions & 0 deletions composer/utils/string_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright 2022 MosaicML Composer authors
# SPDX-License-Identifier: Apache-2.0

"""Utilities for string manipulation."""


def partial_format(s, *args, **kwargs):
"""Format a string with a partial set of arguments.
Since `str.format()` raises a `KeyError` if a format key is missing from the arguments, this
function allows for a partial set of arguments to be provided.
For example:
>>> partial_format('{foo} {bar}', foo='Hello')
'Hello {bar}'
>>> partial_format('{foo} {bar}', foo='Hello', bar='World')
'Hello World'
"""
result = s
done = False
while not done:
try:
result = s.format(*args, **kwargs)
done = True
except KeyError as e:
key = e.args[0]
kwargs[key] = '{' + key + '}'

return result
21 changes: 12 additions & 9 deletions tests/utils/object_store/test_mlflow_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import pytest

from composer.utils import MLFlowObjectStore
from composer.utils.object_store.mlflow_object_store import PLACEHOLDER_EXPERIMENT_ID, PLACEHOLDER_RUN_ID
from composer.utils.object_store.mlflow_object_store import MLFLOW_EXPERIMENT_ID_PLACEHOLDER, MLFLOW_RUN_ID_PLACEHOLDER

TEST_PATH_FORMAT = 'databricks/mlflow-tracking/{experiment_id}/{run_id}/artifacts/'
EXPERIMENT_ID = '123'
Expand Down Expand Up @@ -66,7 +66,7 @@ def test_init_with_experiment_and_no_run(monkeypatch):
mock_mlflow_client.return_value.create_run.return_value = MagicMock(
info=MagicMock(run_id=RUN_ID, run_name='test-run'))

store = MLFlowObjectStore(TEST_PATH_FORMAT.format(experiment_id=EXPERIMENT_ID, run_id=PLACEHOLDER_RUN_ID))
store = MLFlowObjectStore(TEST_PATH_FORMAT.format(experiment_id=EXPERIMENT_ID, run_id=MLFLOW_RUN_ID_PLACEHOLDER))
assert store.experiment_id == EXPERIMENT_ID
assert store.run_id == RUN_ID

Expand All @@ -76,7 +76,7 @@ def test_init_with_run_and_no_experiment(monkeypatch):
monkeypatch.setattr(dbx_sdk, 'WorkspaceClient', MagicMock())

with pytest.raises(ValueError):
MLFlowObjectStore(TEST_PATH_FORMAT.format(experiment_id=PLACEHOLDER_EXPERIMENT_ID, run_id=RUN_ID))
MLFlowObjectStore(TEST_PATH_FORMAT.format(experiment_id=MLFLOW_EXPERIMENT_ID_PLACEHOLDER, run_id=RUN_ID))


def test_init_with_active_run(monkeypatch):
Expand All @@ -91,7 +91,7 @@ def test_init_with_active_run(monkeypatch):
mock_active_run.return_value = MagicMock(info=MagicMock(experiment_id=EXPERIMENT_ID, run_id=RUN_ID))

store = MLFlowObjectStore(
TEST_PATH_FORMAT.format(experiment_id=PLACEHOLDER_EXPERIMENT_ID, run_id=PLACEHOLDER_RUN_ID))
TEST_PATH_FORMAT.format(experiment_id=MLFLOW_EXPERIMENT_ID_PLACEHOLDER, run_id=MLFLOW_RUN_ID_PLACEHOLDER))
assert store.experiment_id == EXPERIMENT_ID
assert store.run_id == RUN_ID

Expand All @@ -109,7 +109,7 @@ def test_init_with_existing_experiment_and_no_run(monkeypatch):
info=MagicMock(run_id=RUN_ID, run_name='test-run'))

store = MLFlowObjectStore(
TEST_PATH_FORMAT.format(experiment_id=PLACEHOLDER_EXPERIMENT_ID, run_id=PLACEHOLDER_RUN_ID))
TEST_PATH_FORMAT.format(experiment_id=MLFLOW_EXPERIMENT_ID_PLACEHOLDER, run_id=MLFLOW_RUN_ID_PLACEHOLDER))
assert store.experiment_id == EXPERIMENT_ID
assert store.run_id == RUN_ID

Expand All @@ -128,7 +128,7 @@ def test_init_with_no_experiment_and_no_run(monkeypatch):
info=MagicMock(run_id=RUN_ID, run_name='test-run'))

store = MLFlowObjectStore(
TEST_PATH_FORMAT.format(experiment_id=PLACEHOLDER_EXPERIMENT_ID, run_id=PLACEHOLDER_RUN_ID))
TEST_PATH_FORMAT.format(experiment_id=MLFLOW_EXPERIMENT_ID_PLACEHOLDER, run_id=MLFLOW_RUN_ID_PLACEHOLDER))
assert store.experiment_id == EXPERIMENT_ID
assert store.run_id == RUN_ID

Expand Down Expand Up @@ -190,16 +190,19 @@ def test_get_artifact_path(mlflow_object_store):
assert mlflow_object_store.get_artifact_path(DEFAULT_PATH + ARTIFACT_PATH) == ARTIFACT_PATH

# Absolute DBFS path with placeholders
path = TEST_PATH_FORMAT.format(experiment_id=PLACEHOLDER_EXPERIMENT_ID, run_id=PLACEHOLDER_RUN_ID) + ARTIFACT_PATH
path = TEST_PATH_FORMAT.format(experiment_id=MLFLOW_EXPERIMENT_ID_PLACEHOLDER,
run_id=MLFLOW_RUN_ID_PLACEHOLDER) + ARTIFACT_PATH
assert mlflow_object_store.get_artifact_path(path) == ARTIFACT_PATH

# Raises ValueError for different experiment ID
path = TEST_PATH_FORMAT.format(experiment_id='different-experiment', run_id=PLACEHOLDER_RUN_ID) + ARTIFACT_PATH
path = TEST_PATH_FORMAT.format(experiment_id='different-experiment',
run_id=MLFLOW_RUN_ID_PLACEHOLDER) + ARTIFACT_PATH
with pytest.raises(ValueError):
mlflow_object_store.get_artifact_path(path)

# Raises ValueError for different run ID
path = TEST_PATH_FORMAT.format(experiment_id=PLACEHOLDER_EXPERIMENT_ID, run_id='different-run') + ARTIFACT_PATH
path = TEST_PATH_FORMAT.format(experiment_id=MLFLOW_EXPERIMENT_ID_PLACEHOLDER,
run_id='different-run') + ARTIFACT_PATH
with pytest.raises(ValueError):
mlflow_object_store.get_artifact_path(path)

Expand Down

0 comments on commit cc51073

Please sign in to comment.