Skip to content

Commit

Permalink
feat(api): implement negative slice indexing
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud authored and jcrist committed Aug 23, 2023
1 parent af186a8 commit caee5c1
Show file tree
Hide file tree
Showing 25 changed files with 584 additions and 98 deletions.
39 changes: 37 additions & 2 deletions ibis/backends/base/sql/alchemy/query_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,32 @@ def _add_limit(self, fragment):
if self.limit is None:
return fragment

fragment = fragment.limit(self.limit.n)
if offset := self.limit.offset:
frag = fragment

n = self.limit.n

if n is None:
n = self.context.compiler.null_limit
elif not isinstance(n, int):
n = (
sa.select(self._translate(n))
.select_from(frag.subquery())
.scalar_subquery()
)

if n is not None:
fragment = fragment.limit(n)

offset = self.limit.offset

if not isinstance(offset, int):
offset = (
sa.select(self._translate(offset))
.select_from(frag.subquery())
.scalar_subquery()
)

if offset != 0 and n != 0:
fragment = fragment.offset(offset)
return fragment

Expand Down Expand Up @@ -393,6 +417,17 @@ class AlchemyCompiler(Compiler):

supports_indexed_grouping_keys = True

# Value to use when the user specified `n` from the `limit` API is
# `None`.
#
# For some backends this is:
# * the identifier ALL (sa.literal_column('ALL'))
# * a NULL literal (sa.null())
#
# and some don't accept an unbounded limit at all: the `LIMIT`
# keyword must simply be left out of the query
null_limit = sa.null()

@classmethod
def to_sql(cls, expr, context=None, params=None, exists=False):
if context is None:
Expand Down
54 changes: 29 additions & 25 deletions ibis/backends/base/sql/compiler/query_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import ibis.expr.types as ir
from ibis import util
from ibis.backends.base.sql.compiler.base import DML, QueryAST, SetOp
from ibis.backends.base.sql.compiler.select_builder import SelectBuilder, _LimitSpec
from ibis.backends.base.sql.compiler.select_builder import SelectBuilder
from ibis.backends.base.sql.compiler.translator import ExprTranslator, QueryContext
from ibis.backends.base.sql.registry import quote_identifier
from ibis.common.grounds import Comparable
Expand Down Expand Up @@ -422,14 +422,27 @@ def format_order_by(self):
return buf.getvalue()

def format_limit(self):
if not self.limit:
if self.limit is None:
return None

buf = StringIO()

n = self.limit.n
buf.write(f"LIMIT {n}")
if offset := self.limit.offset:

if n is None:
n = self.context.compiler.null_limit
elif not isinstance(n, int):
n = f"(SELECT {self._translate(n)} {self.format_table_set()})"

if n is not None:
buf.write(f"LIMIT {n}")

offset = self.limit.offset

if not isinstance(offset, int):
offset = f"(SELECT {self._translate(offset)} {self.format_table_set()})"

if offset != 0 and n != 0:
buf.write(f" OFFSET {offset}")

return buf.getvalue()
Expand Down Expand Up @@ -501,6 +514,7 @@ class Compiler:

cheap_in_memory_tables = False
support_values_syntax_in_select = True
null_limit = None

@classmethod
def make_context(cls, params=None):
Expand Down Expand Up @@ -555,27 +569,17 @@ def to_ast(cls, node, context=None):
@classmethod
def to_ast_ensure_limit(cls, expr, limit=None, params=None):
context = cls.make_context(params=params)
query_ast = cls.to_ast(expr, context)

# note: limit can still be None at this point, if the global
# default_limit is None
for query in reversed(query_ast.queries):
if (
isinstance(query, Select)
and not isinstance(expr, ir.Scalar)
and query.table_set is not None
):
if query.limit is None:
if limit == "default":
query_limit = options.sql.default_limit
else:
query_limit = limit
if query_limit:
query.limit = _LimitSpec(query_limit, offset=0)
elif limit is not None and limit != "default":
query.limit = _LimitSpec(limit, query.limit.offset)

return query_ast
table = expr.as_table()

if limit == "default":
query_limit = options.sql.default_limit
else:
query_limit = limit

if query_limit is not None:
table = table.limit(query_limit)

return cls.to_ast(table, context)

@classmethod
def to_sql(cls, node, context=None, params=None):
Expand Down
26 changes: 10 additions & 16 deletions ibis/backends/base/sql/compiler/select_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@


class _LimitSpec(NamedTuple):
n: int
offset: int
n: ops.Value | int | None
offset: ops.Value | int = 0


class SelectBuilder:
Expand Down Expand Up @@ -182,21 +182,15 @@ def _collect_FillNa(self, op, toplevel=False):
self._collect(new_op, toplevel=toplevel)

def _collect_Limit(self, op, toplevel=False):
if not toplevel:
return

n = op.n
offset = op.offset or 0

if self.limit is None:
self.limit = _LimitSpec(n, offset)
else:
self.limit = _LimitSpec(
min(n, self.limit.n),
offset + self.limit.offset,
)
if toplevel:
if isinstance(table := op.table, ops.Limit):
self.table_set = table
self.select_set = [table]
else:
self._collect(table, toplevel=toplevel)

self._collect(op.table, toplevel=toplevel)
assert self.limit is None
self.limit = _LimitSpec(op.n, op.offset)

def _collect_Union(self, op, toplevel=False):
if toplevel:
Expand Down
1 change: 1 addition & 0 deletions ibis/backends/bigquery/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ class BigQueryCompiler(sql_compiler.Compiler):
difference_class = BigQueryDifference

support_values_syntax_in_select = False
null_limit = None

@staticmethod
def _generate_setup_queries(expr, context):
Expand Down
18 changes: 13 additions & 5 deletions ibis/backends/clickhouse/compiler/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,20 @@ def _set_op(op: ops.SetOp, *, left, right, **_):

@translate_rel.register
def _limit(op: ops.Limit, *, table, **kw):
n = op.n
limited = sg.select("*").from_(table).limit(n)
result = sg.select("*").from_(table)

if offset := op.offset:
limited = limited.offset(offset)
return limited
if (limit := op.n) is not None:
if not isinstance(limit, int):
limit = f"(SELECT {translate_val(limit, **kw)} FROM {table})"
result = result.limit(limit)

if not isinstance(offset := op.offset, int):
offset = f"(SELECT {translate_val(offset, **kw)} FROM {table})"

if offset != 0:
return result.offset(offset)
else:
return result


@translate_rel.register
Expand Down
12 changes: 12 additions & 0 deletions ibis/backends/dask/execution/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,12 +348,24 @@ def execute_cast_series_date(op, data, type, **kwargs):
def execute_limit_frame(op, data, nrows, offset, **kwargs):
# NOTE: Dask Dataframes do not support iloc row based indexing
# Need to add a globally consecutive index in order to select nrows number of rows
if nrows == 0:
return dd.from_pandas(
pd.DataFrame(columns=data.columns).astype(data.dtypes), npartitions=1
)
unique_col_name = ibis.util.guid()
df = add_globally_consecutive_column(data, col_name=unique_col_name)
ret = df.loc[offset : (offset + nrows) - 1]
return rename_index(ret, None)


@execute_node.register(ops.Limit, dd.DataFrame, type(None), integer_types)
def execute_limit_frame_no_limit(op, data, nrows, offset, **kwargs):
unique_col_name = ibis.util.guid()
df = add_globally_consecutive_column(data, col_name=unique_col_name)
ret = df.loc[offset : (offset + len(df)) - 1]
return rename_index(ret, None)


@execute_node.register(ops.Not, (dd.core.Scalar, dd.Series))
def execute_not_scalar_or_series(op, data, **kwargs):
return ~data
Expand Down
14 changes: 11 additions & 3 deletions ibis/backends/datafusion/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,17 @@ def selection(op, **kw):

@translate.register(ops.Limit)
def limit(op, **kw):
if op.offset:
raise NotImplementedError("DataFusion does not support offset")
return translate(op.table, **kw).limit(op.n)
if (n := op.n) is not None and not isinstance(n, int):
raise NotImplementedError("Dynamic limit not supported")

if not isinstance(offset := op.offset, int) or (offset != 0 and n != 0):
raise NotImplementedError("Dynamic offset not supported")

t = translate(op.table, **kw)

if n is not None:
return t.limit(n)
return t


@translate.register(ops.Aggregation)
Expand Down
1 change: 1 addition & 0 deletions ibis/backends/druid/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ def translate(self, op):

class DruidCompiler(AlchemyCompiler):
translator_class = DruidExprTranslator
null_limit = sa.literal_column("ALL")
1 change: 1 addition & 0 deletions ibis/backends/mssql/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ class MsSqlCompiler(AlchemyCompiler):
translator_class = MsSqlExprTranslator

supports_indexed_grouping_keys = False
null_limit = None
1 change: 1 addition & 0 deletions ibis/backends/mysql/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ class MySQLExprTranslator(AlchemyExprTranslator):
class MySQLCompiler(AlchemyCompiler):
translator_class = MySQLExprTranslator
support_values_syntax_in_select = False
null_limit = None
1 change: 1 addition & 0 deletions ibis/backends/oracle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class OracleCompiler(AlchemyCompiler):
translator_class = OracleExprTranslator
support_values_syntax_in_select = False
supports_indexed_grouping_keys = False
null_limit = None


class Backend(BaseAlchemyBackend):
Expand Down
7 changes: 6 additions & 1 deletion ibis/backends/pandas/execution/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,15 @@ def execute_interval_literal(op, value, dtype, **kwargs):


@execute_node.register(ops.Limit, pd.DataFrame, integer_types, integer_types)
def execute_limit_frame(op, data, nrows, offset, **kwargs):
def execute_limit_frame(op, data, nrows: int, offset: int, **kwargs):
return data.iloc[offset : offset + nrows]


@execute_node.register(ops.Limit, pd.DataFrame, type(None), integer_types)
def execute_limit_frame_no_limit(op, data, nrows: None, offset: int, **kwargs):
return data.iloc[offset:]


@execute_node.register(ops.Cast, SeriesGroupBy, dt.DataType)
def execute_cast_series_group_by(op, data, type, **kwargs):
result = execute_cast_series_generic(op, data.obj, type, **kwargs)
Expand Down
10 changes: 7 additions & 3 deletions ibis/backends/polars/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,14 @@ def selection(op, **kw):

@translate.register(ops.Limit)
def limit(op, **kw):
if (n := op.n) is not None and not isinstance(n, int):
raise NotImplementedError("Dynamic limit not supported")

if not isinstance(offset := op.offset, int):
raise NotImplementedError("Dynamic offset not supported")

lf = translate(op.table, **kw)
if op.offset:
return lf.slice(op.offset, op.n)
return lf.limit(op.n)
return lf.slice(offset, n)


@translate.register(ops.Aggregation)
Expand Down
20 changes: 16 additions & 4 deletions ibis/backends/pyspark/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,13 +272,25 @@ def compile_cast(t, op, **kwargs):

@compiles(ops.Limit)
def compile_limit(t, op, **kwargs):
if op.offset != 0:
if (n := op.n) is not None and not isinstance(n, int):
raise com.UnsupportedArgumentError(
"PySpark backend does not support non-zero offset is for "
f"limit operation. Got offset {op.offset}."
"Dynamic LIMIT is not implemented upstream in PySpark"
)
if not isinstance(offset := op.offset, int):
raise com.UnsupportedArgumentError(
"Dynamic OFFSET is not implemented upstream in PySpark"
)
if n != 0 and offset != 0:
raise com.UnsupportedArgumentError(
"PySpark backend does not support non-zero offset values for "
f"the limit operation. Got offset {offset:d}."
)
df = t.translate(op.table, **kwargs)
return df.limit(op.n)

if n is not None:
return df.limit(n)
else:
return df


@compiles(ops.And)
Expand Down
1 change: 1 addition & 0 deletions ibis/backends/sqlite/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ class SQLiteExprTranslator(AlchemyExprTranslator):
class SQLiteCompiler(AlchemyCompiler):
translator_class = SQLiteExprTranslator
support_values_syntax_in_select = False
null_limit = None
Loading

0 comments on commit caee5c1

Please sign in to comment.