Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deadlock fetching key from retiring worker, when scheduler thinks we already have the key #6244

Closed
gjoseph92 opened this issue Apr 28, 2022 · 12 comments · Fixed by #6248
Closed
Assignees
Labels
bug Something is broken deadlock The cluster appears to not make any progress

Comments

@gjoseph92
Copy link
Collaborator

This is somewhat conjecture based on a partial cluster dump from @bnaul (full one wasn't working for some reason). Hopefully we can attach the dump at some point so others can see. I only got to see the dump of a worker. If we could see the scheduler state, we could confirm this theory.

This was the worker story of the bad key
In [9]: dump.worker_stories("('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)")
Out[9]: 
{'tcp://10.125.88.48:37099': [["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
   'ensure-task-exists',
   'released',
   'active_memory_manager-1651174357.5785856',
   datetime.datetime(2022, 4, 28, 13, 32, 37, 586432)],
  ["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
   'released',
   'fetch',
   'fetch',
   {},
   'active_memory_manager-1651174357.5785856',
   datetime.datetime(2022, 4, 28, 13, 32, 37, 586584)],
  ["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
   'compute-task',
   'compute-task-1651174375.3724043',
   datetime.datetime(2022, 4, 28, 13, 32, 55, 378109)],
  ["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
   'release-key',
   'compute-task-1651174375.3724043',
   datetime.datetime(2022, 4, 28, 13, 32, 55, 378176)],
  ["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
   'fetch',
   'released',
   'released',
   {"('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)": 'forgotten'},
   'compute-task-1651174375.3724043',
   datetime.datetime(2022, 4, 28, 13, 32, 55, 378205)],
  ["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
   'released',
   'forgotten',
   'forgotten',
   {},
   'compute-task-1651174375.3724043',
   datetime.datetime(2022, 4, 28, 13, 32, 55, 378212)],
  ["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
   'fetch',
   'waiting',
   'forgotten',
   {"('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)": 'forgotten'},
   'compute-task-1651174375.3724043',
   datetime.datetime(2022, 4, 28, 13, 32, 55, 378214)]],
 'tcp://10.124.15.5:35871': [["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
   'ensure-task-exists',
   'released',
   'active_memory_manager-1651174147.6869376',
   datetime.datetime(2022, 4, 28, 13, 29, 7, 714652)],
  ["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
   'released',
   'fetch',
   'fetch',
   {},
   'active_memory_manager-1651174147.6869376',
   datetime.datetime(2022, 4, 28, 13, 29, 7, 714735)],
  ["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
   'ensure-task-exists',
   'fetch',
   'active_memory_manager-1651174149.5793703',
   datetime.datetime(2022, 4, 28, 13, 29, 9, 587408)],
  ['receive-dep-failed',
   'tcp://10.124.227.26:35167',
   ["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
    "('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 2764)",
    "('_split-bec215abf9d605cb60c27727e9a88b7e', 5726)",
    "('_split-bec215abf9d605cb60c27727e9a88b7e', 9555)",
    "('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 7277)",
    "('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 257)",
    "('_split-bec215abf9d605cb60c27727e9a88b7e', 8041)",
    "('_split-bec215abf9d605cb60c27727e9a88b7e', 2732)"],
   'ensure-communicating-1651174136.9395597',
   datetime.datetime(2022, 4, 28, 13, 31, 0, 999395)],
  ['missing-who-has',
   'tcp://10.124.227.26:35167',
   "('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
   'ensure-communicating-1651174136.9395597',
   datetime.datetime(2022, 4, 28, 13, 31, 0, 999403)],
  ["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
   'fetch',
   'missing',
   'missing',
   {},
   'ensure-communicating-1651174136.9395597',
   datetime.datetime(2022, 4, 28, 13, 31, 0, 999475)],
  ["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
   'missing',
   'fetch',
   'fetch',
   {},
   'find-missing-1651174261.5426853',
   datetime.datetime(2022, 4, 28, 13, 31, 1, 556907)],
  ['gather-dependencies',
   'tcp://10.124.227.26:35167',
   ["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
    "('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 2764)",
    "('_split-bec215abf9d605cb60c27727e9a88b7e', 5726)",
    "('_split-bec215abf9d605cb60c27727e9a88b7e', 9555)",
    "('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 7277)",
    "('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 257)",
    "('_split-bec215abf9d605cb60c27727e9a88b7e', 8041)",
    "('_split-bec215abf9d605cb60c27727e9a88b7e', 2732)"],
   'ensure-communicating-1651174261.556927',
   datetime.datetime(2022, 4, 28, 13, 31, 1, 557029)],
  ["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
   'fetch',
   'flight',
   'flight',
   {},
   'ensure-communicating-1651174261.556927',
   datetime.datetime(2022, 4, 28, 13, 31, 1, 557074)],
  ['request-dep',
   'tcp://10.124.227.26:35167',
   ["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
    "('_split-bec215abf9d605cb60c27727e9a88b7e', 5726)",
    "('_split-bec215abf9d605cb60c27727e9a88b7e', 9555)",
    "('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 7277)",
    "('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 257)",
    "('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 2764)",
    "('_split-bec215abf9d605cb60c27727e9a88b7e', 2732)",
    "('_split-bec215abf9d605cb60c27727e9a88b7e', 8041)"],
   'ensure-communicating-1651174261.556927',
   datetime.datetime(2022, 4, 28, 13, 31, 1, 557221)],
  ['receive-dep-failed',
   'tcp://10.124.227.26:35167',
   ["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
    "('_split-bec215abf9d605cb60c27727e9a88b7e', 5726)",
    "('_split-bec215abf9d605cb60c27727e9a88b7e', 9555)",
    "('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 7277)",
    "('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 257)",
    "('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 2764)",
    "('_split-bec215abf9d605cb60c27727e9a88b7e', 2732)",
    "('_split-bec215abf9d605cb60c27727e9a88b7e', 8041)"],
   'ensure-communicating-1651174261.556927',
   datetime.datetime(2022, 4, 28, 13, 33, 5, 778931)],
  ['missing-who-has',
   'tcp://10.124.227.26:35167',
   "('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
   'ensure-communicating-1651174261.556927',
   datetime.datetime(2022, 4, 28, 13, 33, 5, 778938)],
  ["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
   'flight',
   'missing',
   'missing',
   {},
   'ensure-communicating-1651174261.556927',
   datetime.datetime(2022, 4, 28, 13, 33, 5, 779073)]]}

In #6112, we fixed gather_dep logic to transition all keys to missing that a particular worker holds, if communication with that worker fails. We assume the find_missing PeriodicCallback will talk to the scheduler and ask where to fetch them from next.

In the case of other errors, we would inform the scheduler of this:

elif ts not in recommendations:
ts.who_has.discard(worker)
self.has_what[worker].discard(ts.key)
self.log.append((d, "missing-dep", stimulus_id, time()))
self.batched_stream.send(
{
"op": "missing-data",
"errant_worker": worker,
"key": d,
"stimulus_id": stimulus_id,
}
)

However, in the OSError case, we don't send the missing-data message (because we've already added recommendations for those keys to missing).


Simultaneously, AMM is now used for retire_workers. When retiring a worker, we try to move all its keys onto other workers. To do this, AMM calls Scheduler.request_acquire_replicas:

def request_acquire_replicas(self, addr: str, keys: list, *, stimulus_id: str):
"""Asynchronously ask a worker to acquire a replica of the listed keys from
other workers. This is a fire-and-forget operation which offers no feedback for
success or failure, and is intended for housekeeping and not for computation.
"""
who_has = {}
for key in keys:
ts = self.tasks[key]
who_has[key] = {ws.address for ws in ts.who_has}
self.stream_comms[addr].send(
{
"op": "acquire-replicas",
"keys": keys,
"who_has": who_has,
"stimulus_id": stimulus_id,
},
)

request_acquire_replicas optimistically adds the key to the worker's who_has, before the worker has actually confirmed it's received it. So imagine this flow:

  • Worker A is retiring gracefully. It holds key-foo
  • AMM calls request_acquire_replicas("worker-b-addr", ["key-foo"]).
    • key-foo is immediately assigned to worker B from the scheduler's perspective
    • Scheduler tells worker B: "go fetch key-foo from worker A"
  • Next AMM cycle runs. AMM thinks, "both worker A and B hold key-foo now" (not actually true). "I'll tell worker A to drop its copy." request_remove_replicas also eagerly updates scheduler state so worker A no longer holds the key (this is also not true, but it's more reasonable).
  • Next AMM cycle runs. AMM thinks, "worker A holds no keys anymore. It can shut down"
  • Worker A shuts down
  • Worker B was busy, so it didn't get around to requesting key-foo from worker A until too late. Worker A is already shut down. The key is lost.
  • The OSError happens trying to fetch the key. The key goes to missing on worker B.
  • In find_missing, Worker B asks the scheduler, "who has key-foo"?
  • Scheduler replies, "just you do!"
  • Worker B: 🤔

I can't quite figure out what happens from here. I would have hoped that eventually worker B would try to gather_dep from itself, it would reply to itself "I don't have that key", and then it would finally send the missing-data signal to the scheduler, letting the scheduler realize the key is actually missing. That doesn't seem to be happening, from the cluster dump I'm looking at where this situation happened.

A few things to consider changing:

  • request_acquire_replicas should not be so optimistic, and assume the key is on a new worker before we've confirmed it's there. This creates a race condition, where we may drop the key from the sender before the receiver has gotten it. This means RetireWorker (and all AMM policies) may lead to lost data, when they shouldn't. cc @crusaderky
  • maybe the OSError handling logic should also send missing-data to the scheduler here-ish (not in a for-loop though)? cc @mrocklin
  • Worker.find_missing should definitely take action if the scheduler reports nobody (besides it) holds the key that's missing. That's a clear state mismatch, and we shouldn't assume anything else will resolve it. cc @fjetter @mrocklin
  • Include Worker._missing_dep_flight in cluster dumps #6243

I think maybe a change like this would be a good approach (not this doesn't plumb stimulus_id through in other calls to update_who_has, so it will fail as is. Also not sure if update_who_has is always the right place to do this.):

diff --git a/distributed/worker.py b/distributed/worker.py
index e5563ed0..04c6bfd6 100644
--- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -3202,7 +3202,7 @@ class Worker(ServerNode):
                 keys=[ts.key for ts in self._missing_dep_flight],
             )
             who_has = {k: v for k, v in who_has.items() if v}
-            self.update_who_has(who_has)
+            self.update_who_has(who_has, stimulus_id=stimulus_id)
             recommendations: Recs = {}
             for ts in self._missing_dep_flight:
                 if ts.who_has:
@@ -3216,7 +3216,9 @@ class Worker(ServerNode):
             ].callback_time = self.periodic_callbacks["heartbeat"].callback_time
             self.ensure_communicating()
 
-    def update_who_has(self, who_has: dict[str, Collection[str]]) -> None:
+    def update_who_has(
+        self, who_has: dict[str, Collection[str]], *, stimulus_id: str
+    ) -> None:
         try:
             for dep, workers in who_has.items():
                 if not workers:
@@ -3232,6 +3234,21 @@ class Worker(ServerNode):
                         )
                         # Do not mutate the input dict. That's rude
                         workers = set(workers) - {self.address}
+                        if not workers:
+                            logger.warning(
+                                f"Scheduler claims this worker {self.address} is the only one holding {dep!r}, which is not true. "
+                                f"{dep!r} likely needs to be recomputed."
+                            )
+                            self.batched_stream.send(
+                                {
+                                    "op": "missing-data",
+                                    "errant_worker": self.address,
+                                    "key": dep,
+                                    "stimulus_id": stimulus_id,
+                                }
+                            )
+                            continue
+
                     dep_ts.who_has.update(workers)
 
                     for worker in workers:
@gjoseph92 gjoseph92 added bug Something is broken deadlock The cluster appears to not make any progress labels Apr 28, 2022
@mrocklin
Copy link
Member

I'm curious, what is the end state of this? How does this present to the user? Or rather, what does state look like that makes things seem odd? A worker task in a fetch state, and in some while loop asking the scheduler for who has it? There is some unhandled exception?

I think that the intent of my question is to ask "is there an obvious failure on the worker or scheduler side where we can just throw up our hands and say 'screw it, worker.close'"

@gjoseph92
Copy link
Collaborator Author

Something like this is all I saw in worker logs:

2022-04-28 19:31:00,998 - distributed.worker - ERROR - Worker stream died during communication: tcp://10.124.227.26:35167
Traceback (most recent call last):
    File "/usr/src/python/distributed/distributed/comm/tcp.py", line 439, in connect stream = await self.client.connect(
    File "/usr/local/lib/python3.9/site-packages/tornado/tcpclient.py", line 275, in connect af, addr, stream = await connector.start(connect_timeout=timeout) asyncio.exceptions.CancelledError
    During handling of the above exception, another exception occurred:
    Traceback (most recent call last):
        File "/usr/local/lib/python3.9/asyncio/tasks.py", line 490, in wait_for return fut.result() asyncio.exceptions.CancelledError
    The above exception was the direct cause of the following exception:
    Traceback (most recent call last):
        File "/usr/src/python/distributed/distributed/comm/core.py", line 289, in connect comm = await asyncio.wait_for(
        File "/usr/local/lib/python3.9/asyncio/tasks.py", line 492, in wait_for raise exceptions.TimeoutError() from exc asyncio.exceptions.TimeoutError
    The above exception was the direct cause of the following exception:
    Traceback (most recent call last):
        File "/usr/src/python/distributed/distributed/worker.py", line 3082, in gather_dep response = await get_data_from_worker(
        File "/usr/src/python/distributed/distributed/worker.py", line 4408, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker")
        File "/usr/src/python/distributed/distributed/utils_comm.py", line 381, in retry_operation return await retry(
        File "/usr/src/python/distributed/distributed/utils_comm.py", line 366, in retry return await coro()
        File "/usr/src/python/distributed/distributed/worker.py", line 4385, in _get_data comm = await rpc.connect(worker)
        File "/usr/src/python/distributed/distributed/core.py", line 1163, in connect return await connect_attempt
        File "/usr/src/python/distributed/distributed/core.py", line 1099, in _connect comm = await connect(
        File "/usr/src/python/distributed/distributed/comm/core.py", line 315, in connect raise OSError( OSError: Timed out trying to connect to tcp://10.124.227.26:35167 after 30 s

This presents to the user just like any other deadlock: tasks in processing, workers not executing anything, nothing happening.

is there an obvious failure on the worker or scheduler side where we can just throw up our hands and say 'screw it, worker.close'"

diff --git a/distributed/worker.py b/distributed/worker.py
index e5563ed0..35c5e800 100644
--- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -3232,6 +3232,9 @@ class Worker(ServerNode):
                         )
                         # Do not mutate the input dict. That's rude
                         workers = set(workers) - {self.address}
+                        if not workers:
+                            # something is broken, shut down desperately!
+                            sys.exit(1)
                     dep_ts.who_has.update(workers)
 
                     for worker in workers:

would probably do it. But there might be a better place for it (like in ensure_communicating, if the only who_has for a key is us, and we don't have the key already?).

If you're asking, "could we @fail_hard on some exception?" I don't think so.

@mrocklin
Copy link
Member

I think that @bnaul also answered my question somewhat over here: #6245

@bnaul
Copy link
Contributor

bnaul commented Apr 28, 2022

Re-posting here from #6245 to consolidate

Similar circumstances to #6223 and #6198 but different symptoms. In particular the workload is identical but this is running with the changes from #6234, which seems to perhaps have addressed that specific symptom but uncovered another.

Now the stuck workers (in this case there are 2) are responsive and show up as "running", but their tasks never execute.

In [52]: running = {k: v for k, v in c.processing().items() if v}
In [53]: running
Out[53]:
{'tcp://10.124.15.5:35871': ('partition_inputs-256b735f58774d8cb99d30daf41cc187',
  "('_split-bec215abf9d605cb60c27727e9a88b7e', 5726)"),
 'tcp://10.125.88.48:37099': ("('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",)}

In [63]: {k: c.run_on_scheduler(lambda dask_scheduler: {k: str(v) for k, v in dask_scheduler.workers.items()})[k] for k, v in running.items()}
Out[63]:
{'tcp://10.124.15.5:35871': "<WorkerState 'tcp://10.124.15.5:35871', name: brett-2ad617c8-daskworkers-6c659d57d6-hfblx, status: running, memory: 510, processing: 2>",
 'tcp://10.125.88.48:37099': "<WorkerState 'tcp://10.125.88.48:37099', name: brett-2ad617c8-daskworkers-6c659d57d6-bgf69, status: running, memory: 36, processing: 1>"}

We were having issues pulling an actual cluster dump but @gjoseph92 suggested this alternative:

In [36]: logs = c.run(lambda dask_worker: dask_worker._to_dict(exclude=("run_spec",)), workers=["tcp://10.125.88.48:37099", "tcp://10.124.15.5:35871"])

which I've uploaded here.

@mrocklin
Copy link
Member

@bnaul I hope that you're having a good time with this. If not, you could consider a WorkerPlugin that tracked Worker._transition_counter and decided to kill the worker if the transition counter hadn't changed in some suitable time, but if you still had tasks in some running state. That might help you avoid some pain short-term.

@fjetter
Copy link
Member

fjetter commented Apr 29, 2022

Based on the above story, two things are happening here

  1. The first part of the story is a plain and simple transition error. What we see is
signal start finish
fetch this key release fetch
compute this key fetch released
... released forgotten ⚠️

which should be instead

signal start finish
fetch this key release fetch
compute this key fetch released
... released waiting ✅

TLDR The worker is being told to compute something but ignores the request and instead forgets everything about the key

  1. The second part of the log that shows the missing-who-has log is a red herring. At the very least this does not belong to the log of the first story since the IPs don't match up. We cannot infer what this means without the context of the other worker or the scheduler (that's why partial dumps are sometimes problematic).

I should be able to reproduce the first transition problem

@fjetter
Copy link
Member

fjetter commented Apr 29, 2022

What version did this occur on?

@mrocklin
Copy link
Member

I suspect that it was run on #6234

@bnaul
Copy link
Contributor

bnaul commented Apr 29, 2022

^ correct

@fjetter
Copy link
Member

fjetter commented Apr 29, 2022

got a reproducer of the issue w/ the same story

@gen_cluster(client=True)
async def test_fetch_via_amm_to_compute(c, s, a, b):
    # Block ensure_communicating to ensure we indeed know that the task is in
    # fetch and doesn't leave it accidentally
    old_out_connections, b.total_out_connections = b.total_out_connections, 0
    old_comm_threshold, b.comm_threshold_bytes = b.comm_threshold_bytes, 0

    f1 = c.submit(inc, 1, workers=[a.address], key="f1", allow_other_workers=True)

    await f1
    s.request_acquire_replicas(b.address, [f1.key], stimulus_id=f"test")

    await wait_for_state(f1.key, "fetch", b)
    await a.close()

    b.total_out_connections = old_out_connections
    b.comm_threshold_bytes = old_comm_threshold

    await f1

@crusaderky
Copy link
Collaborator

crusaderky commented Apr 29, 2022

request_acquire_replicas optimistically adds the key to the worker's who_has, before the worker has actually confirmed it's received it.

This is definitely not right and it will cause a race condition.
If you switch it to the other way around, however, you have to make sure that tasks that consistently take more than 2s to transfer don't bounce between fetch and flight forever. XREF #5759.

request_remove_replicas also eagerly updates scheduler state so worker A no longer holds the key (this is also not true, but it's more reasonable).

This is deliberate, and designed specifically to avoid race conditions. The worker will lose the key soon, but the scheduler can't know when, so it should not rely on it. If for whatever reason the worker refuses, it will readd the key. See test_remove_replicas_while_computing.

@gjoseph92 gjoseph92 self-assigned this Apr 29, 2022
@crusaderky
Copy link
Collaborator

request_acquire_replicas optimistically adds the key to the worker's who_has, before the worker has actually confirmed it's received it.

Correction: this is not true. There is nothing in request_acquire_replicas that eagerly adds the recipient worker to who_has.
The scheduler waits for the whole round-trip, as it should:

  1. the scheduler suggest that the worker should acquire the replica through "op": "acquire-replicas"
  2. the worker calls gather_dep (not necessarily right away, if its comms are under heavy load)
  3. the worker waits until the replica arrives
  4. the worker informs the scheduler
  5. the scheduler updates who_has.

If this whole round-trip doesn't happen within 2 seconds, the AMM will repeat the suggestion - which is somewhat problematic if the transfer is genuinely slow, as described in #5759, but won't cause a deadlock.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something is broken deadlock The cluster appears to not make any progress
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants