Skip to content

Commit

Permalink
Deprecate WorkerState accessors (#6579)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky authored Jun 16, 2022
1 parent 1797863 commit 8aab5e8
Show file tree
Hide file tree
Showing 24 changed files with 398 additions and 372 deletions.
16 changes: 8 additions & 8 deletions distributed/dashboard/components/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ def update(self):
w = self.worker
d = {
"Stored": [len(w.data)],
"Executing": ["%d / %d" % (w.executing_count, w.nthreads)],
"Ready": [len(w.ready)],
"Waiting": [w.waiting_for_data_count],
"Connections": [len(w.in_flight_workers)],
"Executing": ["%d / %d" % (w.state.executing_count, w.state.nthreads)],
"Ready": [len(w.state.ready)],
"Waiting": [w.state.waiting_for_data_count],
"Connections": [len(w.state.in_flight_workers)],
"Serving": [len(w._comms)],
}
update(self.source, d)
Expand Down Expand Up @@ -225,7 +225,7 @@ def __init__(self, worker, **kwargs):
fig = figure(
title="Communication History",
x_axis_type="datetime",
y_range=[-0.1, worker.total_out_connections + 0.5],
y_range=[-0.1, worker.state.total_out_connections + 0.5],
height=150,
tools="",
x_range=x_range,
Expand All @@ -247,7 +247,7 @@ def update(self):
{
"x": [time() * 1000],
"out": [len(self.worker._comms)],
"in": [len(self.worker.in_flight_workers)],
"in": [len(self.worker.state.in_flight_workers)],
},
10000,
)
Expand All @@ -263,7 +263,7 @@ def __init__(self, worker, **kwargs):
fig = figure(
title="Executing History",
x_axis_type="datetime",
y_range=[-0.1, worker.nthreads + 0.1],
y_range=[-0.1, worker.state.nthreads + 0.1],
height=150,
tools="",
x_range=x_range,
Expand All @@ -281,7 +281,7 @@ def __init__(self, worker, **kwargs):
@log_errors
def update(self):
self.source.stream(
{"x": [time() * 1000], "y": [self.worker.executing_count]}, 1000
{"x": [time() * 1000], "y": [self.worker.state.executing_count]}, 1000
)


Expand Down
6 changes: 3 additions & 3 deletions distributed/dashboard/tests/test_scheduler_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ async def test_stealing_events(c, s, a, b):
await wait(futures)
se.update()
assert len(first(se.source.data.values()))
assert b.tasks
assert sum(se.source.data["count"]) >= len(b.tasks)
assert b.state.tasks
assert sum(se.source.data["count"]) >= len(b.state.tasks)


@gen_cluster(client=True)
Expand All @@ -133,7 +133,7 @@ async def test_events(c, s, a, b):
slowinc, range(100), delay=0.1, workers=a.address, allow_other_workers=True
)

while not b.tasks:
while not b.state.tasks:
await asyncio.sleep(0.01)

e.update()
Expand Down
4 changes: 2 additions & 2 deletions distributed/deploy/tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def test_procs():
assert len(c.workers) == 2
assert all(isinstance(w, Worker) for w in c.workers.values())
with Client(c.scheduler.address) as e:
assert all(w.nthreads == 3 for w in c.workers.values())
assert all(w.state.nthreads == 3 for w in c.workers.values())
assert all(isinstance(w, Worker) for w in c.workers.values())
repr(c)

Expand Down Expand Up @@ -324,7 +324,7 @@ async def test_defaults_2():
dashboard_address=":0",
asynchronous=True,
) as c:
assert sum(w.nthreads for w in c.workers.values()) == CPU_COUNT
assert sum(w.state.nthreads for w in c.workers.values()) == CPU_COUNT
assert all(isinstance(w, Worker) for w in c.workers.values())
assert len(c.workers) == 1

Expand Down
12 changes: 6 additions & 6 deletions distributed/deploy/tests/test_spec_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ async def test_specification():
assert isinstance(cluster.workers[1], Worker)
assert isinstance(cluster.workers["my-worker"], MyWorker)

assert cluster.workers[0].nthreads == 1
assert cluster.workers[1].nthreads == 2
assert cluster.workers["my-worker"].nthreads == 3
assert cluster.workers[0].state.nthreads == 1
assert cluster.workers[1].state.nthreads == 2
assert cluster.workers["my-worker"].state.nthreads == 3

async with Client(cluster, asynchronous=True) as client:
result = await client.submit(lambda x: x + 1, 10)
Expand All @@ -69,9 +69,9 @@ def test_spec_sync(loop):
assert isinstance(cluster.workers[1], Worker)
assert isinstance(cluster.workers["my-worker"], MyWorker)

assert cluster.workers[0].nthreads == 1
assert cluster.workers[1].nthreads == 2
assert cluster.workers["my-worker"].nthreads == 3
assert cluster.workers[0].state.nthreads == 1
assert cluster.workers[1].state.nthreads == 2
assert cluster.workers["my-worker"].state.nthreads == 3

with Client(cluster, loop=loop) as client:
assert cluster.loop is cluster.scheduler.loop
Expand Down
10 changes: 5 additions & 5 deletions distributed/diagnostics/tests/test_worker_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ async def test_normal_task_transitions_called(c, s, w):

await c.register_worker_plugin(plugin)
await c.submit(lambda x: x, 1, key="task")
await async_wait_for(lambda: not w.tasks, timeout=10)
await async_wait_for(lambda: not w.state.tasks, timeout=10)


@gen_cluster(nthreads=[("127.0.0.1", 1)], client=True)
Expand Down Expand Up @@ -148,7 +148,7 @@ async def test_superseding_task_transitions_called(c, s, w):

await c.register_worker_plugin(plugin)
await c.submit(lambda x: x, 1, key="task", resources={"X": 1})
await async_wait_for(lambda: not w.tasks, timeout=10)
await async_wait_for(lambda: not w.state.tasks, timeout=10)


@gen_cluster(nthreads=[("127.0.0.1", 1)], client=True)
Expand All @@ -174,7 +174,7 @@ async def test_dependent_tasks(c, s, w):

await c.register_worker_plugin(plugin)
await c.get(dsk, "task", sync=False)
await async_wait_for(lambda: not w.tasks, timeout=10)
await async_wait_for(lambda: not w.state.tasks, timeout=10)


@gen_cluster(nthreads=[("127.0.0.1", 1)], client=True)
Expand Down Expand Up @@ -207,7 +207,7 @@ class Dummy(WorkerPlugin):
with warnings.catch_warnings(record=True) as record:
await c.register_worker_plugin(Dummy())
assert await c.submit(inc, 1, key="x") == 2
while "x" in a.tasks:
while "x" in a.state.tasks:
await asyncio.sleep(0.01)

assert not record
Expand Down Expand Up @@ -235,7 +235,7 @@ def teardown(self, worker):
await c.submit(inc, 0)
assert w.foo == 123

while s.tasks or w.tasks:
while s.tasks or w.state.tasks:
await asyncio.sleep(0.01)

class MyCustomPlugin(WorkerPlugin):
Expand Down
4 changes: 2 additions & 2 deletions distributed/http/scheduler/tests/test_scheduler_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ async def fetch_metrics():

# submit a task which should show up in the prometheus scraping
future = c.submit(slowinc, 1, delay=0.5)
while not any(future.key in w.tasks for w in [a, b]):
while not any(future.key in w.state.tasks for w in [a, b]):
await asyncio.sleep(0.001)

active_metrics, forgotten_tasks = await fetch_metrics()
Expand All @@ -157,7 +157,7 @@ async def fetch_metrics():

future.release()

while any(future.key in w.tasks for w in [a, b]):
while any(future.key in w.state.tasks for w in [a, b]):
await asyncio.sleep(0.001)

active_metrics, forgotten_tasks = await fetch_metrics()
Expand Down
10 changes: 5 additions & 5 deletions distributed/http/worker/prometheus/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,21 @@ def collect(self):
labels=["state"],
)
tasks.add_metric(["stored"], len(self.server.data))
tasks.add_metric(["executing"], self.server.executing_count)
tasks.add_metric(["ready"], len(self.server.ready))
tasks.add_metric(["waiting"], self.server.waiting_for_data_count)
tasks.add_metric(["executing"], self.server.state.executing_count)
tasks.add_metric(["ready"], len(self.server.state.ready))
tasks.add_metric(["waiting"], self.server.state.waiting_for_data_count)
yield tasks

yield GaugeMetricFamily(
self.build_name("concurrent_fetch_requests"),
"Number of open fetch requests to other workers.",
value=len(self.server.in_flight_workers),
value=len(self.server.state.in_flight_workers),
)

yield GaugeMetricFamily(
self.build_name("threads"),
"Number of worker threads.",
value=self.server.nthreads,
value=self.server.state.nthreads,
)

yield GaugeMetricFamily(
Expand Down
16 changes: 8 additions & 8 deletions distributed/tests/test_active_memory_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,12 +459,12 @@ async def test_drop_with_paused_workers_with_running_tasks_1(c, s, a, b):
x = (await c.scatter({"x": 1}, broadcast=True))["x"]
y = c.submit(slowinc, x, delay=2.5, key="y", workers=[a.address])

while "y" not in a.tasks or a.tasks["y"].state != "executing":
while "y" not in a.state.tasks or a.state.tasks["y"].state != "executing":
await asyncio.sleep(0.01)
a.status = Status.paused
while s.workers[a.address].status != Status.paused:
await asyncio.sleep(0.01)
assert a.tasks["y"].state == "executing"
assert a.state.tasks["y"].state == "executing"

s.extensions["amm"].run_once()
await y
Expand Down Expand Up @@ -509,7 +509,7 @@ async def test_drop_with_paused_workers_with_running_tasks_3_4(c, s, a, b, pause
"""
x = (await c.scatter({"x": 1}, broadcast=True))["x"]
y = c.submit(slowinc, x, delay=2.5, key="y", workers=[a.address])
while "y" not in a.tasks or a.tasks["y"].state != "executing":
while "y" not in a.state.tasks or a.state.tasks["y"].state != "executing":
await asyncio.sleep(0.01)

if pause:
Expand All @@ -519,7 +519,7 @@ async def test_drop_with_paused_workers_with_running_tasks_3_4(c, s, a, b, pause
await asyncio.sleep(0.01)

assert s.tasks["y"].state == "processing"
assert a.tasks["y"].state == "executing"
assert a.state.tasks["y"].state == "executing"

s.extensions["amm"].run_once()
await y
Expand All @@ -544,10 +544,10 @@ async def test_drop_with_paused_workers_with_running_tasks_5(c, s, w1, w2, w3):

def executing() -> bool:
return (
"y1" in w1.tasks
and w1.tasks["y1"].state == "executing"
and "y2" in w3.tasks
and w3.tasks["y2"].state == "executing"
"y1" in w1.state.tasks
and w1.state.tasks["y1"].state == "executing"
and "y2" in w3.state.tasks
and w3.state.tasks["y2"].state == "executing"
)

while not executing():
Expand Down
14 changes: 7 additions & 7 deletions distributed/tests/test_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ async def test_client_actions(s, a, b, direct_to_workers):

assert counter._address == a.address

assert isinstance(a.actors[counter.key], Counter)
assert isinstance(a.state.actors[counter.key], Counter)
assert s.tasks[counter.key].actor

await asyncio.gather(counter.increment(), counter.increment())
Expand Down Expand Up @@ -186,7 +186,7 @@ async def test_gc(c, s, a, b):
await wait(actor)
del actor

while a.actors or b.actors:
while a.state.actors or b.state.actors:
await asyncio.sleep(0.01)


Expand All @@ -200,15 +200,15 @@ async def test_track_dependencies(c, s, a, b):

await asyncio.sleep(0.3)

assert a.actors or b.actors
assert a.state.actors or b.state.actors


@gen_cluster(client=True)
async def test_future(c, s, a, b):
counter = c.submit(Counter, actor=True, workers=[a.address])
assert isinstance(counter, Future)
await wait(counter)
assert isinstance(a.actors[counter.key], Counter)
assert isinstance(a.state.actors[counter.key], Counter)

counter = await counter
assert isinstance(counter, Actor)
Expand Down Expand Up @@ -364,7 +364,7 @@ def add(n, counter):
while not done.done():
assert (
len([ws for ws in s.workers.values() if ws.processing])
<= a.nthreads + b.nthreads
<= a.state.nthreads + b.state.nthreads
)
await asyncio.sleep(0.01)

Expand Down Expand Up @@ -430,7 +430,7 @@ def __init__(self, x, y=None):
actors = c.map(Foo, range(10), y=b, actor=True)
await wait(actors)

assert all(len(w.actors) == 2 for w in workers)
assert all(len(w.state.actors) == 2 for w in workers)


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 4, Worker=Nanny)
Expand Down Expand Up @@ -588,7 +588,7 @@ async def test_worker_actor_handle_is_weakref(c, s, a, b):
del counter

start = time()
while a.actors or b.data:
while a.state.actors or b.data:
await asyncio.sleep(0.1)
assert time() < start + 30

Expand Down
Loading

0 comments on commit 8aab5e8

Please sign in to comment.