Skip to content

Commit

Permalink
Review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Nov 10, 2022
1 parent b199066 commit 7215427
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 8 deletions.
15 changes: 8 additions & 7 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1509,7 +1509,7 @@ class SchedulerState:
idle: dict[str, WorkerState]
#: Similar to `idle`
#: Definition based on assigned tasks
idle_task_count: dict[str, WorkerState]
idle_task_count: set[WorkerState]
#: Workers that are fully utilized. May include non-running workers.
saturated: set[WorkerState]
total_nthreads: int
Expand Down Expand Up @@ -1616,7 +1616,7 @@ def __init__(
self.extensions = {}
self.host_info = host_info
self.idle = SortedDict()
self.idle_task_count = dict()
self.idle_task_count = set()
self.n_tasks = 0
self.resources = resources
self.saturated = set()
Expand Down Expand Up @@ -2158,7 +2158,7 @@ def decide_worker_rootish_queuing_enabled(self) -> WorkerState | None:
# Just pick the least busy worker.
# NOTE: this will lead to worst-case scheduling with regards to co-assignment.
ws = min(
self.idle_task_count.values(),
self.idle_task_count,
key=lambda ws: len(ws.processing) / ws.nthreads,
)
if self.validate:
Expand Down Expand Up @@ -3082,10 +3082,11 @@ def check_idle_saturated(self, ws: WorkerState, occ: float = -1.0):
if 0.4 < pending > 1.9 * (self.total_occupancy / self.total_nthreads):
saturated.add(ws)

self.idle_task_count.pop(ws.address, None)
if not _worker_full(ws, self.WORKER_SATURATION):
if ws.status == Status.running:
self.idle_task_count[ws.address] = ws
self.idle_task_count.add(ws)
else:
self.idle_task_count.discard(ws)

def is_unoccupied(
self, ws: WorkerState, occupancy: float, nprocessing: int
Expand Down Expand Up @@ -4751,7 +4752,7 @@ async def remove_worker(
del self.stream_comms[address]
del self.aliases[ws.name]
self.idle.pop(ws.address, None)
self.idle_task_count.pop(ws.address, None)
self.idle_task_count.discard(ws)
self.saturated.discard(ws)
del self.workers[address]
ws.status = Status.closed
Expand Down Expand Up @@ -5339,7 +5340,7 @@ def handle_worker_status_change(
else:
self.running.discard(ws)
self.idle.pop(ws.address, None)
self.idle_task_count.pop(ws.address, None)
self.idle_task_count.discard(ws)

async def handle_request_refresh_who_has(
self, keys: Iterable[str], worker: str, stimulus_id: str
Expand Down
5 changes: 4 additions & 1 deletion distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ async def test_queued_paused_new_worker(c, s, a, b):
await asyncio.sleep(0.01)

assert not s.idle
assert not s.idle_task_count
assert not s.running

async with Worker(s.address, nthreads=2) as w:
Expand Down Expand Up @@ -446,6 +447,7 @@ async def test_queued_paused_unpaused(c, s, a, b, queue):

assert not s.running
assert not s.idle
assert not s.idle_task_count

# un-pause
a.status = Status.running
Expand All @@ -455,7 +457,8 @@ async def test_queued_paused_unpaused(c, s, a, b, queue):

if queue:
assert not s.idle # workers should have been (or already were) filled
# If queuing is disabled, all workers might already be saturated when they un-pause.
# If queuing is disabled, all workers might already be saturated when they un-pause.
assert not s.idle_task_count

await wait(final)

Expand Down

0 comments on commit 7215427

Please sign in to comment.