Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix test_stress_scatter_death #6404

Merged
merged 3 commits into from
Jun 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 0 additions & 26 deletions distributed/diagnostics/tests/test_worker_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,32 +195,6 @@ class MyCustomPlugin(WorkerPlugin):
assert next(iter(w.plugins)).startswith("MyCustomPlugin-")


@gen_cluster(client=True, nthreads=[("", 1)])
async def test_release_key_deprecated(c, s, a):
class ReleaseKeyDeprecated(WorkerPlugin):
def __init__(self):
self._called = False

def release_key(self, key, state, cause, reason, report):
# Ensure that the handler still works
self._called = True
assert state == "memory"
assert key == "task"

def teardown(self, worker):
assert self._called
return super().teardown(worker)

await c.register_worker_plugin(ReleaseKeyDeprecated())

with pytest.warns(
FutureWarning, match="The `WorkerPlugin.release_key` hook is deprecated"
):
assert await c.submit(inc, 1, key="x") == 2
while "x" in a.tasks:
await asyncio.sleep(0.01)


@gen_cluster(client=True, nthreads=[("", 1)])
async def test_assert_no_warning_no_overload(c, s, a):
"""Assert we do not receive a deprecation warning if we do not overload any
Expand Down
101 changes: 101 additions & 0 deletions distributed/tests/test_cancelled_state.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio

import pytest

import distributed
from distributed import Event, Lock, Worker
from distributed.client import wait
Expand Down Expand Up @@ -452,3 +454,102 @@ async def get_data(self, comm, *args, **kwargs):
# w1 closed

assert await f4 == 6


@pytest.mark.parametrize("wait_for_processing", [True, False])
@pytest.mark.parametrize("raise_error", [True, False])
@gen_cluster(client=True)
async def test_resumed_cancelled_handle_compute(
c, s, a, b, raise_error, wait_for_processing
):
Comment on lines +462 to +464
Copy link
Member Author

@fjetter fjetter May 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This covers the changes in handle_compute where I moved around task states in the switch statement and added an error handler that was missing before

"""
Given the history of a task
Executing -> Cancelled -> Fetch -> Resumed

A handle_compute should properly restore executing
"""
# This test is heavily using set_restrictions to simulate certain scheduler
# decisions of placing keys

lock_compute = Lock()
await lock_compute.acquire()
enter_compute = Event()

def block(x, lock, enter_event):
enter_event.set()
with lock:
if raise_error:
raise RuntimeError("test error")
else:
return x + 1

f1 = c.submit(inc, 1, key="f1", workers=[a.address])
f2 = c.submit(inc, f1, key="f2", workers=[a.address])
f3 = c.submit(
block,
f2,
lock=lock_compute,
enter_event=enter_compute,
key="f3",
workers=[b.address],
)

f4 = c.submit(sum, [f1, f3], workers=[b.address])

await enter_compute.wait()

async def release_all_futures():
futs = [f1, f2, f3, f4]
for fut in futs:
fut.release()

while any(fut.key in s.tasks for fut in futs):
await asyncio.sleep(0.05)

await release_all_futures()
await wait_for_state(f3.key, "cancelled", b)

f1 = c.submit(inc, 1, key="f1", workers=[a.address])
f2 = c.submit(inc, f1, key="f2", workers=[a.address])
f3 = c.submit(inc, f2, key="f3", workers=[a.address])
f4 = c.submit(sum, [f1, f3], workers=[b.address])

await wait_for_state(f3.key, "resumed", b)
await release_all_futures()

f1 = c.submit(inc, 1, key="f1", workers=[a.address])
f2 = c.submit(inc, f1, key="f2", workers=[a.address])
f3 = c.submit(inc, f2, key="f3", workers=[b.address])
f4 = c.submit(sum, [f1, f3], workers=[b.address])

if wait_for_processing:
await wait_for_state(f3.key, "processing", s)

await lock_compute.release()

if not raise_error:
assert await f4 == 4 + 2

assert_story(
b.story(f3.key),
expect=[
(f3.key, "ready", "executing", "executing", {}),
(f3.key, "executing", "released", "cancelled", {}),
(f3.key, "cancelled", "fetch", "resumed", {}),
(f3.key, "resumed", "memory", "memory", {}),
],
)

else:
with pytest.raises(RuntimeError, match="test error"):
await f3

assert_story(
b.story(f3.key),
expect=[
(f3.key, "ready", "executing", "executing", {}),
(f3.key, "executing", "released", "cancelled", {}),
(f3.key, "cancelled", "fetch", "resumed", {}),
(f3.key, "resumed", "error", "error", {}),
],
)
1 change: 0 additions & 1 deletion distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3047,7 +3047,6 @@ async def test_task_flight_compute_oserror(c, s, a, b):
),
# inc is lost and needs to be recomputed. Therefore, sum is released
("free-keys", ("f1",)),
("f1", "release-key"),
# The recommendations here are hard to predict. Whatever key is
# currently scheduled to be fetched, if any, will be recommended to be
# released.
Expand Down
40 changes: 40 additions & 0 deletions distributed/tests/test_worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,3 +495,43 @@ async def test_forget_data_needed(c, s, a, b):
x = c.submit(inc, 2, key="x", workers=[a.address])
y = c.submit(inc, x, key="y", workers=[b.address])
assert await y == 4


@gen_cluster(client=True, nthreads=[("", 1)] * 3)
async def test_missing_handle_compute_dependency(c, s, w1, w2, w3):
"""Test that it is OK for a dependency to be in state missing if a dependent is asked to be computed"""

fjetter marked this conversation as resolved.
Show resolved Hide resolved
w3.periodic_callbacks["find-missing"].stop()

f1 = c.submit(inc, 1, key="f1", workers=[w1.address])
f2 = c.submit(inc, 2, key="f2", workers=[w1.address])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

f2 is unnecessary for the purpose of this test

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this to have multiple keys fetched from w1. I don't think this is absolutely required but I decided to keep this in the test

await wait_for_state(f1.key, "memory", w1)

w3.handle_acquire_replicas(
keys=[f1.key], who_has={f1.key: [w2.address]}, stimulus_id="acquire"
)

await wait_for_state(f1.key, "missing", w3)

f3 = c.submit(sum, [f1, f2], key="f3", workers=[w3.address])

await f3


@gen_cluster(client=True, nthreads=[("", 1)] * 3)
async def test_missing_to_waiting(c, s, w1, w2, w3):
w3.periodic_callbacks["find-missing"].stop()

f1 = c.submit(inc, 1, key="f1", workers=[w1.address], allow_other_workers=True)
await wait_for_state(f1.key, "memory", w1)

w3.handle_acquire_replicas(
keys=[f1.key], who_has={f1.key: [w2.address]}, stimulus_id="acquire"
)

await wait_for_state(f1.key, "missing", w3)

await w2.close()
await w1.close()

await f1
Loading