Skip to content

Commit

Permalink
perf: use assign instead of concat in projections when possible
Browse files Browse the repository at this point in the history
  • Loading branch information
gerrymanoim authored and cpcloud committed Feb 11, 2022
1 parent 565f42f commit 985c242
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 0 deletions.
15 changes: 15 additions & 0 deletions ibis/backends/dask/execution/selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
add_partitioned_sorted_column,
coerce_to_output,
compute_sorted_frame,
is_row_order_preserving,
)


Expand Down Expand Up @@ -130,7 +131,21 @@ def build_df_from_projection(
Build up a df from individual pieces by dispatching to `compute_projection`
for each expression.
"""
# Fast path for when we're assigning columns into the same table.
if (selection_exprs[0] is op.table) and all(
is_row_order_preserving(selection_exprs[1:])
):
for expr in selection_exprs[1:]:
projection = compute_projection(expr, op, data, **kwargs)
if isinstance(projection, dd.Series):
data = data.assign(**{projection.name: projection})
else:
data = data.assign(
**{c: projection[c] for c in projection.columns}
)
return data

# Slow path when we cannot do direct assigns
# Create a unique row identifier and set it as the index. This is
# used in dd.concat to merge the pieces back together.
data = add_partitioned_sorted_column(data)
Expand Down
19 changes: 19 additions & 0 deletions ibis/backends/dask/execution/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from ibis.backends.pandas.client import ibis_dtype_to_pandas
from ibis.backends.pandas.trace import TraceTwoLevelDispatcher
from ibis.expr import datatypes as dt
from ibis.expr import lineage as lin
from ibis.expr import types as ir
from ibis.expr.scope import Scope
from ibis.expr.typing import TimeContext
Expand Down Expand Up @@ -436,3 +437,21 @@ def helper(
df = df.set_index(col_name, sorted=True, divisions=divisions)

return df


def is_row_order_preserving(exprs) -> bool:
"""Detects if the operation preserves row ordering.
Certain operations we know will not affect the ordering of rows in the
dataframe (for example elementwise operations on ungrouped dataframes).
In these cases we may be able to avoid expensive joins and assign directly
into the parent dataframe.
"""

def _is_row_order_preserving(expr: ir.Expr):
if isinstance(expr.op(), (ops.Reduction, ops.WindowOp)):
return (lin.halt, False)
else:
return (lin.proceed, True)

return lin.traverse(_is_row_order_preserving, exprs)

0 comments on commit 985c242

Please sign in to comment.