Skip to content

Commit

Permalink
Make worker state machine methods private (#6564)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky authored Jun 13, 2022
1 parent 058e629 commit 0fcc724
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 150 deletions.
32 changes: 19 additions & 13 deletions distributed/tests/test_failed_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from distributed.metrics import time
from distributed.utils import CancelledError, sync
from distributed.utils_test import (
BlockedGatherDep,
captured_logger,
cluster,
div,
Expand All @@ -24,7 +25,7 @@
slowadd,
slowinc,
)
from distributed.worker_state_machine import FreeKeysEvent, TaskState
from distributed.worker_state_machine import FreeKeysEvent

pytestmark = pytest.mark.ci1

Expand Down Expand Up @@ -469,22 +470,27 @@ async def test_worker_time_to_live(c, s, a, b):
assert time() - start < interval + 2.0


@gen_cluster()
async def test_forget_data_not_supposed_to_have(s, a, b):
@gen_cluster(client=True, nthreads=[("", 1)])
async def test_forget_data_not_supposed_to_have(c, s, a):
"""If a dependency fetch finishes on a worker after the scheduler already released
everything, the worker might be stuck with a redundant replica which is never
cleaned up.
"""
# FIXME: Replace with "blackbox test" which shows an actual example where
# this situation is provoked if this is even possible.
ts = TaskState("key", state="flight")
a.tasks["key"] = ts
recommendations = {ts: ("memory", 123)}
a.transitions(recommendations, stimulus_id="test")

assert a.data
while a.data:
await asyncio.sleep(0.001)
async with BlockedGatherDep(s.address) as b:
x = c.submit(inc, 1, key="x", workers=[a.address])
y = c.submit(inc, x, key="y", workers=[b.address])

await b.in_gather_dep.wait()
assert b.tasks["x"].state == "flight"

x.release()
y.release()
while s.tasks:
await asyncio.sleep(0.01)

b.block_gather_dep.set()
while b.tasks:
await asyncio.sleep(0.01)


@gen_cluster(
Expand Down
Loading

0 comments on commit 0fcc724

Please sign in to comment.