From da04679df47100fde89fcc8caf83b8e8dc9c78ec Mon Sep 17 00:00:00 2001 From: mfatihaktas Date: Mon, 18 Dec 2023 13:50:25 -0500 Subject: [PATCH] feat(flink): add primary key support --- ibis/backends/flink/__init__.py | 7 +++ ibis/backends/flink/ddl.py | 44 +++++++++++++++-- ibis/backends/flink/tests/test_ddl.py | 70 +++++++++++++++++++++++++-- 3 files changed, 113 insertions(+), 8 deletions(-) diff --git a/ibis/backends/flink/__init__.py b/ibis/backends/flink/__init__.py index 7aa374be3b83..d540114533e7 100644 --- a/ibis/backends/flink/__init__.py +++ b/ibis/backends/flink/__init__.py @@ -312,6 +312,7 @@ def create_table( catalog: str | None = None, tbl_properties: dict | None = None, watermark: Watermark | None = None, + primary_key: str | list[str] | None = None, temp: bool = False, overwrite: bool = False, ) -> ir.Table: @@ -349,6 +350,11 @@ def create_table( dictionary of key-value pairs (key1=val1, key2=val2, ...). watermark Watermark strategy for the table, only applicable on sources. + primary_key + A single column or a list of columns to be marked as primary. Raises + an error if the column(s) in `primary_key` is NOT a subset of the + columns in `schema`. Primary keys must be non-nullable in Flink and + the columns indicated as primary key will be designated as non-nullable. temp Whether a table is temporary or not. overwrite @@ -441,6 +447,7 @@ def create_table( schema=schema, tbl_properties=tbl_properties, watermark=watermark, + primary_key=primary_key, temporary=temp, database=database, catalog=catalog, diff --git a/ibis/backends/flink/ddl.py b/ibis/backends/flink/ddl.py index 379acd68cb4e..3f8f497ee4f8 100644 --- a/ibis/backends/flink/ddl.py +++ b/ibis/backends/flink/ddl.py @@ -4,6 +4,7 @@ import sqlglot as sg +import ibis.common.exceptions as exc import ibis.expr.schema as sch from ibis.backends.base.sql.ddl import ( CreateTable, @@ -19,15 +20,19 @@ ) from ibis.backends.base.sql.registry import quote_identifier from ibis.backends.flink.registry import type_to_sql_string +from ibis.util import promote_list if TYPE_CHECKING: + from collections.abc import Sequence + from ibis.api import Watermark -def format_schema(schema): +def format_schema(schema: sch.Schema): elements = [ _format_schema_element(name, t) for name, t in zip(schema.names, schema.types) ] + return "({})".format(",\n ".join(elements)) @@ -54,15 +59,31 @@ def _format_watermark_strategy(watermark: Watermark) -> str: def format_schema_with_watermark( - schema: sch.Schema, watermark: Watermark | None = None + schema: sch.Schema, + watermark: Watermark | None = None, + primary_keys: Sequence[str] | None = None, ) -> str: elements = [ _format_schema_element(name, t) for name, t in zip(schema.names, schema.types) ] + if watermark is not None: elements.append( f"WATERMARK FOR {watermark.time_col} AS {_format_watermark_strategy(watermark)}" ) + + if primary_keys is not None and primary_keys: + # Note (mehmet): Currently supports "NOT ENFORCED" only. For the reason + # of this choice, the following quote from Flink docs is self-explanatory: + # "SQL standard specifies that a constraint can either be ENFORCED or + # NOT ENFORCED. This controls if the constraint checks are performed on + # the incoming/outgoing data. Flink does not own the data therefore the + # only mode we want to support is the NOT ENFORCED mode. It is up to the + # user to ensure that the query enforces key integrity." + # Ref: https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/create/#primary-key + comma_separated_keys = ", ".join(f"`{key}`" for key in primary_keys) + elements.append(f"PRIMARY KEY ({comma_separated_keys}) NOT ENFORCED") + return "({})".format(",\n ".join(elements)) @@ -88,6 +109,7 @@ def __init__( schema: sch.Schema, tbl_properties: dict, watermark: Watermark | None = None, + primary_key: str | Sequence[str] | None = None, database: str | None = None, catalog: str | None = None, temporary: bool = False, @@ -107,6 +129,16 @@ def __init__( self.temporary = temporary self.watermark = watermark + self.primary_keys = promote_list(primary_key) + + # Check if `primary_keys` is a subset of the columns in `schema`. + if self.primary_keys and not set(self.primary_keys) <= set(schema.names): + raise exc.IbisError( + "`primary_key` must be a subset of the columns in `schema`. \n" + f"\t primary_key= {primary_key} \n" + f"\t schema.names= {schema.names}" + ) + def _storage(self) -> str: return f"STORED AS {self.format}" if self.format else None @@ -142,10 +174,14 @@ def _pieces(self): } main_schema = sch.Schema(fields) - yield format_schema_with_watermark(main_schema, self.watermark) + yield format_schema_with_watermark( + main_schema, self.watermark, self.primary_keys + ) yield f"PARTITIONED BY {format_schema(part_schema)}" else: - yield format_schema_with_watermark(self.schema, self.watermark) + yield format_schema_with_watermark( + self.schema, self.watermark, self.primary_keys + ) yield self._format_tbl_properties() diff --git a/ibis/backends/flink/tests/test_ddl.py b/ibis/backends/flink/tests/test_ddl.py index f5f1b24307bc..c946b341b159 100644 --- a/ibis/backends/flink/tests/test_ddl.py +++ b/ibis/backends/flink/tests/test_ddl.py @@ -8,6 +8,7 @@ import pytest import ibis +import ibis.common.exceptions as exc import ibis.expr.datatypes as dt import ibis.expr.schema as sch from ibis.backends.conftest import TEST_TABLES @@ -258,19 +259,80 @@ def test_force_recreate_in_mem_table( assert new_table.schema() == schema -def test_create_source_table_with_watermark( - con, functional_alltypes_schema, temp_table, csv_source_configs +@pytest.fixture +def functional_alltypes_schema_w_nonnullable_columns(): + return sch.Schema( + { + "id": dt.int32(nullable=False), + "bool_col": dt.bool(nullable=False), + "smallint_col": dt.int16(nullable=False), + "int_col": dt.int32(nullable=False), + "bigint_col": dt.int64(nullable=False), + "float_col": dt.float32(nullable=False), + "double_col": dt.float64(nullable=False), + "date_string_col": dt.string(nullable=False), + "string_col": dt.string(nullable=False), + "year": dt.int32(nullable=False), + "month": dt.int32(nullable=False), + "timestamp_col": dt.timestamp(scale=3), + } + ) + + +@pytest.mark.parametrize( + "primary_key", + [ + None, + "id", + ["id"], + ["month"], + ["id", "string_col"], + ["id", "string_col", "year"], + ], +) +def test_create_source_table_with_watermark_and_primary_key( + con, + temp_table, + functional_alltypes_schema_w_nonnullable_columns, + csv_source_configs, + primary_key, ): new_table = con.create_table( temp_table, - schema=functional_alltypes_schema, + schema=functional_alltypes_schema_w_nonnullable_columns, tbl_properties=csv_source_configs("functional_alltypes"), watermark=ibis.watermark( time_col="timestamp_col", allowed_delay=ibis.interval(seconds=15) ), + primary_key=primary_key, ) assert temp_table in con.list_tables() - assert new_table.schema() == functional_alltypes_schema + assert new_table.schema() == functional_alltypes_schema_w_nonnullable_columns + + +@pytest.mark.parametrize( + "primary_key", + [ + "nonexistent_column", + ["nonexistent_column"], + ["id", "nonexistent_column"], + ], +) +def test_create_table_failure_with_invalid_primary_keys( + con, + temp_table, + functional_alltypes_schema_w_nonnullable_columns, + csv_source_configs, + primary_key, +): + with pytest.raises(exc.IbisError): + con.create_table( + temp_table, + schema=functional_alltypes_schema_w_nonnullable_columns, + tbl_properties=csv_source_configs("functional_alltypes"), + primary_key=primary_key, + ) + assert temp_table not in con.list_tables() @pytest.mark.parametrize("temp", [True, False])