Skip to content

Commit

Permalink
refactor(ir): remove node.root_tables() and unify parent table handling
Browse files Browse the repository at this point in the history
BREAKING CHANGE: removed Node.root_tables() method, use ibis.expr.analysis.find_immediate_parent_tables() instead
  • Loading branch information
kszucs committed Sep 16, 2022
1 parent be1cdda commit fbb07c1
Show file tree
Hide file tree
Showing 33 changed files with 130 additions and 232 deletions.
14 changes: 7 additions & 7 deletions ibis/backends/base/sql/alchemy/query_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import sqlalchemy as sa
import sqlalchemy.sql as sql

import ibis.expr.analysis as an
import ibis.expr.operations as ops
import ibis.expr.schema as sch
import ibis.expr.types as ir
Expand Down Expand Up @@ -154,17 +155,16 @@ def _can_lower_sort_column(table_set, expr):
# in the generic SQL compiler that "fuses" the sort with the
# aggregation so they appear in same query. It's generally for
# cosmetics and doesn't really affect query semantics.
bases = {op: op.to_expr() for op in expr.op().root_tables()}
bases = an.find_immediate_parent_tables(expr)
if len(bases) != 1:
return False

base = list(bases.values())[0]
base_op = base.op()
base = bases[0]

if isinstance(base_op, ops.Aggregation):
return base_op.table.equals(table_set)
elif isinstance(base_op, ops.Selection):
return base.equals(table_set)
if isinstance(base, ops.Aggregation):
return base.table.equals(table_set)
elif isinstance(base, ops.Selection):
return base.equals(table_set.op())
else:
return False

Expand Down
4 changes: 3 additions & 1 deletion ibis/backends/base/sql/compiler/select_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ def __init__(self, query, expr):
self.query = query
self.ctx = query.context
self.expr = expr
self.query_roots = frozenset(self.query.table_set.op().root_tables())
self.query_roots = frozenset(
L.find_immediate_parent_tables(self.query.table_set)
)
self.has_foreign_root = False
self.has_query_root = False

Expand Down
3 changes: 2 additions & 1 deletion ibis/backends/base/sql/registry/binary_infix.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from typing import Literal

import ibis.expr.analysis as an
import ibis.expr.types as ir
from ibis.backends.base.sql.registry import helpers

Expand Down Expand Up @@ -81,7 +82,7 @@ def translate(translator, expr):
# TableColumn compilation
and not any(
ctx.is_foreign_expr(leaf.to_expr())
for leaf in right.op().root_tables()
for leaf in an.find_immediate_parent_tables(right)
)
):
if not right.has_name():
Expand Down
8 changes: 4 additions & 4 deletions ibis/backends/dask/execution/selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import pandas
from toolz import concatv

import ibis.expr.analysis as an
import ibis.expr.operations as ops
import ibis.expr.types as ir
from ibis.backends.dask.core import execute
Expand Down Expand Up @@ -44,7 +45,6 @@ def compute_projection_scalar_expr(
name = expr.get_name()
assert name is not None, 'Scalar selection name is None'

op = expr.op()
parent_table_op = parent.table.op()

data_columns = frozenset(data.columns)
Expand All @@ -60,7 +60,7 @@ def compute_projection_scalar_expr(
},
timecontext,
)
for t in op.root_tables()
for t in an.find_immediate_parent_tables(expr)
)
scalar = execute(expr, scope=scope, **kwargs)
return data.assign(**{name: scalar})[name]
Expand Down Expand Up @@ -105,7 +105,7 @@ def compute_projection_column_expr(
},
timecontext,
)
for t in op.root_tables()
for t in an.find_immediate_parent_tables(expr)
)

result = execute(expr, scope=scope, timecontext=timecontext, **kwargs)
Expand Down Expand Up @@ -266,7 +266,7 @@ def _compute_predicates(
# predicates on the result instead of any left or right tables if the
# Selection is on a Join. Project data to only inlude columns from
# the root table.
root_tables = predicate.op().root_tables()
root_tables = an.find_immediate_parent_tables(predicate)

# handle suffixes
data_columns = frozenset(data.columns)
Expand Down
4 changes: 3 additions & 1 deletion ibis/backends/dask/execution/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import ibis.backends.pandas.execution.util as pd_util
import ibis.common.exceptions as com
import ibis.expr.analysis as an
import ibis.expr.operations as ops
import ibis.util
from ibis.backends.dask.core import execute
Expand Down Expand Up @@ -325,7 +326,8 @@ def compute_sort_key(
if scope is None:
scope = Scope()
scope = scope.merge_scopes(
Scope({t: data}, timecontext) for t in by.op().root_tables()
Scope({t: data}, timecontext)
for t in an.find_immediate_parent_tables(by)
)
new_column = execute(by, scope=scope, **kwargs)
new_column.name = name
Expand Down
5 changes: 3 additions & 2 deletions ibis/backends/dask/execution/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import dask.dataframe as dd

import ibis.expr.analysis as an
import ibis.expr.operations as ops
import ibis.expr.window as win
from ibis.backends.dask.core import execute, execute_with_scope
Expand Down Expand Up @@ -111,7 +112,7 @@ def execute_grouped_window_op(
**kwargs,
):
# extract the parent
(root,) = op.root_tables()
root = an.find_first_base_table(op.to_expr())
root_expr = root.to_expr()

root_data = execute(
Expand All @@ -132,7 +133,7 @@ def execute_grouped_window_op(
scope = scope.merge_scopes(
[
Scope({t: grouped_root_data}, timecontext)
for t in op.expr.op().root_tables()
for t in an.find_immediate_parent_tables(op.expr)
],
overwrite=True,
)
Expand Down
3 changes: 2 additions & 1 deletion ibis/backends/datafusion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pyarrow as pa

import ibis.common.exceptions as com
import ibis.expr.analysis as an
import ibis.expr.operations as ops
import ibis.expr.schema as sch
import ibis.expr.types as ir
Expand Down Expand Up @@ -172,7 +173,7 @@ def execute(
table = _to_pyarrow_table(frame)
return table['tmp'].to_pandas()
elif isinstance(expr, ir.Scalar):
if expr.op().root_tables():
if an.find_immediate_parent_tables(expr):
# there are associated datafusion tables so convert the expr
# to a selection which we can directly convert to a datafusion
# plan
Expand Down
14 changes: 14 additions & 0 deletions ibis/backends/datafusion/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,20 @@ def invert(op, _):
return ~arg


@translate.register(ops.And)
def and_(op, _):
left = translate(op.left)
right = translate(op.right)
return left & right


@translate.register(ops.Or)
def or_(op, _):
left = translate(op.left)
right = translate(op.right)
return left | right


@translate.register(ops.Abs)
def abs(op, _):
arg = translate(op.arg)
Expand Down
3 changes: 2 additions & 1 deletion ibis/backends/pandas/execution/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pandas as pd

import ibis.expr.analysis as an
import ibis.expr.operations as ops
from ibis.backends.pandas.core import execute
from ibis.backends.pandas.dispatch import execute_node
Expand All @@ -15,7 +16,7 @@ def _compute_join_column(column_expr, **kwargs):
new_column = column_op.name
else:
new_column = execute(column_expr, **kwargs)
(root_table,) = column_op.root_tables()
root_table, *_ = an.find_immediate_parent_tables(column_expr)
return new_column, root_table


Expand Down
68 changes: 13 additions & 55 deletions ibis/backends/pandas/execution/selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
import functools
import operator
from collections import defaultdict
from operator import methodcaller
from typing import List, Optional

import pandas as pd
from multipledispatch import Dispatcher
from toolz import compose, concat, concatv, unique
from toolz import concatv, first

import ibis.expr.analysis as an
import ibis.expr.operations as ops
import ibis.expr.types as ir
from ibis.backends.pandas.core import execute
Expand Down Expand Up @@ -58,7 +58,6 @@ def compute_projection_scalar_expr(
name = expr.get_name()
assert name is not None, 'Scalar selection name is None'

op = expr.op()
parent_table_op = parent.table.op()

data_columns = frozenset(data.columns)
Expand All @@ -78,7 +77,7 @@ def compute_projection_scalar_expr(
},
timecontext,
)
for t in op.root_tables()
for t in an.find_immediate_parent_tables(expr)
)
scalar = execute(expr, scope=scope, **kwargs)
result = pd.Series([scalar], name=name).repeat(len(data.index))
Expand Down Expand Up @@ -127,7 +126,7 @@ def compute_projection_column_expr(
},
timecontext,
)
for t in op.root_tables()
for t in an.find_immediate_parent_tables(expr)
)

result = coerce_to_output(
Expand Down Expand Up @@ -179,7 +178,9 @@ def remap_overlapping_column_names(table_op, root_table, data_columns):
if not isinstance(table_op, ops.Join):
return None

left_root, right_root = ops.distinct_roots(table_op.left, table_op.right)
left_root, right_root = an.find_immediate_parent_tables(
[table_op.left, table_op.right]
)
suffixes = {
left_root: constants.LEFT_JOIN_SUFFIX,
right_root: constants.RIGHT_JOIN_SUFFIX,
Expand All @@ -201,12 +202,12 @@ def remap_overlapping_column_names(table_op, root_table, data_columns):
({name, f"{name}{suffix}"} & data_columns, name)
for name in root_table.schema.names
]

return {
col_names.pop(): final_name
for col_names, final_name in column_names
if col_names
mapping = {
first(col_name): final_name
for col_name, final_name in column_names
if col_name
}
return mapping


def map_new_column_names_to_data(mapping, df):
Expand Down Expand Up @@ -249,7 +250,7 @@ def _compute_predicates(
# predicates on the result instead of any left or right tables if the
# Selection is on a Join. Project data to only inlude columns from
# the root table.
root_tables = predicate.op().root_tables()
root_tables = an.find_immediate_parent_tables(predicate)

# handle suffixes
data_columns = frozenset(data.columns)
Expand All @@ -268,49 +269,6 @@ def _compute_predicates(
yield execute(predicate, scope=scope, **kwargs)


physical_tables = Dispatcher(
'physical_tables',
doc="""\
Return the underlying physical tables nodes of a
:class:`~ibis.expr.types.Node`.
Parameters
----------
op : ops.Node
Returns
-------
tables : List[ops.Node]
""",
)


@physical_tables.register(ops.Selection)
def physical_tables_selection(sel):
return physical_tables(sel.table.op())


@physical_tables.register(ops.PhysicalTable)
def physical_tables_physical_table(t):
# Base case. PhysicalTable nodes are their own root physical tables.
return [t]


@physical_tables.register(ops.Join)
def physical_tables_join(join):
# Physical roots of Join nodes are the unique physical roots of their
# left and right TableNodes.
func = compose(physical_tables, methodcaller('op'))
return list(unique(concat(map(func, (join.left, join.right)))))


@physical_tables.register(ops.Node)
def physical_tables_node(node):
# Iterative case. Any other Node's physical roots are the unique physical
# roots of that Node's root tables.
return list(unique(concat(map(physical_tables, node.root_tables()))))


def build_df_from_selection(
selection_exprs: List[ir.Column],
data: pd.DataFrame,
Expand Down
10 changes: 7 additions & 3 deletions ibis/backends/pandas/execution/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import toolz

import ibis.common.exceptions as com
import ibis.expr.analysis as an
import ibis.util
from ibis.backends.pandas.core import execute
from ibis.backends.pandas.execution import constants
Expand All @@ -16,8 +17,10 @@


def get_join_suffix_for_op(op: ops.TableColumn, join_op: ops.Join):
(root_table,) = op.root_tables()
left_root, right_root = ops.distinct_roots(join_op.left, join_op.right)
(root_table,) = an.find_immediate_parent_tables(op.to_expr())
left_root, right_root = an.find_immediate_parent_tables(
[join_op.left, join_op.right]
)
return {
left_root: constants.LEFT_JOIN_SUFFIX,
right_root: constants.RIGHT_JOIN_SUFFIX,
Expand All @@ -34,7 +37,8 @@ def compute_sort_key(key, data, timecontext, scope=None, **kwargs):
if scope is None:
scope = Scope()
scope = scope.merge_scopes(
Scope({t: data}, timecontext) for t in by.op().root_tables()
Scope({t: data}, timecontext)
for t in an.find_immediate_parent_tables(by)
)
new_column = execute(by, scope=scope, **kwargs)
name = ibis.util.guid()
Expand Down
6 changes: 3 additions & 3 deletions ibis/backends/pandas/execution/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pandas.core.groupby import SeriesGroupBy

import ibis.common.exceptions as com
import ibis.expr.analysis as an
import ibis.expr.datatypes as dt
import ibis.expr.operations as ops
import ibis.expr.window as win
Expand Down Expand Up @@ -301,8 +302,7 @@ def execute_window_op(
scope = pre_executed_scope
else:
scope = scope.merge_scope(pre_executed_scope)
(root,) = op.root_tables()
root_expr = root.to_expr()
root_expr = an.find_first_base_table(op.to_expr()).to_expr()

data = execute(
root_expr,
Expand Down Expand Up @@ -384,7 +384,7 @@ def execute_window_op(
new_scope = scope.merge_scopes(
[
Scope({t: source}, adjusted_timecontext)
for t in operand.op().root_tables()
for t in an.find_immediate_parent_tables(operand)
],
overwrite=True,
)
Expand Down
Loading

0 comments on commit fbb07c1

Please sign in to comment.