-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RFC Sign every compute task with a unique counter to correlated responses #7372
Conversation
I realize this may not be entirely sufficient for the purposes of shuffling since the attempt counter on worker side may be bumped by a ComputeTaskEvent after the task finished on the threadpool but before the response was generated. Will need to think a bit more about this |
The current implementation would not be sufficient for my shuffle requirements due to the cancelled->resumed logic. Interestingly, we may be able to drop cancelled(exceuting) and the resumed state entirely with this counter. Will follow shortly with some explanation... |
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 19 files - 3 19 suites - 3 13h 45m 16s ⏱️ + 3h 57m 12s For more details on these failures and errors, see this check. Results for commit 4ef8975. ± Comparison against base commit 9d8e18c. ♻️ This comment has been updated with latest results. |
The gist of removing cancelled/resumed: At the very least for the execute call this attempt counter would allow us to immediately release a task even if it is still on the threadpool. The threadpool would still be blocked but the task state can be released immediately assuming the If it is identical, the task would be transitioned to successful as usual. If the counter shows any drift, the worker throws away the result, regardless of success or failure and simply issues a "new" Regarding the (FYI instead of counters, any arbitrary unique identifiers can be used. ordering is not important but counters are simple and fast) |
#7356 suggested to use the transition counter to sign the task. That would also work, we don't necessarily need to know about the "attempt", i.e. that's a bit less state on the scheduler |
This fixes `test_deadlock_resubmit_queued_tasks_fast`. The problem: - an "old" TaskState `f-0` is forgotten and removed from `queued` HeapSet. Internally, this removes it from a set, but a weakref is still on the heap - a new TaskState `f-0` is added to the HeapSet. It's pushed onto the heap, but since its priority is higher (newer generation), it comes after the old `f-0` object that's still on the front of the heap - `pop` pops old `f-0` off the heap. The weakref is still alive (for whatever reason). `value in self._data` (the set) is True, because the _hash_ of old `f-0` and new `f-0` are the same (same key). So we return the stale, old TaskState object. Much like `WorkerState`s, there should be exactly 1 `TaskState` instance per task. If there are multiple instances with the same key, they are different tasks, and have different state. xref dask#7372, dask#7356.
79d9e65
to
4ef8975
Compare
@@ -1354,6 +1362,7 @@ def __init__(self, key: str, run_spec: object, state: TaskStateState): | |||
self.metadata = {} | |||
self.annotations = {} | |||
self.erred_on = set() | |||
self._attempt = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, this means that if we re-create a task, e.g. by re-running a workload after it has been forgotten, its attempt count would be reset to 0. This would mean that the attempt counter is not unique. We should use a global counter instead that we also actively increment. transition_counter
would be one possible candidate if we make sure to increment it every time we update _attempt
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure but I'd use a different name for this then since it's no longer an attempt
This was proposed in the context of shuffle resilience, see #6105 and primarily #7353
This introduces a new unique counter in the TaskState objects that counts attempts of assigning the task to a worker. This attempt counter is unique and is used to sign the
compute-task
message. The Worker is expected to respond sign the response accordingly. The scheduler in turn can use this to correlatetask-finished
messages with the sent compute messages and can intentionally reject/accept a response.In an ancient version of this code base we had a similar check at this position where we verified that a task-finished message would originate from the worker the task is
processing_on
. I don't exactly remember what happened to this check but I cannot find it any longer. This is a different take on this but practically the same functionality.