Skip to content

Commit

Permalink
Rework some tests related to gather_dep (#6472)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky authored Jun 15, 2022
1 parent 2778cf5 commit cb88e3b
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 15 deletions.
7 changes: 3 additions & 4 deletions distributed/diagnostics/tests/test_eventstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,13 @@ async def test_eventstream(c, s, *workers):
name: collections.deque(maxlen=100)
for name in "start duration key name color worker worker_thread y alpha".split()
}
workers = dict()
workers = {}
for msg in es.buffer:
task_stream_append(lists, msg, workers)

assert len([n for n in lists["name"] if n.startswith("transfer")]) == 2
assert sum(n == "transfer-sum" for n in lists["name"]) == 2
for name, color in zip(lists["name"], lists["color"]):
if name == "transfer":
assert color == "red"
assert (name == "transfer-sum") == (color == "red")

assert any(c == "black" for c in lists["color"])

Expand Down
28 changes: 17 additions & 11 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,17 +745,18 @@ async def test_clean_nbytes(c, s, a, b):


@pytest.mark.parametrize("as_deps", [True, False])
@gen_cluster(client=True, nthreads=[("", 1)] * 20)
async def test_gather_many_small(c, s, a, *workers, as_deps):
"""If the dependencies of a given task are very small, do not limit the
number of concurrent outgoing connections
@gen_cluster(client=True, nthreads=[("", 1)] * 21)
async def test_gather_many_small(c, s, a, *snd_workers, as_deps):
"""If the dependencies of a given task are very small, do not limit the number of
concurrent outgoing connections. If multiple small fetches from the same worker are
scheduled all at once, they will result in a single call to gather_dep.
"""
a.total_out_connections = 2
futures = await c.scatter(
{f"x{i}": i for i in range(100)},
workers=[w.address for w in workers],
workers=[w.address for w in snd_workers],
)
assert all(w.data for w in workers)
assert all(w.data for w in snd_workers)

if as_deps:
future = c.submit(lambda _: None, futures, key="y", workers=[a.address])
Expand All @@ -765,13 +766,18 @@ async def test_gather_many_small(c, s, a, *workers, as_deps):
while len(a.data) < 100:
await asyncio.sleep(0.01)

types = list(pluck(0, a.log))
req = [i for i, t in enumerate(types) if t == "request-dep"]
recv = [i for i, t in enumerate(types) if t == "receive-dep"]
assert len(req) == len(recv) == 19
assert min(recv) > max(req)
assert a.comm_nbytes == 0

story = a.story("request-dep", "receive-dep")
assert len(story) == 40 # 1 request-dep + 1 receive-dep per sender worker
# All GatherDep instructions are fired at the same time; each fetches all keys
# available on the sender worker
for ev in story[:20]:
assert ev[0] == "request-dep"
assert len(ev[2]) > 1
for ev in story[20:]:
assert ev[0] == "receive-dep"


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3)
async def test_multiple_transfers(c, s, w1, w2, w3):
Expand Down

0 comments on commit cb88e3b

Please sign in to comment.