Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Make simple types of repartition #512

Merged
merged 1 commit into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 65 additions & 19 deletions src/dask_awkward/lib/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -942,29 +942,75 @@ def repartition(
npartitions: int | None = None,
divisions: tuple[int, ...] | None = None,
rows_per_partition: int | None = None,
one_to_n: int | None = None,
n_to_one: int | None = None,
) -> Array:
"""Restructure the partitioning of the whole array

Various schemes are possible, with one of the mutually exclusive
optional arguments for each. Of these, the first three require
knowledge of the number of rows in each existing partition, which
will be eagerly computed if not already known, and some shuffling of
data between partitions.

- npartitions: split all the rows as evenly as possible into this
many output partitions.
- divisions: exact row count offsets of each output partition
- rows_per_partition: each partition will have this many rows,
except the last, which will have this number or fewer
- one_to_n: each input partition becomes n output partitions
- n_to_one: every n adjacent input partitions becomes one
output partition. Note that exactly one output partition
(npartitions=1) is a special case of this.
"""
from dask_awkward.layers import AwkwardMaterializedLayer
from dask_awkward.lib.structure import repartition_layer
from dask_awkward.lib.structure import (
repartition_layer,
simple_repartition_layer,
)

if sum(bool(_) for _ in [npartitions, divisions, rows_per_partition]) != 1:
if (
sum(
bool(_)
for _ in (
npartitions,
divisions,
rows_per_partition,
one_to_n,
n_to_one,
)
)
!= 1
):
raise ValueError("Please specify exactly one of the inputs")
if not self.known_divisions:
self.eager_compute_divisions()
nrows = self.defined_divisions[-1]
new_divisions: tuple[int, ...] = tuple()
if divisions:
new_divisions = divisions
elif npartitions:
rows_per_partition = math.ceil(nrows / npartitions)
if rows_per_partition:
new_divs = list(range(0, nrows, rows_per_partition))
new_divs.append(nrows)
new_divisions = tuple(new_divs)

token = tokenize(self, divisions)
key = f"repartition-{token}"

new_layer_raw = repartition_layer(self, key, new_divisions)
new_divisions: tuple[int, ...] = ()
if npartitions and npartitions == 1:
npartitions, n_to_one = None, self.npartitions
if n_to_one or one_to_n:
token = tokenize(self, n_to_one, one_to_n)
key = f"repartition-{token}"
new_layer_raw, new_divisions = simple_repartition_layer(
self, n_to_one, one_to_n, key
)
else:
if not self.known_divisions:
self.eager_compute_divisions()
nrows = self.defined_divisions[-1]
if divisions:
if divisions == self.divisions:
# noop
return self
new_divisions = divisions
elif npartitions:
rows_per_partition = math.ceil(nrows / npartitions)
if rows_per_partition:
new_divs = list(range(0, nrows, rows_per_partition))
new_divs.append(nrows)
new_divisions = tuple(new_divs)
token = tokenize(self, divisions)
key = f"repartition-{token}"
new_layer_raw = repartition_layer(self, key, new_divisions)

new_layer = AwkwardMaterializedLayer(
new_layer_raw,
previous_layer_names=[self.name],
Expand Down
43 changes: 43 additions & 0 deletions src/dask_awkward/lib/structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -1350,6 +1350,49 @@ def repartition_layer(arr: Array, key: str, divisions: tuple[int, ...]) -> dict:
return layer


def _subpart(data: ak.Array, parts: int, part: int) -> ak.Array:
from dask_awkward.lib.core import is_typetracer

if is_typetracer(data):
return data
rows_per = len(data) // parts
return data[
part * rows_per : None if part == (parts - 1) else (part + 1) * rows_per
]


def _subcat(*arrs: tuple[ak.Array, ...]) -> ak.Array:
return ak.concatenate(arrs)


def simple_repartition_layer(
arr: Array, n_to_one: int | None, one_to_n: int | None, key: str
) -> tuple[dict, tuple[Any, ...]]:
layer: dict[tuple[str, int], tuple[Any, ...]] = {}
new_divisions: tuple[Any, ...]
if n_to_one:
for i in range(0, arr.npartitions, n_to_one):
layer[(key, i)] = (_subcat,) + tuple(
(arr.name, part)
for part in range(i, min(i + n_to_one, arr.npartitions))
)
new_divisions = arr.divisions[::n_to_one]
elif one_to_n:
for i in range(arr.npartitions):
for part in range(one_to_n):
layer[(key, (i * one_to_n + part))] = (
_subpart,
(arr.name, i),
one_to_n,
part,
)
# TODO: if arr.known_divisions:
new_divisions = (None,) * (arr.npartitions * one_to_n + 1)
else:
raise ValueError
return layer, new_divisions


@borrow_docstring(ak.enforce_type)
def enforce_type(
array: Array,
Expand Down
6 changes: 6 additions & 0 deletions tests/test_structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,12 @@ def test_repartition_whole(daa):
assert_eq(daa, daa1, check_divisions=False)


def test_repartition_one_to_n(daa):
daa1 = daa.repartition(one_to_n=2)
assert daa1.npartitions == daa.npartitions * 2
assert_eq(daa, daa1, check_divisions=False)


def test_repartition_no_change(daa):
daa1 = daa.repartition(divisions=(0, 5, 10, 15))
assert daa1.npartitions == 3
Expand Down
Loading