Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tune] Output insufficent resources warning msg when trials are in pending for extended amount of time. #17533

Merged
merged 13 commits into from
Aug 13, 2021
Merged
7 changes: 7 additions & 0 deletions doc/source/tune/user-guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,13 @@ These are the environment variables Ray Tune currently considers:
trial startups. After the grace period, Tune will block until a result from a running trial is received. Can
be disabled by setting this to lower or equal to 0.
* **TUNE_WARN_THRESHOLD_S**: Threshold for logging if an Tune event loop operation takes too long. Defaults to 0.5 (seconds).
* **TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S**: Threshold for throwing a warning if no active trials are in ``RUNNING`` state
for this amount of seconds. If the Ray Tune job is stuck in this state (most likely due to insufficient resources),
the warning message is printed repeatedly every this amount of seconds. Defaults to 1 (seconds).
* **TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S_AUTOSCALER**: Threshold for throwing a warning, when the autoscaler is enabled,
if no active trials are in ``RUNNING`` state for this amount of seconds.
If the Ray Tune job is stuck in this state (most likely due to insufficient resources), the warning message is printed
repeatedly every this amount of seconds. Defaults to 60 (seconds).
* **TUNE_STATE_REFRESH_PERIOD**: Frequency of updating the resource tracking from Ray. Defaults to 10 (seconds).


Expand Down
53 changes: 53 additions & 0 deletions python/ray/tune/tests/test_ray_trial_executor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# coding: utf-8
from freezegun import freeze_time
xwjiang2010 marked this conversation as resolved.
Show resolved Hide resolved
from mock import patch
import os
import unittest

Expand All @@ -16,6 +18,57 @@
from ray.tune.utils.placement_groups import PlacementGroupFactory


class TrialExecutorInsufficientResourcesTest(unittest.TestCase):
def setUp(self):
os.environ["TUNE_INSUFFICENT_RESOURCE_WARN_THRESHOLD_S"] = "1"
self.cluster = Cluster(
initialize_head=True,
connect=True,
head_node_args={
"num_cpus": 4,
"num_gpus": 2,
"_system_config": {
"num_heartbeats_timeout": 10
}
})

def tearDown(self):
ray.shutdown()
self.cluster.shutdown()

@freeze_time("2021-08-03", auto_tick_seconds=15)
@patch.object(ray.tune.trial_executor.logger, "warning")
def testOutputWarningMessage(self, mocked_warn):
def train(config):
pass

tune.run(
train, resources_per_trial={
"cpu": 1,
"gpu": 1,
})
msg = ("Autoscaler is disabled. No trial is running and no new trial"
" has been started within at least the last 1.0 seconds. This "
"could be due to the cluster not having enough resources "
"available to start the next trial. Please check if the "
"requested resources can be fulfilled by your cluster, or will "
"be fulfilled eventually (when using the Ray autoscaler).")
mocked_warn.assert_called_with(msg)

@freeze_time("2021-08-03")
@patch.object(ray.tune.trial_executor.logger, "warning")
def testNotOutputWarningMessage(self, mocked_warn):
def train(config):
pass

tune.run(
train, resources_per_trial={
"cpu": 1,
"gpu": 1,
})
mocked_warn.assert_not_called()


class RayTrialExecutorTest(unittest.TestCase):
def setUp(self):
# Wait up to five seconds for placement groups when starting a trial
Expand Down
47 changes: 47 additions & 0 deletions python/ray/tune/trial_executor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# coding: utf-8
from abc import ABCMeta, abstractmethod
from functools import lru_cache
import logging
import os
import time
from typing import Dict, List, Optional

from ray.tune.resources import Resources
Expand All @@ -12,6 +15,18 @@
logger = logging.getLogger(__name__)


# Accessing environment variable could be slow.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait really? do you have some reference for this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be as fast as accessing any other dict, right? os should load it only once

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was under the impression that accessing environment variable incurs penalty for some scripting languages. But looking closely at os.py, it seems just a normal wrapped dictionary in process. So maybe not so much in this case.

@lru_cache()
def _get_warn_threshold(autoscaler_enabled: bool) -> float:
if autoscaler_enabled:
return float(
os.environ.get(
"TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S_AUTOSCALER", "60"))
else:
return float(
os.environ.get("TUNE_WARN_INSUFFICENT_RESOURCE_THRESHOLD_S", "1"))


@DeveloperAPI
class TrialExecutor(metaclass=ABCMeta):
"""Module for interacting with remote trainables.
Expand All @@ -32,6 +47,12 @@ def __init__(self, queue_trials: bool = False):
self._queue_trials = queue_trials
self._cached_trial_state = {}
self._trials_to_cache = set()
# The start time since when all active trials have been in PENDING
# state, or since last time we output a resource insufficent
# warning message, whichever comes later.
# -1 means either the TrialExecutor is just initialized without any
# trials yet, or there are some trials in RUNNING state.
self._no_running_trials_since = -1

def set_status(self, trial: Trial, status: str) -> None:
"""Sets status and checkpoints metadata if needed.
Expand Down Expand Up @@ -194,6 +215,31 @@ def on_step_end(self, trials: List[Trial]) -> None:
def force_reconcilation_on_next_step_end(self) -> None:
pass

def may_warn_insufficient_resources(self, all_trials):
autoscaler_enabled = is_ray_cluster()
if not any(trial.status == Trial.RUNNING for trial in all_trials):
if self._no_running_trials_since == -1:
self._no_running_trials_since = time.monotonic()
elif time.monotonic(
) - self._no_running_trials_since > _get_warn_threshold(
autoscaler_enabled):
warn_prefix = ("If autoscaler is still scaling up, ignore "
"this message." if autoscaler_enabled else
"Autoscaler is disabled.")
logger.warning(
f"{warn_prefix} "
f"No trial is running and no new trial has been started "
f"within at least the last "
f"{_get_warn_threshold(autoscaler_enabled)} seconds. "
f"This could be due to the cluster not having enough "
f"resources available to start the next trial. Please "
f"check if the requested resources can be fulfilled by "
f"your cluster, or will be fulfilled eventually (when "
f"using the Ray autoscaler).")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple comments here:

  1. IMO users that aren't using the Ray autoscaler should not see "Ray autoscaler"
  2. This doesn't actually provide any action for the user to take. For example, user may not know what "requested resources" means nor even "cluster".

Instead, it would be good to say:

  1. which resource is not available, and how much is being requested
  2. what is the total amount of those resource available on the cluster

Also, one suggestion should be to say that they should stop their tuning job and reconfigure their resource request.

Does that make sense? In principle, we should provide 1. what went wrong on the Ray side (in terms that the end user understands) 2. what the user did wrong (if possible) 3. what they should do instead :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good feedback.
Practically there is no API that exposes those information. Left a TODO and filed #17799 to follow up.

self._no_running_trials_since = time.monotonic()
else:
self._no_running_trials_since = -1

def on_no_available_trials(self, trials: List[Trial]) -> None:
"""
Args:
Expand All @@ -203,6 +249,7 @@ def on_no_available_trials(self, trials: List[Trial]) -> None:

if self._queue_trials:
return
self.may_warn_insufficient_resources(trials)
for trial in trials:
if trial.uses_placement_groups:
return
Expand Down
2 changes: 2 additions & 0 deletions python/requirements/tune/requirements_tune.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ dask[complete]==2021.03.0; python_version < '3.7'
dask[complete]==2021.06.1; python_version >= '3.7'
dragonfly-opt==0.1.6
flaml==0.5.2
freezegun==1.1.0
gluoncv==0.10.1.post0
gpy==1.10.0
gym[atari]==0.18.3
Expand All @@ -22,6 +23,7 @@ lightgbm==3.2.1
matplotlib==3.3.4; python_version < '3.7'
matplotlib==3.4.2; python_version >= '3.7'
mlflow==1.19.0
mock==4.0.3
mxnet==1.8.0.post0
nevergrad==0.4.3.post3
optuna==2.8.0
Expand Down