Skip to content

Commit

Permalink
Merge branch 'main' into ensure_communicating
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed May 6, 2022
2 parents 2c3ed5c + 9f7646a commit ef56014
Show file tree
Hide file tree
Showing 8 changed files with 258 additions and 65 deletions.
6 changes: 5 additions & 1 deletion distributed/deploy/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,11 @@ async def _correct_state_internal(self):
opts["name"] = name
if isinstance(cls, str):
cls = import_term(cls)
worker = cls(self.scheduler.address, **opts)
worker = cls(
getattr(self.scheduler, "contact_address", None)
or self.scheduler.address,
**opts,
)
self._created.add(worker)
workers.append(worker)
if workers:
Expand Down
11 changes: 11 additions & 0 deletions distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ properties:
For a list of handlers see the `dask.distributed.Scheduler.handlers` attribute.
contact-address:
type:
- string
- "null"
description: |
The address that the scheduler advertises to workers for communication with it.
To be specified when the address to which the scheduler binds cannot be the same
as the address that workers use to contact the scheduler (e.g. because the former
is private and the scheduler is in a different network than the workers).
default-data-size:
type:
- string
Expand Down
1 change: 1 addition & 0 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ distributed:
allowed-failures: 3 # number of retries before a task is considered bad
bandwidth: 100000000 # 100 MB/s estimated worker-worker bandwidth
blocked-handlers: []
contact-address: null
default-data-size: 1kiB
# Number of seconds to wait until workers or clients are removed from the events log
# after they have been removed from the scheduler
Expand Down
10 changes: 10 additions & 0 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2912,6 +2912,12 @@ class Scheduler(SchedulerState, ServerNode):
Users typically do not interact with the scheduler directly but rather with
the client object ``Client``.
The ``contact_address`` parameter allows to advertise a specific address to
the workers for communication with the scheduler, which is different than
the address the scheduler binds to. This is useful when the scheduler
listens on a private address, which therefore cannot be used by the workers
to contact it.
**State**
The scheduler contains the following state variables. Each variable is
Expand Down Expand Up @@ -2976,11 +2982,15 @@ def __init__(
preload=None,
preload_argv=(),
plugins=(),
contact_address=None,
**kwargs,
):
self._setup_logging(logger)

# Attributes
if contact_address is None:
contact_address = dask.config.get("distributed.scheduler.contact-address")
self.contact_address = contact_address
if allowed_failures is None:
allowed_failures = dask.config.get("distributed.scheduler.allowed-failures")
self.allowed_failures = allowed_failures
Expand Down
153 changes: 146 additions & 7 deletions distributed/tests/test_cancelled_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,12 @@ def f(ev):
("f1", "ready", "executing", "executing", {}),
("free-keys", ("f1",)),
("f1", "executing", "released", "cancelled", {}),
("f1", "ensure-task-exists", "cancelled"),
("f1", "cancelled", "fetch", "cancelled", {"f1": ("resumed", "fetch")}),
("f1", "cancelled", "resumed", "resumed", {}),
("f1", "put-in-memory"),
("f1", "cancelled", "fetch", "resumed", {}),
("f1", "resumed", "memory", "memory", {"f2": "ready"}),
("free-keys", ("f1",)),
("f1", "release-key"),
("f1", "memory", "released", "released", {}),
("f1", "released", "forgotten", "forgotten", {}),
],
strict=True,
)


Expand Down Expand Up @@ -164,8 +159,9 @@ def blockable_compute(x, lock):
lock=block_compute,
workers=[a.address],
allow_other_workers=True,
key="fut1",
)
fut2 = c.submit(inc, fut1, workers=[b.address])
fut2 = c.submit(inc, fut1, workers=[b.address], key="fut2")

await enter_get_data.wait()
await block_compute.acquire()
Expand Down Expand Up @@ -198,6 +194,9 @@ async def wait_and_raise(*args, **kwargs):

fut = c.submit(wait_and_raise)
await wait_for_state(fut.key, "executing", w)
# Queue up another task to ensure this is not affected by our error handling
fut2 = c.submit(inc, 1)
await wait_for_state(fut2.key, "ready", w)

fut.release()
await wait_for_state(fut.key, "cancelled", w)
Expand All @@ -210,6 +209,7 @@ async def wait_and_raise(*args, **kwargs):
while fut.key in w.tasks:
await asyncio.sleep(0.01)

assert await fut2 == 2
# Everything should still be executing as usual after this
await c.submit(sum, c.map(inc, range(10))) == sum(map(inc, range(10)))

Expand Down Expand Up @@ -257,3 +257,142 @@ async def get_data(self, comm, *args, **kwargs):
a.block_get_data = False
# Everything should still be executing as usual after this
assert await c.submit(sum, c.map(inc, range(10))) == sum(map(inc, range(10)))


@gen_cluster(client=True, nthreads=[("", 1)])
async def test_in_flight_lost_after_resumed(c, s, b):
block_get_data = asyncio.Lock()
in_get_data = asyncio.Event()

lock_executing = Lock()

def block_execution(lock):
with lock:
return

class BlockedGetData(Worker):
async def get_data(self, comm, *args, **kwargs):
in_get_data.set()
async with block_get_data:
return await super().get_data(comm, *args, **kwargs)

async with BlockedGetData(s.address, name="blocked-get-dataworker") as a:
fut1 = c.submit(
block_execution,
lock_executing,
workers=[a.address],
allow_other_workers=True,
key="fut1",
)
# Ensure fut1 is in memory but block any further execution afterwards to
# ensure we control when the recomputation happens
await fut1
await lock_executing.acquire()
in_get_data.clear()
await block_get_data.acquire()
fut2 = c.submit(inc, fut1, workers=[b.address], key="fut2")

# This ensures that B already fetches the task, i.e. after this the task
# is guaranteed to be in flight
await in_get_data.wait()
assert fut1.key in b.tasks
assert b.tasks[fut1.key].state == "flight"

# It is removed, i.e. get_data is guaranteed to fail and f1 is scheduled
# to be recomputed on B
await s.remove_worker(a.address, "foo", close=False, safe=True)

while not b.tasks[fut1.key].state == "resumed":
await asyncio.sleep(0.01)

fut1.release()
fut2.release()

while not b.tasks[fut1.key].state == "cancelled":
await asyncio.sleep(0.01)

block_get_data.release()
while b.tasks:
await asyncio.sleep(0.01)

assert_story(
b.story(fut1.key),
expect=[
# The initial free-keys is rejected
("free-keys", (fut1.key,)),
(fut1.key, "resumed", "released", "cancelled", {}),
# After gather_dep receives the data, it tries to transition to memory but the task will release instead
(fut1.key, "cancelled", "memory", "released", {fut1.key: "forgotten"}),
],
)


@gen_cluster(client=True)
async def test_cancelled_error(c, s, a, b):
executing = Event()
lock_executing = Lock()
await lock_executing.acquire()

def block_execution(event, lock):
event.set()
with lock:
raise RuntimeError()

fut1 = c.submit(
block_execution,
executing,
lock_executing,
workers=[b.address],
allow_other_workers=True,
key="fut1",
)

await executing.wait()
assert b.tasks[fut1.key].state == "executing"
fut1.release()
while b.tasks[fut1.key].state == "executing":
await asyncio.sleep(0.01)
await lock_executing.release()
while b.tasks:
await asyncio.sleep(0.01)

assert_story(
b.story(fut1.key),
[
(fut1.key, "executing", "released", "cancelled", {}),
(fut1.key, "cancelled", "error", "error", {fut1.key: "released"}),
],
)


@gen_cluster(client=True, nthreads=[("", 1, {"resources": {"A": 1}})])
async def test_cancelled_error_with_ressources(c, s, a):
executing = Event()
lock_executing = Lock()
await lock_executing.acquire()

def block_execution(event, lock):
event.set()
with lock:
raise RuntimeError()

fut1 = c.submit(
block_execution,
executing,
lock_executing,
resources={"A": 1},
key="fut1",
)

await executing.wait()
fut2 = c.submit(inc, 1, resources={"A": 1}, key="fut2")

while fut2.key not in a.tasks:
await asyncio.sleep(0.01)

fut1.release()
while a.tasks[fut1.key].state == "executing":
await asyncio.sleep(0.01)
await lock_executing.release()

assert await fut2 == 2
10 changes: 9 additions & 1 deletion distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1682,12 +1682,20 @@ async def test_closing_scheduler_closes_workers(s, a, b):
client=True, nthreads=[("127.0.0.1", 1)], worker_kwargs={"resources": {"A": 1}}
)
async def test_resources_reset_after_cancelled_task(c, s, w):
future = c.submit(sleep, 0.2, resources={"A": 1})
lock = Lock()

def block(lock):
with lock:
return

await lock.acquire()
future = c.submit(block, lock, resources={"A": 1})

while not w.executing_count:
await asyncio.sleep(0.01)

await future.cancel()
await lock.release()

while w.executing_count:
await asyncio.sleep(0.01)
Expand Down
4 changes: 4 additions & 0 deletions distributed/tests/test_worker_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import distributed.system
from distributed import Client, Event, Nanny, Worker, wait
from distributed.compatibility import MACOS
from distributed.core import Status
from distributed.metrics import monotonic
from distributed.spill import has_zict_210
Expand Down Expand Up @@ -763,6 +764,9 @@ def __reduce__(self):


@pytest.mark.slow
@pytest.mark.skipif(
condition=MACOS, reason="https://github.com/dask/distributed/issues/6233"
)
@gen_cluster(
nthreads=[("", 1)],
client=True,
Expand Down
Loading

0 comments on commit ef56014

Please sign in to comment.