Skip to content

Commit

Permalink
refactor(ir): remove DestructValue expressions
Browse files Browse the repository at this point in the history
The DestructValue expressions didn't have corresponding operation nodes.

BREAKING CHANGE: removed ir.DestructValue, ir.DestructScalar and ir.DestructColumn, use table.unpack() instead
  • Loading branch information
kszucs committed Sep 16, 2022
1 parent aa3aa3b commit 762d384
Show file tree
Hide file tree
Showing 19 changed files with 141 additions and 356 deletions.
5 changes: 5 additions & 0 deletions ibis/backends/dask/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ def main_execute(
scope=None,
timecontext: Optional[TimeContext] = None,
aggcontext=None,
cache=None,
**kwargs,
):
"""Execute an expression against data that are bound to it. If no data
Expand Down Expand Up @@ -394,6 +395,9 @@ def main_execute(
if params is None:
params = {}

if cache is None:
cache = {}

# TODO: make expresions hashable so that we can get rid of these .op()
# calls everywhere
params = {k.op() if hasattr(k, 'op') else k: v for k, v in params.items()}
Expand All @@ -403,6 +407,7 @@ def main_execute(
scope,
timecontext=timecontext,
aggcontext=aggcontext,
cache=cache,
**kwargs,
)

Expand Down
17 changes: 10 additions & 7 deletions ibis/backends/dask/execution/structs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,24 @@
from ibis.backends.dask.execution.util import make_selected_obj


@execute_node.register(ops.StructField, dd.DataFrame)
def execute_node_struct_field_dict(op, data, **kwargs):
return data[op.field]


@execute_node.register(ops.StructField, dd.Series)
def execute_node_struct_field_series(op, data, **kwargs):
field = op.field
# TODO This meta is not necessarily right
return data.map(
operator.itemgetter(field), meta=(data.name, data.dtype)
).rename(field)
getter = operator.itemgetter(op.field)
return data.map(getter, meta=(data.name, data.dtype)).rename(op.field)


@execute_node.register(ops.StructField, ddgb.SeriesGroupBy)
def execute_node_struct_field_series_group_by(op, data, **kwargs):
field = op.field
selected_obj = make_selected_obj(data)
getter = operator.itemgetter(op.field)
return (
selected_obj.map(operator.itemgetter(field), meta=selected_obj._meta)
.rename(field)
selected_obj.map(getter, meta=selected_obj._meta)
.rename(op.field)
.groupby(data.index)
)
146 changes: 25 additions & 121 deletions ibis/backends/dask/execution/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
import ibis.expr.operations as ops
import ibis.util
from ibis.backends.dask.core import execute
from ibis.backends.pandas.client import ibis_dtype_to_pandas
from ibis.backends.pandas.trace import TraceTwoLevelDispatcher
from ibis.expr import datatypes as dt
from ibis.expr import lineage as lin
from ibis.expr import types as ir
from ibis.expr.scope import Scope
Expand Down Expand Up @@ -127,42 +125,34 @@ def coerce_to_output(
0 [1, 2, 3]
Name: result, dtype: object
"""
result_name = expr._safe_name
dataframe_exprs = (
ir.DestructColumn,
ir.StructColumn,
ir.DestructScalar,
ir.StructScalar,
)
if isinstance(expr, dataframe_exprs):
return _coerce_to_dataframe(
result, expr.type().names, expr.type().types
)
elif isinstance(result, (pd.Series, dd.Series)):
result_name = expr.get_name()

if isinstance(result, (pd.DataFrame, dd.DataFrame)):
result = result.map_partitions(pd.DataFrame.to_dict, orient='records')
return result.rename(result_name)

if isinstance(result, (pd.Series, dd.Series)):
# Series from https://github.com/ibis-project/ibis/issues/2711
return result.rename(result_name)
elif isinstance(expr, ir.Scalar):
if isinstance(result, dd.core.Scalar):
# wrap the scalar in a series
out_dtype = _pandas_dtype_from_dd_scalar(result)
out_len = 1 if index is None else len(index)
meta = make_meta_series(dtype=out_dtype, name=result_name)
# Specify `divisions` so that the created Dask object has
# known divisions (to be concatenatable with Dask objects
# created using `dd.from_pandas`)
series = dd.from_delayed(
_wrap_dd_scalar(result, result_name, out_len),
meta=meta,
divisions=(0, out_len - 1),
)

return series
else:
return dd.from_pandas(
pd_util.coerce_to_output(result, expr, index), npartitions=1
)
else:
raise ValueError(f"Cannot coerce_to_output. Result: {result}")
if isinstance(result, dd.core.Scalar):
# wrap the scalar in a series
out_dtype = _pandas_dtype_from_dd_scalar(result)
out_len = 1 if index is None else len(index)
meta = make_meta_series(dtype=out_dtype, name=result_name)
# Specify `divisions` so that the created Dask object has
# known divisions (to be concatenatable with Dask objects
# created using `dd.from_pandas`)
series = dd.from_delayed(
_wrap_dd_scalar(result, result_name, out_len),
meta=meta,
divisions=(0, out_len - 1),
)
return series

return dd.from_pandas(
pd_util.coerce_to_output(result, expr, index), npartitions=1
)


@dask.delayed
Expand All @@ -177,92 +167,6 @@ def _pandas_dtype_from_dd_scalar(x: dd.core.Scalar):
return pd.Series([x._meta]).dtype


def _coerce_to_dataframe(
data: Any,
column_names: List[str],
types: List[dt.DataType],
) -> dd.DataFrame:
"""
Clone of ibis.util.coerce_to_dataframe that deals well with dask types
Coerce the following shapes to a DataFrame.
The following shapes are allowed:
(1) A list/tuple of Series -> each series is a column
(2) A list/tuple of scalars -> each scalar is a column
(3) A Dask Series of list/tuple -> each element inside becomes a column
(4) dd.DataFrame -> the data is unchanged
Examples
--------
Note: these examples demonstrate functionality with pandas objects in order
to make them more legible, but this works the same with dask.
>>> coerce_to_dataframe(pd.DataFrame({'a': [1, 2, 3]}), ['b'])
b
0 1
1 2
2 3
>>> coerce_to_dataframe(pd.Series([[1, 2, 3]]), ['a', 'b', 'c'])
a b c
0 1 2 3
>>> coerce_to_dataframe(pd.Series([range(3), range(3)]), ['a', 'b', 'c'])
a b c
0 0 1 2
1 0 1 2
>>> coerce_to_dataframe([pd.Series(x) for x in [1, 2, 3]], ['a', 'b', 'c'])
a b c
0 1 2 3
>>> coerce_to_dataframe([1, 2, 3], ['a', 'b', 'c'])
a b c
0 1 2 3
"""
if isinstance(data, dd.DataFrame):
result = data

elif isinstance(data, dd.Series):
# This takes a series where the values are iterables and converts each
# value into its own row in a new dataframe.

# NOTE - We add a detailed meta here so we do not drop the key index
# downstream. This seems to be fixed in versions of dask > 2020.12.0
dtypes = map(ibis_dtype_to_pandas, types)
series = [
data.apply(
_select_item_in_iter,
selection=i,
meta=make_meta_series(
dtype, meta_index=data._meta_nonempty.index
),
)
for i, dtype in enumerate(dtypes)
]

result = dd.concat(series, axis=1)

elif isinstance(data, (tuple, list)):
if len(data) == 0:
result = dd.from_pandas(
pd.DataFrame(columns=column_names), npartitions=1
)
elif isinstance(data[0], dd.Series):
result = dd.concat(data, axis=1)
else:
result = dd.from_pandas(
pd.concat([pd.Series([v]) for v in data], axis=1),
npartitions=1,
)
else:
raise ValueError(f"Cannot coerce to DataFrame: {data}")

result.columns = column_names
return result


def _select_item_in_iter(t, selection):
return t[selection]


def safe_concat(dfs: List[Union[dd.Series, dd.DataFrame]]) -> dd.DataFrame:
"""
Concat a list of `dd.Series` or `dd.DataFrame` objects into one DataFrame
Expand Down
16 changes: 8 additions & 8 deletions ibis/backends/dask/udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,29 +63,29 @@ def execute_udf_node_groupby(op, *args, **kwargs):
@execute_node.register(
ops.ElementWiseVectorizedUDF, *(itertools.repeat(dd.Series, nargs))
)
def execute_udf_node(op, *args, **kwargs):
def execute_udf_node(op, *args, cache=None, timecontext=None, **kwargs):
# We have rewritten op.func to be a closure enclosing
# the kwargs, and therefore, we do not need to pass
# kwargs here. This is true for all udf execution in this
# file.
# See ibis.udf.vectorized.UserDefinedFunction
try:
return cache[(op, timecontext)]
except KeyError:
pass

if isinstance(op.return_type, dt.Struct):
meta = make_struct_op_meta(op)

df = dd.map_partitions(op.func, *args, meta=meta)
return df
else:
name = args[0].name if len(args) == 1 else None
meta = pandas.Series([], name=name, dtype=op.return_type.to_dask())
df = dd.map_partitions(op.func, *args, meta=meta)

return df
cache[(op, timecontext)] = df

@execute_node.register(
ops.ElementWiseVectorizedUDF, *(itertools.repeat(object, nargs))
)
def execute_udf_node_non_dask(op, *args, **kwargs):
return op.func(*args)
return df

return scope

Expand Down
9 changes: 8 additions & 1 deletion ibis/backends/pandas/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,9 @@ def execute_until_in_scope(
clients=clients,
**kwargs,
)
computed = post_execute_(op, result, timecontext=timecontext)
computed = post_execute_(
op, result, timecontext=timecontext, aggcontext=aggcontext, **kwargs
)
return Scope({op: computed}, timecontext)


Expand All @@ -384,6 +386,7 @@ def main_execute(
scope=None,
timecontext: Optional[TimeContext] = None,
aggcontext=None,
cache=None,
**kwargs,
):
"""Execute an expression against data that are bound to it. If no data
Expand Down Expand Up @@ -429,6 +432,9 @@ def main_execute(
if params is None:
params = {}

if cache is None:
cache = {}

# TODO: make expresions hashable so that we can get rid of these .op()
# calls everywhere
params = {k.op() if hasattr(k, 'op') else k: v for k, v in params.items()}
Expand All @@ -438,6 +444,7 @@ def main_execute(
scope,
timecontext=timecontext,
aggcontext=aggcontext,
cache=cache,
**kwargs,
)

Expand Down
23 changes: 11 additions & 12 deletions ibis/backends/pandas/execution/structs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
from ibis.backends.pandas.dispatch import execute_node


@execute_node.register(ops.StructField, collections.abc.Mapping)
@execute_node.register(
ops.StructField, (collections.abc.Mapping, pd.DataFrame)
)
def execute_node_struct_field_dict(op, data, **kwargs):
return data[op.field]

Expand All @@ -23,25 +25,22 @@ def execute_node_struct_field_none(op, data, **_):
return pd.NA


@execute_node.register(ops.StructField, pd.Series)
def execute_node_struct_field_series(op, data, **kwargs):
field = op.field
return data.map(functools.partial(_safe_getter, field=field)).rename(field)


def _safe_getter(value, field: str):
if pd.isna(value):
return pd.NA
else:
return value[field]


@execute_node.register(ops.StructField, pd.Series)
def execute_node_struct_field_series(op, data, **kwargs):
getter = functools.partial(_safe_getter, field=op.field)
return data.map(getter).rename(op.field)


@execute_node.register(ops.StructField, SeriesGroupBy)
def execute_node_struct_field_series_group_by(op, data, **kwargs):
field = op.field

getter = functools.partial(_safe_getter, field=op.field)
return (
data.obj.map(functools.partial(_safe_getter, field=field))
.rename(field)
.groupby(data.grouper.groupings)
data.obj.map(getter).rename(op.field).groupby(data.grouper.groupings)
)
Loading

0 comments on commit 762d384

Please sign in to comment.