Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cubed blockwise #357

Merged
merged 4 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ dependencies:
- cachey
- cftime
- codecov
- cubed>=0.14.2
- cubed>=0.14.3
- dask-core
- pandas
- numpy>=1.22
Expand Down
31 changes: 31 additions & 0 deletions docs/source/user-stories/climatology-hourly-cubed.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,37 @@
"source": [
"hourly.compute()"
]
},
{
"cell_type": "markdown",
"id": "7",
"metadata": {},
"source": [
"## Other climatologies: resampling by month\n",
"\n",
"This uses the \"blockwise\" strategy."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8",
"metadata": {},
"outputs": [],
"source": [
"monthly = ds.tp.resample(time=\"ME\").sum(method=\"blockwise\")\n",
"monthly"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9",
"metadata": {},
"outputs": [],
"source": [
"monthly.compute()"
]
}
],
"metadata": {
Expand Down
182 changes: 109 additions & 73 deletions flox/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1798,87 +1798,123 @@ def cubed_groupby_agg(
assert isinstance(axis, Sequence)
assert all(ax >= 0 for ax in axis)

inds = tuple(range(array.ndim))
if method == "blockwise":
assert by.ndim == 1
assert expected_groups is not None

by_input = by
def _reduction_func(a, by, axis, start_group, num_groups):
# adjust group labels to start from 0 for each chunk
by_for_chunk = by - start_group
expected_groups_for_chunk = pd.RangeIndex(num_groups)

# Unifying chunks is necessary for argreductions.
# We need to rechunk before zipping up with the index
# let's always do it anyway
if not is_chunked_array(by):
# chunk numpy arrays like the input array
chunks = tuple(array.chunks[ax] if by.shape[ax] != 1 else (1,) for ax in range(-by.ndim, 0))
axis = (axis,) # convert integral axis to tuple

by = cubed.from_array(by, chunks=chunks, spec=array.spec)
_, (array, by) = cubed.core.unify_chunks(array, inds, by, inds[-by.ndim :])
blockwise_method = partial(
_reduce_blockwise,
agg=agg,
axis=axis,
expected_groups=expected_groups_for_chunk,
fill_value=fill_value,
engine=engine,
sort=sort,
reindex=reindex,
)
out = blockwise_method(a, by_for_chunk)
return out[agg.name]

# Cubed's groupby_reduction handles the generation of "intermediates", and the
# "map-reduce" combination step, so we don't have to do that here.
# Only the equivalent of "_simple_combine" is supported, there is no
# support for "_grouped_combine".
labels_are_unknown = is_chunked_array(by_input) and expected_groups is None
do_simple_combine = not _is_arg_reduction(agg) and not labels_are_unknown
num_groups = len(expected_groups)
result = cubed.core.groupby.groupby_blockwise(
array, by, axis=axis, func=_reduction_func, num_groups=num_groups
)
groups = (expected_groups.to_numpy(),)
return (result, groups)

assert do_simple_combine
assert method == "map-reduce"
assert expected_groups is not None
assert reindex is True
assert len(axis) == 1 # one axis/grouping
else:
inds = tuple(range(array.ndim))

def _groupby_func(a, by, axis, intermediate_dtype, num_groups):
blockwise_method = partial(
_get_chunk_reduction(agg.reduction_type),
func=agg.chunk,
fill_value=agg.fill_value["intermediate"],
dtype=agg.dtype["intermediate"],
reindex=reindex,
user_dtype=agg.dtype["user"],
by_input = by

# Unifying chunks is necessary for argreductions.
# We need to rechunk before zipping up with the index
# let's always do it anyway
if not is_chunked_array(by):
# chunk numpy arrays like the input array
chunks = tuple(
array.chunks[ax] if by.shape[ax] != 1 else (1,) for ax in range(-by.ndim, 0)
)

by = cubed.from_array(by, chunks=chunks, spec=array.spec)
_, (array, by) = cubed.core.unify_chunks(array, inds, by, inds[-by.ndim :])

# Cubed's groupby_reduction handles the generation of "intermediates", and the
# "map-reduce" combination step, so we don't have to do that here.
# Only the equivalent of "_simple_combine" is supported, there is no
# support for "_grouped_combine".
labels_are_unknown = is_chunked_array(by_input) and expected_groups is None
do_simple_combine = not _is_arg_reduction(agg) and not labels_are_unknown

assert do_simple_combine
assert method == "map-reduce"
assert expected_groups is not None
assert reindex is True
assert len(axis) == 1 # one axis/grouping

def _groupby_func(a, by, axis, intermediate_dtype, num_groups):
blockwise_method = partial(
_get_chunk_reduction(agg.reduction_type),
func=agg.chunk,
fill_value=agg.fill_value["intermediate"],
dtype=agg.dtype["intermediate"],
reindex=reindex,
user_dtype=agg.dtype["user"],
axis=axis,
expected_groups=expected_groups,
engine=engine,
sort=sort,
)
out = blockwise_method(a, by)
# Convert dict to one that cubed understands, dropping groups since they are
# known, and the same for every block.
return {
f"f{idx}": intermediate for idx, intermediate in enumerate(out["intermediates"])
}

def _groupby_combine(a, axis, dummy_axis, dtype, keepdims):
# this is similar to _simple_combine, except the dummy axis and concatenation is handled by cubed
# only combine over the dummy axis, to preserve grouping along 'axis'
dtype = dict(dtype)
out = {}
for idx, combine in enumerate(agg.simple_combine):
field = f"f{idx}"
out[field] = combine(a[field], axis=dummy_axis, keepdims=keepdims)
return out

def _groupby_aggregate(a):
# Convert cubed dict to one that _finalize_results works with
results = {"groups": expected_groups, "intermediates": a.values()}
out = _finalize_results(results, agg, axis, expected_groups, fill_value, reindex)
return out[agg.name]

# convert list of dtypes to a structured dtype for cubed
intermediate_dtype = [(f"f{i}", dtype) for i, dtype in enumerate(agg.dtype["intermediate"])]
dtype = agg.dtype["final"]
num_groups = len(expected_groups)

result = cubed.core.groupby.groupby_reduction(
array,
by,
func=_groupby_func,
combine_func=_groupby_combine,
aggregate_func=_groupby_aggregate,
axis=axis,
expected_groups=expected_groups,
engine=engine,
sort=sort,
intermediate_dtype=intermediate_dtype,
dtype=dtype,
num_groups=num_groups,
)
out = blockwise_method(a, by)
# Convert dict to one that cubed understands, dropping groups since they are
# known, and the same for every block.
return {f"f{idx}": intermediate for idx, intermediate in enumerate(out["intermediates"])}

def _groupby_combine(a, axis, dummy_axis, dtype, keepdims):
# this is similar to _simple_combine, except the dummy axis and concatenation is handled by cubed
# only combine over the dummy axis, to preserve grouping along 'axis'
dtype = dict(dtype)
out = {}
for idx, combine in enumerate(agg.simple_combine):
field = f"f{idx}"
out[field] = combine(a[field], axis=dummy_axis, keepdims=keepdims)
return out

def _groupby_aggregate(a):
# Convert cubed dict to one that _finalize_results works with
results = {"groups": expected_groups, "intermediates": a.values()}
out = _finalize_results(results, agg, axis, expected_groups, fill_value, reindex)
return out[agg.name]

# convert list of dtypes to a structured dtype for cubed
intermediate_dtype = [(f"f{i}", dtype) for i, dtype in enumerate(agg.dtype["intermediate"])]
dtype = agg.dtype["final"]
num_groups = len(expected_groups)

result = cubed.core.groupby.groupby_reduction(
array,
by,
func=_groupby_func,
combine_func=_groupby_combine,
aggregate_func=_groupby_aggregate,
axis=axis,
intermediate_dtype=intermediate_dtype,
dtype=dtype,
num_groups=num_groups,
)

groups = (expected_groups.to_numpy(),)
groups = (expected_groups.to_numpy(),)

return (result, groups)
return (result, groups)


def _collapse_blocks_along_axes(reduced: DaskArray, axis: T_Axes, group_chunks) -> DaskArray:
Expand Down Expand Up @@ -2467,9 +2503,9 @@ def groupby_reduce(
if method is None:
method = "map-reduce"

if method != "map-reduce":
if method not in ("map-reduce", "blockwise"):
raise NotImplementedError(
"Reduction for Cubed arrays is only implemented for method 'map-reduce'."
"Reduction for Cubed arrays is only implemented for methods 'map-reduce' and 'blockwise'."
)

partial_agg = partial(cubed_groupby_agg, **kwargs)
Expand Down
34 changes: 34 additions & 0 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1205,6 +1205,40 @@ def test_group_by_datetime(engine, method):
assert_equal(expected, actual)


@requires_cubed
@pytest.mark.parametrize("method", ["blockwise", "map-reduce"])
def test_group_by_datetime_cubed(engine, method):
kwargs = dict(
func="mean",
method=method,
engine=engine,
)
t = pd.date_range("2000-01-01", "2000-12-31", freq="D").to_series()
data = t.dt.dayofyear
cubedarray = cubed.from_array(data.values, chunks=30)

actual, _ = groupby_reduce(cubedarray, t, **kwargs)
expected = data.to_numpy().astype(float)
assert_equal(expected, actual)

edges = pd.date_range("1999-12-31", "2000-12-31", freq="ME").to_series().to_numpy()
actual, _ = groupby_reduce(
cubedarray, t.to_numpy(), isbin=True, expected_groups=edges, **kwargs
)
expected = data.resample("ME").mean().to_numpy()
assert_equal(expected, actual)

actual, _ = groupby_reduce(
cubed.array_api.broadcast_to(cubedarray, (2, 3, cubedarray.shape[-1])),
t.to_numpy(),
isbin=True,
expected_groups=edges,
**kwargs,
)
expected = np.broadcast_to(expected, (2, 3, expected.shape[-1]))
assert_equal(expected, actual)


def test_factorize_values_outside_bins():
# pd.factorize returns intp
vals = factorize_(
Expand Down
Loading