Skip to content

Commit

Permalink
Time based warning threshold, passed by env variable.
Browse files Browse the repository at this point in the history
Also extended to cover w/ and w/o autoscaler and add unit tests.
  • Loading branch information
xwjiang2010 committed Aug 4, 2021
1 parent 09d055c commit 51a7ab2
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 23 deletions.
55 changes: 55 additions & 0 deletions python/ray/tune/tests/test_ray_trial_executor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# coding: utf-8
from freezegun import freeze_time
from mock import patch
import os
import unittest

Expand Down Expand Up @@ -280,6 +282,59 @@ def create_trial(cpu, gpu=0):
self.trial_executor.has_resources_for_trial(cpu_only_trial3))


class TrialExecutorInsufficientResourceTest(unittest.TestCase):
def setUp(self):
os.environ["TUNE_INSUFFICENT_RESOURCE_WARN_THRESHOLD_S"] = "1"
self.cluster = Cluster(
initialize_head=True,
connect=True,
head_node_args={
"num_cpus": 4,
"num_gpus": 2,
"_system_config": {
"num_heartbeats_timeout": 10
}
})
# Pytest doesn't play nicely with imports
_register_all()

def tearDown(self):
ray.shutdown()
self.cluster.shutdown()
_register_all() # re-register the evicted objects

@freeze_time("2012-01-14", auto_tick_seconds=15)
def testOutputWarningMessage(self):
def train(config):
pass

with self.assertLogs(logger="ray.tune.trial_executor") as ctx:
out = tune.run(
train, resources_per_trial={
"cpu": 1,
"gpu": 1,
})
msg = "Autoscaler is disabled. Resource is not ready after " \
"extended amount of time without any trials running - " \
"please consider if the allocated resource is not enough."
assert ctx.records[0].msg == msg

@freeze_time("2012-01-14")
def testNotOutputWarningMessage(self):
def train(config):
pass

# apparently there is no assertNoLogs yet...
with patch.object(ray.tune.trial_executor.logger,
"warning") as warning_method:
out = tune.run(
train, resources_per_trial={
"cpu": 1,
"gpu": 1,
})
warning_method.assert_not_called()


class RayExecutorPlacementGroupTest(unittest.TestCase):
def setUp(self):
self.head_cpus = 8
Expand Down
57 changes: 38 additions & 19 deletions python/ray/tune/trial_executor.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
# coding: utf-8
import logging
import os
import time

from ray.tune.trial import Trial, Checkpoint
from ray.tune.error import TuneError
from ray.tune.cluster_info import is_ray_cluster

logger = logging.getLogger(__name__)

# We may prompt user to check if resource is insufficent to start even one
# single trial run if we observe the following:
# 1. all trials are in pending state
# 2. autoscaler is disabled
# 3. No progress is made after this number of iterations (executor.step()
# is looped this number of times).
# Shown every this number of times as a warning msg so as not to pollute
# logging.
SHOW_MAYBE_INSUFFICIENT_RESOURCE_WARNING_ITER_DELAY = 100

def get_warn_threshold(autoscaler_enabled: bool):
if autoscaler_enabled:
return float(
os.environ.get(
"TUNE_INSUFFICENT_RESOURCE_WARN_THRESHOLD_S_AUTOSCALER", "60"))
else:
return float(
os.environ.get("TUNE_INSUFFICENT_RESOURCE_WARN_THRESHOLD_S", "5"))


class TrialExecutor:
Expand All @@ -37,6 +39,12 @@ def __init__(self, queue_trials: bool = False):
self._queue_trials = queue_trials
self._cached_trial_state = {}
self._trials_to_cache = set()
# The start time since when all active trials have been in PENDING
# state, or since last time we output a resource insufficent
# warning message, whichever comes later.
# -1 means either the TrialExecutor is just initialized without any
# trials yet, or there are some trials in RUNNING state.
self._no_running_trials_since = -1

def set_status(self, trial, status):
"""Sets status and checkpoints metadata if needed.
Expand Down Expand Up @@ -184,21 +192,32 @@ def on_step_end(self, trial_runner):
def force_reconcilation_on_next_step_end(self):
pass

def may_warn_insufficient_resource(self, all_trials):
autoscaler_enabled = is_ray_cluster()
if not any(trial.status == Trial.RUNNING for trial in all_trials):
if self._no_running_trials_since == -1:
self._no_running_trials_since = time.monotonic()
elif time.monotonic(
) - self._no_running_trials_since > get_warn_threshold(
autoscaler_enabled):
warn_prefix = ("If autoscaler is still scaling up, ignore " \
"this message. " if autoscaler_enabled
else "Autoscaler is disabled. ")
logger.warn(
warn_prefix +
"Resource is not ready after extended amount of time "
"without any trials running - please consider if the "
"allocated resource is not enough.")
self._no_running_trials_since = time.time()
else:
self._no_running_trials_since = -1

def on_no_available_trials(self, trial_runner):
if self._queue_trials:
return

all_trials = trial_runner.get_trials()
all_trials_are_pending = all(
trial.status == Trial.PENDING for trial in all_trials)
if all_trials_are_pending and not is_ray_cluster() and (
trial_runner.iteration +
1) % SHOW_MAYBE_INSUFFICIENT_RESOURCE_WARNING_ITER_DELAY == 0:
logger.warning(
"Autoscaler is not enabled and resource is not ready after "
"extended amoung of time - please consider if the allocated "
"resource is not enough for starting even a single trial."
)
self.may_warn_insufficient_resource(all_trials)
for trial in all_trials:
if trial.uses_placement_groups:
return
Expand Down
4 changes: 0 additions & 4 deletions python/ray/tune/trial_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,10 +350,6 @@ def search_alg(self):
def scheduler_alg(self):
return self._scheduler_alg

@property
def iteration(self):
return self._iteration

def _validate_resume(self, resume_type):
"""Checks whether to resume experiment.
Expand Down

0 comments on commit 51a7ab2

Please sign in to comment.