Skip to content

Commit

Permalink
feat(flink): implement array operators
Browse files Browse the repository at this point in the history
  • Loading branch information
mfatihaktas committed Jan 22, 2024
1 parent 141edea commit 266e8fb
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 39 deletions.
49 changes: 48 additions & 1 deletion ibis/backends/flink/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,21 @@ def _floor_divide(translator: ExprTranslator, op: ops.Node) -> str:
return f"FLOOR(({left}) / ({right}))"


def _array(translator: ExprTranslator, op: ops.arrays.Array) -> str:
return "ARRAY[{}]".format(", ".join(map(translator.translate, op.exprs)))

Check warning on line 286 in ibis/backends/flink/registry.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/registry.py#L286

Added line #L286 was not covered by tests


def _array_contains(translator: ExprTranslator, op: ops.arrays.ArrayContains) -> str:
arg = translator.translate(op.arg)
other = translator.translate(op.other)
return f"ARRAY_CONTAINS({arg}, {other})"

Check warning on line 292 in ibis/backends/flink/registry.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/registry.py#L290-L292

Added lines #L290 - L292 were not covered by tests


def _array_distinct(translator: ExprTranslator, op: ops.arrays.ArrayDistinct) -> str:
arg = translator.translate(op.arg)
return f"ARRAY_DISTINCT({arg})"

Check warning on line 297 in ibis/backends/flink/registry.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/registry.py#L296-L297

Added lines #L296 - L297 were not covered by tests


def _array_index(translator: ExprTranslator, op: ops.arrays.ArrayIndex):
table_column = op.arg
index = op.index
Expand All @@ -296,6 +311,31 @@ def _array_length(translator: ExprTranslator, op: ops.arrays.ArrayLength) -> str
return f"CARDINALITY({translator.translate(op.arg)})"


def _array_position(translator: ExprTranslator, op: ops.arrays.ArrayPosition) -> str:
arg = translator.translate(op.arg)
other = translator.translate(op.other)
return f"ARRAY_POSITION({arg}, {other}) - 1"

Check warning on line 317 in ibis/backends/flink/registry.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/registry.py#L315-L317

Added lines #L315 - L317 were not covered by tests


def _array_slice(translator: ExprTranslator, op: ops.arrays.ArraySlice) -> str:
array = translator.translate(op.arg)
start = op.start.value

Check warning on line 322 in ibis/backends/flink/registry.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/registry.py#L321-L322

Added lines #L321 - L322 were not covered by tests
# Note (mehmet): The offsets are 1-based for ARRAY_SLICE.
# Ref: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions
if start >= 0:
start += 1

Check warning on line 326 in ibis/backends/flink/registry.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/registry.py#L326

Added line #L326 was not covered by tests

if op.stop is None:
return f"ARRAY_SLICE({array}, {start})"

Check warning on line 329 in ibis/backends/flink/registry.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/registry.py#L329

Added line #L329 was not covered by tests

stop = op.stop.value

Check warning on line 331 in ibis/backends/flink/registry.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/registry.py#L331

Added line #L331 was not covered by tests
if stop >= 0:
return f"ARRAY_SLICE({array}, {start}, {stop})"

Check warning on line 333 in ibis/backends/flink/registry.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/registry.py#L333

Added line #L333 was not covered by tests
else:
# Note (mehmet): To imitate the behavior of pandas array slicing.
return f"ARRAY_SLICE({array}, {start}, CARDINALITY({array}) - {abs(stop)})"

Check warning on line 336 in ibis/backends/flink/registry.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/registry.py#L336

Added line #L336 was not covered by tests


def _json_get_item(translator: ExprTranslator, op: ops.json.JSONGetItem) -> str:
arg_translated = translator.translate(op.arg)
if op.index.dtype.is_integer():
Expand Down Expand Up @@ -532,9 +572,16 @@ def _struct_field(translator, op):
# Binary operations
ops.Power: fixed_arity("power", 2),
ops.FloorDivide: _floor_divide,
# Collection functions
# Collection operations
ops.Array: _array,
ops.ArrayContains: _array_contains,
ops.ArrayDistinct: _array_distinct,
ops.ArrayIndex: _array_index,
ops.ArrayLength: _array_length,
ops.ArrayPosition: _array_position,
ops.ArrayRemove: fixed_arity("ARRAY_REMOVE", 2),
ops.ArraySlice: _array_slice,
ops.ArrayUnion: fixed_arity("ARRAY_UNION", 2),
ops.JSONGetItem: _json_get_item,
ops.Map: _map,
ops.MapGet: _map_get,
Expand Down
3 changes: 2 additions & 1 deletion ibis/backends/flink/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@ def connect(*, tmpdir, worker_id, **kw: Any):
def _load_data(self, **_: Any) -> None:
import pandas as pd

from ibis.backends.tests.data import json_types, struct_types, win
from ibis.backends.tests.data import array_types, json_types, struct_types, win

Check warning on line 55 in ibis/backends/flink/tests/conftest.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/tests/conftest.py#L55

Added line #L55 was not covered by tests

for table_name in TEST_TABLES:
path = self.data_dir / "parquet" / f"{table_name}.parquet"
self.connection.create_table(table_name, pd.read_parquet(path), temp=True)

self.connection.create_table("array_types", array_types, temp=True)

Check warning on line 61 in ibis/backends/flink/tests/conftest.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/tests/conftest.py#L61

Added line #L61 was not covered by tests
self.connection.create_table("json_t", json_types, temp=True)
self.connection.create_table("struct", struct_types, temp=True)
self.connection.create_table("win", win, temp=True)
Expand Down
Loading

0 comments on commit 266e8fb

Please sign in to comment.