Skip to content

Commit

Permalink
Merge branch 'main' into pause_while_spilling
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Apr 26, 2022
2 parents f8ea6c2 + 1adbb7a commit 9fa6fd8
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 10 deletions.
10 changes: 7 additions & 3 deletions distributed/cli/tests/test_dask_spec.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,37 @@
import random
import sys

import pytest
import yaml

from distributed import Client
from distributed.utils_test import gen_cluster, gen_test, popen


@pytest.mark.flaky(reruns=2)
@gen_test(timeout=120)
async def test_text():
port = random.randint(10000, 50000)
with popen(
[
sys.executable,
"-m",
"distributed.cli.dask_spec",
"--spec",
'{"cls": "dask.distributed.Scheduler", "opts": {"port": 9373}}',
'{"cls": "dask.distributed.Scheduler", "opts": {"port": %d}}' % port,
]
):
with popen(
[
sys.executable,
"-m",
"distributed.cli.dask_spec",
"tcp://localhost:9373",
"tcp://localhost:%d" % port,
"--spec",
'{"cls": "dask.distributed.Worker", "opts": {"nanny": false, "nthreads": 3, "name": "foo"}}',
]
):
async with Client("tcp://localhost:9373", asynchronous=True) as client:
async with Client("tcp://localhost:%d" % port, asynchronous=True) as client:
await client.wait_for_workers(1)
info = await client.scheduler.identity()
[w] = info["workers"].values()
Expand Down
2 changes: 2 additions & 0 deletions distributed/comm/tests/test_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
xfail_ssl_issue5601,
)

pytestmark = pytest.mark.flaky(reruns=2)


def test_registered():
assert "ws" in backends
Expand Down
13 changes: 9 additions & 4 deletions distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -851,10 +851,15 @@ def watch_stop_q():
Wait for an incoming stop message and then stop the
worker cleanly.
"""
msg = child_stop_q.get()
child_stop_q.close()
assert msg.pop("op") == "stop"
loop.add_callback(do_stop, **msg)
try:
msg = child_stop_q.get()
except (TypeError, OSError):
logger.error("Worker process died unexpectedly")
msg = {"op": "stop"}
finally:
child_stop_q.close()
assert msg.pop("op") == "stop"
loop.add_callback(do_stop, **msg)

thread = threading.Thread(
target=watch_stop_q, name="Nanny stop queue watch"
Expand Down
5 changes: 2 additions & 3 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -810,9 +810,8 @@ async def test_hold_onto_dependents(c, s, a, b):
await asyncio.sleep(0.1)


# Normally takes >2s but it has been observed to take >30s occasionally
@pytest.mark.slow
@gen_test(timeout=120)
@pytest.mark.xfail(reason="asyncio.wait_for bug")
@gen_test()
async def test_worker_death_timeout():
w = Worker("tcp://127.0.0.1:12345", death_timeout=0.1)
with pytest.raises(TimeoutError) as info:
Expand Down

0 comments on commit 9fa6fd8

Please sign in to comment.