From d06dfd724231d39504440269dbce0c46112abec0 Mon Sep 17 00:00:00 2001 From: Florian Jetter Date: Fri, 6 May 2022 08:38:04 -0500 Subject: [PATCH 1/3] Skip test_release_evloop_while_spilling on OSX (#6291) --- distributed/tests/test_worker_memory.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/distributed/tests/test_worker_memory.py b/distributed/tests/test_worker_memory.py index ff410ae0d0..04c7b6c5c2 100644 --- a/distributed/tests/test_worker_memory.py +++ b/distributed/tests/test_worker_memory.py @@ -12,6 +12,7 @@ import distributed.system from distributed import Client, Event, Nanny, Worker, wait +from distributed.compatibility import MACOS from distributed.core import Status from distributed.metrics import monotonic from distributed.spill import has_zict_210 @@ -763,6 +764,9 @@ def __reduce__(self): @pytest.mark.slow +@pytest.mark.skipif( + condition=MACOS, reason="https://github.com/dask/distributed/issues/6233" +) @gen_cluster( nthreads=[("", 1)], client=True, From 7c57fdc07998d7a6d561fc062b5f37b6029d555e Mon Sep 17 00:00:00 2001 From: Florian Jetter Date: Fri, 6 May 2022 08:56:50 -0500 Subject: [PATCH 2/3] Ensure resumed tasks are not accidentally forgotten (#6217) Co-authored-by: crusaderky --- distributed/tests/test_cancelled_state.py | 153 +++++++++++++++++++++- distributed/tests/test_scheduler.py | 10 +- distributed/worker.py | 128 ++++++++++-------- 3 files changed, 227 insertions(+), 64 deletions(-) diff --git a/distributed/tests/test_cancelled_state.py b/distributed/tests/test_cancelled_state.py index 273f7f1c74..261aa29683 100644 --- a/distributed/tests/test_cancelled_state.py +++ b/distributed/tests/test_cancelled_state.py @@ -89,17 +89,12 @@ def f(ev): ("f1", "ready", "executing", "executing", {}), ("free-keys", ("f1",)), ("f1", "executing", "released", "cancelled", {}), - ("f1", "ensure-task-exists", "cancelled"), - ("f1", "cancelled", "fetch", "cancelled", {"f1": ("resumed", "fetch")}), - ("f1", "cancelled", "resumed", "resumed", {}), - ("f1", "put-in-memory"), + ("f1", "cancelled", "fetch", "resumed", {}), ("f1", "resumed", "memory", "memory", {"f2": "ready"}), ("free-keys", ("f1",)), - ("f1", "release-key"), ("f1", "memory", "released", "released", {}), ("f1", "released", "forgotten", "forgotten", {}), ], - strict=True, ) @@ -164,8 +159,9 @@ def blockable_compute(x, lock): lock=block_compute, workers=[a.address], allow_other_workers=True, + key="fut1", ) - fut2 = c.submit(inc, fut1, workers=[b.address]) + fut2 = c.submit(inc, fut1, workers=[b.address], key="fut2") await enter_get_data.wait() await block_compute.acquire() @@ -198,6 +194,9 @@ async def wait_and_raise(*args, **kwargs): fut = c.submit(wait_and_raise) await wait_for_state(fut.key, "executing", w) + # Queue up another task to ensure this is not affected by our error handling + fut2 = c.submit(inc, 1) + await wait_for_state(fut2.key, "ready", w) fut.release() await wait_for_state(fut.key, "cancelled", w) @@ -210,6 +209,7 @@ async def wait_and_raise(*args, **kwargs): while fut.key in w.tasks: await asyncio.sleep(0.01) + assert await fut2 == 2 # Everything should still be executing as usual after this await c.submit(sum, c.map(inc, range(10))) == sum(map(inc, range(10))) @@ -257,3 +257,142 @@ async def get_data(self, comm, *args, **kwargs): a.block_get_data = False # Everything should still be executing as usual after this assert await c.submit(sum, c.map(inc, range(10))) == sum(map(inc, range(10))) + + +@gen_cluster(client=True, nthreads=[("", 1)]) +async def test_in_flight_lost_after_resumed(c, s, b): + block_get_data = asyncio.Lock() + in_get_data = asyncio.Event() + + lock_executing = Lock() + + def block_execution(lock): + with lock: + return + + class BlockedGetData(Worker): + async def get_data(self, comm, *args, **kwargs): + in_get_data.set() + async with block_get_data: + return await super().get_data(comm, *args, **kwargs) + + async with BlockedGetData(s.address, name="blocked-get-dataworker") as a: + fut1 = c.submit( + block_execution, + lock_executing, + workers=[a.address], + allow_other_workers=True, + key="fut1", + ) + # Ensure fut1 is in memory but block any further execution afterwards to + # ensure we control when the recomputation happens + await fut1 + await lock_executing.acquire() + in_get_data.clear() + await block_get_data.acquire() + fut2 = c.submit(inc, fut1, workers=[b.address], key="fut2") + + # This ensures that B already fetches the task, i.e. after this the task + # is guaranteed to be in flight + await in_get_data.wait() + assert fut1.key in b.tasks + assert b.tasks[fut1.key].state == "flight" + + # It is removed, i.e. get_data is guaranteed to fail and f1 is scheduled + # to be recomputed on B + await s.remove_worker(a.address, "foo", close=False, safe=True) + + while not b.tasks[fut1.key].state == "resumed": + await asyncio.sleep(0.01) + + fut1.release() + fut2.release() + + while not b.tasks[fut1.key].state == "cancelled": + await asyncio.sleep(0.01) + + block_get_data.release() + while b.tasks: + await asyncio.sleep(0.01) + + assert_story( + b.story(fut1.key), + expect=[ + # The initial free-keys is rejected + ("free-keys", (fut1.key,)), + (fut1.key, "resumed", "released", "cancelled", {}), + # After gather_dep receives the data, it tries to transition to memory but the task will release instead + (fut1.key, "cancelled", "memory", "released", {fut1.key: "forgotten"}), + ], + ) + + +@gen_cluster(client=True) +async def test_cancelled_error(c, s, a, b): + executing = Event() + lock_executing = Lock() + await lock_executing.acquire() + + def block_execution(event, lock): + event.set() + with lock: + raise RuntimeError() + + fut1 = c.submit( + block_execution, + executing, + lock_executing, + workers=[b.address], + allow_other_workers=True, + key="fut1", + ) + + await executing.wait() + assert b.tasks[fut1.key].state == "executing" + fut1.release() + while b.tasks[fut1.key].state == "executing": + await asyncio.sleep(0.01) + await lock_executing.release() + while b.tasks: + await asyncio.sleep(0.01) + + assert_story( + b.story(fut1.key), + [ + (fut1.key, "executing", "released", "cancelled", {}), + (fut1.key, "cancelled", "error", "error", {fut1.key: "released"}), + ], + ) + + +@gen_cluster(client=True, nthreads=[("", 1, {"resources": {"A": 1}})]) +async def test_cancelled_error_with_ressources(c, s, a): + executing = Event() + lock_executing = Lock() + await lock_executing.acquire() + + def block_execution(event, lock): + event.set() + with lock: + raise RuntimeError() + + fut1 = c.submit( + block_execution, + executing, + lock_executing, + resources={"A": 1}, + key="fut1", + ) + + await executing.wait() + fut2 = c.submit(inc, 1, resources={"A": 1}, key="fut2") + + while fut2.key not in a.tasks: + await asyncio.sleep(0.01) + + fut1.release() + while a.tasks[fut1.key].state == "executing": + await asyncio.sleep(0.01) + await lock_executing.release() + + assert await fut2 == 2 diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 58d5a180da..784448c588 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -1682,12 +1682,20 @@ async def test_closing_scheduler_closes_workers(s, a, b): client=True, nthreads=[("127.0.0.1", 1)], worker_kwargs={"resources": {"A": 1}} ) async def test_resources_reset_after_cancelled_task(c, s, w): - future = c.submit(sleep, 0.2, resources={"A": 1}) + lock = Lock() + + def block(lock): + with lock: + return + + await lock.acquire() + future = c.submit(block, lock, resources={"A": 1}) while not w.executing_count: await asyncio.sleep(0.01) await future.cancel() + await lock.release() while w.executing_count: await asyncio.sleep(0.01) diff --git a/distributed/worker.py b/distributed/worker.py index 2d5fd2be85..a45454ff08 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -605,7 +605,6 @@ def __init__( validate = dask.config.get("distributed.scheduler.validate") self.validate = validate self._transitions_table = { - ("cancelled", "resumed"): self.transition_cancelled_resumed, ("cancelled", "fetch"): self.transition_cancelled_fetch, ("cancelled", "released"): self.transition_cancelled_released, ("cancelled", "waiting"): self.transition_cancelled_waiting, @@ -614,7 +613,7 @@ def __init__( ("cancelled", "error"): self.transition_cancelled_error, ("resumed", "memory"): self.transition_generic_memory, ("resumed", "error"): self.transition_generic_error, - ("resumed", "released"): self.transition_generic_released, + ("resumed", "released"): self.transition_resumed_released, ("resumed", "waiting"): self.transition_resumed_waiting, ("resumed", "fetch"): self.transition_resumed_fetch, ("resumed", "missing"): self.transition_resumed_missing, @@ -2207,28 +2206,28 @@ def transition_cancelled_error( *, stimulus_id: str, ) -> RecsInstrs: - recs: Recs = {} - instructions: Instructions = [] - if ts._previous == "executing": - recs, instructions = self.transition_executing_error( - ts, - exception, - traceback, - exception_text, - traceback_text, - stimulus_id=stimulus_id, - ) - elif ts._previous == "flight": - recs, instructions = self.transition_flight_error( - ts, - exception, - traceback, - exception_text, - traceback_text, - stimulus_id=stimulus_id, - ) - if ts._next: - recs[ts] = ts._next + assert ts._previous == "executing" or ts.key in self.long_running + recs, instructions = self.transition_executing_error( + ts, + exception, + traceback, + exception_text, + traceback_text, + stimulus_id=stimulus_id, + ) + # We'll ignore instructions, i.e. we choose to not submit the failure + # message to the scheduler since from the schedulers POV it already + # released this task + if self.validate: + assert len(instructions) == 1 + assert isinstance(instructions[0], TaskErredMsg) + assert instructions[0].key == ts.key + instructions.clear() + # Workers should never "retry" tasks. A transition to error should, by + # default, be the end. Since cancelled indicates that the scheduler lost + # interest, we can transition straight to released + assert ts not in recs + recs[ts] = "released" return recs, instructions def transition_generic_error( @@ -2286,7 +2285,7 @@ def transition_executing_error( ) def _transition_from_resumed( - self, ts: TaskState, finish: TaskStateState, *, stimulus_id: str + self, ts: TaskState, finish: TaskStateState, *args, stimulus_id: str ) -> RecsInstrs: """`resumed` is an intermediate degenerate state which splits further up into two states depending on what the last signal / next state is @@ -2309,18 +2308,29 @@ def _transition_from_resumed( """ recs: Recs = {} instructions: Instructions = [] - if ts.done: + + if ts._previous == finish: + # We're back where we started. We should forget about the entire + # cancellation attempt + ts.state = finish + ts._next = None + ts._previous = None + elif not ts.done: + # If we're not done, yet, just remember where we want to be next + ts._next = finish + else: + # Flight/executing finished unsuccesfully, i.e. not in memory + assert finish != "memory" next_state = ts._next - # if the next state is already intended to be waiting or if the - # coro/thread is still running (ts.done==False), this is a noop - if ts._next != finish: + assert next_state in {"waiting", "fetch"}, next_state + assert ts._previous in {"executing", "flight"}, ts._previous + + if next_state != finish: recs, instructions = self.transition_generic_released( ts, stimulus_id=stimulus_id ) - assert next_state recs[ts] = next_state - else: - ts._next = finish + return recs, instructions def transition_resumed_fetch( @@ -2339,7 +2349,19 @@ def transition_resumed_missing( """ return self._transition_from_resumed(ts, "missing", stimulus_id=stimulus_id) - def transition_resumed_waiting(self, ts: TaskState, *, stimulus_id: str): + def transition_resumed_released( + self, ts: TaskState, *, stimulus_id: str + ) -> RecsInstrs: + if not ts.done: + ts.state = "cancelled" + ts._next = None + return {}, [] + else: + return self.transition_generic_released(ts, stimulus_id=stimulus_id) + + def transition_resumed_waiting( + self, ts: TaskState, *, stimulus_id: str + ) -> RecsInstrs: """ See Worker._transition_from_resumed """ @@ -2355,14 +2377,9 @@ def transition_cancelled_fetch( return {}, [] else: assert ts._previous == "executing" - return {ts: ("resumed", "fetch")}, [] - - def transition_cancelled_resumed( - self, ts: TaskState, next: TaskStateState, *, stimulus_id: str - ) -> RecsInstrs: - ts._next = next - ts.state = "resumed" - return {}, [] + ts.state = "resumed" + ts._next = "fetch" + return {}, [] def transition_cancelled_waiting( self, ts: TaskState, *, stimulus_id: str @@ -2374,7 +2391,9 @@ def transition_cancelled_waiting( return {}, [] else: assert ts._previous == "flight" - return {ts: ("resumed", "waiting")}, [] + ts.state = "resumed" + ts._next = "waiting" + return {}, [] def transition_cancelled_forgotten( self, ts: TaskState, *, stimulus_id: str @@ -2390,24 +2409,19 @@ def transition_cancelled_released( if not ts.done: ts._next = "released" return {}, [] - next_state = ts._next - assert next_state self._executing.discard(ts) self._in_flight_tasks.discard(ts) for resource, quantity in ts.resource_restrictions.items(): self.available_resources[resource] += quantity - return merge_recs_instructions( - self.transition_generic_released(ts, stimulus_id=stimulus_id), - ({ts: next_state} if next_state != "released" else {}, []), - ) + return self.transition_generic_released(ts, stimulus_id=stimulus_id) def transition_executing_released( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: ts._previous = ts.state - ts._next = "released" + ts._next = None # See https://github.com/dask/distributed/pull/5046#discussion_r685093940 ts.state = "cancelled" ts.done = False @@ -2546,16 +2560,18 @@ def transition_flight_released( return self.transition_generic_released(ts, stimulus_id=stimulus_id) else: ts._previous = "flight" - ts._next = "released" + ts._next = None # See https://github.com/dask/distributed/pull/5046#discussion_r685093940 ts.state = "cancelled" return {}, [] - def transition_cancelled_memory( - self, ts: TaskState, value, *, stimulus_id: str - ) -> RecsInstrs: - assert ts._next - return {ts: ts._next}, [] + def transition_cancelled_memory(self, ts, value, *, stimulus_id): + # We only need this because the to-memory signatures require a value but + # we do not want to store a cancelled result and want to release + # immediately + assert ts.done + + return self.transition_cancelled_released(ts, stimulus_id=stimulus_id) def transition_executing_long_running( self, ts: TaskState, compute_duration: float, *, stimulus_id: str @@ -4082,7 +4098,7 @@ def validate_task_missing(self, ts): def validate_task_cancelled(self, ts): assert ts.key not in self.data assert ts._previous - assert ts._next + assert ts._next is None # We'll always transition to released after it is done def validate_task_resumed(self, ts): assert ts.key not in self.data From 9f7646abc81b5b0a1acdca24cef7c17f29a6acb4 Mon Sep 17 00:00:00 2001 From: Enric Tejedor Date: Fri, 6 May 2022 16:10:55 +0200 Subject: [PATCH 3/3] Add option to specify a scheduler address for workers to use (#5944) --- distributed/deploy/spec.py | 6 +++++- distributed/distributed-schema.yaml | 11 +++++++++++ distributed/distributed.yaml | 1 + distributed/scheduler.py | 10 ++++++++++ 4 files changed, 27 insertions(+), 1 deletion(-) diff --git a/distributed/deploy/spec.py b/distributed/deploy/spec.py index 5cd561ea84..96a66d63ae 100644 --- a/distributed/deploy/spec.py +++ b/distributed/deploy/spec.py @@ -340,7 +340,11 @@ async def _correct_state_internal(self): opts["name"] = name if isinstance(cls, str): cls = import_term(cls) - worker = cls(self.scheduler.address, **opts) + worker = cls( + getattr(self.scheduler, "contact_address", None) + or self.scheduler.address, + **opts, + ) self._created.add(worker) workers.append(worker) if workers: diff --git a/distributed/distributed-schema.yaml b/distributed/distributed-schema.yaml index 0ff6802f7f..942eab8ff9 100644 --- a/distributed/distributed-schema.yaml +++ b/distributed/distributed-schema.yaml @@ -43,6 +43,17 @@ properties: For a list of handlers see the `dask.distributed.Scheduler.handlers` attribute. + contact-address: + type: + - string + - "null" + description: | + The address that the scheduler advertises to workers for communication with it. + + To be specified when the address to which the scheduler binds cannot be the same + as the address that workers use to contact the scheduler (e.g. because the former + is private and the scheduler is in a different network than the workers). + default-data-size: type: - string diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index 19baf8343b..74d59addb3 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -12,6 +12,7 @@ distributed: allowed-failures: 3 # number of retries before a task is considered bad bandwidth: 100000000 # 100 MB/s estimated worker-worker bandwidth blocked-handlers: [] + contact-address: null default-data-size: 1kiB # Number of seconds to wait until workers or clients are removed from the events log # after they have been removed from the scheduler diff --git a/distributed/scheduler.py b/distributed/scheduler.py index b80ae3e9d0..9cb0806a5b 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2912,6 +2912,12 @@ class Scheduler(SchedulerState, ServerNode): Users typically do not interact with the scheduler directly but rather with the client object ``Client``. + The ``contact_address`` parameter allows to advertise a specific address to + the workers for communication with the scheduler, which is different than + the address the scheduler binds to. This is useful when the scheduler + listens on a private address, which therefore cannot be used by the workers + to contact it. + **State** The scheduler contains the following state variables. Each variable is @@ -2976,11 +2982,15 @@ def __init__( preload=None, preload_argv=(), plugins=(), + contact_address=None, **kwargs, ): self._setup_logging(logger) # Attributes + if contact_address is None: + contact_address = dask.config.get("distributed.scheduler.contact-address") + self.contact_address = contact_address if allowed_failures is None: allowed_failures = dask.config.get("distributed.scheduler.allowed-failures") self.allowed_failures = allowed_failures