diff --git a/ibis/backends/flink/registry.py b/ibis/backends/flink/registry.py index aa15829d5509a..83b91cdbedac2 100644 --- a/ibis/backends/flink/registry.py +++ b/ibis/backends/flink/registry.py @@ -255,6 +255,21 @@ def _floor_divide(translator: ExprTranslator, op: ops.Node) -> str: return f"FLOOR(({left}) / ({right}))" +def _array_column(translator: ExprTranslator, op: ops.arrays.ArrayColumn) -> str: + return "ARRAY[{}]".format(", ".join(map(translator.translate, op.cols))) + + +def _array_contains(translator: ExprTranslator, op: ops.arrays.ArrayContains) -> str: + arg = translator.translate(op.arg) + other = translator.translate(op.other) + return f"ARRAY_CONTAINS({arg}, {other})" + + +def _array_distinct(translator: ExprTranslator, op: ops.arrays.ArrayDistinct) -> str: + arg = translator.translate(op.arg) + return f"ARRAY_DISTINCT({arg})" + + def _array_index(translator: ExprTranslator, op: ops.arrays.ArrayIndex): table_column = op.arg index = op.index @@ -269,6 +284,31 @@ def _array_length(translator: ExprTranslator, op: ops.arrays.ArrayLength) -> str return f"CARDINALITY({translator.translate(op.arg)})" +def _array_position(translator: ExprTranslator, op: ops.arrays.ArrayPosition) -> str: + arg = translator.translate(op.arg) + other = translator.translate(op.other) + return f"ARRAY_POSITION({arg}, {other}) - 1" + + +def _array_slice(translator: ExprTranslator, op: ops.arrays.ArraySlice) -> str: + array = translator.translate(op.arg) + start = op.start.value + # Note (mehmet): The offsets are 1-based for ARRAY_SLICE. + # Ref: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions + if start >= 0: + start += 1 + + if op.stop is None: + return f"ARRAY_SLICE({array}, {start})" + + stop = op.stop.value + if stop >= 0: + return f"ARRAY_SLICE({array}, {start}, {stop})" + else: + # Note (mehmet): To imitate the behavior of pandas array slicing. + return f"ARRAY_SLICE({array}, {start}, CARDINALITY({array}) - {abs(stop)})" + + def _json_get_item(translator: ExprTranslator, op: ops.json.JSONGetItem) -> str: arg_translated = translator.translate(op.arg) if op.index.dtype.is_integer(): @@ -442,9 +482,16 @@ def _struct_field(translator, op): # Binary operations ops.Power: fixed_arity("power", 2), ops.FloorDivide: _floor_divide, - # Collection functions + # Collection operations + ops.ArrayColumn: _array_column, + ops.ArrayContains: _array_contains, + ops.ArrayDistinct: _array_distinct, ops.ArrayIndex: _array_index, ops.ArrayLength: _array_length, + ops.ArrayPosition: _array_position, + ops.ArrayRemove: fixed_arity("ARRAY_REMOVE", 2), + ops.ArraySlice: _array_slice, + ops.ArrayUnion: fixed_arity("ARRAY_UNION", 2), ops.JSONGetItem: _json_get_item, ops.Map: _map, ops.MapGet: _map_get, diff --git a/ibis/backends/flink/tests/conftest.py b/ibis/backends/flink/tests/conftest.py index 3977093e615bc..2d817f7cc84ff 100644 --- a/ibis/backends/flink/tests/conftest.py +++ b/ibis/backends/flink/tests/conftest.py @@ -49,7 +49,7 @@ def connect(*, tmpdir, worker_id, **kw: Any): def _load_data(self, **_: Any) -> None: import pandas as pd - from ibis.backends.tests.data import json_types, struct_types + from ibis.backends.tests.data import array_types, json_types, struct_types for table_name in TEST_TABLES: path = self.data_dir / "parquet" / f"{table_name}.parquet" @@ -57,6 +57,7 @@ def _load_data(self, **_: Any) -> None: self.connection.create_table("json_t", json_types, temp=True) self.connection.create_table("struct", struct_types, temp=True) + self.connection.create_table("array_types", array_types, temp=True) class TestConfForStreaming(TestConf): diff --git a/ibis/backends/tests/test_array.py b/ibis/backends/tests/test_array.py index 8e91d22c49fc5..2cbde2a2c076c 100644 --- a/ibis/backends/tests/test_array.py +++ b/ibis/backends/tests/test_array.py @@ -40,7 +40,6 @@ # list. -@pytest.mark.notimpl(["flink"], raises=com.OperationNotDefinedError) def test_array_column(backend, alltypes, df): expr = ibis.array([alltypes["double_col"], alltypes["double_col"]]) assert isinstance(expr, ir.ArrayColumn) @@ -78,7 +77,7 @@ def test_array_scalar(con, backend): assert con.execute(expr.typeof()) == ARRAY_BACKEND_TYPES[backend_name] -@pytest.mark.notimpl(["polars", "flink"], raises=com.OperationNotDefinedError) +@pytest.mark.notimpl(["flink", "polars"], raises=com.OperationNotDefinedError) def test_array_repeat(con): expr = ibis.array([1.0, 2.0]) * 2 @@ -178,8 +177,6 @@ def test_array_index(con, idx): pytest.mark.never( ["sqlite"], reason="array types are unsupported", raises=NotImplementedError ), - # someone needs to implement these - pytest.mark.notimpl(["flink"], raises=Exception), ) @@ -219,7 +216,7 @@ def test_array_discovery(backend): raises=GoogleBadRequest, ) @pytest.mark.notimpl(["dask"], raises=ValueError) -@pytest.mark.notimpl(["datafusion"], raises=com.OperationNotDefinedError) +@pytest.mark.notimpl(["datafusion", "flink"], raises=com.OperationNotDefinedError) def test_unnest_simple(backend): array_types = backend.array_types expected = ( @@ -236,7 +233,7 @@ def test_unnest_simple(backend): @builtin_array @pytest.mark.notimpl("dask", raises=com.OperationNotDefinedError) -@pytest.mark.notimpl(["datafusion"], raises=com.OperationNotDefinedError) +@pytest.mark.notimpl(["datafusion", "flink"], raises=com.OperationNotDefinedError) def test_unnest_complex(backend): array_types = backend.array_types df = array_types.execute() @@ -274,7 +271,7 @@ def test_unnest_complex(backend): raises=AssertionError, ) @pytest.mark.notimpl(["dask"], raises=ValueError) -@pytest.mark.notimpl(["datafusion"], raises=com.OperationNotDefinedError) +@pytest.mark.notimpl(["datafusion", "flink"], raises=com.OperationNotDefinedError) def test_unnest_idempotent(backend): array_types = backend.array_types df = array_types.execute() @@ -295,7 +292,7 @@ def test_unnest_idempotent(backend): @builtin_array @pytest.mark.notimpl("dask", raises=ValueError) -@pytest.mark.notimpl(["datafusion"], raises=com.OperationNotDefinedError) +@pytest.mark.notimpl(["datafusion", "flink"], raises=com.OperationNotDefinedError) def test_unnest_no_nulls(backend): array_types = backend.array_types df = array_types.execute() @@ -322,7 +319,7 @@ def test_unnest_no_nulls(backend): @builtin_array @pytest.mark.notimpl("dask", raises=ValueError) -@pytest.mark.notimpl(["datafusion"], raises=com.OperationNotDefinedError) +@pytest.mark.notimpl(["datafusion", "flink"], raises=com.OperationNotDefinedError) def test_unnest_default_name(backend): array_types = backend.array_types df = array_types.execute() @@ -349,13 +346,27 @@ def test_unnest_default_name(backend): (None, None), (3, None), (-3, None), - (None, -3), (-3, -1), + param( + None, + -3, + marks=[ + pytest.mark.notyet( + ["flink"], + raises=AssertionError, + reason=( + "ArraySlice in Flink behaves unexpectedly when" + "`start` is None and `stop` is negative." + ), + ) + ], + id="nulls", + ), ], ) @pytest.mark.notimpl(["polars"], raises=com.OperationNotDefinedError) @pytest.mark.notimpl( - ["datafusion", "flink"], raises=Exception, reason="array_types table isn't defined" + ["datafusion"], raises=Exception, reason="array_types table isn't defined" ) @pytest.mark.notimpl(["dask"], raises=com.OperationNotDefinedError) def test_array_slice(backend, start, stop): @@ -370,7 +381,16 @@ def test_array_slice(backend, start, stop): @builtin_array @pytest.mark.notimpl( - ["datafusion", "impala", "mssql", "polars", "snowflake", "sqlite", "mysql"], + [ + "datafusion", + "flink", + "impala", + "mssql", + "polars", + "snowflake", + "sqlite", + "mysql", + ], raises=com.OperationNotDefinedError, ) @pytest.mark.notimpl( @@ -416,7 +436,17 @@ def test_array_map(backend, con, input, output): @builtin_array @pytest.mark.notimpl( - ["dask", "datafusion", "impala", "mssql", "pandas", "polars", "snowflake", "mysql"], + [ + "dask", + "datafusion", + "flink", + "impala", + "mssql", + "pandas", + "polars", + "snowflake", + "mysql", + ], raises=com.OperationNotDefinedError, ) @pytest.mark.notimpl( @@ -465,6 +495,11 @@ def test_array_filter(backend, con, input, output): ) @pytest.mark.notimpl(["dask"], raises=com.OperationNotDefinedError) @pytest.mark.never(["impala"], reason="array_types table isn't defined") +@pytest.mark.broken( + ["flink"], + raises=Py4JJavaError, + reason="Caused by: java.lang.NullPointerException", +) def test_array_contains(backend, con): t = backend.array_types expr = t.x.contains(1) @@ -473,19 +508,50 @@ def test_array_contains(backend, con): backend.assert_series_equal(result, expected, check_names=False) +@pytest.mark.parametrize( + ("a", "expected_array"), + [ + param( + [[1], [], [42, 42], []], + [-1, -1, 0, -1], + id="including-empty-array", + marks=[ + pytest.mark.notyet( + ["flink"], + raises=Py4JJavaError, + reason=( + "SQL validation failed; Flink does not seem to support ARRAY[]" + ), + ), + pytest.mark.broken( + ["datafusion"], + raises=Exception, + reason="Internal error: start_from index out of bounds", + ), + ], + ), + param( + [[1], [1], [42, 42], [1]], + [-1, -1, 0, -1], + id="all-non-empty-arrays", + ), + param( + [[1], [1, 42], [42, 42, 42], [42, 1]], + [-1, 1, 0, 0], + id="all-non-empty-arrays-2", + ), + ], +) @builtin_array @pytest.mark.notimpl( ["dask", "impala", "mssql", "pandas", "polars"], raises=com.OperationNotDefinedError, ) -@pytest.mark.broken( - ["datafusion"], reason="internal error as of 34.0.0", raises=Exception -) -def test_array_position(backend, con): - t = ibis.memtable({"a": [[1], [], [42, 42], []]}) +def test_array_position(backend, con, a, expected_array): + t = ibis.memtable({"a": a}) expr = t.a.index(42) result = con.execute(expr) - expected = pd.Series([-1, -1, 0, -1], dtype="object") + expected = pd.Series(expected_array, dtype="object") backend.assert_series_equal(result, expected, check_names=False, check_dtype=False) @@ -494,8 +560,27 @@ def test_array_position(backend, con): ["dask", "impala", "mssql", "pandas", "polars"], raises=com.OperationNotDefinedError, ) -def test_array_remove(backend, con): - t = ibis.memtable({"a": [[3, 2], [], [42, 2], [2, 2], []]}) +@pytest.mark.parametrize( + ("a"), + [ + param( + [[3, 2], [], [42, 2], [2, 2], []], + id="including-empty-array", + marks=[ + pytest.mark.notyet( + ["flink"], + raises=Py4JJavaError, + reason=( + "SQL validation failed; Flink does not seem to support ARRAY[]" + ), + ) + ], + ), + param([[3, 2], [2], [42, 2], [2, 2], [2]], id="all-non-empty-arrays"), + ], +) +def test_array_remove(backend, con, a): + t = ibis.memtable({"a": a}) expr = t.a.remove(2) result = con.execute(expr) expected = pd.Series([[3], [], [42], [], []], dtype="object") @@ -540,6 +625,9 @@ def test_array_remove(backend, con): ), ], ) +@pytest.mark.notimpl( + ["flink"], raises=NotImplementedError, reason="`from_ibis()` is not implemented" +) def test_array_unique(backend, con, input, expected): t = ibis.memtable(input) expr = t.a.unique() @@ -550,7 +638,7 @@ def test_array_unique(backend, con, input, expected): @builtin_array @pytest.mark.notimpl( - ["dask", "datafusion", "impala", "mssql", "pandas", "polars"], + ["dask", "datafusion", "flink", "impala", "mssql", "pandas", "polars"], raises=com.OperationNotDefinedError, ) def test_array_sort(backend, con): @@ -571,11 +659,37 @@ def test_array_sort(backend, con): raises=GoogleBadRequest, reason="BigQuery doesn't support arrays with null elements", ) -def test_array_union(con): - t = ibis.memtable({"a": [[3, 2], [], []], "b": [[1, 3], [None], [5]]}) +@pytest.mark.parametrize( + ("a", "b", "expected_array"), + [ + param( + [[3, 2], [], []], + [[1, 3], [None], [5]], + [{1, 2, 3}, {None}, {5}], + id="including-empty-array", + marks=[ + pytest.mark.notyet( + ["flink"], + raises=Py4JJavaError, + reason=( + "SQL validation failed; Flink does not seem to support ARRAY[]" + ), + ) + ], + ), + param( + [[3, 2], [1], [5]], + [[1, 3], [1], [5]], + [{1, 2, 3}, {1}, {5}], + id="all-non-empty-arrays", + ), + ], +) +def test_array_union(con, a, b, expected_array): + t = ibis.memtable({"a": a, "b": b}) expr = t.a.union(t.b) result = con.execute(expr).map(set, na_action="ignore") - expected = pd.Series([{1, 2, 3}, {None}, {5}], dtype="object") + expected = pd.Series(expected_array, dtype="object") assert len(result) == len(expected) for i, (lhs, rhs) in enumerate(zip(result, expected)): @@ -627,7 +741,10 @@ def test_array_intersect(con, data): reason="ClickHouse won't accept dicts for struct type values", ) @pytest.mark.notimpl(["postgres"], raises=sa.exc.ProgrammingError) -@pytest.mark.notimpl(["datafusion"], raises=com.OperationNotDefinedError) +@pytest.mark.notimpl( + ["datafusion", "flink"], + raises=com.OperationNotDefinedError, +) def test_unnest_struct(con): data = {"value": [[{"a": 1}, {"a": 2}], [{"a": 3}, {"a": 4}]]} t = ibis.memtable(data, schema=ibis.schema({"value": "!array>"})) @@ -644,7 +761,7 @@ def test_unnest_struct(con): ["impala", "mssql"], raises=com.OperationNotDefinedError, reason="no array support" ) @pytest.mark.notimpl( - ["dask", "datafusion", "druid", "oracle", "pandas", "polars", "postgres"], + ["dask", "datafusion", "druid", "flink", "oracle", "pandas", "polars", "postgres"], raises=com.OperationNotDefinedError, ) def test_zip(backend): @@ -672,7 +789,7 @@ def test_zip(backend): reason="https://github.com/ClickHouse/ClickHouse/issues/41112", ) @pytest.mark.notimpl(["postgres"], raises=sa.exc.ProgrammingError) -@pytest.mark.notimpl(["datafusion"], raises=com.OperationNotDefinedError) +@pytest.mark.notimpl(["datafusion", "flink"], raises=com.OperationNotDefinedError) @pytest.mark.notimpl( ["polars"], raises=com.OperationNotDefinedError, @@ -763,8 +880,7 @@ def flatten_data(): ), ], ) -@pytest.mark.notimpl(["flink"], raises=com.OperationNotDefinedError) -@pytest.mark.notyet(["datafusion"], raises=com.OperationNotDefinedError) +@pytest.mark.notimpl(["datafusion", "flink"], raises=com.OperationNotDefinedError) def test_array_flatten(backend, flatten_data, column, expected): data = flatten_data[column] t = ibis.memtable( @@ -870,6 +986,7 @@ def test_unnest_empty_array(con): @pytest.mark.notimpl( [ "datafusion", + "flink", "impala", "mssql", "polars", @@ -896,6 +1013,7 @@ def test_array_map_with_conflicting_names(backend, con): @pytest.mark.notimpl( [ "datafusion", + "flink", "impala", "mssql", "polars", @@ -1050,8 +1168,14 @@ def test_timestamp_range_zero_step(con, start, stop, step, tzinfo): assert list(result) == [] -@pytest.mark.notimpl(["flink"], raises=Py4JJavaError) @pytest.mark.notimpl(["datafusion"], raises=Exception) +@pytest.mark.notimpl( + ["flink"], + raises=Py4JJavaError, + reason=( + "No match found for function signature datetime(, ..., )" + ), +) def test_repr_timestamp_array(con, monkeypatch): monkeypatch.setattr(ibis.options, "interactive", True) monkeypatch.setattr(ibis.options, "default_backend", con) diff --git a/ibis/backends/tests/test_map.py b/ibis/backends/tests/test_map.py index 19ec3e71fa213..3eaffd5adb7d8 100644 --- a/ibis/backends/tests/test_map.py +++ b/ibis/backends/tests/test_map.py @@ -8,6 +8,7 @@ import ibis import ibis.common.exceptions as exc import ibis.expr.datatypes as dt +from ibis.backends.tests.errors import Py4JJavaError pytestmark = [ pytest.mark.never( @@ -235,8 +236,8 @@ def test_map_construct_dict(con, keys, values): @pytest.mark.notyet(["postgres"], reason="only support maps of string -> string") @pytest.mark.notimpl( ["flink"], - raises=exc.OperationNotDefinedError, - reason="No translation rule for ", + raises=Py4JJavaError, + reason="Map key type should be non-nullable", ) def test_map_construct_array_column(con, alltypes, df): expr = ibis.map(ibis.array([alltypes.string_col]), ibis.array([alltypes.int_col]))