Skip to content

Commit

Permalink
perf(dask): avoid triggering compute for dynamic limit/offset (#8747)
Browse files Browse the repository at this point in the history
  • Loading branch information
mfatihaktas authored Mar 26, 2024
1 parent 7308249 commit b3e27eb
Showing 1 changed file with 25 additions and 8 deletions.
33 changes: 25 additions & 8 deletions ibis/backends/dask/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,23 @@
# ruff: noqa: F811


def limit_df(
df: dd.DataFrame,
col: str,
n: int | pd.DataFrame,
offset: int | pd.DataFrame,
):
if isinstance(offset, pd.DataFrame):
offset = offset.iat[0, 0]
if isinstance(n, pd.DataFrame):
n = n.iat[0, 0]

if n is None:
return df[df[col] >= offset]

return df[df[col].between(offset, offset + n - 1)]


class DaskExecutor(PandasExecutor, DaskUtils):
name = "dask"
kernels = dask_kernels
Expand Down Expand Up @@ -291,17 +308,17 @@ def visit(cls, op: ops.DummyTable, values):

@classmethod
def visit(cls, op: PandasLimit, parent, n, offset):
n = n.compute().iat[0, 0]
offset = offset.compute().iat[0, 0]

name = gen_name("limit")
df = add_globally_consecutive_column(parent, name, set_as_index=False)
if n is None:
df = df[df[name] >= offset]
else:
df = df[df[name].between(offset, offset + n - 1)]

return df.drop(columns=[name])
return df.map_partitions(
limit_df,
col=name,
n=n,
offset=offset,
align_dataframes=False,
meta=df._meta,
).drop(columns=[name])

@classmethod
def visit(cls, op: PandasResetIndex, parent):
Expand Down

0 comments on commit b3e27eb

Please sign in to comment.