Skip to content

Commit

Permalink
Remove transition-counter-max from config (#6349)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky authored May 16, 2022
1 parent 5ca7a5a commit af3b93e
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 23 deletions.
7 changes: 0 additions & 7 deletions distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1027,13 +1027,6 @@ properties:
type: boolean
description: Enter Python Debugger on scheduling error

transition-counter-max:
oneOf:
- enum: [false]
- type: integer
description: Cause the scheduler or workers to break if they reach this
number of transitions

system-monitor:
type: object
description: |
Expand Down
4 changes: 0 additions & 4 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,6 @@ distributed:
log-length: 10000 # default length of logs to keep in memory
log-format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
pdb-on-err: False # enter debug mode on scheduling error
# Cause scheduler and workers to break if they reach this many transitions.
# Used to debug infinite transition loops.
# Note: setting this will cause healthy long-running services to eventually break.
transition-counter-max: False
system-monitor:
interval: 500ms
event-loop: tornado
Expand Down
7 changes: 4 additions & 3 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1283,6 +1283,7 @@ def __init__(
unrunnable: set,
validate: bool,
plugins: Iterable[SchedulerPlugin] = (),
transition_counter_max: int | Literal[False] = False,
**kwargs, # Passed verbatim to Server.__init__()
):
logger.info("State start")
Expand Down Expand Up @@ -1338,9 +1339,7 @@ def __init__(
/ 2.0
)
self.transition_counter = 0
self.transition_counter_max = dask.config.get(
"distributed.admin.transition-counter-max"
)
self.transition_counter_max = transition_counter_max

@property
def memory(self) -> MemoryState:
Expand Down Expand Up @@ -2878,6 +2877,7 @@ def __init__(
preload_argv=(),
plugins=(),
contact_address=None,
transition_counter_max=False,
**kwargs,
):
self._setup_logging(logger)
Expand Down Expand Up @@ -3109,6 +3109,7 @@ def __init__(
unrunnable=unrunnable,
validate=validate,
plugins=plugins,
transition_counter_max=transition_counter_max,
)
ServerNode.__init__(
self,
Expand Down
3 changes: 2 additions & 1 deletion distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3264,7 +3264,8 @@ async def test_transition_counter_max_worker(c, s, a):
@gen_cluster(
client=True,
nthreads=[("", 1)],
config={"distributed.admin.transition-counter-max": False},
scheduler_kwargs={"transition_counter_max": False},
worker_kwargs={"transition_counter_max": False},
)
async def test_disable_transition_counter_max(c, s, a, b):
"""Test that the cluster can run indefinitely if transition_counter_max is disabled.
Expand Down
6 changes: 4 additions & 2 deletions distributed/tests/test_stress.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ async def test_stress_steal(c, s, *workers):
nthreads=[("", 1)] * 10,
client=True,
timeout=180,
config={"distributed.admin.transition-counter-max": 500_000},
scheduler_kwargs={"transition_counter_max": 500_000},
worker_kwargs={"transition_counter_max": 500_000},
)
async def test_close_connections(c, s, *workers):
da = pytest.importorskip("dask.array")
Expand Down Expand Up @@ -287,7 +288,8 @@ async def test_no_delay_during_large_transfer(c, s, w):
client=True,
Worker=Nanny,
nthreads=[("", 2)] * 6,
config={"distributed.admin.transition-counter-max": 500_000},
scheduler_kwargs={"transition_counter_max": 500_000},
worker_kwargs={"transition_counter_max": 500_000},
)
async def test_chaos_rechunk(c, s, *workers):
s.allowed_failures = 10000
Expand Down
15 changes: 12 additions & 3 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -989,12 +989,21 @@ async def test_foo(scheduler, worker1, worker2, pytest_fixture_a, pytest_fixture
timeout = 3600

scheduler_kwargs = merge(
{"dashboard": False, "dashboard_address": ":0"}, scheduler_kwargs
dict(
dashboard=False,
dashboard_address=":0",
transition_counter_max=50_000,
),
scheduler_kwargs,
)
worker_kwargs = merge(
{"memory_limit": system.MEMORY_LIMIT, "death_timeout": 15}, worker_kwargs
dict(
memory_limit=system.MEMORY_LIMIT,
death_timeout=15,
transition_counter_max=50_000,
),
worker_kwargs,
)
config = merge({"distributed.admin.transition-counter-max": 50_000}, config)

def _(func):
if not iscoroutinefunction(func):
Expand Down
5 changes: 2 additions & 3 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,7 @@ def __init__(
lifetime: Any | None = None,
lifetime_stagger: Any | None = None,
lifetime_restart: bool | None = None,
transition_counter_max: int | Literal[False] = False,
###################################
# Parameters to WorkerMemoryManager
memory_limit: str | float = "auto",
Expand Down Expand Up @@ -610,9 +611,7 @@ def __init__(
self.validate = validate

self.transition_counter = 0
self.transition_counter_max = dask.config.get(
"distributed.admin.transition-counter-max"
)
self.transition_counter_max = transition_counter_max
self.incoming_transfer_log = deque(maxlen=100000)
self.incoming_count = 0
self.outgoing_transfer_log = deque(maxlen=100000)
Expand Down

0 comments on commit af3b93e

Please sign in to comment.