Skip to content

Commit

Permalink
Breadth-first transitions
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Jun 17, 2022
1 parent 8cadc41 commit ab0e9a1
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 19 deletions.
24 changes: 17 additions & 7 deletions distributed/tests/test_worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,23 @@ def test_WorkerState__to_dict():
"busy_workers": [],
"constrained": [],
"data": {"y": None},
"data_needed": ["x"],
"data_needed_per_worker": {"127.0.0.1:1235": ["x"]},
"data_needed": [],
"data_needed_per_worker": {"127.0.0.1:1235": []},
"executing": [],
"in_flight_tasks": [],
"in_flight_workers": {},
"in_flight_tasks": ["x"],
"in_flight_workers": {"127.0.0.1:1235": ["x"]},
"log": [
["x", "ensure-task-exists", "released", "s1"],
["x", "released", "fetch", "fetch", {}, "s1"],
["gather-dependencies", "127.0.0.1:1235", ["x"], "s1"],
[
"x",
"released",
"fetch",
"fetch",
{"x": ["flight", "127.0.0.1:1235"]},
"s1",
],
["x", "fetch", "flight", "flight", {}, "s1"],
["y", "put-in-memory", "s2"],
["y", "receive-from-scatter", "s2"],
],
Expand All @@ -137,10 +146,11 @@ def test_WorkerState__to_dict():
],
"tasks": {
"x": {
"coming_from": "127.0.0.1:1235",
"key": "x",
"nbytes": 123,
"priority": [1],
"state": "fetch",
"state": "flight",
"who_has": ["127.0.0.1:1235"],
},
"y": {
Expand All @@ -149,7 +159,7 @@ def test_WorkerState__to_dict():
"state": "memory",
},
},
"transition_counter": 1,
"transition_counter": 2,
}
assert actual == expect

Expand Down
28 changes: 16 additions & 12 deletions distributed/worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -2262,18 +2262,22 @@ def _transitions(self, recommendations: Recs, *, stimulus_id: str) -> Instructio
reach a steady state
"""
instructions = []

remaining_recs = recommendations.copy()
tasks = set()
while remaining_recs:
ts, finish = remaining_recs.popitem()
tasks.add(ts)
a_recs, a_instructions = self._transition(
ts, finish, stimulus_id=stimulus_id
)

remaining_recs.update(a_recs)
instructions += a_instructions
next_recs = {}
tasks: set[TaskState] = set()

# Perform a breadth-first pass of all transitions before the next round.
# This is important when multiple tasks are recommended to transition to 'fetch'
# state together, so that _ensure_communicating can cluster them into less calls
# to gather_dep.
while recommendations:
tasks |= recommendations.keys()
for ts, finish in recommendations.items():
a_recs, a_instructions = self._transition(
ts, finish, stimulus_id=stimulus_id
)
next_recs.update(a_recs)
instructions += a_instructions
recommendations, next_recs = next_recs, {}

if self.validate:
# Full state validation is very expensive
Expand Down

0 comments on commit ab0e9a1

Please sign in to comment.