From eb857e69e2fa156c521839c2be12dfd4f5141334 Mon Sep 17 00:00:00 2001 From: Casey Artner Date: Thu, 8 Aug 2024 23:29:55 -0400 Subject: [PATCH] feat(flink): support `ArrayValue.collect` --- .env | 2 +- .github/workflows/ibis-backends.yml | 8 ++++---- conda/environment-arm64-flink.yml | 2 +- conda/environment.yml | 2 +- docker/flink/Dockerfile | 4 ++-- ibis/backends/sql/compilers/flink.py | 6 ++++++ ibis/backends/sql/dialects.py | 1 + ibis/backends/tests/test_aggregation.py | 20 +++----------------- ibis/backends/tests/test_array.py | 2 -- 9 files changed, 19 insertions(+), 28 deletions(-) diff --git a/.env b/.env index c5a9122db653..05c6411ce4e2 100644 --- a/.env +++ b/.env @@ -1,4 +1,4 @@ -FLINK_VERSION=1.19.1 +FLINK_VERSION=1.20.0 CLOUDSDK_ACTIVE_CONFIG_NAME=ibis-gbq GOOGLE_CLOUD_PROJECT="$CLOUDSDK_ACTIVE_CONFIG_NAME" PGPASSWORD="postgres" diff --git a/.github/workflows/ibis-backends.yml b/.github/workflows/ibis-backends.yml index a5a7fd315008..ae1e18c9f8be 100644 --- a/.github/workflows/ibis-backends.yml +++ b/.github/workflows/ibis-backends.yml @@ -227,7 +227,7 @@ jobs: extras: - flink additional_deps: - - "'apache-flink==1.19.1'" + - "'apache-flink==1.20.0'" - "'pandas<2.2'" - setuptools services: @@ -249,7 +249,7 @@ jobs: extras: - flink additional_deps: - - "'apache-flink==1.19.1'" + - "'apache-flink==1.20.0'" - "'pandas<2.2'" - setuptools services: @@ -400,7 +400,7 @@ jobs: extras: - flink additional_deps: - - "'apache-flink==1.19.1'" + - "'apache-flink==1.20.0'" - "'pandas<2.2'" - setuptools services: @@ -413,7 +413,7 @@ jobs: extras: - flink additional_deps: - - "'apache-flink==1.19.1'" + - "'apache-flink==1.20.0'" - "'pandas<2.2'" - setuptools services: diff --git a/conda/environment-arm64-flink.yml b/conda/environment-arm64-flink.yml index 0c76ae81a161..158e5478bec4 100644 --- a/conda/environment-arm64-flink.yml +++ b/conda/environment-arm64-flink.yml @@ -96,4 +96,4 @@ dependencies: - py4j =0.10.9.7 - pip - pip: - - apache-flink =1.19.1 + - apache-flink =1.20.0 diff --git a/conda/environment.yml b/conda/environment.yml index 2ed9223e38b6..9a2ddfe085c1 100644 --- a/conda/environment.yml +++ b/conda/environment.yml @@ -3,7 +3,7 @@ channels: - conda-forge dependencies: # runtime dependencies - - apache-flink + - apache-flink =1.20.0 - atpublic >=2.3 - black >=22.1.0,<25 - clickhouse-connect >=0.5.23 diff --git a/docker/flink/Dockerfile b/docker/flink/Dockerfile index 515d1ef66145..5fbe1a943623 100644 --- a/docker/flink/Dockerfile +++ b/docker/flink/Dockerfile @@ -1,8 +1,8 @@ -ARG FLINK_VERSION=1.19.1 +ARG FLINK_VERSION=1.20.0 FROM flink:${FLINK_VERSION} # ibis-flink requires PyFlink dependency -ARG FLINK_VERSION=1.19.1 +ARG FLINK_VERSION=1.20.0 RUN wget -nv -P $FLINK_HOME/lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-python/${FLINK_VERSION}/flink-python-${FLINK_VERSION}.jar # install python3 and pip3 diff --git a/ibis/backends/sql/compilers/flink.py b/ibis/backends/sql/compilers/flink.py index b00c248a7f8a..6a7f9cf456ee 100644 --- a/ibis/backends/sql/compilers/flink.py +++ b/ibis/backends/sql/compilers/flink.py @@ -575,5 +575,11 @@ def visit_MapMerge(self, op: ops.MapMerge, *, left, right): def visit_StructColumn(self, op, *, names, values): return self.cast(sge.Struct(expressions=list(values)), op.dtype) + def visit_ArrayCollect(self, op, *, arg, where, order_by, include_null): + if not include_null: + cond = arg.is_(sg.not_(NULL, copy=False)) + where = cond if where is None else sge.And(this=cond, expression=where) + return self.agg.array_agg(arg, where=where, order_by=order_by) + compiler = FlinkCompiler() diff --git a/ibis/backends/sql/dialects.py b/ibis/backends/sql/dialects.py index 217fdf34e1e2..e90c0a16787b 100644 --- a/ibis/backends/sql/dialects.py +++ b/ibis/backends/sql/dialects.py @@ -211,6 +211,7 @@ class Generator(Hive.Generator): sge.VariancePop: rename_func("var_pop"), sge.ArrayConcat: rename_func("array_concat"), sge.ArraySize: rename_func("cardinality"), + sge.ArrayAgg: rename_func("array_agg"), sge.Length: rename_func("char_length"), sge.TryCast: lambda self, e: f"TRY_CAST({e.this.sql(self.dialect)} AS {e.to.sql(self.dialect)})", diff --git a/ibis/backends/tests/test_aggregation.py b/ibis/backends/tests/test_aggregation.py index b0515729598c..a139359e2480 100644 --- a/ibis/backends/tests/test_aggregation.py +++ b/ibis/backends/tests/test_aggregation.py @@ -1394,25 +1394,11 @@ def test_group_concat_ordered(alltypes, df, filtered): @pytest.mark.notimpl( - [ - "druid", - "exasol", - "flink", - "impala", - "mssql", - "mysql", - "oracle", - "sqlite", - ], + ["druid", "exasol", "impala", "mssql", "mysql", "oracle", "sqlite"], raises=com.OperationNotDefinedError, ) @pytest.mark.notimpl( - [ - "clickhouse", - "dask", - "pandas", - "pyspark", - ], + ["clickhouse", "dask", "pandas", "pyspark", "flink"], raises=com.UnsupportedOperationError, ) @pytest.mark.parametrize( @@ -1447,7 +1433,7 @@ def test_collect_ordered(alltypes, df, filtered): @pytest.mark.notimpl( - ["druid", "exasol", "flink", "impala", "mssql", "mysql", "oracle", "sqlite"], + ["druid", "exasol", "impala", "mssql", "mysql", "oracle", "sqlite"], raises=com.OperationNotDefinedError, ) @pytest.mark.notimpl( diff --git a/ibis/backends/tests/test_array.py b/ibis/backends/tests/test_array.py index fe38b31f6fa8..d4ca266629ca 100644 --- a/ibis/backends/tests/test_array.py +++ b/ibis/backends/tests/test_array.py @@ -303,7 +303,6 @@ def test_unnest_complex(backend): @builtin_array -@pytest.mark.notimpl(["flink"], raises=com.OperationNotDefinedError) @pytest.mark.notyet( ["datafusion"], raises=Exception, @@ -331,7 +330,6 @@ def test_unnest_idempotent(backend): @builtin_array -@pytest.mark.notimpl(["flink"], raises=com.OperationNotDefinedError) @pytest.mark.notyet( ["datafusion"], raises=Exception,