From f0af5c9c437fb4afc39d248456cb365e76770d0f Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Wed, 15 Feb 2023 10:39:45 -0800 Subject: [PATCH] Revert "Revert "[build_base] [Tune] Add more comprehensive support for remote `upload_dir` w/ endpoint and params (#32479)" (#32571)" This reverts commit 350fb13090b45f62531dd390b678ce706ae9ce36. --- python/ray/_private/test_utils.py | 56 ++++++------ python/ray/air/_internal/uri_utils.py | 61 ++++++++++++++ python/ray/tune/experiment/experiment.py | 11 +-- python/ray/tune/experiment/trial.py | 8 +- python/ray/tune/impl/tuner_internal.py | 13 +-- python/ray/tune/tests/test_experiment.py | 17 +++- python/ray/tune/tests/test_syncer.py | 103 ++++++++++++++++++----- python/ray/tune/trainable/util.py | 4 +- 8 files changed, 194 insertions(+), 79 deletions(-) create mode 100644 python/ray/air/_internal/uri_utils.py diff --git a/python/ray/_private/test_utils.py b/python/ray/_private/test_utils.py index da02cc8e3002e..8e5ccccb9ce4d 100644 --- a/python/ray/_private/test_utils.py +++ b/python/ray/_private/test_utils.py @@ -1572,7 +1572,20 @@ def no_resource_leaks_excluding_node_resources(): @contextmanager -def simulate_storage(storage_type, root=None): +def simulate_storage( + storage_type: str, + root: Optional[str] = None, + port: int = 5002, + region: str = "us-west-2", +): + """Context that simulates a given storage type and yields the URI. + + Args: + storage_type: The storage type to simiulate ("fs" or "s3") + root: Root directory of the URI to return (e.g., s3 bucket name) + port: The port of the localhost endpoint where s3 is being served (s3 only) + region: The s3 region (s3 only) + """ if storage_type == "fs": if root is None: with tempfile.TemporaryDirectory() as d: @@ -1580,38 +1593,17 @@ def simulate_storage(storage_type, root=None): else: yield "file://" + root elif storage_type == "s3": - import uuid - - from moto import mock_s3 - - from ray.tests.mock_s3_server import start_service, stop_process - - @contextmanager - def aws_credentials(): - old_env = os.environ - os.environ["AWS_ACCESS_KEY_ID"] = "testing" - os.environ["AWS_SECRET_ACCESS_KEY"] = "testing" - os.environ["AWS_SECURITY_TOKEN"] = "testing" - os.environ["AWS_SESSION_TOKEN"] = "testing" - yield - os.environ = old_env - - @contextmanager - def moto_s3_server(): - host = "localhost" - port = 5002 - url = f"http://{host}:{port}" - process = start_service("s3", host, port) - yield url - stop_process(process) - - if root is None: - root = uuid.uuid4().hex - with moto_s3_server() as s3_server, aws_credentials(), mock_s3(): - url = f"s3://{root}?region=us-west-2&endpoint_override={s3_server}" - yield url + from moto.server import ThreadedMotoServer + + root = root or uuid.uuid4().hex + s3_server = f"http://localhost:{port}" + server = ThreadedMotoServer(port=port) + server.start() + url = f"s3://{root}?region={region}&endpoint_override={s3_server}" + yield url + server.stop() else: - raise ValueError(f"Unknown storage type: {storage_type}") + raise NotImplementedError(f"Unknown storage type: {storage_type}") def job_hook(**kwargs): diff --git a/python/ray/air/_internal/uri_utils.py b/python/ray/air/_internal/uri_utils.py new file mode 100644 index 0000000000000..c6222198b1378 --- /dev/null +++ b/python/ray/air/_internal/uri_utils.py @@ -0,0 +1,61 @@ +from pathlib import Path +import urllib.parse +import os +from typing import Union + + +class URI: + """Represents a URI, supporting path appending and retrieving parent URIs. + + Example Usage: + + >>> s3_uri = URI("s3://bucket/a?scheme=http&endpoint_override=localhost%3A900") + >>> s3_uri + URI + >>> str(s3_uri / "b" / "c") + 's3://bucket/a/b/c?scheme=http&endpoint_override=localhost%3A900' + >>> str(s3_uri.parent) + 's3://bucket?scheme=http&endpoint_override=localhost%3A900' + >>> str(s3_uri) + 's3://bucket/a?scheme=http&endpoint_override=localhost%3A900' + >>> s3_uri.parent.name, s3_uri.name + ('bucket', 'a') + + Args: + uri: The URI to represent. + Ex: s3://bucket?scheme=http&endpoint_override=localhost%3A900 + Ex: file:///a/b/c/d + """ + + def __init__(self, uri: str): + self._parsed = urllib.parse.urlparse(uri) + if not self._parsed.scheme: + raise ValueError(f"Invalid URI: {uri}") + self._path = Path(os.path.normpath(self._parsed.netloc + self._parsed.path)) + + @property + def name(self) -> str: + return self._path.name + + @property + def parent(self) -> "URI": + assert self._path.parent != ".", f"{str(self)} has no valid parent URI" + return URI(self._get_str_representation(self._parsed, self._path.parent)) + + def __truediv__(self, path_to_append): + assert isinstance(path_to_append, str) + return URI( + self._get_str_representation(self._parsed, self._path / path_to_append) + ) + + @classmethod + def _get_str_representation( + cls, parsed_uri: urllib.parse.ParseResult, path: Union[str, Path] + ) -> str: + return parsed_uri._replace(netloc=str(path), path="").geturl() + + def __repr__(self): + return f"URI<{str(self)}>" + + def __str__(self): + return self._get_str_representation(self._parsed, self._path) diff --git a/python/ray/tune/experiment/experiment.py b/python/ray/tune/experiment/experiment.py index 6e52b8de0b2b6..7b9c6fe028d61 100644 --- a/python/ray/tune/experiment/experiment.py +++ b/python/ray/tune/experiment/experiment.py @@ -22,6 +22,7 @@ ) from ray.air import CheckpointConfig +from ray.air._internal.uri_utils import URI from ray.tune.error import TuneError from ray.tune.registry import register_trainable, is_function_trainable from ray.tune.result import DEFAULT_RESULTS_DIR @@ -443,14 +444,8 @@ def checkpoint_dir(self): def remote_checkpoint_dir(self) -> Optional[str]: if not self.sync_config.upload_dir or not self.dir_name: return None - - # NOTE: `upload_dir` can contain query strings. For example: - # 's3://bucket?scheme=http&endpoint_override=localhost%3A9000'. - if "?" in self.sync_config.upload_dir: - path, query = self.sync_config.upload_dir.split("?") - return os.path.join(path, self.dir_name) + "?" + query - - return os.path.join(self.sync_config.upload_dir, self.dir_name) + uri = URI(self.sync_config.upload_dir) + return str(uri / self.dir_name) @property def run_identifier(self): diff --git a/python/ray/tune/experiment/trial.py b/python/ray/tune/experiment/trial.py index 2697329981ba0..2b4edcf8c4c32 100644 --- a/python/ray/tune/experiment/trial.py +++ b/python/ray/tune/experiment/trial.py @@ -14,6 +14,7 @@ import ray from ray.air import CheckpointConfig +from ray.air._internal.uri_utils import URI from ray.air._internal.checkpoint_manager import _TrackedCheckpoint, CheckpointStorage import ray.cloudpickle as cloudpickle from ray.exceptions import RayActorError, RayTaskError @@ -601,7 +602,7 @@ def generate_id(cls): return str(uuid.uuid4().hex)[:8] @property - def remote_checkpoint_dir(self): + def remote_checkpoint_dir(self) -> str: """This is the **per trial** remote checkpoint dir. This is different from **per experiment** remote checkpoint dir. @@ -609,9 +610,8 @@ def remote_checkpoint_dir(self): assert self.logdir, "Trial {}: logdir not initialized.".format(self) if not self.sync_config.upload_dir or not self.experiment_dir_name: return None - return os.path.join( - self.sync_config.upload_dir, self.experiment_dir_name, self.relative_logdir - ) + uri = URI(self.sync_config.upload_dir) + return str(uri / self.experiment_dir_name / self.relative_logdir) @property def uses_cloud_checkpointing(self): diff --git a/python/ray/tune/impl/tuner_internal.py b/python/ray/tune/impl/tuner_internal.py index f3ce25e59b8c7..e6857f7c848ef 100644 --- a/python/ray/tune/impl/tuner_internal.py +++ b/python/ray/tune/impl/tuner_internal.py @@ -7,11 +7,11 @@ import tempfile from pathlib import Path from typing import Any, Callable, Dict, Optional, Type, Union, TYPE_CHECKING, Tuple -import urllib.parse import ray import ray.cloudpickle as pickle from ray.util import inspect_serializability +from ray.air._internal.uri_utils import URI from ray.air._internal.remote_storage import download_from_uri, is_non_local_path_uri from ray.air.config import RunConfig, ScalingConfig from ray.tune import Experiment, TuneError, ExperimentAnalysis @@ -359,14 +359,9 @@ def _restore_from_path_or_uri( self._run_config.name = experiment_path.name else: # Set the experiment `name` and `upload_dir` according to the URI - parsed_uri = urllib.parse.urlparse(path_or_uri) - remote_path = Path(os.path.normpath(parsed_uri.netloc + parsed_uri.path)) - upload_dir = parsed_uri._replace( - netloc="", path=str(remote_path.parent) - ).geturl() - - self._run_config.name = remote_path.name - self._run_config.sync_config.upload_dir = upload_dir + uri = URI(path_or_uri) + self._run_config.name = uri.name + self._run_config.sync_config.upload_dir = str(uri.parent) # If we synced, `experiment_checkpoint_dir` will contain a temporary # directory. Create an experiment checkpoint dir instead and move diff --git a/python/ray/tune/tests/test_experiment.py b/python/ray/tune/tests/test_experiment.py index 729b1e3a9633c..be3774b9ec2a6 100644 --- a/python/ray/tune/tests/test_experiment.py +++ b/python/ray/tune/tests/test_experiment.py @@ -4,19 +4,30 @@ import ray from ray.air import CheckpointConfig from ray.tune import register_trainable, SyncConfig -from ray.tune.experiment import Experiment, _convert_to_experiment_list +from ray.tune.experiment import Experiment, Trial, _convert_to_experiment_list from ray.tune.error import TuneError from ray.tune.utils import diagnose_serialization -def test_remote_checkpoint_dir_with_query_string(): +def test_remote_checkpoint_dir_with_query_string(tmp_path): + sync_config = SyncConfig(syncer="auto", upload_dir="s3://bucket?scheme=http") experiment = Experiment( name="spam", run=lambda config: config, - sync_config=SyncConfig(syncer="auto", upload_dir="s3://bucket?scheme=http"), + sync_config=sync_config, ) assert experiment.remote_checkpoint_dir == "s3://bucket/spam?scheme=http" + trial = Trial( + "mock", + stub=True, + sync_config=sync_config, + experiment_dir_name="spam", + local_dir=str(tmp_path), + ) + trial.relative_logdir = "trial_dirname" + assert trial.remote_checkpoint_dir == "s3://bucket/spam/trial_dirname?scheme=http" + class ExperimentTest(unittest.TestCase): def tearDown(self): diff --git a/python/ray/tune/tests/test_syncer.py b/python/ray/tune/tests/test_syncer.py index 83d03b285da4c..2e39dac28ecf5 100644 --- a/python/ray/tune/tests/test_syncer.py +++ b/python/ray/tune/tests/test_syncer.py @@ -4,12 +4,11 @@ import subprocess import tempfile import time -from pathlib import Path from typing import List, Optional from unittest.mock import patch -import pytest import boto3 +import pytest from freezegun import freeze_time import ray @@ -23,6 +22,14 @@ from ray._private.test_utils import simulate_storage +@pytest.fixture +def ray_start_4_cpus(): + address_info = ray.init(num_cpus=4, configure_logging=False) + yield address_info + # The code after the yield will run as teardown code. + ray.shutdown() + + @pytest.fixture def ray_start_2_cpus(): address_info = ray.init(num_cpus=2, configure_logging=False) @@ -65,6 +72,23 @@ def temp_data_dirs(): shutil.rmtree(tmp_target) +@pytest.fixture +def mock_s3_bucket_uri(): + bucket_name = "test_syncer_bucket" + port = 5002 + region = "us-west-2" + with simulate_storage("s3", root=bucket_name, port=port, region=region) as s3_uri: + s3 = boto3.client( + "s3", region_name=region, endpoint_url=f"http://localhost:{port}" + ) + s3.create_bucket( + Bucket=bucket_name, + CreateBucketConfiguration={"LocationConstraint": region}, + ) + + yield s3_uri + + def assert_file(exists: bool, root: str, path: str): full_path = os.path.join(root, path) @@ -620,7 +644,7 @@ def test_syncer_serialize(temp_data_dirs): pickle.dumps(syncer) -def test_final_experiment_checkpoint_sync(tmpdir): +def test_final_experiment_checkpoint_sync(ray_start_2_cpus, tmpdir): class SlowSyncer(_DefaultSyncer): def __init__(self, **kwargs): super(_DefaultSyncer, self).__init__(**kwargs) @@ -676,28 +700,19 @@ def train_func(config): ) -def test_sync_folder_with_many_files_s3(tmpdir): +def test_sync_folder_with_many_files_s3(mock_s3_bucket_uri, tmp_path): + source_dir = tmp_path / "source" + check_dir = tmp_path / "check" + source_dir.mkdir() + check_dir.mkdir() + # Create 256 files to upload for i in range(256): - (tmpdir / str(i)).write_text("", encoding="utf-8") + (source_dir / str(i)).write_text("", encoding="utf-8") - root = "bucket_test_syncer/dir" - with simulate_storage("s3", root) as s3_uri: - # Upload to S3 - - s3 = boto3.client( - "s3", region_name="us-west-2", endpoint_url="http://localhost:5002" - ) - s3.create_bucket( - Bucket="bucket_test_syncer", - CreateBucketConfiguration={"LocationConstraint": "us-west-2"}, - ) - upload_to_uri(tmpdir, s3_uri) - - with tempfile.TemporaryDirectory() as download_dir: - download_from_uri(s3_uri, download_dir) - - assert (Path(download_dir) / "255").exists() + upload_to_uri(source_dir, mock_s3_bucket_uri) + download_from_uri(mock_s3_bucket_uri, check_dir) + assert (check_dir / "255").exists() def test_sync_folder_with_many_files_fs(tmpdir): @@ -713,6 +728,50 @@ def test_sync_folder_with_many_files_fs(tmpdir): assert (tmpdir / "255").exists() +def test_e2e_sync_to_s3(ray_start_4_cpus, mock_s3_bucket_uri, tmp_path): + """Tests an end to end Tune run with syncing to a mock s3 bucket.""" + download_dir = tmp_path / "upload_dir" + download_dir.mkdir() + + local_dir = str(tmp_path / "local_dir") + + exp_name = "test_e2e_sync_to_s3" + + def train_fn(config): + session.report({"score": 1}, checkpoint=Checkpoint.from_dict({"data": 1})) + + tuner = tune.Tuner( + train_fn, + param_space={"id": tune.grid_search([0, 1, 2, 3])}, + run_config=RunConfig( + name=exp_name, + local_dir=local_dir, + sync_config=tune.SyncConfig(upload_dir=mock_s3_bucket_uri), + ), + tune_config=tune.TuneConfig( + trial_dirname_creator=lambda t: str(t.config.get("id")) + ), + ) + result_grid = tuner.fit() + + # Download remote dir to do some sanity checks + download_from_uri(uri=mock_s3_bucket_uri, local_path=str(download_dir)) + + assert not result_grid.errors + + def get_remote_trial_dir(trial_id: int): + return os.path.join(download_dir, exp_name, str(trial_id)) + + # Check that each remote trial dir has a checkpoint + for result in result_grid: + trial_id = result.config["id"] + remote_dir = get_remote_trial_dir(trial_id) + num_checkpoints = len( + [file for file in os.listdir(remote_dir) if file.startswith("checkpoint_")] + ) + assert num_checkpoints == 1 + + if __name__ == "__main__": import sys diff --git a/python/ray/tune/trainable/util.py b/python/ray/tune/trainable/util.py index 489faad4113e2..94d6e0261c1dc 100644 --- a/python/ray/tune/trainable/util.py +++ b/python/ray/tune/trainable/util.py @@ -14,6 +14,7 @@ PlacementGroupFactory, resource_dict_to_pg_factory, ) +from ray.air._internal.uri_utils import URI from ray.air.config import ScalingConfig from ray.tune.registry import _ParameterRegistry from ray.tune.resources import Resources @@ -194,7 +195,8 @@ def get_remote_storage_path( ``logdir`` is assumed to be a prefix of ``local_path``.""" rel_local_path = os.path.relpath(local_path, logdir) - return os.path.join(remote_checkpoint_dir, rel_local_path) + uri = URI(remote_checkpoint_dir) + return str(uri / rel_local_path) @DeveloperAPI