Skip to content

Commit

Permalink
Store ready and constrained tasks in heapsets
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Jul 11, 2022
1 parent 91a9e5d commit 1ddce73
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 97 deletions.
136 changes: 123 additions & 13 deletions distributed/tests/test_resources.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import annotations

import asyncio
from time import time

import pytest

Expand All @@ -12,6 +11,13 @@
from distributed import Lock, Worker
from distributed.client import wait
from distributed.utils_test import gen_cluster, inc, lock_inc, slowadd, slowinc
from distributed.worker_state_machine import (
ComputeTaskEvent,
Execute,
ExecuteSuccessEvent,
FreeKeysEvent,
TaskFinishedMsg,
)


@gen_cluster(
Expand Down Expand Up @@ -235,20 +241,124 @@ async def test_minimum_resource(c, s, a):
assert a.state.total_resources == a.state.available_resources


@gen_cluster(client=True, nthreads=[("127.0.0.1", 2, {"resources": {"A": 1}})])
async def test_prefer_constrained(c, s, a):
futures = c.map(slowinc, range(1000), delay=0.1)
constrained = c.map(inc, range(10), resources={"A": 1})
@pytest.mark.parametrize("swap", [False, True])
@pytest.mark.parametrize("p1,p2,expect_key", [(1, 0, "y"), (0, 1, "x")])
def test_constrained_vs_ready_priority_1(ws, p1, p2, expect_key, swap):
"""If there are both ready and constrained tasks, those with the highest priority
win (note: on the Worker, priorities have their sign inverted)
"""
ws.available_resources = {"R": 1}
ws.total_resources = {"R": 1}
RR = {"resource_restrictions": {"R": 1}}

start = time()
await wait(constrained)
end = time()
assert end - start < 4
assert (
len([ts for ts in s.tasks.values() if ts.state == "memory"])
<= len(constrained) + 2
ws.handle_stimulus(ComputeTaskEvent.dummy(key="clog", stimulus_id="clog"))

stimuli = [
ComputeTaskEvent.dummy("x", priority=(p1,), stimulus_id="s1"),
ComputeTaskEvent.dummy("y", priority=(p2,), **RR, stimulus_id="s2"),
]
if swap:
stimuli = stimuli[::-1] # This must be inconsequential

instructions = ws.handle_stimulus(
*stimuli,
ExecuteSuccessEvent.dummy("clog", stimulus_id="s3"),
)
assert instructions == [
TaskFinishedMsg.match(key="clog", stimulus_id="s3"),
Execute(key=expect_key, stimulus_id="s3"),
]


@pytest.mark.parametrize("swap", [False, True])
@pytest.mark.parametrize("p1,p2,expect_key", [(1, 0, "y"), (0, 1, "x")])
def test_constrained_vs_ready_priority_2(ws, p1, p2, expect_key, swap):
"""If there are both ready and constrained tasks, but not enough available
resources, priority is inconsequential - the tasks in the ready queue are picked up.
"""
ws.nthreads = 2
ws.available_resources = {"R": 1}
ws.total_resources = {"R": 1}
RR = {"resource_restrictions": {"R": 1}}

ws.handle_stimulus(
ComputeTaskEvent.dummy(key="clog1", stimulus_id="clog1"),
ComputeTaskEvent.dummy(key="clog2", **RR, stimulus_id="clog2"),
)

# Test that both priorities and order are inconsequential
stimuli = [
ComputeTaskEvent.dummy("x", priority=(p1,), stimulus_id="s1"),
ComputeTaskEvent.dummy("y", priority=(p2,), **RR, stimulus_id="s2"),
]
if swap:
stimuli = stimuli[::-1]

instructions = ws.handle_stimulus(
*stimuli,
ExecuteSuccessEvent.dummy("clog1", stimulus_id="s3"),
)
assert instructions == [
TaskFinishedMsg.match(key="clog1", stimulus_id="s3"),
Execute(key="x", stimulus_id="s3"),
]


def test_constrained_tasks_respect_priority(ws):
ws.available_resources = {"R": 1}
ws.total_resources = {"R": 1}
RR = {"resource_restrictions": {"R": 1}}

instructions = ws.handle_stimulus(
ComputeTaskEvent.dummy(key="clog", **RR, stimulus_id="clog"),
ComputeTaskEvent.dummy(key="x1", priority=(1,), **RR, stimulus_id="s1"),
ComputeTaskEvent.dummy(key="x2", priority=(2,), **RR, stimulus_id="s2"),
ComputeTaskEvent.dummy(key="x3", priority=(0,), **RR, stimulus_id="s3"),
ExecuteSuccessEvent.dummy(key="clog", stimulus_id="s4"), # start x3
ExecuteSuccessEvent.dummy(key="x3", stimulus_id="s5"), # start x1
ExecuteSuccessEvent.dummy(key="x1", stimulus_id="s6"), # start x2
)
assert instructions == [
Execute(key="clog", stimulus_id="clog"),
TaskFinishedMsg.match(key="clog", stimulus_id="s4"),
Execute(key="x3", stimulus_id="s4"),
TaskFinishedMsg.match(key="x3", stimulus_id="s5"),
Execute(key="x1", stimulus_id="s5"),
TaskFinishedMsg.match(key="x1", stimulus_id="s6"),
Execute(key="x2", stimulus_id="s6"),
]


def test_task_cancelled_and_readded_with_resources(ws):
"""See https://github.com/dask/distributed/issues/6710
A task is enqueued without resources, then cancelled by the client, then re-added
with the same key, this time with resources.
Test that resources are respected.
"""
ws.available_resources = {"R": 1}
ws.total_resources = {"R": 1}
RR = {"resource_restrictions": {"R": 1}}

ws.handle_stimulus(
ComputeTaskEvent.dummy(key="clog", **RR, stimulus_id="s1"),
ComputeTaskEvent.dummy(key="x", stimulus_id="s2"),
)
ts = ws.tasks["x"]
assert ts.state == "ready"
assert ts in ws.ready
assert ts not in ws.constrained
assert ts.resource_restrictions == {}

ws.handle_stimulus(
FreeKeysEvent(keys=["x"], stimulus_id="clog"),
ComputeTaskEvent.dummy(key="x", **RR, stimulus_id="s2"),
)
assert s.workers[a.address].processing
ts = ws.tasks["x"]
assert ts.state == "constrained"
assert ts not in ws.ready
assert ts in ws.constrained
assert ts.resource_restrictions == {"R": 1}


@pytest.mark.skip(reason="")
Expand Down
10 changes: 4 additions & 6 deletions distributed/tests/test_steal.py
Original file line number Diff line number Diff line change
Expand Up @@ -1063,12 +1063,12 @@ async def test_steal_concurrent_simple(c, s, *workers):
await asyncio.sleep(0.1)

# ready is a heap but we don't need last, just not the next
_, victim_key = w0.state.ready[-1]
victim_key = w0.state.ready.peek().key
victim_ts = s.tasks[victim_key]

ws0 = s.workers[w0.address]
ws1 = s.workers[w1.address]
ws2 = s.workers[w2.address]
victim_ts = s.tasks[victim_key]
steal.move_task_request(victim_ts, ws0, ws1)
steal.move_task_request(victim_ts, ws0, ws2)

Expand Down Expand Up @@ -1098,8 +1098,7 @@ async def test_steal_reschedule_reset_in_flight_occupancy(c, s, *workers):
await asyncio.sleep(0.01)

# ready is a heap but we don't need last, just not the next
_, victim_key = w0.state.ready[-1]

victim_key = w0.state.ready.peek().key
victim_ts = s.tasks[victim_key]

wsA = victim_ts.processing_on
Expand Down Expand Up @@ -1157,8 +1156,7 @@ async def test_steal_worker_dies_same_ip(c, s, w0, w1):
while not w0.active_keys:
await asyncio.sleep(0.01)

victim_key = list(w0.state.ready)[-1][1]

victim_key = w0.state.ready.peek().key
victim_ts = s.tasks[victim_key]

wsA = victim_ts.processing_on
Expand Down
2 changes: 1 addition & 1 deletion distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1877,7 +1877,7 @@ def stateof(self, key: str) -> dict[str, Any]:
return {
"executing": ts.state == "executing",
"waiting_for_data": bool(ts.waiting_for_data),
"heap": key in pluck(1, self.state.ready),
"heap": ts in self.state.ready or ts in self.state.constrained,
"data": key in self.data,
}

Expand Down
Loading

0 comments on commit 1ddce73

Please sign in to comment.