Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tune] Add test for multi-tenancy workaround and documentation to FAQ #32560

Merged
merged 20 commits into from
Feb 16, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 59 additions & 1 deletion doc/source/tune/faq.rst
Original file line number Diff line number Diff line change
Expand Up @@ -773,4 +773,62 @@ API should be used to get the path for saving trial-specific outputs.
The `TUNE_ORIG_WORKING_DIR` environment variable was the original workaround for
accessing paths relative to the original working directory. This environment
variable is deprecated, and the `chdir_to_trial_dir` flag described above should be
used instead.
used instead.


.. _tune-multi-tenancy:

How can I run multiple Ray Tune jobs on the same cluster at the same time (multi tenancy)?
krfricke marked this conversation as resolved.
Show resolved Hide resolved
krfricke marked this conversation as resolved.
Show resolved Hide resolved
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

First, please acknowledge that running multiple Ray Tune runs on the same cluster at the same
krfricke marked this conversation as resolved.
Show resolved Hide resolved
time is not officially supported. This means we do not test this workflow and we recommend
krfricke marked this conversation as resolved.
Show resolved Hide resolved
using a separate cluster for every tuning job.
krfricke marked this conversation as resolved.
Show resolved Hide resolved

The reasons for this are:

1. When multiple Ray Tune jobs run at the same time, they compete for resources.
One job could run all its trials at the same time, while the other job waits
for a long time until it gets resources to even run one trial.
krfricke marked this conversation as resolved.
Show resolved Hide resolved
2. Concurrent jobs are harder to debug. If a trial of job A fills the disk,
trials from job B on the same node will suffer from it. In practice it's hard
krfricke marked this conversation as resolved.
Show resolved Hide resolved
to reason about these conditions from the logs if something goes wrong.
3. Some internal implementations in Ray Tune actually assume that you only have one job
krfricke marked this conversation as resolved.
Show resolved Hide resolved
running at the same time. This can lead to conflicts.
krfricke marked this conversation as resolved.
Show resolved Hide resolved

Especially the third point is a common problem when running concurrent tuning jobs. Symptoms
krfricke marked this conversation as resolved.
Show resolved Hide resolved
are for instance that trials from job A use parameters specified in job B, leading to unexpected
krfricke marked this conversation as resolved.
Show resolved Hide resolved
results.

The technical reason for this is that Ray Tune uses a cluster-global key-value store to store trainables.
krfricke marked this conversation as resolved.
Show resolved Hide resolved
If two tuning jobs use a trainable (or Ray AIR trainer) with the same name, the second job will overwrite
krfricke marked this conversation as resolved.
Show resolved Hide resolved
this trainable. Trials from the first job that are started after the second job started will then use
krfricke marked this conversation as resolved.
Show resolved Hide resolved
this overwritten trainable - leading to problems if captured variables are used (as is the case e.g. in
krfricke marked this conversation as resolved.
Show resolved Hide resolved
most Ray AIR trainers).

Workaround
''''''''''

We are working on resolving the issue of conflicting trainables in concurrent tuning jobs. Until this
is resolved, the main workaround you can use is to give each trainable class a unique name.

For Ray AIR trainers this could look like this:

.. code-block:: python

import uuid
from ray.train.torch import TorchTrainer

TorchTrainer.__name__ = "TorchTrainer_" + uuid.uuid4().hex[:8]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python question - at this point is TorchTrainer an isolated reference? I.e. will this name change leak if other places in the code import TorchTrainer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it will update the name attribute globally.


trainer = TorchTrainer(
# ...
)
krfricke marked this conversation as resolved.
Show resolved Hide resolved

Then the separate Ray Tune jobs will all have their own trainable which will not conflict with
krfricke marked this conversation as resolved.
Show resolved Hide resolved
each other.
krfricke marked this conversation as resolved.
Show resolved Hide resolved

The disadvantage of this approach is that resuming is a bit more complicated. If you want to resume
krfricke marked this conversation as resolved.
Show resolved Hide resolved
a run, you will have to use the same unique trainable name again before calling ``Trainer.restore()``.
krfricke marked this conversation as resolved.
Show resolved Hide resolved
This means you'll likely have to store the trainable name somewhere, as we're not updating this automatically
krfricke marked this conversation as resolved.
Show resolved Hide resolved
(yet).
8 changes: 8 additions & 0 deletions python/ray/tune/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,14 @@ py_test(
tags = ["team:ml"],
)

py_test(
name = "test_multi_tenancy",
size = "medium",
srcs = ["tests/test_multi_tenancy.py"],
deps = [":tune_lib"],
tags = ["team:ml"],
)

py_test(
name = "test_multinode_sync",
size = "large",
Expand Down
99 changes: 99 additions & 0 deletions python/ray/tune/tests/_test_multi_tenancy_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
from pathlib import Path
import os
import time

from ray import air, tune
from ray.air import session
from ray.train.data_parallel_trainer import DataParallelTrainer

from ray.tune.search import BasicVariantGenerator

# Hang full script until this marker is deleted
HANG_RUN_MARKER = os.environ.get("HANG_RUN_MARKER", "")

# Delete this marker when a trial is started
DELETE_TRIAL_MARKER = os.environ.get("DELETE_TRIAL_MARKER", "")

# Hang in trial until this marker is deleted
HANG_TRIAL_MARKER = os.environ.get("HANG_TRIAL_MARKER", "")

# Delete this marker after tuning finished
DELETE_RUN_MARKER = os.environ.get("DELETE_RUN_MARKER", "")

# Hang at end of run until this marker is deleted
HANG_END_MARKER = os.environ.get("HANG_END_MARKER", "")

# Report this val as the "fixed" metric in the trial.
# This value is captured in the trainer and will conflict when a trainable
# is overwritten!
FIXED_VAL = int(os.environ["FIXED_VAL"])

# Grid search over these vals and report as "param" metric in the trial.
# Even with conflicting trainables, these will be reported correctly as they
# are tracked by the driver, not the trainable.
VALS = [int(os.environ["VAL_1"]), int(os.environ["VAL_2"])]

# If 1, use workaround, if 0, just run (and fail in job 1).
USE_WORKAROUND = bool(int(os.environ["WORKAROUND"]))

# Wait for HANG_RUN_MARKER
while HANG_RUN_MARKER and Path(HANG_RUN_MARKER).exists():
time.sleep(0.1)


def train_func(config):
# Delete DELETE_TRIAL_MARKER
delete_marker = config["delete_marker"]
if delete_marker and Path(delete_marker).exists():
Path(delete_marker).unlink()

# Wait for HANG_TRIAL_MARKER
hang_marker = config["hang_marker"]
while hang_marker and Path(hang_marker).exists():
time.sleep(0.1)

# Finish trial
session.report({"param": config["param"], "fixed": config["fixed"]})


# Workaround: Just use a unique name per trainer/trainable
if USE_WORKAROUND:
import uuid

DataParallelTrainer.__name__ = "DataParallelTrainer_" + uuid.uuid4().hex[:8]


trainer = DataParallelTrainer(
train_loop_per_worker=train_func,
train_loop_config={
"fixed": FIXED_VAL,
},
scaling_config=air.ScalingConfig(
num_workers=1, trainer_resources={"CPU": 0}, resources_per_worker={"CPU": 2}
),
)

tuner = tune.Tuner(
trainer,
param_space={
"train_loop_config": {
"param": tune.grid_search(VALS),
"delete_marker": DELETE_TRIAL_MARKER,
"hang_marker": HANG_TRIAL_MARKER,
}
},
tune_config=tune.TuneConfig(search_alg=BasicVariantGenerator(max_concurrent=1)),
)
results = tuner.fit()

# Delete DELETE_RUN_MARKER
if DELETE_RUN_MARKER and Path(DELETE_RUN_MARKER).exists():
Path(DELETE_RUN_MARKER).unlink()

# Wait for HANG_END_MARKER
while HANG_END_MARKER and Path(HANG_END_MARKER).exists():
time.sleep(0.1)

# Put assertions last, so we don't finish early because of failures
assert sorted([result.metrics["param"] for result in results]) == VALS
assert [result.metrics["fixed"] for result in results] == [FIXED_VAL, FIXED_VAL]
134 changes: 134 additions & 0 deletions python/ray/tune/tests/test_multi_tenancy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import pytest
import subprocess
import sys

from pathlib import Path

import ray


@pytest.fixture
def ray_start_4_cpus():
address_info = ray.init(num_cpus=4)
yield address_info
# The code after the yield will run as teardown code.
ray.shutdown()


@pytest.mark.parametrize("use_workaround", [False, True])
@pytest.mark.parametrize("exit_same", [False, True])
def test_registry_conflict(ray_start_4_cpus, tmpdir, use_workaround, exit_same):
"""Two concurrent Tune runs can conflict with each other when they
use a trainable with the same name.

This test starts two runs in parallel and asserts that a workaround used
in the docs can alleviate the problem.

This is how we schedule the runs:

- We have two runs. Every run starts two trials.
- Run 1 will start 1 trial immediately. This trial will be started with
krfricke marked this conversation as resolved.
Show resolved Hide resolved
the correct parameters for the script. The trial hangs until the file
``run_2_finished`` is deleted.
- Run 2 will start as soon as the 1st trial of Run 1 is running (by waiting
krfricke marked this conversation as resolved.
Show resolved Hide resolved
until ``run_1_running`` file is deleted by that trial). It will overwrite
krfricke marked this conversation as resolved.
Show resolved Hide resolved
the global registry trainable with the same name!
krfricke marked this conversation as resolved.
Show resolved Hide resolved
- Run 2 will finish both trials. The script should finish with the expected
krfricke marked this conversation as resolved.
Show resolved Hide resolved
parameters.
- Run 2 will then delete the ``run_2_finished`` marker, allowing Run 1 trial 1
krfricke marked this conversation as resolved.
Show resolved Hide resolved
to continue training. Training will finish, and the second trial will be launched.
krfricke marked this conversation as resolved.
Show resolved Hide resolved
THIS TRIAL will then use the overwritten trainable, i.e. wrong parameters! At
krfricke marked this conversation as resolved.
Show resolved Hide resolved
least unless the workaround is used.
krfricke marked this conversation as resolved.
Show resolved Hide resolved
- Run 1 finally finishes, and we compare the expected results with the actual
results.

When no workaround is used, we expect an assertion error (if ``exit_same=True``,
krfricke marked this conversation as resolved.
Show resolved Hide resolved
see below), otherwise a KeyError (because a trial failed).
When the workaround is used, we expect everything to run fine.
krfricke marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
When the workaround is used, we expect everything to run fine.
When you use the workaround, expect no errors.


NOTE: There are two errors that can come up with registry conflicts: First,
krfricke marked this conversation as resolved.
Show resolved Hide resolved
the trainable can be overwritten which will be captured e.g. when a fixed value
krfricke marked this conversation as resolved.
Show resolved Hide resolved
is included in the trainable. The second trial of run 1 will then have a wrong
krfricke marked this conversation as resolved.
Show resolved Hide resolved
parameter and report a wrong metric (from run 2).
krfricke marked this conversation as resolved.
Show resolved Hide resolved

The second error comes up when the second run finishes fully and its objects
krfricke marked this conversation as resolved.
Show resolved Hide resolved
are garbage collected. In this case, the first run tries to find the trainable
registered by run 2, but this will fail lookup because the objects have been
krfricke marked this conversation as resolved.
Show resolved Hide resolved
removed already. Note that these objects are the ones registered with
krfricke marked this conversation as resolved.
Show resolved Hide resolved
``tune.with_parameters()`` (not the global registry store).
We test both scenarios using the ``exit_same`` parameter.

NOTE: If we get around to resolving the registry issue (e.g. with unique keys)
krfricke marked this conversation as resolved.
Show resolved Hide resolved
the test where we expect the assertion error can be removed! I.e. we can remove
krfricke marked this conversation as resolved.
Show resolved Hide resolved
the parametrization and the workaround and just assert that no conflict comes up!
krfricke marked this conversation as resolved.
Show resolved Hide resolved
"""
# Create file markers
run_1_running = tmpdir / "run_1_running"
run_1_finished = tmpdir / "run_1_finished"
run_2_finished = tmpdir / "run_2_finished"

run_1_running.write_text("", encoding="utf-8")
run_1_finished.write_text("", encoding="utf-8")
run_2_finished.write_text("", encoding="utf-8")

ray_address = ray_start_4_cpus.address_info["address"]

run_1_env = {
"RAY_ADDRESS": ray_address,
"WORKAROUND": str(int(use_workaround)),
"FIXED_VAL": str(1),
"VAL_1": str(2),
"VAL_2": str(3),
# Run 1 can start immediately
"HANG_RUN_MARKER": "",
# Allow second run to start once first trial of first run is started
"DELETE_TRIAL_MARKER": str(run_1_running),
# Hang in first trial until the second run finished
"HANG_TRIAL_MARKER": str(run_2_finished),
# Mark run 1 as completed
"DELETE_RUN_MARKER": str(run_1_finished),
# Do not wait at end
"HANG_END_MARKER": "",
}

run_2_env = {
"RAY_ADDRESS": ray_address,
"WORKAROUND": str(int(use_workaround)),
"FIXED_VAL": str(4),
"VAL_1": str(5),
"VAL_2": str(6),
# Wait until first trial of first run is running
"HANG_RUN_MARKER": str(run_1_running),
# Don't delete during run
"DELETE_TRIAL_MARKER": "",
# No need to hang in trial
"HANG_TRIAL_MARKER": "",
# After full run finished, allow first run to continue
"DELETE_RUN_MARKER": str(run_2_finished),
# Wait until first run finished
# If we don't do this, we actually don't die because of parameter conflict
# but because of "The object's owner has exited" - so we test this
# separately
"HANG_END_MARKER": str(run_1_finished) if exit_same else "",
}

script_path = Path(__file__).parent / "_test_multi_tenancy_run.py"

run_1 = subprocess.Popen([sys.executable, script_path], env=run_1_env)
print("Started run 1:", run_1.pid)

run_2 = subprocess.Popen([sys.executable, script_path], env=run_2_env)
print("Started run 2:", run_2.pid)

assert run_2.wait() == 0

if use_workaround:
assert run_1.wait() == 0
else:
assert run_1.wait() != 0


if __name__ == "__main__":
import pytest

sys.exit(pytest.main(["-v", __file__]))