Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker stuck in closing_gracefully state #6223

Closed
bnaul opened this issue Apr 27, 2022 · 25 comments · Fixed by #6234
Closed

Worker stuck in closing_gracefully state #6223

bnaul opened this issue Apr 27, 2022 · 25 comments · Fixed by #6234
Assignees
Labels
bug Something is broken deadlock The cluster appears to not make any progress

Comments

@bnaul
Copy link
Contributor

bnaul commented Apr 27, 2022

(Note: @mrocklin heavily edited this issue, the words are his (sorry Brett to impersonate you) but the logs are coming from Brett)

We've found that workers can be stuck in a closing_gracefully state indefinitely. Here is some context:

Cluster

We're running an adaptively scaled cluster of roughly 4000 workers.

cluster = KubeHelmCluster(release_name=release_name)  # custom wrapper around dask_kubernetes.KubeCluster
cluster.adapt(minimum=0, maximum=4000, interval="10s", target_duration="60s")

Computation

Our workload is pretty simple. We're just reading a bunch of data, and then calling a map_partitions call and that's it. We see that the cluster scales up pretty fast, and then scales down pretty fast.

dd.read_parquet("gs://.../*.parquet").map_partitions(len).compute()  # ~11k parquet files

Some findings

Both the scheduler and the worker agree that it is in a closing gracefully state (this is after a long while)

In [30]: c.run_on_scheduler(lambda dask_scheduler: dask_scheduler.workers[w].status)
Out[30]: <Status.closing_gracefully: 'closing_gracefully'>

In [31]: c.run(lambda dask_worker: str(dask_worker), workers=["tcp://10.36.228.19:34273"])
Out[31]: {'tcp://10.36.228.19:34273': "<Worker 'tcp://10.36.228.19:34273', name: 128, status: closing_gracefully, stored: 1, running: 0/4, ready: 3, comm: 0, waiting: 0>"}

Interestingly, they both also agree that there are three tasks that are ready to go. An artificial call to Worker._ensure_computing at this point doesn't do anything because the first check in that method is if self.status != Status.Running: return.

Here are the logs on the scheduler that refer to this worker.

(.venv) ➜  model git:(ta_backfill) ✗ kl  brett-f4d63eb0-daskscheduler-667b4fc95-f897l | grep '10.36.228.19:34273'
2022-04-27 13:22:17,287 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://10.36.228.19:34273', name: 128, status: undefined, memory: 0, processing: 0>
2022-04-27 13:22:17,291 - distributed.scheduler - INFO - Starting worker compute stream, tcp://10.36.228.19:34273
2022-04-27 13:23:01,001 - distributed.scheduler - INFO - Retiring worker tcp://10.36.228.19:34273
2022-04-27 13:23:01,448 - distributed.active_memory_manager - INFO - Retiring worker tcp://10.36.228.19:34273; 3 keys are being moved away.
2022-04-27 13:23:02,341 - distributed.active_memory_manager - INFO - Retiring worker tcp://10.36.228.19:34273; 3 keys are being moved away.
2022-04-27 13:23:03,716 - distributed.active_memory_manager - INFO - Retiring worker tcp://10.36.228.19:34273; 2 keys are being moved away.
2022-04-27 13:23:05,544 - distributed.active_memory_manager - INFO - Retiring worker tcp://10.36.228.19:34273; no unique keys need to be moved away.

And the events from the scheduler's perspective

In [32]: c.run_on_scheduler(lambda dask_scheduler: dask_scheduler.events[w])
Out[32]:
deque([(1651065737.2876427, {'action': 'add-worker'}),
       (1651065737.629527,
        {'action': 'worker-status-change',
         'prev-status': 'undefined',
         'status': 'running'}),
       (1651065793.225713,
        {'action': 'missing-data',
         'key': "('_split-659a14d513522415a58d9ec3d470e072', 8087)"}),
       (1651065793.2970057,
        {'action': 'missing-data',
         'key': "('_split-659a14d513522415a58d9ec3d470e072', 8087)"})])

And the logs on the worker side

(.venv) ➜  model git:(ta_backfill) ✗ kl -f brett-f4d63eb0-daskworkers-584d467f76-9zctq
/usr/src/python/distributed/distributed/cli/dask_worker.py:319: FutureWarning: The --nprocs flag will be removed in a future release. It has been renamed to --nworkers.
  warnings.warn(
2022-04-27 13:22:12,055 - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.36.228.19:46595'
2022-04-27 13:22:13,553 - distributed.worker - INFO -       Start worker at:   tcp://10.36.228.19:34273
2022-04-27 13:22:13,553 - distributed.worker - INFO -          Listening to:   tcp://10.36.228.19:34273
2022-04-27 13:22:13,553 - distributed.worker - INFO -          dashboard at:         10.36.228.19:33865
2022-04-27 13:22:13,553 - distributed.worker - INFO - Waiting to connect to: tcp://brett-f4d63eb0-daskscheduler:8786
2022-04-27 13:22:13,553 - distributed.worker - INFO - -------------------------------------------------
2022-04-27 13:22:13,553 - distributed.worker - INFO -               Threads:                          4
2022-04-27 13:22:13,553 - distributed.worker - INFO -                Memory:                   7.82 GiB
2022-04-27 13:22:13,553 - distributed.worker - INFO -       Local Directory: /src/dask-worker-space/worker-ykvkqgn_
2022-04-27 13:22:13,554 - distributed.worker - INFO - -------------------------------------------------
2022-04-27 13:22:17,292 - distributed.worker - INFO -         Registered to: tcp://brett-f4d63eb0-daskscheduler:8786
2022-04-27 13:22:17,292 - distributed.worker - INFO - -------------------------------------------------
2022-04-27 13:22:17,293 - distributed.core - INFO - Starting established connection
2022-04-27 13:22:56,885 - distributed.utils_perf - INFO - full garbage collection released 105.93 MiB from 0 reference cycles (threshold: 9.54 MiB)

Thoughts

My guess is that when entering a closing_gracefully state on the worker side we don't immediately clear out ready work, when maybe we should.

Also, we should have some sanity checks so that close_gracefully can't go on for forever.

cc @crusaderky @fjetter

@fjetter fjetter added bug Something is broken deadlock The cluster appears to not make any progress labels Apr 27, 2022
@mrocklin
Copy link
Member

mrocklin commented Apr 27, 2022

I hope that this is easy to resolve (we should be able to manage state in the close_gracefully method I hope).

In the meantime, a plugin like this might be helpful.

class CloseUngracefully(WorkerPlugin):
    def setup(self, worker):
        self.worker = worker
        self.count = 0

        pc = PeriodicCallback(self.check, 1000)
        pc.start()
    
    def check(self):
        if self.worker.status == Status.closing_gracefully:
            self.count += 1

        if self.count >= 10:
            sys.exit(0)

@mrocklin
Copy link
Member

To be clear, the code above is not actually tested. It is likely wrong (but hopefully right directionally)

@gjoseph92
Copy link
Collaborator

I don't think this is the worker's fault. Status.closing_gracefully is sort of just a sentinel value on the worker, where the scheduler is telling the worker, "don't do any more work, but don't shut down yet until I tell you it's okay". The reason being that retire_workers (what adaptive scaling uses to scale down) doesn't tell the worker to shut down until its data has been copied to other workers in the cluster. The worker in question obviously can't know when all those keys have been replicated elsewhere; the scheduler is the one that has to orchestrate this.

Having a sanity timeout so that close_gracefully can't go on forever probably makes sense. But I'm guessing it should be on the scheduler side, not the workers, since the scheduler is the one coordinating anyway—fewer race conditions to worry about if we do it there. In fact, the scheduler may tell a worker that's closing_gracefully "actually wait no, don't go! we still need you!". This gets tricky to reason about if the worker might also be shutting itself too due to a deadline.


Anyway, my guess is that there's a discrepancy between RetireWorker.done

def done(self) -> bool:
"""Return True if it is safe to close the worker down; False otherwise"""
ws = self.manager.scheduler.workers.get(self.address)
if ws is None:
return True
return all(len(ts.who_has) > 1 for ts in ws.has_what)

and the cases in which the RetireWorker AMM policy removes itself from the AMM policies.

Because Scheduler._track_retire_worker is waiting for policy.done() to flip before it tells the worker to actually shut down, if the policy gets removed before it's done(), it may never become done. Or, because there's a poll interval, maybe done() actually was true at the point when the policy removed itself, but somehow things changed in between (a different worker holding the other replica suddenly died, for example), and by the time done() gets called, it no longer matches the done() criteria (and without a running policy, never will)?

@crusaderky what do you think? It still feels odd to me that the criteria for self.done() isn't just "the policy has been removed". (Some previous discussion in https://github.com/dask/distributed/pull/5381/files#r794919819.) What about:

diff --git a/distributed/active_memory_manager.py b/distributed/active_memory_manager.py
index 448a7581..b549d300 100644
--- a/distributed/active_memory_manager.py
+++ b/distributed/active_memory_manager.py
@@ -661,6 +661,8 @@ class RetireWorker(ActiveMemoryManagerPolicy):
 
     def done(self) -> bool:
         """Return True if it is safe to close the worker down; False otherwise"""
+        if self not in self.manager.policies:
+            return True
         ws = self.manager.scheduler.workers.get(self.address)
         if ws is None:
             return True

Also, the obvious place to add a timeout would be in this while not policy.done() loop. But if the timeout expires, do we want to close the worker (even if it still has essential data), or switch it back to running?

My guess is that when entering a closing_gracefully state on the worker side we don't immediately clear out ready work, when maybe we should.

This might be a nice optimization (steal tasks immediately from workers that are closing gracefully, instead of waiting for them to close, and letting the normal remove_worker handling reassign the tasks). But if the tasks on the worker depend on data that's on the worker, having work-stealing replicating data around at the same time as a RetireWorkers policy might be counterproductive. Data replication has to happen before those tasks can run elsewhere. Once RetireWorkers is done, the data is replicated. So I don't think we gain much from this. We just have to be patient and wait for data transfer.

@mrocklin
Copy link
Member

I don't think this is the worker's fault. Status.closing_gracefully is sort of just a sentinel value on the worker, where the scheduler is telling the worker, "don't do any more work, but don't shut down yet until I tell you it's okay".

Should the scheduler immediately reschedule processing tasks on other workers?

@mrocklin
Copy link
Member

@gjoseph92 do you think it is doable to get a fix that would resolve this issue in by the release on Friday?

@gjoseph92
Copy link
Collaborator

Should the scheduler immediately reschedule processing tasks on other workers?

Probably not. I discussed this in the last paragraph of my previous comment. This would only help for tasks that are queued on the worker that don't have any dependencies on the worker, which is rare enough it doesn't seem worth optimizing for right now. For tasks besides that, it adds more complexity and chaos.

do you think it is doable to get a fix that would resolve this issue in by the release on Friday?

@bnaul can you try with 2fa193a git+https://github.com/gjoseph92/distributed.git@2fa193ab0f4b8b17f721a1484ea4b2b0b70b7d05 and see if it fixes things? Not exactly what I'd merge, but tells us if we're on the right track

If it works, and @crusaderky is okay with this approach, I would be too. However, I've been trying to write a test to trigger this behavior and can't quite make it happen. If we can poke at a cluster that's doing this, it will be much faster. But if we can't figure out what the problem is or how to test it, then I'm not sure we could get a fix in by Friday. I think we can probably figure that out though, if we devote the time to it.

To be clear: the fact that we see Retiring worker tcp://10.36.228.19:34273; no unique keys need to be moved away in the logs, but we don't then see All unique keys on worker %s have been replicated elsewhere and Retired worker %s (and the worker isn't removed), is clearly a bug.

Adding a timeout to _track_retire_worker (and resuming the worker if the timeout fails) is the one other option I'd consider, but it's just a bandaid on this bug. I'd defer to @crusaderky on that.

@mrocklin
Copy link
Member

@bnaul it sounds like @gjoseph92 would benefit from access to a live cluster. When something next fails can I ask you to ping him?

Gabe, for reference Brett pings me from time to time saying "I've got a failure if you want to come play" and I drop things and we screenshare. It's been effective so far if you're interruptable.

@mrocklin
Copy link
Member

Probably not. I discussed this in the last paragraph of my previous comment. This would only help for tasks that are queued on the worker that don't have any dependencies on the worker, which is rare enough it doesn't seem worth optimizing for right now. For tasks besides that, it adds more complexity and chaos.

Well, this could be pretty common if, for example, the worker had more ready tasks than threads.

If it works, and @crusaderky is okay with this approach, I would be too. However, I've been trying to write a test to trigger this behavior and can't quite make it happen. If we can poke at a cluster that's doing this, it will be much faster. But if we can't figure out what the problem is or how to test it, then I'm not sure we could get a fix in by Friday

I would be ok getting a fix in now without a dedicated test for it, and delaying the test for after the release if we have confirmation that it resolves the live issue. I think that there is significant value here.

@mrocklin
Copy link
Member

Adding a timeout to _track_retire_worker (and resuming the worker if the timeout fails) is the one other option I'd consider, but it's just a bandaid on this bug

It sounds like this bandaid would stop the bleeding though, which I suspect would be highly appreciated by the folks that are currently bleeding out (like @bnaul). Bandaids aren't bad if they're good to have anyway. They buy us more time to think without stress.

@gjoseph92
Copy link
Collaborator

Well, this could be pretty common if, for example, the worker had more ready tasks than threads.

If those ready tasks depend on data that's on the worker, that data has to be transferred to a different worker before they can execute elsewhere. And that transferring is what RetireWorker is doing already. Agreed, adding work-stealing on top of that would be nice eventually, because it would let the worker retire sooner (by removing queued tasks) and let those tasks start running sooner (by running elsewhere).
I think that's just a nice-to-have optimization though. If most of the ready tasks don't have dependencies, the worker retirement should happen pretty quickly. (In this case here, it took 4s for the worker to be ready to be closed. That seems quick enough for now? The problem is that the close didn't actually happen.)

It sounds like this bandaid would stop the bleeding though, which I suspect would be highly appreciated by the folks that are currently bleeding out

Let's see if the patch helps. From reading the logs and the code, I think there's a very limited place where this bug can be happening. If that's not actually true, then adding the timeout is pretty easy.

@mrocklin
Copy link
Member

Let's see if the patch helps. From reading the logs and the code, I think there's a very limited place where this bug can be happening. If that's not actually true, then adding the timeout is pretty easy

Sounds good to me. In general I I like the approach of "Try to solve the deeper problem quickly. If we can't, see if there is a bandaid that we like, get that out quickly, and then refocus on the deeper problem for a longer period of time". I think that that approach balances short-term pain and long-term design. If you agree with that approach generally then I'm happy. Mostly I want to make sure that we don't spend a couple of weeks on the deeper problem leaving users feeling bad when we could stop the bleeding with an hour or two of work.

I think I'm hearing you say something similar above. Let me know if you disagree in principle.

@crusaderky
Copy link
Collaborator

crusaderky commented Apr 27, 2022

Interestingly, they both also agree that there are three tasks that are ready to go. An artificial call to Worker._ensure_computing at this point doesn't do anything because the first check in that method is if self.status != Status.Running: return.

This is a known issue: #3761
In the case of closing_gracefully, this should be just a nuisance - pending and running tasks don't stop the retirement, and the scheduler will (re)run them elsewhere as soon as the worker is retired. #3761 will just make it happen faster.

Having a sanity timeout so that close_gracefully can't go on forever probably makes sense. But I'm guessing it should be on the scheduler side, not the workers, since the scheduler is the one coordinating anyway—fewer race conditions to worry about if we do it there. In fact, the scheduler may tell a worker that's closing_gracefully "actually wait no, don't go! we still need you!". This gets tricky to reason about if the worker might also be shutting itself too due to a deadline.

This is already implemented:

if nno_rec:
# All workers are paused or closing_gracefully.
# Scheduler.retire_workers will read this flag and exit immediately.
# TODO after we implement the automatic transition of workers from paused
# to closing_gracefully after a timeout expires, we should revisit this
# code to wait for paused workers and only exit immediately if all
# workers are in closing_gracefully status.
self.no_recipients = True
logger.warning(
f"Tried retiring worker {self.address}, but {nno_rec} tasks could not "
"be moved as there are no suitable workers to receive them. "
"The worker will not be retired."
)
self.manager.policies.remove(self)

and
if policy.no_recipients:
# Abort retirement. This time we don't need to worry about race
# conditions and we can wait for a scheduler->worker->scheduler
# round-trip.
self.stream_comms[ws.address].send(
{"op": "worker-status-change", "status": prev_status.name}
)
return None, {}

The timeout here is implicit: if, while you're waiting indefinitely to copy your worker's data to apparently healthy recipients, you keep failing because they're unresponsive, then eventually the scheduler will notice (timeout) and remove them; at which point the above code will be triggered.
This won't save you from deadlocked workers that are still sending the heartbeat. I don't think a recipient's deadlock should be dealt with a timeout though.

Anyway, my guess is that there's a discrepancy between RetireWorker.done
and the cases in which the RetireWorker AMM policy removes itself from the AMM policies.

Good point, however not the right bit of code.
This one is fine:

if nno_rec:
# All workers are paused or closing_gracefully.
# Scheduler.retire_workers will read this flag and exit immediately.
# TODO after we implement the automatic transition of workers from paused
# to closing_gracefully after a timeout expires, we should revisit this
# code to wait for paused workers and only exit immediately if all
# workers are in closing_gracefully status.
self.no_recipients = True
logger.warning(
f"Tried retiring worker {self.address}, but {nno_rec} tasks could not "
"be moved as there are no suitable workers to receive them. "
"The worker will not be retired."
)
self.manager.policies.remove(self)

as it will trigger this response in the scheduler:
if policy.no_recipients:
# Abort retirement. This time we don't need to worry about race
# conditions and we can wait for a scheduler->worker->scheduler
# round-trip.
self.stream_comms[ws.address].send(
{"op": "worker-status-change", "status": prev_status.name}
)
return None, {}

This one however is vulnerable to a race condition:

self.manager.policies.remove(self)

What I think is happening:
AMM runs. There are no unique tasks in memory, so the policy detaches itself. However, there are tasks in running state. Before the next polling for done() happens, the task terminates and they become in memory - so done() will return False.

In theory, there could also be tasks in flight state (and before #6195, they could also be in fetch state). Before the next polling for done() happens, they reach the worker and become in memory AND the worker they received the data from loses them - so done() will return False. This last bit is not impossible, just very very unlikely. It should never be the AMM's fault - maybe the other worker was closed ungracefully? or maybe it had been unresponsive to the scheduler for several tens of seconds but for some weird reason managed to send one last message to its peer worker, and then the scheduler declared it dead.

In theory, you could just remove that line. The policy will be uninstalled by

self.manager.policies.remove(self)
after the worker is removed.
There is one caveat, which is if you submit this:

futs = [c.submit(slowinc, 1, delay=0.5 + 2 * i, workers=[a.address]) for i in range(16)]
await c.retire_workers(a.address)

and a has 16 threads, it will take 32.5s to shut down. In other words, if at least one task, which was already running when the worker transitioned from running to closing_gracefully, transitions to memory between each iteration of the AMM, then you'll have to wait until the next iteration.

To fix it,

  1. remove line 660
  2. in its place, populate an ignorelist of tasks in running state
  3. in done(), return True if the only unique tasks in memory are in the ignorelist.
  4. Their output will be lost and recomputed somewhere else by the scheduler.

@crusaderky what do you think? It still feels odd to me that the criteria for self.done() isn't just "the policy has been removed".

The problem is that all transfers could finish successfully in 0.1s, but the policy won't rerun before 2s, and the user may have set a much slower period for the AMM - say, 60s. That's the same reason why there's a polling with a sleep in retire_workers() and not just an asyncio.Event that is set when removing the policy.

Also, the obvious place to add a timeout would be in this while not policy.done() loop.

What would a timeout protect you from?
If the other workers are unresponsive, they will eventually timeout and be removed - and the retirement will be aborted.
If the other workers are deadlocked, you have bigger problems to worry about 😉

I do not agree that we should implement a timeout in _track_retire_worker.

But if the timeout expires, do we want to close the worker (even if it still has essential data), or switch it back to running?

The current algorithm switches back to running.

This might be a nice optimization (steal tasks immediately from workers that are closing gracefully, instead of waiting for them to close, and letting the normal remove_worker handling reassign the tasks). But if the tasks on the worker depend on data that's on the worker, having work-stealing replicating data around at the same time as a RetireWorkers policy might be counterproductive.

The worst that can happen is that steal will choose one worker and RetireWorker a different one, so you'll end up with an extra replica of the data and some extra network activity. The last bit doesn't worry me, as retirement is a somewhat infrequent operation. The extra replica is also not a problem if ReduceReplicas is running - which by default currently isn't.

To clarify: this is already happening, except that it's not steal vs. RetireWorker, but the basic scheduler vs. RetireWorker after the worker has been retired.

@bnaul it sounds like @gjoseph92 would benefit from access to a live cluster.

Tip: add this to your config to dump the full reasoning of the AMM:

distributed:
  logging:
     distributed.active_memory_manager: debug
     distributed.active_memory_manager.tasks: debug

If most of the ready tasks don't have dependencies, the worker retirement should happen pretty quickly

The amount of ready tasks and their deps are inconsequential to how long it takes to retire.

@gjoseph92
Copy link
Collaborator

gjoseph92 commented Apr 28, 2022

Thanks for the great explanation @crusaderky.

eventually the scheduler will notice (timeout) and remove them

Just to note—this wasn't the case by default until very recently #6200. Another reason why having a TTL is important, but agreed it's not AMM's problem.

AMM runs. There are no unique tasks in memory, so the policy detaches itself. However, there are tasks in running state. Before the next polling for done() happens, the task terminates and they become in memory - so done() will return False.

This totally makes sense. These are the cases I was looking for. It also makes sense with @bnaul's workload (which only has root tasks).

To fix it,

  1. remove line 660
  2. in its place, populate an ignorelist of tasks in running state
  3. in done(), return True if the only unique tasks in memory are in the ignorelist.
  4. Their output will be lost and recomputed somewhere else by the scheduler.

What about the fetch scenario? We could also traverse all the processing tasks and add their dependencies to this set? Something like gjoseph92@a176149?

How much benefit do we get from leaving the RetireWorker policy around, even after it's done, and adding this ignorelist? Pros: if executing tasks finish after RetireWorker.done() is True but before the worker has fully left, maybe we get very very lucky and are able to replicate the key instead of recomputing it. Cons: it's more complex and hard to test.

Versus something simple like eae1bd4?

@mrocklin
Copy link
Member

As a stopgap: #6230

@fjetter
Copy link
Member

fjetter commented Apr 28, 2022

I have to admit that I didn't understand the AMM race condition wizardry above so maybe this is not helpful.

Since this is about a worker closing, I would like to point to two related issues that can prohibit the closing of a worker. The latter would even prohibit us from introducing an asyncio timeout on worker side as proposed in (#6230)

I've been debugging these issues as part of closing and not closing_gracefully. I'll rebase the two PRs to see where we are

@mrocklin
Copy link
Member

The latter would even prohibit us from introducing an asyncio timeout on worker side as proposed in (#6230)

Hrm, I'm not seeing the connection here yet. In practice what I saw with @bnaul 's setup was that there was nothing running in the ThreadPoolExecutor. The worker was quiet, and was just waiting on the scheduler to give it the "time to close" signal. The diff in that PR is bit wonky though, so I'll wait until the rebase and look again to see if I see what's going on.

Also, to be clear, do you think that #6230 is a bad idea?

@mrocklin
Copy link
Member

Looking at the commit in #6091, yes, I think that that would be useful. It seems somewhat orthogonal to this topic though (but still a good and easy thing to fix).

@fjetter
Copy link
Member

fjetter commented Apr 28, 2022

Hrm, I'm not seeing the connection here yet. In practice what I saw with @bnaul 's setup was that there was nothing running in the ThreadPoolExecutor. The worker was quiet, and was just waiting on the scheduler to give it the "time to close" signal.

Not claiming for this to be the problem. I just wanted to raise awareness that there is something ongoing in this space. I haven't had enough time to familiarize myself with this specific problem to make a call.
The threadpool is likely not a problem from what you're saying. The other one might explain why the worker never leaves, though.

Also, to be clear, do you think that #6230 is a bad idea?

I am reviewing. If there is actually a problem in the worker closing method, I am wondering how much this is helping. In this case only a timeout on scheduler side would help. I believe this is discussed in the above thread but I am struggling to follow the entire conversation right now

@fjetter
Copy link
Member

fjetter commented Apr 28, 2022

FWIW I 100% agree we should have a reliable timeout for graceful shutdown

@crusaderky
Copy link
Collaborator

crusaderky commented Apr 28, 2022

What about the fetch scenario? We could also traverse all the processing tasks and add their dependencies to this set? Something like gjoseph92@a176149?

Unnecessary. When an in-flight task lands and becomes in-memory, by definition there are now two replicas of it. So you won't waste any AMM iterations. In the weird edge case where the sender worker dies in the meantime, you are covered by not removing the policy - so it will just take an extra iteration of the AMM to deal with it.

@crusaderky
Copy link
Collaborator

How much benefit do we get from leaving the RetireWorker policy around, even after it's done?

It lets you stop worrying about race conditions. To clarify, the ignorelist is a nice-to-have that makes the retirement faster in the edge case of a multitude of tasks that transition from running to memory at ~2 seconds interval.

Versus something simple like eae1bd4?

This also works.

@crusaderky
Copy link
Collaborator

Also, to be clear, do you think that #6230 is a bad idea?

Yes I think it's a bad idea - it adds to the complexity of the whole system, and it does not work when the retirement is initiated from the client or the scheduler, at all.
A timeout, if any, should be in Scheduler._track_retire_worker.

@gjoseph92
Copy link
Collaborator

gjoseph92 commented Apr 28, 2022

Versus something simple like eae1bd4?

This also works.

This is my vote then. It's simple and easy to understand. And it should work just as well for the "multitude of tasks that transition from running to memory at ~2 seconds interval" scenario, right? (The worker will ignore them and retire immediately.)

@crusaderky
Copy link
Collaborator

Yes, it does. +1 from me.

@gjoseph92
Copy link
Collaborator

I'll open a PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something is broken deadlock The cluster appears to not make any progress
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants