Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert idle classification when worker-saturation is set #7278

Merged
merged 3 commits into from
Nov 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 20 additions & 12 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1504,8 +1504,12 @@ class SchedulerState:
#: Workers that are currently in running state
running: set[WorkerState]
#: Workers that are currently in running state and not fully utilized
#: Definition based on occupancy
#: (actually a SortedDict, but the sortedcontainers package isn't annotated)
idle: dict[str, WorkerState]
#: Similar to `idle`
#: Definition based on assigned tasks
idle_task_count: set[WorkerState]
#: Workers that are fully utilized. May include non-running workers.
saturated: set[WorkerState]
total_nthreads: int
Expand Down Expand Up @@ -1612,6 +1616,7 @@ def __init__(
self.extensions = {}
self.host_info = host_info
self.idle = SortedDict()
self.idle_task_count = set()
self.n_tasks = 0
self.resources = resources
self.saturated = set()
Expand Down Expand Up @@ -2146,13 +2151,16 @@ def decide_worker_rootish_queuing_enabled(self) -> WorkerState | None:
# (and actually pass in the task).
assert not math.isinf(self.WORKER_SATURATION)

if not self.idle:
if not self.idle_task_count:
# All workers busy? Task gets/stays queued.
return None

# Just pick the least busy worker.
# NOTE: this will lead to worst-case scheduling with regards to co-assignment.
ws = min(self.idle.values(), key=lambda ws: len(ws.processing) / ws.nthreads)
ws = min(
self.idle_task_count,
key=lambda ws: len(ws.processing) / ws.nthreads,
)
if self.validate:
assert not _worker_full(ws, self.WORKER_SATURATION), (
ws,
Expand Down Expand Up @@ -2791,7 +2799,7 @@ def transition_waiting_queued(self, key, stimulus_id):
worker_msgs: dict = {}

if self.validate:
assert not self.idle, (ts, self.idle)
assert not self.idle_task_count, (ts, self.idle_task_count)
_validate_ready(self, ts)

ts.state = "queued"
Expand Down Expand Up @@ -3062,25 +3070,23 @@ def check_idle_saturated(self, ws: WorkerState, occ: float = -1.0):

idle = self.idle
saturated = self.saturated
if (
self.is_unoccupied(ws, occ, p)
if math.isinf(self.WORKER_SATURATION)
else not _worker_full(ws, self.WORKER_SATURATION)
):
saturated.discard(ws)
if self.is_unoccupied(ws, occ, p):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm quite concerned that we're now calling is_unoccupied every time, even when queuing is enabled. This significantly slows down the scheduler: #7256. The urgency of fixing that was diminished by queuing being on by default and getting to skip that slow code path.

I'm not sure that a known and large scheduler performance degradation is worth avoiding hypothetical small changes to work-stealing behavior due to the changed definition of idle when queuing is on.

If we can fix #7256 before a release, then I'm happy with this change, otherwise I'd be concerned by this tradeoff.

Copy link
Collaborator

@gjoseph92 gjoseph92 Nov 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After running some benchmarks, it looks like occupancy might not have as much of an effect on end-to-end runtime as I'd expected: #7256 (comment). So I'm happy with this if we want to go with it.

For performance reasons and practicality though, I'd like to consider #7280 as another solution to #7085.

Edit: that uses occupancy too, so there's a similar performance cost. I think doing both PRs would be a good idea.

if ws.status == Status.running:
idle[ws.address] = ws
Comment on lines +3073 to 3076
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
saturated.discard(ws)
if self.is_unoccupied(ws, occ, p):
if ws.status == Status.running:
idle[ws.address] = ws
if self.is_unoccupied(ws, occ, p):
if ws.status == Status.running:
idle[ws.address] = ws
saturated.discard(ws)

This is more consistent with previous behavior. Notice that before, if a worker was occupied, but not saturated, it wouldn't be removed from the saturated set. This is probably not intentional or correct, but we're trying to match previous behavior here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. saturated.discard was always called unless the worker was truly classified as such, see
    idle = self.idle
    saturated = self.saturated
    if p < nc or occ < nc * avg / 2:
    idle[ws.address] = ws
    saturated.discard(ws)
    else:
    idle.pop(ws.address, None)
    if p > nc:
    pending: float = occ * (p - nc) / (p * nc)
    if 0.4 < pending > 1.9 * avg:
    saturated.add(ws)
    return
    saturated.discard(ws)
    so my behavior is consistent with what it was before
  2. Other than dashboard visuals, saturated is only used in stealing to avoid sorting over all workers, https://github.com/fjetter/distributed/blob/a5d686572e3289e9d7ce71c063205cc35d4a06c2/distributed/stealing.py#L422-L431 and I'm not too concerned about this since stealing is a bit erratic either way

saturated.discard(ws)
else:
idle.pop(ws.address, None)

nc = ws.nthreads
if p > nc:
pending = occ * (p - nc) / (p * nc)
if 0.4 < pending > 1.9 * (self.total_occupancy / self.total_nthreads):
saturated.add(ws)
return

saturated.discard(ws)
if not _worker_full(ws, self.WORKER_SATURATION):
if ws.status == Status.running:
self.idle_task_count.add(ws)
else:
self.idle_task_count.discard(ws)

def is_unoccupied(
self, ws: WorkerState, occupancy: float, nprocessing: int
Expand Down Expand Up @@ -4746,6 +4752,7 @@ async def remove_worker(
del self.stream_comms[address]
del self.aliases[ws.name]
self.idle.pop(ws.address, None)
self.idle_task_count.discard(ws)
self.saturated.discard(ws)
del self.workers[address]
ws.status = Status.closed
Expand Down Expand Up @@ -5333,6 +5340,7 @@ def handle_worker_status_change(
else:
self.running.discard(ws)
self.idle.pop(ws.address, None)
self.idle_task_count.discard(ws)

async def handle_request_refresh_who_has(
self, keys: Iterable[str], worker: str, stimulus_id: str
Expand Down
5 changes: 4 additions & 1 deletion distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ async def test_queued_paused_new_worker(c, s, a, b):
await asyncio.sleep(0.01)

assert not s.idle
assert not s.idle_task_count
assert not s.running

async with Worker(s.address, nthreads=2) as w:
Expand Down Expand Up @@ -446,6 +447,7 @@ async def test_queued_paused_unpaused(c, s, a, b, queue):

assert not s.running
assert not s.idle
assert not s.idle_task_count

# un-pause
a.status = Status.running
Expand All @@ -455,7 +457,8 @@ async def test_queued_paused_unpaused(c, s, a, b, queue):

if queue:
assert not s.idle # workers should have been (or already were) filled
# If queuing is disabled, all workers might already be saturated when they un-pause.
# If queuing is disabled, all workers might already be saturated when they un-pause.
assert not s.idle_task_count

await wait(final)

Expand Down