Skip to content

Commit

Permalink
feat: add dask windowing
Browse files Browse the repository at this point in the history
  • Loading branch information
patcao authored and jreback committed May 23, 2023
1 parent ab758bd commit 9cb920a
Show file tree
Hide file tree
Showing 23 changed files with 1,483 additions and 248 deletions.
210 changes: 210 additions & 0 deletions ibis/backends/dask/aggcontext.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
import operator
from typing import Any, Callable, Dict, Tuple, Union

import dask.dataframe as dd
from dask.dataframe.groupby import SeriesGroupBy

import ibis
from ibis.backends.pandas.aggcontext import (
AggregationContext,
compute_window_spec,
construct_time_context_aware_series,
get_time_col,
window_agg_udf,
wrap_for_agg,
)
from ibis.backends.pandas.aggcontext import Transform as PandasTransform

# TODO Consolidate this logic with the pandas aggcontext.
# This file is almost a direct port of the pandas aggcontext.
# https://github.com/ibis-project/ibis/issues/5911


class Summarize(AggregationContext):
__slots__ = ()

def agg(self, grouped_data, function, *args, **kwargs):
if isinstance(function, str):
return getattr(grouped_data, function)(*args, **kwargs)

if not callable(function):
raise TypeError(f'Object {function} is not callable or a string')

elif isinstance(grouped_data, dd.Series):
return grouped_data.reduction(wrap_for_agg(function, args, kwargs))
else:
return grouped_data.agg(wrap_for_agg(function, args, kwargs))


class Transform(PandasTransform):
def agg(self, grouped_data, function, *args, **kwargs):
res = super().agg(grouped_data, function, *args, **kwargs)
index_name = res.index.name if res.index.name is not None else 'index'
res = res.reset_index().set_index(index_name).iloc[:, 0]
return res


def dask_window_agg_built_in(
frame: dd.DataFrame,
windowed: dd.rolling.Rolling,
function: str,
max_lookback: int,
*args: Tuple[Any],
**kwargs: Dict[str, Any],
) -> dd.Series:
"""Apply window aggregation with built-in aggregators."""
assert isinstance(function, str)
method = operator.methodcaller(function, *args, **kwargs)

if max_lookback is not None:
agg_method = method

def sliced_agg(s):
return agg_method(s.iloc[-max_lookback:])

method = operator.methodcaller('apply', sliced_agg, raw=False)

result = method(windowed)
# No MultiIndex support in dask
result.index = frame.index
return result


class Window(AggregationContext):
__slots__ = ('construct_window',)

def __init__(self, kind, *args, **kwargs):
super().__init__(
parent=kwargs.pop('parent', None),
group_by=kwargs.pop('group_by', None),
order_by=kwargs.pop('order_by', None),
output_type=kwargs.pop('output_type'),
max_lookback=kwargs.pop('max_lookback', None),
)
self.construct_window = operator.methodcaller(kind, *args, **kwargs)

def agg(
self,
grouped_data: Union[dd.Series, SeriesGroupBy],
function: Union[str, Callable],
*args: Any,
**kwargs: Any,
) -> dd.Series:
# avoid a pandas warning about numpy arrays being passed through
# directly
group_by = self.group_by
order_by = self.order_by

assert group_by or order_by

# Get the DataFrame from which the operand originated
# (passed in when constructing this context object in
# execute_node(ops.Window))
parent = self.parent
frame = getattr(parent, 'obj', parent)
grouped_meta = getattr(grouped_data, '_meta_nonempty', grouped_data)
obj = getattr(grouped_meta, 'obj', grouped_data)
name = obj.name
if frame[name] is not obj or name in group_by or name in order_by:
name = f"{name}_{ibis.util.guid()}"
frame = frame.assign(**{name: obj})

# set the index to our order_by keys and append it to the existing
# index
# TODO: see if we can do this in the caller, when the context
# is constructed rather than pulling out the data
columns = group_by + order_by + [name]
# Create a new frame to avoid mutating the original one
indexed_by_ordering = frame[columns].copy()
# placeholder column to compute window_sizes below
indexed_by_ordering['_placeholder'] = 0
indexed_by_ordering = indexed_by_ordering.set_index(order_by)

# regroup if needed
if group_by:
grouped_frame = indexed_by_ordering.groupby(group_by, group_keys=False)
else:
grouped_frame = indexed_by_ordering
grouped = grouped_frame[name]

if callable(function):
# To compute the window_size, we need to contruct a
# RollingGroupby and compute count using construct_window.
# However, if the RollingGroupby is not numeric, e.g.,
# we are calling window UDF on a timestamp column, we
# cannot compute rolling count directly because:
# (1) windowed.count() will exclude NaN observations
# , which results in incorrect window sizes.
# (2) windowed.apply(len, raw=True) will include NaN
# obversations, but doesn't work on non-numeric types.
# https://github.com/pandas-dev/pandas/issues/23002
# To deal with this, we create a _placeholder column
windowed_frame = self.construct_window(grouped_frame)
window_sizes = windowed_frame['_placeholder'].count().reset_index(drop=True)
mask = ~(window_sizes.isna())
window_upper_indices = dd.Series(range(len(window_sizes))) + 1
window_lower_indices = window_upper_indices - window_sizes
# The result Series of udf may need to be trimmed by
# timecontext. In order to do so, 'time' must be added
# as an index to the Series, if present. Here We extract
# time column from the parent Dataframe `frame`.
if get_time_col() in frame:
result_index = construct_time_context_aware_series(obj, frame).index
else:
result_index = obj.index
result = window_agg_udf(
grouped_data,
function,
window_lower_indices,
window_upper_indices,
mask,
result_index,
self.dtype,
self.max_lookback,
*args,
**kwargs,
)
else:
# perform the per-group rolling operation
windowed = self.construct_window(grouped)
result = dask_window_agg_built_in(
frame,
windowed,
function,
self.max_lookback,
*args,
**kwargs,
)
try:
return result.astype(self.dtype, copy=False)
except (TypeError, ValueError):
# The dtypes in result could have been promoted during the agg
# computation. Trying to downcast the type back with self.dtype will
# fail but we want to result with the promoted types anyways.
return result


class Cumulative(Window):
__slots__ = ()

def __init__(self, window, *args, **kwargs):
super().__init__('rolling', *args, window=window, min_periods=1, **kwargs)


class Moving(Window):
__slots__ = ()

def __init__(self, start, max_lookback, *args, **kwargs):
start = compute_window_spec(start.output_dtype, start.value)

super().__init__(
'rolling',
start,
*args,
max_lookback=max_lookback,
min_periods=1,
**kwargs,
)

def short_circuit_method(self, grouped_data, function):
raise AttributeError('No short circuit method for rolling operations')
3 changes: 1 addition & 2 deletions ibis/backends/dask/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,14 @@
import ibis.expr.operations as ops
from ibis.backends.base.df.scope import Scope
from ibis.backends.base.df.timecontext import TimeContext, canonicalize_context
from ibis.backends.dask import aggcontext as agg_ctx
from ibis.backends.dask.dispatch import (
execute_literal,
execute_node,
post_execute,
pre_execute,
)
from ibis.backends.dask.trace import trace
from ibis.backends.pandas import aggcontext as agg_ctx
from ibis.backends.pandas.core import (
compute_time_context,
get_node_arguments,
Expand Down Expand Up @@ -320,7 +320,6 @@ def execute_until_in_scope(
new_scope.get_value(arg, timecontext) if isinstance(arg, ops.Node) else arg
for (arg, timecontext) in zip(computable_args, arg_timecontexts)
]

result = execute_node(
node,
*data,
Expand Down
3 changes: 1 addition & 2 deletions ibis/backends/dask/execution/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
import ibis.expr.operations as ops
from ibis.backends.base.df.scope import Scope
from ibis.backends.base.df.timecontext import TimeContext
from ibis.backends.dask import aggcontext as agg_ctx
from ibis.backends.dask.core import execute
from ibis.backends.dask.dispatch import execute_node
from ibis.backends.dask.execution.util import coerce_to_output, safe_concat
from ibis.backends.pandas.execution.generic import agg_ctx


# TODO - aggregations - #2553
Expand Down Expand Up @@ -144,7 +144,6 @@ def execute_notany_series(op, data, mask, aggcontext=None, **kwargs):
# here for future scafolding.
method = operator.methodcaller(name)
result = aggcontext.agg(data, lambda data: ~method(data))

return result


Expand Down
16 changes: 15 additions & 1 deletion ibis/backends/dask/execution/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@
import ibis.expr.datatypes as dt
import ibis.expr.operations as ops
import ibis.expr.types as ir
import ibis.util
from ibis.backends.dask import Backend as DaskBackend
from ibis.backends.dask.core import execute
from ibis.backends.dask.dispatch import execute_node
from ibis.backends.dask.execution.util import (
TypeRegistrationDict,
add_globally_consecutive_column,
make_selected_obj,
register_types_to_dispatcher,
rename_index,
)
from ibis.backends.pandas.core import (
date_types,
Expand Down Expand Up @@ -341,7 +344,11 @@ def execute_cast_series_date(op, data, type, **kwargs):
@execute_node.register(ops.Limit, dd.DataFrame, integer_types, integer_types)
def execute_limit_frame(op, data, nrows, offset, **kwargs):
# NOTE: Dask Dataframes do not support iloc row based indexing
return data.loc[offset : (offset + nrows) - 1]
# Need to add a globally consecutive index in order to select nrows number of rows
unique_col_name = ibis.util.guid()
df = add_globally_consecutive_column(data, col_name=unique_col_name)
ret = df.loc[offset : (offset + nrows) - 1]
return rename_index(ret, None)


@execute_node.register(ops.Not, (dd.core.Scalar, dd.Series))
Expand Down Expand Up @@ -537,3 +544,10 @@ def execute_node_coalesce(op, values, **kwargs):
# TODO: this is slow
values = [execute(arg, **kwargs) for arg in values]
return compute_row_reduction(coalesce, values)


@execute_node.register(ops.TableArrayView, dd.DataFrame)
def execute_table_array_view(op, _, **kwargs):
# Need to compute dataframe in order to squeeze into a scalar
ddf = execute(op.table)
return ddf.compute().squeeze()
4 changes: 3 additions & 1 deletion ibis/backends/dask/execution/numeric.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ def execute_series_quantile(op, data, quantile, _, mask, **kwargs):
ops.Quantile, ddgb.SeriesGroupBy, numeric_types, type(None), type(None)
)
def execute_series_quantile_group_by(op, data, quantile, *_, **kwargs):
return data.quantile(q=quantile)
raise NotImplementedError(
"Quantile not implemented for Dask SeriesGroupBy, Dask #9824"
)


@execute_node.register(
Expand Down
25 changes: 15 additions & 10 deletions ibis/backends/dask/execution/selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
from ibis.backends.dask.core import execute
from ibis.backends.dask.dispatch import execute_node
from ibis.backends.dask.execution.util import (
add_partitioned_sorted_column,
add_globally_consecutive_column,
coerce_to_output,
compute_sorted_frame,
is_row_order_preserving,
rename_index,
)
from ibis.backends.pandas.execution.selection import (
build_df_from_selection,
Expand Down Expand Up @@ -110,7 +111,6 @@ def compute_projection(
)
for t in an.find_immediate_parent_tables(node)
)

result = execute(node, scope=scope, timecontext=timecontext, **kwargs)
return coerce_to_output(result, node, data.index)
else:
Expand All @@ -137,10 +137,15 @@ def build_df_from_projection(
# Slow path when we cannot do direct assigns
# Create a unique row identifier and set it as the index. This is
# used in dd.concat to merge the pieces back together.
data = add_partitioned_sorted_column(data)
data_pieces = [compute_projection(node, op, data, **kwargs) for node in selections]

return dd.concat(data_pieces, axis=1).reset_index(drop=True)
partitioned_data = add_globally_consecutive_column(data)
data_pieces = [
compute_projection(node, op, partitioned_data, **kwargs) for node in selections
]
result = dd.concat(data_pieces, axis=1)
# _ibis_index was added and used to concat data_pieces together.
# Drop the index name here but keep the index as the dataframe is
# already partitioned on it.
return rename_index(result, None)


@execute_node.register(ops.Selection, dd.DataFrame)
Expand Down Expand Up @@ -174,7 +179,6 @@ def execute_selection_dataframe(
timecontext=timecontext,
**kwargs,
)

if op.sort_keys:
if len(op.sort_keys) > 1:
raise NotImplementedError(
Expand All @@ -187,13 +191,14 @@ def execute_selection_dataframe(
raise NotImplementedError(
"Descending sort is not supported for the Dask backend"
)
result = compute_sorted_frame(
result,
order_by=sort_key,
result, _, _ = compute_sorted_frame(
df=result,
order_by=[sort_key],
scope=scope,
timecontext=timecontext,
**kwargs,
)
result = add_globally_consecutive_column(result, col_name='_ibis_sort_index')

return result
else:
Expand Down
Loading

0 comments on commit 9cb920a

Please sign in to comment.