forked from ray-project/ray
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[tune] Add test for multi-tenancy workaround and documentation to FAQ (…
…ray-project#32560) Ray Tune does not officially support multi-tenancy, but we see some users still using it. They then run into problems with the cluster-global trainable registry, which will overwrite trainables with the same name from different tuning jobs. The workaround here is to use a unique name for every trainable. This is currently undocumented. This PR adds a section to the Ray Tune FAQ explaining the workaround (with a big disclaimer on why multi-tenancy might still be a bad idea). It also adds a unit test that constructs a conflict situation and tests that the workaround mitigates the problem. Signed-off-by: Kai Fricke <kai@anyscale.com> Signed-off-by: Kai Fricke <krfricke@users.noreply.github.com> Co-authored-by: angelinalg <122562471+angelinalg@users.noreply.github.com> Co-authored-by: Richard Liaw <rliaw@berkeley.edu> Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
- Loading branch information
Showing
5 changed files
with
308 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
from pathlib import Path | ||
import os | ||
import time | ||
|
||
from ray import air, tune | ||
from ray.air import session | ||
from ray.train.data_parallel_trainer import DataParallelTrainer | ||
|
||
from ray.tune.search import BasicVariantGenerator | ||
|
||
# Hang full script until this marker is deleted | ||
HANG_RUN_MARKER = os.environ.get("HANG_RUN_MARKER", "") | ||
|
||
# Delete this marker when a trial is started | ||
DELETE_TRIAL_MARKER = os.environ.get("DELETE_TRIAL_MARKER", "") | ||
|
||
# Hang in trial until this marker is deleted | ||
HANG_TRIAL_MARKER = os.environ.get("HANG_TRIAL_MARKER", "") | ||
|
||
# Delete this marker after tuning finished | ||
DELETE_RUN_MARKER = os.environ.get("DELETE_RUN_MARKER", "") | ||
|
||
# Hang at end of run until this marker is deleted | ||
HANG_END_MARKER = os.environ.get("HANG_END_MARKER", "") | ||
|
||
# Report this val as the "fixed" metric in the trial. | ||
# This value is captured in the trainer and will conflict when a trainable | ||
# is overwritten! | ||
FIXED_VAL = int(os.environ["FIXED_VAL"]) | ||
|
||
# Grid search over these vals and report as "param" metric in the trial. | ||
# Even with conflicting trainables, these will be reported correctly as they | ||
# are tracked by the driver, not the trainable. | ||
VALS = [int(os.environ["VAL_1"]), int(os.environ["VAL_2"])] | ||
|
||
# If 1, use workaround, if 0, just run (and fail in job 1). | ||
USE_WORKAROUND = bool(int(os.environ["WORKAROUND"])) | ||
|
||
# Wait for HANG_RUN_MARKER | ||
while HANG_RUN_MARKER and Path(HANG_RUN_MARKER).exists(): | ||
time.sleep(0.1) | ||
|
||
|
||
def train_func(config): | ||
# Delete DELETE_TRIAL_MARKER | ||
delete_marker = config["delete_marker"] | ||
if delete_marker and Path(delete_marker).exists(): | ||
Path(delete_marker).unlink() | ||
|
||
# Wait for HANG_TRIAL_MARKER | ||
hang_marker = config["hang_marker"] | ||
while hang_marker and Path(hang_marker).exists(): | ||
time.sleep(0.1) | ||
|
||
# Finish trial | ||
session.report({"param": config["param"], "fixed": config["fixed"]}) | ||
|
||
|
||
# Workaround: Just use a unique name per trainer/trainable | ||
if USE_WORKAROUND: | ||
import uuid | ||
|
||
DataParallelTrainer.__name__ = "DataParallelTrainer_" + uuid.uuid4().hex[:8] | ||
|
||
|
||
trainer = DataParallelTrainer( | ||
train_loop_per_worker=train_func, | ||
train_loop_config={ | ||
"fixed": FIXED_VAL, | ||
}, | ||
scaling_config=air.ScalingConfig( | ||
num_workers=1, trainer_resources={"CPU": 0}, resources_per_worker={"CPU": 2} | ||
), | ||
) | ||
|
||
tuner = tune.Tuner( | ||
trainer, | ||
param_space={ | ||
"train_loop_config": { | ||
"param": tune.grid_search(VALS), | ||
"delete_marker": DELETE_TRIAL_MARKER, | ||
"hang_marker": HANG_TRIAL_MARKER, | ||
} | ||
}, | ||
tune_config=tune.TuneConfig(search_alg=BasicVariantGenerator(max_concurrent=1)), | ||
) | ||
results = tuner.fit() | ||
|
||
# Delete DELETE_RUN_MARKER | ||
if DELETE_RUN_MARKER and Path(DELETE_RUN_MARKER).exists(): | ||
Path(DELETE_RUN_MARKER).unlink() | ||
|
||
# Wait for HANG_END_MARKER | ||
while HANG_END_MARKER and Path(HANG_END_MARKER).exists(): | ||
time.sleep(0.1) | ||
|
||
# Put assertions last, so we don't finish early because of failures | ||
assert sorted([result.metrics["param"] for result in results]) == VALS | ||
assert [result.metrics["fixed"] for result in results] == [FIXED_VAL, FIXED_VAL] | ||
|
||
if USE_WORKAROUND: | ||
from ray.experimental.internal_kv import _internal_kv_del | ||
from ray.tune.registry import _make_key, TRAINABLE_CLASS | ||
|
||
_internal_kv_del(_make_key("global", TRAINABLE_CLASS, DataParallelTrainer.__name__)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
import pytest | ||
import subprocess | ||
import sys | ||
|
||
from pathlib import Path | ||
|
||
import ray | ||
|
||
|
||
@pytest.fixture | ||
def ray_start_4_cpus(): | ||
address_info = ray.init(num_cpus=4) | ||
yield address_info | ||
# The code after the yield will run as teardown code. | ||
ray.shutdown() | ||
|
||
|
||
@pytest.mark.parametrize("use_workaround", [False, True]) | ||
@pytest.mark.parametrize("exit_same", [False, True]) | ||
def test_registry_conflict(ray_start_4_cpus, tmpdir, use_workaround, exit_same): | ||
"""Two concurrent Tune runs can conflict with each other when they | ||
use a trainable with the same name. | ||
This test starts two runs in parallel and asserts that a workaround used | ||
in the docs can alleviate the problem. | ||
This is how we schedule the runs: | ||
- We have two runs. Every run starts two trials. | ||
- Run 1 starts 1 trial immediately. This trial starts with | ||
the correct parameters for the script. The trial hangs until the file | ||
``run_2_finished`` is deleted. | ||
- Run 2 starts as soon as the first trial of Run 1 runs (by waiting | ||
until the ``run_1_running`` file is deleted by that trial). It will overwrite | ||
the global registry trainable with the same name. | ||
- Run 2 finishes both trials. The script finishes with the expected | ||
parameters. | ||
- Run 2 then deletes the ``run_2_finished`` marker, allowing Run 1 trial 1 | ||
to continue training. When training finishes, the second trial launches. | ||
This second trial then uses the overwritten trainable, that is, the wrong | ||
parameters unless you use the workaround. | ||
- Run 1 finally finishes, and we compare the expected results with the actual | ||
results. | ||
When you don't use the workaround, expect an assertion error (if ``exit_same=True``, | ||
see below), otherwise a KeyError (because a trial failed). | ||
When the workaround is used, we expect everything to run without error. | ||
NOTE: Two errors can occur with registry conflicts. First, | ||
the trainable can be overwritten and captured, for example, when a fixed value | ||
is included in the trainable. The second trial of run 1 then has a wrong | ||
parameter and reports a wrong metric (from run 2). | ||
The second error occurs when the second run finishes fully and its objects | ||
are garbage collected. In this case, the first run tries to find the trainable | ||
registered by run 2, but fails lookup because the objects have been | ||
removed already. Note that these objects are registered with | ||
``tune.with_parameters()`` (not the global registry store). | ||
We test both scenarios using the ``exit_same`` parameter. | ||
NOTE: If we resolve the registry issue (for example, with unique keys) | ||
you can remove the test that expects the assertion error. We can remove | ||
the parametrization and the workaround and assert that no conflict occurs. | ||
""" | ||
# Create file markers | ||
run_1_running = tmpdir / "run_1_running" | ||
run_1_finished = tmpdir / "run_1_finished" | ||
run_2_finished = tmpdir / "run_2_finished" | ||
|
||
run_1_running.write_text("", encoding="utf-8") | ||
run_1_finished.write_text("", encoding="utf-8") | ||
run_2_finished.write_text("", encoding="utf-8") | ||
|
||
ray_address = ray_start_4_cpus.address_info["address"] | ||
|
||
run_1_env = { | ||
"RAY_ADDRESS": ray_address, | ||
"WORKAROUND": str(int(use_workaround)), | ||
"FIXED_VAL": str(1), | ||
"VAL_1": str(2), | ||
"VAL_2": str(3), | ||
# Run 1 can start immediately | ||
"HANG_RUN_MARKER": "", | ||
# Allow second run to start once first trial of first run is started | ||
"DELETE_TRIAL_MARKER": str(run_1_running), | ||
# Hang in first trial until the second run finished | ||
"HANG_TRIAL_MARKER": str(run_2_finished), | ||
# Mark run 1 as completed | ||
"DELETE_RUN_MARKER": str(run_1_finished), | ||
# Do not wait at end | ||
"HANG_END_MARKER": "", | ||
} | ||
|
||
run_2_env = { | ||
"RAY_ADDRESS": ray_address, | ||
"WORKAROUND": str(int(use_workaround)), | ||
"FIXED_VAL": str(4), | ||
"VAL_1": str(5), | ||
"VAL_2": str(6), | ||
# Wait until first trial of first run is running | ||
"HANG_RUN_MARKER": str(run_1_running), | ||
# Don't delete during run | ||
"DELETE_TRIAL_MARKER": "", | ||
# No need to hang in trial | ||
"HANG_TRIAL_MARKER": "", | ||
# After full run finished, allow first run to continue | ||
"DELETE_RUN_MARKER": str(run_2_finished), | ||
# Wait until first run finished | ||
# If we don't do this, we actually don't die because of parameter conflict | ||
# but because of "The object's owner has exited" - so we test this | ||
# separately | ||
"HANG_END_MARKER": str(run_1_finished) if exit_same else "", | ||
} | ||
|
||
script_path = Path(__file__).parent / "_test_multi_tenancy_run.py" | ||
|
||
run_1 = subprocess.Popen( | ||
[sys.executable, script_path], env=run_1_env, stderr=subprocess.PIPE | ||
) | ||
print("Started run 1:", run_1.pid) | ||
|
||
run_2 = subprocess.Popen([sys.executable, script_path], env=run_2_env) | ||
print("Started run 2:", run_2.pid) | ||
|
||
assert run_2.wait() == 0 | ||
|
||
if use_workaround: | ||
assert run_1.wait() == 0 | ||
else: | ||
assert run_1.wait() != 0 | ||
|
||
stderr = run_1.stderr.read().decode() | ||
|
||
if not exit_same: | ||
assert "OwnerDiedError" in stderr, stderr | ||
else: | ||
assert "AssertionError" in stderr, stderr | ||
|
||
|
||
if __name__ == "__main__": | ||
import pytest | ||
|
||
sys.exit(pytest.main(["-v", __file__])) |