diff --git a/ibis/backends/flink/ddl.py b/ibis/backends/flink/ddl.py index eee4ea86c404..df33de44818b 100644 --- a/ibis/backends/flink/ddl.py +++ b/ibis/backends/flink/ddl.py @@ -15,7 +15,8 @@ format_partition, is_fully_qualified, ) -from ibis.backends.base.sql.registry import quote_identifier, type_to_sql_string +from ibis.backends.base.sql.registry import quote_identifier +from ibis.backends.flink.registry import type_to_sql_string if TYPE_CHECKING: from ibis.api import Watermark diff --git a/ibis/backends/flink/registry.py b/ibis/backends/flink/registry.py index 91730917b793..97fca3ff65c0 100644 --- a/ibis/backends/flink/registry.py +++ b/ibis/backends/flink/registry.py @@ -4,7 +4,13 @@ import ibis.common.exceptions as com import ibis.expr.operations as ops -from ibis.backends.base.sql.registry import aggregate, fixed_arity, helpers, unary +from ibis.backends.base.sql.registry import ( + aggregate, + fixed_arity, + helpers, + quote_identifier, + unary, +) from ibis.backends.base.sql.registry import ( operation_registry as base_operation_registry, ) @@ -17,6 +23,12 @@ operation_registry = base_operation_registry.copy() +def type_to_sql_string(tval): + if tval.is_array(): + return f"array<{helpers.type_to_sql_string(tval.value_type)}>" + return helpers.type_to_sql_string(tval) + + def _not(translator: ExprTranslator, op: ops.Node) -> str: formatted_arg = translator.translate(op.arg) if helpers.needs_parens(op.arg): @@ -61,10 +73,11 @@ def _cast(translator: ExprTranslator, op: ops.generic.Cast) -> str: return f"CAST({arg_translated} AS date)" elif to.is_json(): return arg_translated - - from ibis.backends.base.sql.registry.main import cast - - return cast(translator=translator, op=op) + elif op.arg.dtype.is_temporal() and op.to.is_int64(): + return f"1000000 * unix_timestamp({arg_translated})" + else: + sql_type = type_to_sql_string(op.to) + return f"CAST({arg_translated} AS {sql_type})" def _left_op_right(translator: ExprTranslator, op_node: ops.Node, op_sign: str) -> str: @@ -96,7 +109,7 @@ def _try_cast(translator: ExprTranslator, op: ops.Node) -> str: # It's recommended to use UNIX_TIMESTAMP(CAST(timestamp_col AS STRING)) instead. return f"UNIX_TIMESTAMP(TRY_CAST({arg_formatted} AS STRING))" else: - sql_type = helpers.type_to_sql_string(op.to) + sql_type = type_to_sql_string(op.to) return f"TRY_CAST({arg_formatted} AS {sql_type})" @@ -382,6 +395,11 @@ def _timestamp_from_ymdhms( return f"CAST({concat_string} AS TIMESTAMP)" +def _struct_field(translator, op): + arg = translator.translate(op.arg) + return f"{arg}.{quote_identifier(op.field, force=True)}" + + operation_registry.update( { # Unary operations @@ -444,6 +462,7 @@ def _timestamp_from_ymdhms( ops.TimestampFromUNIX: _timestamp_from_unix, ops.TimestampFromYMDHMS: _timestamp_from_ymdhms, ops.TimestampSub: _timestamp_sub, + ops.StructField: _struct_field, } ) diff --git a/ibis/backends/flink/tests/conftest.py b/ibis/backends/flink/tests/conftest.py index c6be24c1818f..21374a673113 100644 --- a/ibis/backends/flink/tests/conftest.py +++ b/ibis/backends/flink/tests/conftest.py @@ -10,7 +10,6 @@ class TestConf(BackendTest): - supports_structs = False force_sort = True deps = "pandas", "pyflink" @@ -50,13 +49,14 @@ def connect(*, tmpdir, worker_id, **kw: Any): def _load_data(self, **_: Any) -> None: import pandas as pd - from ibis.backends.tests.data import json_types + from ibis.backends.tests.data import json_types, struct_types for table_name in TEST_TABLES: path = self.data_dir / "parquet" / f"{table_name}.parquet" self.connection.create_table(table_name, pd.read_parquet(path)) self.connection.create_table("json_t", json_types) + self.connection.create_table("struct", struct_types) class TestConfForStreaming(TestConf): diff --git a/ibis/backends/tests/test_struct.py b/ibis/backends/tests/test_struct.py index 9ba8ede6bc11..ace35261ef2d 100644 --- a/ibis/backends/tests/test_struct.py +++ b/ibis/backends/tests/test_struct.py @@ -13,7 +13,7 @@ pytestmark = [ pytest.mark.never(["mysql", "sqlite", "mssql"], reason="No struct support"), pytest.mark.notyet(["impala"]), - pytest.mark.notimpl(["datafusion", "druid", "oracle", "flink"]), + pytest.mark.notimpl(["datafusion", "druid", "oracle"]), ] @@ -55,6 +55,9 @@ def test_all_fields(struct, struct_df): @pytest.mark.notimpl(["postgres"]) @pytest.mark.parametrize("field", ["a", "b", "c"]) +@pytest.mark.notyet( + ["flink"], reason="flink doesn't support creating struct columns from literals" +) def test_literal(backend, con, field): query = _STRUCT_LITERAL[field] dtype = query.type().to_pandas() @@ -69,6 +72,9 @@ def test_literal(backend, con, field): @pytest.mark.notyet( ["clickhouse"], reason="clickhouse doesn't support nullable nested types" ) +@pytest.mark.notyet( + ["flink"], reason="flink doesn't support creating struct columns from literals" +) def test_null_literal(backend, con, field): query = _NULL_STRUCT_LITERAL[field] result = pd.Series([con.execute(query)]) @@ -78,6 +84,9 @@ def test_null_literal(backend, con, field): @pytest.mark.notimpl(["dask", "pandas", "postgres"]) +@pytest.mark.notyet( + ["flink"], reason="flink doesn't support creating struct columns from literals" +) def test_struct_column(backend, alltypes, df): t = alltypes expr = ibis.struct(dict(a=t.string_col, b=1, c=t.bigint_col)).name("s") @@ -91,6 +100,9 @@ def test_struct_column(backend, alltypes, df): @pytest.mark.notimpl(["dask", "pandas", "postgres", "polars"]) +@pytest.mark.notyet( + ["flink"], reason="flink doesn't support creating struct columns from collect" +) def test_collect_into_struct(alltypes): from ibis import _ diff --git a/ibis/backends/tests/test_timecontext.py b/ibis/backends/tests/test_timecontext.py index ca300900dac6..754552c5fdd7 100644 --- a/ibis/backends/tests/test_timecontext.py +++ b/ibis/backends/tests/test_timecontext.py @@ -123,8 +123,8 @@ def test_context_adjustment_filter_before_window( @pytest.mark.notimpl(["duckdb", "pyspark"]) @pytest.mark.notimpl( ["flink"], - raises=com.OperationNotDefinedError, - reason="No translation rule for ", + raises=com.UnsupportedOperationError, + reason="Flink engine does not support generic window clause with no order by", ) def test_context_adjustment_multi_col_udf_non_grouped( backend, alltypes, context, monkeypatch