Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed May 30, 2022
1 parent 1b4b42f commit 2c32233
Showing 1 changed file with 92 additions and 103 deletions.
195 changes: 92 additions & 103 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,115 +132,104 @@ async def test_decide_worker_with_restrictions(client, s, a, b, c):


@pytest.mark.parametrize("ndeps", [0, 1, 4])
@pytest.mark.parametrize(
"nthreads",
[
[("127.0.0.1", 1)] * 5,
[("127.0.0.1", 3), ("127.0.0.1", 2), ("127.0.0.1", 1)],
],
@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 3), ("127.0.0.1", 2), ("127.0.0.1", 1)],
config={"distributed.scheduler.work-stealing": False},
)
def test_decide_worker_coschedule_order_neighbors(ndeps, nthreads):
@gen_cluster(
client=True,
nthreads=nthreads,
config={"distributed.scheduler.work-stealing": False},
)
async def test_decide_worker_coschedule_order_neighbors_(c, s, *workers):
r"""
Ensure that sibling root tasks are scheduled to the same node, reducing future
data transfer.
We generate a wide layer of "root" tasks (random NumPy arrays). All of those
tasks share 0-5 trivial dependencies. The ``ndeps=0`` and ``ndeps=1`` cases are
most common in real-world use (``ndeps=1`` is basically ``da.from_array(...,
inline_array=False)`` or ``da.from_zarr``). The graph is structured like this
(though the number of tasks and workers is different):
|-W1-| |-W2-| |-W3-| |-W4-| < ---- ideal task scheduling
q r s t < --- `sum-aggregate-`
/ \ / \ / \ / \
i j k l m n o p < --- `sum-`
| | | | | | | |
a b c d e f g h < --- `random-`
\ \ \ | | / / /
TRIVIAL * 0..5
Neighboring `random-` tasks should be scheduled on the same worker. We test that
generally, only one worker holds each row of the array, that the `random-` tasks
are never transferred, and that there are few transfers overall.
"""
da = pytest.importorskip("dask.array")
np = pytest.importorskip("numpy")
async def test_decide_worker_coschedule_order_neighbors(c, s, *workers, ndeps):
r"""
Ensure that sibling root tasks are scheduled to the same node, reducing future
data transfer.
We generate a wide layer of "root" tasks (random NumPy arrays). All of those
tasks share 0-5 trivial dependencies. The ``ndeps=0`` and ``ndeps=1`` cases are
most common in real-world use (``ndeps=1`` is basically ``da.from_array(...,
inline_array=False)`` or ``da.from_zarr``). The graph is structured like this
(though the number of tasks and workers is different):
|-W1-| |-W2-| |-W3-| |-W4-| < ---- ideal task scheduling
q r s t < --- `sum-aggregate-`
/ \ / \ / \ / \
i j k l m n o p < --- `sum-`
| | | | | | | |
a b c d e f g h < --- `random-`
\ \ \ | | / / /
TRIVIAL * 0..5
Neighboring `random-` tasks should be scheduled on the same worker. We test that
generally, only one worker holds each row of the array, that the `random-` tasks
are never transferred, and that there are few transfers overall.
"""
da = pytest.importorskip("dask.array")
np = pytest.importorskip("numpy")

if ndeps == 0:
x = da.random.random((100, 100), chunks=(10, 10))
else:
if ndeps == 0:
x = da.random.random((100, 100), chunks=(10, 10))
else:

def random(**kwargs):
assert len(kwargs) == ndeps
return np.random.random((10, 10))
def random(**kwargs):
assert len(kwargs) == ndeps
return np.random.random((10, 10))

trivial_deps = {f"k{i}": delayed(object()) for i in range(ndeps)}
trivial_deps = {f"k{i}": delayed(object()) for i in range(ndeps)}

# TODO is there a simpler (non-blockwise) way to make this sort of graph?
x = da.blockwise(
random,
"yx",
new_axes={"y": (10,) * 10, "x": (10,) * 10},
dtype=float,
**trivial_deps,
)
# TODO is there a simpler (non-blockwise) way to make this sort of graph?
x = da.blockwise(
random,
"yx",
new_axes={"y": (10,) * 10, "x": (10,) * 10},
dtype=float,
**trivial_deps,
)

xx, xsum = dask.persist(x, x.sum(axis=1, split_every=20))
await xsum

# Check that each chunk-row of the array is (mostly) stored on the same worker
primary_worker_key_fractions = []
secondary_worker_key_fractions = []
for i, keys in enumerate(x.__dask_keys__()):
# Iterate along rows of the array.
keys = {stringify(k) for k in keys}

# No more than 2 workers should have any keys
assert sum(any(k in w.data for k in keys) for w in workers) <= 2

# What fraction of the keys for this row does each worker hold?
key_fractions = [
len(set(w.data).intersection(keys)) / len(keys) for w in workers
]
key_fractions.sort()
# Primary worker: holds the highest percentage of keys
# Secondary worker: holds the second highest percentage of keys
primary_worker_key_fractions.append(key_fractions[-1])
secondary_worker_key_fractions.append(key_fractions[-2])

# There may be one or two rows that were poorly split across workers,
# but the vast majority of rows should only be on one worker.
assert np.mean(primary_worker_key_fractions) >= 0.9
assert np.median(primary_worker_key_fractions) == 1.0
assert np.mean(secondary_worker_key_fractions) <= 0.1
assert np.median(secondary_worker_key_fractions) == 0.0

# Check that there were few transfers
unexpected_transfers = []
for worker in workers:
for log in worker.incoming_transfer_log:
keys = log["keys"]
# The root-ish tasks should never be transferred
assert not any(k.startswith("random") for k in keys), keys
# `object-` keys (the trivial deps of the root random tasks) should be
# transferred
if any(not k.startswith("object") for k in keys):
# But not many other things should be
unexpected_transfers.append(list(keys))

# A transfer at the very end to move aggregated results is fine (necessary with
# unbalanced workers in fact), but generally there should be very very few
# transfers.
assert len(unexpected_transfers) <= 3, unexpected_transfers

test_decide_worker_coschedule_order_neighbors_()
xx, xsum = dask.persist(x, x.sum(axis=1, split_every=20))
await xsum

# Check that each chunk-row of the array is (mostly) stored on the same worker
primary_worker_key_fractions = []
secondary_worker_key_fractions = []
for i, keys in enumerate(x.__dask_keys__()):
# Iterate along rows of the array.
keys = {stringify(k) for k in keys}

# No more than 2 workers should have any keys
assert sum(any(k in w.data for k in keys) for w in workers) <= 2

# What fraction of the keys for this row does each worker hold?
key_fractions = [
len(set(w.data).intersection(keys)) / len(keys) for w in workers
]
key_fractions.sort()
# Primary worker: holds the highest percentage of keys
# Secondary worker: holds the second highest percentage of keys
primary_worker_key_fractions.append(key_fractions[-1])
secondary_worker_key_fractions.append(key_fractions[-2])

# There may be one or two rows that were poorly split across workers,
# but the vast majority of rows should only be on one worker.
assert np.mean(primary_worker_key_fractions) >= 0.9
assert np.median(primary_worker_key_fractions) == 1.0
assert np.mean(secondary_worker_key_fractions) <= 0.1
assert np.median(secondary_worker_key_fractions) == 0.0

# Check that there were few transfers
unexpected_transfers = []
for worker in workers:
for log in worker.incoming_transfer_log:
keys = log["keys"]
# The root-ish tasks should never be transferred
assert not any(k.startswith("random") for k in keys), keys
# `object-` keys (the trivial deps of the root random tasks) should be
# transferred
if any(not k.startswith("object") for k in keys):
# But not many other things should be
unexpected_transfers.append(list(keys))

# A transfer at the very end to move aggregated results is fine (necessary with
# unbalanced workers in fact), but generally there should be very few transfers.
assert len(unexpected_transfers) <= 6, unexpected_transfers


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3)
Expand Down

0 comments on commit 2c32233

Please sign in to comment.