Skip to content

Commit

Permalink
Merge branch 'main' into WSMR/retry_busy_worker
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed May 21, 2022
2 parents 02e4193 + 9bb999d commit 5e2c5a0
Show file tree
Hide file tree
Showing 21 changed files with 619 additions and 836 deletions.
6 changes: 5 additions & 1 deletion distributed/_signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,8 @@ def handle_signal(signum, frame):
for sig in signals:
old_handlers[sig] = signal.signal(sig, handle_signal)

await event.wait()
try:
await event.wait()
finally:
for sig in signals:
signal.signal(sig, old_handlers[sig])
4 changes: 1 addition & 3 deletions distributed/chaos.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ async def setup(self, worker):
)

def graceful(self):
asyncio.create_task(
self.worker.close(report=False, nanny=False, executor_wait=False)
)
asyncio.create_task(self.worker.close(nanny=False, executor_wait=False))

def sys_exit(self):
sys.exit(0)
Expand Down
19 changes: 17 additions & 2 deletions distributed/cli/dask_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@
)
@click.option(
"--reconnect/--no-reconnect",
default=True,
help="Reconnect to scheduler if disconnected [default: --reconnect]",
default=None,
help="Deprecated, has no effect. Passing --reconnect is an error. [default: --no-reconnect]",
)
@click.option(
"--nanny/--no-nanny",
Expand Down Expand Up @@ -280,6 +280,7 @@ def main(
dashboard_address,
worker_class,
preload_nanny,
reconnect,
**kwargs,
):
g0, g1, g2 = gc.get_threshold() # https://github.com/dask/distributed/issues/1653
Expand All @@ -299,6 +300,20 @@ def main(
"The --bokeh/--no-bokeh flag has been renamed to --dashboard/--no-dashboard. "
)
dashboard = bokeh
if reconnect is not None:
if reconnect:
logger.error(
"The `--reconnect` option has been removed. "
"To improve cluster stability, workers now always shut down in the face of network disconnects. "
"For details, or if this is an issue for you, see https://github.com/dask/distributed/issues/6350."
)
sys.exit(1)
else:
logger.warning(
"The `--no-reconnect/--reconnect` flag is deprecated, and will be removed in a future release. "
"Worker reconnection is now always disabled, so `--no-reconnect` is unnecessary. "
"See https://github.com/dask/distributed/issues/6350 for details.",
)

sec = {
k: v
Expand Down
19 changes: 18 additions & 1 deletion distributed/cli/tests/test_dask_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ def test_version_option():


@pytest.mark.slow
def test_idle_timeout(loop):
def test_idle_timeout():
start = time()
runner = CliRunner()
result = runner.invoke(
Expand All @@ -424,6 +424,23 @@ def test_idle_timeout(loop):
assert result.exit_code == 0


@pytest.mark.slow
def test_restores_signal_handler():
# another test could have altered the signal handler, so use a new function
# that both has sensible sigint behaviour *and* can be used as a sentinel
def raise_ki():
raise KeyboardInterrupt

original_handler = signal.signal(signal.SIGINT, raise_ki)
try:
CliRunner().invoke(
distributed.cli.dask_scheduler.main, ["--idle-timeout", "1s"]
)
assert signal.getsignal(signal.SIGINT) is raise_ki
finally:
signal.signal(signal.SIGINT, original_handler)


def test_multiple_workers_2(loop):
text = """
def dask_setup(worker):
Expand Down
105 changes: 60 additions & 45 deletions distributed/cli/tests/test_dask_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@

from distributed import Client
from distributed.cli.dask_worker import _apportion_ports, main
from distributed.compatibility import LINUX, WINDOWS, to_thread
from distributed.compatibility import LINUX, WINDOWS
from distributed.deploy.utils import nprocesses_nthreads
from distributed.metrics import time
from distributed.utils import open_port
from distributed.utils_test import gen_cluster, popen, requires_ipv6


Expand Down Expand Up @@ -275,56 +276,34 @@ async def test_no_nanny(c, s):
await c.wait_for_workers(1)


@pytest.mark.slow
@pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"])
@gen_cluster(client=True, nthreads=[])
async def test_no_reconnect(c, s, nanny):
async def test_reconnect_deprecated(c, s):
with popen(
[
"dask-worker",
s.address,
"--no-reconnect",
nanny,
"--no-dashboard",
]
["dask-worker", s.address, "--reconnect"],
flush_output=False,
) as worker:
# roundtrip works
assert await c.submit(lambda x: x + 1, 10) == 11

(comm,) = s.stream_comms.values()
comm.abort()

# worker terminates as soon as the connection is aborted
await to_thread(worker.wait, timeout=5)
assert worker.returncode == 0

for _ in range(10):
line = worker.stdout.readline()
print(line)
if b"`--reconnect` option has been removed" in line:
break
else:
raise AssertionError("Message not printed, see stdout")
assert worker.wait() == 1

@pytest.mark.slow
@pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"])
@gen_cluster(client=True, nthreads=[])
async def test_reconnect(c, s, nanny):
with popen(
[
"dask-worker",
s.address,
"--reconnect",
nanny,
"--no-dashboard",
]
["dask-worker", s.address, "--no-reconnect"],
flush_output=False,
) as worker:
# roundtrip works
assert await c.submit(lambda x: x + 1, 10) == 11

(comm,) = s.stream_comms.values()
comm.abort()

# roundtrip still works, which means the worker reconnected
assert await c.submit(lambda x: x + 1, 11) == 12

# closing the scheduler cleanly does terminate the worker
await s.close()
await to_thread(worker.wait, timeout=5)
assert worker.returncode == 0
for _ in range(10):
line = worker.stdout.readline()
print(line)
if b"flag is deprecated, and will be removed" in line:
break
else:
raise AssertionError("Message not printed, see stdout")
await c.wait_for_workers(1)
await c.shutdown()


@pytest.mark.slow
Expand Down Expand Up @@ -735,3 +714,39 @@ async def test_signal_handling(c, s, nanny, sig):
assert "timed out" not in logs
assert "error" not in logs
assert "exception" not in logs


@pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"])
def test_error_during_startup(monkeypatch, nanny):
# see https://github.com/dask/distributed/issues/6320
scheduler_port = str(open_port())
scheduler_addr = f"tcp://127.0.0.1:{scheduler_port}"

monkeypatch.setenv("DASK_SCHEDULER_ADDRESS", scheduler_addr)
with popen(
[
"dask-scheduler",
"--port",
scheduler_port,
],
flush_output=False,
) as scheduler:
start = time()
# Wait for the scheduler to be up
while line := scheduler.stdout.readline():
if b"Scheduler at" in line:
break
# Ensure this is not killed by pytest-timeout
if time() - start > 5:
raise TimeoutError("Scheduler failed to start in time.")

with popen(
[
"dask-worker",
scheduler_addr,
nanny,
"--worker-port",
scheduler_port,
],
) as worker:
assert worker.wait(5) == 1
2 changes: 1 addition & 1 deletion distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1630,7 +1630,7 @@ async def _shutdown(self):
else:
with suppress(CommClosedError):
self.status = "closing"
await self.scheduler.terminate(close_workers=True)
await self.scheduler.terminate()

def shutdown(self):
"""Shut down the connected scheduler and workers
Expand Down
2 changes: 1 addition & 1 deletion distributed/deploy/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ async def _close(self):
if self.scheduler_comm:
async with self._lock:
with suppress(OSError):
await self.scheduler_comm.terminate(close_workers=True)
await self.scheduler_comm.terminate()
await self.scheduler_comm.close_rpc()
else:
logger.warning("Cluster closed without starting up")
Expand Down
24 changes: 22 additions & 2 deletions distributed/deploy/tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@

from dask.system import CPU_COUNT

from distributed import Client, Nanny, Worker, get_client
from distributed import Client, LocalCluster, Nanny, Worker, get_client
from distributed.compatibility import LINUX
from distributed.core import Status
from distributed.deploy.local import LocalCluster
from distributed.deploy.utils_test import ClusterTest
from distributed.metrics import time
from distributed.system import MEMORY_LIMIT
Expand All @@ -29,6 +28,7 @@
clean,
gen_test,
inc,
raises_with_cause,
slowinc,
tls_only_security,
xfail_ssl_issue5601,
Expand Down Expand Up @@ -1155,3 +1155,23 @@ async def test_connect_to_closed_cluster():
# Raises during init without actually connecting since we're not
# awaiting anything
Client(cluster, asynchronous=True)


class MyPlugin:
def setup(self, worker=None):
import my_nonexistent_library # noqa


@pytest.mark.slow
@gen_test(
clean_kwargs={
# FIXME: This doesn't close the LoopRunner properly, leaving a thread around
"threads": False
}
)
async def test_localcluster_start_exception():
with raises_with_cause(RuntimeError, None, ImportError, "my_nonexistent_library"):
async with LocalCluster(
plugins={MyPlugin()},
):
return
2 changes: 1 addition & 1 deletion distributed/diagnostics/tests/test_cluster_dump_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async def test_cluster_dump_plugin(c, s, *workers, tmp_path):
f2 = c.submit(inc, f1)

assert (await f2) == 3
await s.close(close_workers=True)
await s.close()

dump = DumpArtefact.from_url(str(dump_file))
assert {f1.key, f2.key} == set(dump.scheduler_story(f1.key, f2.key).keys())
Expand Down
11 changes: 11 additions & 0 deletions distributed/diagnostics/tests/test_scheduler_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,14 @@ def f():
await c.submit(f)

assert ("foo", 123) in s._recorded_events


@gen_cluster(client=True)
async def test_register_plugin_on_scheduler(c, s, a, b):
class MyPlugin(SchedulerPlugin):
async def start(self, scheduler: Scheduler) -> None:
scheduler._foo = "bar" # type: ignore

await s.register_scheduler_plugin(MyPlugin())

assert s._foo == "bar"
13 changes: 4 additions & 9 deletions distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,6 @@ async def instantiate(self) -> Status:
nanny=self.address,
name=self.name,
memory_limit=self.memory_manager.memory_limit,
reconnect=self.reconnect,
resources=self.resources,
validate=self.validate,
silence_logs=self.silence_logs,
Expand Down Expand Up @@ -471,7 +470,7 @@ async def plugin_remove(self, name=None):

return {"status": "OK"}

async def restart(self, timeout=30, executor_wait=True):
async def restart(self, timeout=30):
async def _():
if self.process is not None:
await self.kill()
Expand Down Expand Up @@ -557,7 +556,7 @@ def close_gracefully(self):
"""
self.status = Status.closing_gracefully

async def close(self, comm=None, timeout=5, report=None):
async def close(self, timeout=5):
"""
Close the worker process, stop all comms.
"""
Expand All @@ -570,9 +569,8 @@ async def close(self, comm=None, timeout=5, report=None):

self.status = Status.closing
logger.info(
"Closing Nanny at %r. Report closure to scheduler: %s",
"Closing Nanny at %r.",
self.address_safe,
report,
)

for preload in self.preloads:
Expand All @@ -595,9 +593,8 @@ async def close(self, comm=None, timeout=5, report=None):
self.process = None
await self.rpc.close()
self.status = Status.closed
if comm:
await comm.write("OK")
await super().close()
return "OK"

async def _log_event(self, topic, msg):
await self.scheduler.log_event(
Expand Down Expand Up @@ -838,9 +835,7 @@ def _run(
async def do_stop(timeout=5, executor_wait=True):
try:
await worker.close(
report=True,
nanny=False,
safe=True, # TODO: Graceful or not?
executor_wait=executor_wait,
timeout=timeout,
)
Expand Down
Loading

0 comments on commit 5e2c5a0

Please sign in to comment.