-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Yank state machine out of Worker class #6566
Conversation
62ffa83
to
4e882ab
Compare
4e882ab
to
f953262
Compare
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 15 files ± 0 15 suites ±0 6h 1m 54s ⏱️ - 16m 13s Results for commit 82bcf8b. ± Comparison against base commit 0fcc724. ♻️ This comment has been updated with latest results. |
0eeffe1
to
d45916f
Compare
d45916f
to
ac28a32
Compare
} | ||
info["data"] = list(self.data) | ||
info = {k: v for k, v in self.__dict__.items() if not k.startswith("_")} | ||
info["data"] = dict.fromkeys(self.data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For coherence with various equivalent operations in worker_state_machine:
WorkerState._to_dict
GatherDepSuccessEvent
ExecuteSuccessEvent
.. currentmodule:: distributed.worker_state_machine | ||
|
||
.. autoclass:: distributed.worker_state_machine.TaskState | ||
:members: | ||
|
||
.. autoclass:: distributed.worker_state_machine.WorkerState | ||
:members: | ||
|
||
.. autoclass:: distributed.worker_state_machine.BaseWorker | ||
:members: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These should probably be moved to a separate document as part of #5413.
@@ -609,7 +545,7 @@ def __init__( | |||
profile_cycle_interval = parse_timedelta(profile_cycle_interval, default="ms") | |||
assert profile_cycle_interval | |||
|
|||
self._setup_logging(logger) | |||
self._setup_logging(logger, wsm_logger) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should I also add distributed.worker_memory.logger
?
@@ -609,7 +545,7 @@ def __init__( | |||
profile_cycle_interval = parse_timedelta(profile_cycle_interval, default="ms") | |||
assert profile_cycle_interval | |||
|
|||
self._setup_logging(logger) | |||
self._setup_logging(logger, wsm_logger) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should I also add distributed.worker_memory.logger
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest a follow up PR. I have thoughts on logging and would prefer this to be in a dedicated PR
if TYPE_CHECKING: | ||
# TODO import from typing (requires Python >=3.10) | ||
from typing_extensions import TypeAlias | ||
|
||
# Circular imports | ||
from distributed.actor import Actor | ||
from distributed.diagnostics.plugin import WorkerPlugin | ||
from distributed.worker import Worker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is only for the sake of the deprecation machinery
|
||
.. note:: | ||
The data attributes of this class are implementation details and may be | ||
changed without a deprecation cycle. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMPORTANT
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with this but if we encounter too many problems with this, we may need to reconsider
# assert self.waiting_for_data_count == waiting_for_data_count | ||
for worker, keys in self.has_what.items(): | ||
def validate_state(self) -> None: | ||
assert len(self.executing) >= 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes in this method are just a de-indentation.
Error control has remained in Worker.validate_state
.
ac28a32
to
82bcf8b
Compare
Ready for review and merge! |
@dataclass | ||
class PauseEvent(StateMachineEvent): | ||
__slots__ = () |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
follow up topic: I'm wondering if we should move the definition of the events into another file to keep file sizes smaller. The event definition + import is already at almost 1k lines of code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
Instructions + events definition amounts to 525 lines.
I explored changing the imports in worker.py to
import distributed.worker_state_machine as wsm
and add wsm.
in front of everything, but I found it rather unpleasing to the eye.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What we could do is move all the @_handle_event.register
methods of WorkerState to methods of their respective events and then have them self-register to the WorkerState and Worker classes in a plugin style:
class StateMachineEvent:
@abc.abstractmethod
def handle(self, state: Workerstate) -> RecsInstrs:
...
class WorkerState:
def _handle_event(self, ev: StateMachineEvent) -> RecsInstrs:
return ev.handle(self)
At that point it would totally make sense to move the events to a separate file.
Should I investigate it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instructions + events definition amounts to 525 lines.
My counting also included the TaskState, exceptions, imports, etc. never mind
Should I investigate it?
I don't hate the idea but I am not sure if it is worth it. After all, the handlers are modifying the WorkerState more or less directly. I think we should not overdo it and stop at this point and see how it feels for a while before engaging in the next iteration of refactoring.
Right now, the events are mostly dataclasses and I think that's good for now
|
||
.. note:: | ||
The data attributes of this class are implementation details and may be | ||
changed without a deprecation cycle. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with this but if we encounter too many problems with this, we may need to reconsider
async def set_resources(self, **resources) -> None: | ||
for r, quantity in resources.items(): | ||
if r in self.total_resources: | ||
self.available_resources[r] += quantity - self.total_resources[r] | ||
self.state.available_resources[r] += quantity - self.total_resources[r] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
follow up: this should be encapsulated and communicated via an event. After all, this could / should trigger transitions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
""" | ||
if self.status not in WORKER_ANY_RUNNING: | ||
return None | ||
|
||
try: | ||
self.log.append(("request-dep", worker, to_gather, stimulus_id, time())) | ||
self.state.log.append( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
follow up: I don't think the worker class should log anythign on the state. I think these messages should be removed entirely, see also #6442 (comment)
expect = { | ||
"address": "127.0.0.1.1234", | ||
"busy_workers": [], | ||
"constrained": [], | ||
"data": {"y": None}, | ||
"data_needed": ["x"], | ||
"data_needed_per_worker": {"127.0.0.1:1235": ["x"]}, | ||
"executing": [], | ||
"in_flight_tasks": [], | ||
"in_flight_workers": {}, | ||
"log": [ | ||
["x", "ensure-task-exists", "released", "s1"], | ||
["x", "released", "fetch", "fetch", {}, "s1"], | ||
["y", "put-in-memory", "s2"], | ||
["y", "receive-from-scatter", "s2"], | ||
], | ||
"long_running": [], | ||
"nthreads": 8, | ||
"ready": [], | ||
"running": True, | ||
"stimulus_log": [ | ||
{ | ||
"cls": "AcquireReplicasEvent", | ||
"stimulus_id": "s1", | ||
"who_has": {"x": ["127.0.0.1:1235"]}, | ||
}, | ||
{ | ||
"cls": "UpdateDataEvent", | ||
"data": {"y": None}, | ||
"report": False, | ||
"stimulus_id": "s2", | ||
}, | ||
], | ||
"tasks": { | ||
"x": { | ||
"key": "x", | ||
"priority": [1], | ||
"state": "fetch", | ||
"who_has": ["127.0.0.1:1235"], | ||
}, | ||
"y": { | ||
"key": "y", | ||
"nbytes": 16, | ||
"state": "memory", | ||
}, | ||
}, | ||
"transition_counter": 1, | ||
} | ||
assert actual == expect |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we'll need to change this to be less verbose. no need to block the PR, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really like this test though. It gives the reader a very good impression of what a much larger real-life dump looks like.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, but we'll need to change it for many unrelated reasons. I would like it better to have a developer documentation rendered by sphinx that would print this example instead of having it hard coded in place.
I've decided to break the PR in two: there will be a follow-up after this that enables the deprecation warning in the attributes, which in turn will require repetitive changes to thousands of unit test.
This PR is divided into three commits:
Changes of note
WorkerState.running
, which is a simplified mirror ofWorker.status
.WorkerState.running == (Worker.status == Status.running)
. Added newPauseEvent
, symmetrical to the already existingUnpauseEvent
.Out of scope