Skip to content

Commit

Permalink
fix(context-adjustment): pass scope when calling adjust_context in py…
Browse files Browse the repository at this point in the history
…spark backend

The Pandas and Dask backends always pass scope to adjust_context, so
that adjust_context can potentially make use of extra information in
scope. The PySpark backend was not passing scope to adjust_context,
which can lead to behavior differences or errors across backends.

fixes #3108
  • Loading branch information
Timothy Dijamco authored and cpcloud committed Jan 19, 2022
1 parent c868783 commit 33aad7b
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 1 deletion.
2 changes: 1 addition & 1 deletion ibis/backends/pyspark/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def compile_selection(t, expr, scope, timecontext, **kwargs):
# time context among child nodes, and pass this as context to
# source table to get all data within time context loaded.
arg_timecontexts = [
adjust_context(node.op(), timecontext)
adjust_context(node.op(), timecontext, scope=scope)
for node in op.selections
if timecontext
]
Expand Down
46 changes: 46 additions & 0 deletions ibis/backends/pyspark/tests/test_timecontext.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
from typing import Optional

import pandas as pd
import pandas.testing as tm
import pytest

import ibis
import ibis.expr.operations as ops
from ibis.backends.pyspark.timecontext import combine_time_context
from ibis.expr.scope import Scope
from ibis.expr.timecontext import adjust_context
from ibis.expr.types import TimeContext


def test_table_with_timecontext(client):
Expand Down Expand Up @@ -55,3 +62,42 @@ def test_table_with_timecontext(client):
)
def test_combine_time_context(contexts, expected):
assert combine_time_context(contexts) == expected


def test_adjust_context_scope(client):
"""Test that `adjust_context` has access to `scope` by default."""
table = client.table('time_indexed_table')

# WindowOp is the only context-adjusted node that the PySpark backend
# can compile.
# Override its `adjust_context` function with a function that imply
# checks that `scope` has been passed in:

@adjust_context.register(ops.WindowOp)
def adjust_context_window_check_scope(
op: ops.WindowOp,
timecontext: TimeContext,
scope: Optional[Scope] = None,
) -> TimeContext:
"""Confirms that `scope` is passed in."""
assert scope is not None
return timecontext

# Do an operation that will trigger context adjustment
# on a WindowOp
expr = table.mutate(
win=(
table['value']
.count()
.over(
ibis.window(
ibis.interval(hours=1),
0,
order_by='time',
group_by='key',
)
)
)
)
context = (pd.Timestamp('20170105'), pd.Timestamp('20170111'))
expr.execute(timecontext=context)

0 comments on commit 33aad7b

Please sign in to comment.