From 8dabefd4b2d157fce3b0bb932749eaae3b2901a5 Mon Sep 17 00:00:00 2001 From: Chloe He Date: Tue, 5 Sep 2023 12:01:24 -0700 Subject: [PATCH] feat(flink): implement table-related ddl in Flink backend to support streaming connectors --- ibis/backends/flink/__init__.py | 213 ++++++++++++++++++++++---- ibis/backends/flink/ddl.py | 152 ++++++++++++++++++ ibis/backends/flink/tests/conftest.py | 15 ++ ibis/backends/flink/tests/test_ddl.py | 84 ++++++++++ ibis/backends/pyspark/__init__.py | 2 +- 5 files changed, 439 insertions(+), 27 deletions(-) create mode 100644 ibis/backends/flink/ddl.py create mode 100644 ibis/backends/flink/tests/test_ddl.py diff --git a/ibis/backends/flink/__init__.py b/ibis/backends/flink/__init__.py index 3dbfa88d42a7..e70f410e020b 100644 --- a/ibis/backends/flink/__init__.py +++ b/ibis/backends/flink/__init__.py @@ -10,9 +10,15 @@ import ibis.common.exceptions as exc import ibis.expr.operations as ops import ibis.expr.schema as sch -from ibis.backends.base import BaseBackend, CanListDatabases +from ibis.backends.base import BaseBackend, CanCreateDatabase from ibis.backends.base.sql.ddl import fully_qualified_re, is_fully_qualified from ibis.backends.flink.compiler.core import FlinkCompiler +from ibis.backends.flink.ddl import ( + CreateDatabase, + CreateTableFromConnector, + DropDatabase, + DropTable, +) if TYPE_CHECKING: from collections.abc import Mapping @@ -24,7 +30,7 @@ import ibis.expr.types as ir -class Backend(BaseBackend, CanListDatabases): +class Backend(BaseBackend, CanCreateDatabase): name = "flink" compiler = FlinkCompiler supports_temporary_tables = True @@ -48,29 +54,117 @@ def do_connect(self, table_env: TableEnvironment) -> None: """ self._table_env = table_env + def _exec_sql(self, query: str) -> None: + self._table_env.execute_sql(query) + def list_databases(self, like: str | None = None) -> list[str]: databases = self._table_env.list_databases() return self._filter_with_like(databases, like) + @property + def current_catalog(self) -> str: + return self._table_env.get_current_catalog() + @property def current_database(self) -> str: return self._table_env.get_current_database() + def create_database( + self, + name: str, + db_properties: dict | None = None, + catalog: str | None = None, + force: bool = False, + ) -> None: + """Create a new database. + + Parameters + ---------- + name : str + Name of the new database. + db_properties : dict, optional + Properties of the database. Accepts dictionary of key-value pairs + (key1=val1, key2=val2, ...). + catalog : str, optional + Name of the catalog in which the new database will be created. + force : bool, optional + If `False`, an exception is raised if the database already exists. + """ + statement = CreateDatabase( + name=name, db_properties=db_properties, catalog=catalog, can_exist=force + ) + self._exec_sql(statement.compile()) + + def drop_database( + self, name: str, catalog: str | None = None, force: bool = False + ) -> None: + """Drop a database with name `name`. + + Parameters + ---------- + name : str + Database to drop. + catalog : str, optional + Name of the catalog from which the database will be dropped. + force : bool, optional + If `False`, an exception is raised if the database does not exist. + """ + statement = DropDatabase(name=name, catalog=catalog, must_exist=not force) + self._exec_sql(statement.compile()) + def list_tables( - self, like: str | None = None, database: str | None = None + self, + like: str | None = None, + database: str | None = None, + catalog: str | None = None, ) -> list[str]: + """Return the list of table names. + + Return the list of table names in the specified database and catalog. + or the default one if no database/catalog is specified. + + Parameters + ---------- + like : str, optional + A pattern in Python's regex format. + database : str, optional + The database to list tables of, if not the current one. + catalog : str, optional + The catalog to list tables of, if not the current one. + + Returns + ------- + list[str] + The list of the table names that match the pattern `like`. + """ tables = self._table_env._j_tenv.listTables( - self._table_env.get_current_catalog(), database or self.current_database - ) + catalog or self.current_catalog, + database or self.current_database, + ) # this is equivalent to the SQL query string `SHOW TABLES FROM|IN`, + # but executing the SQL string directly yields a `TableResult` object return self._filter_with_like(tables, like) - def _fully_qualified_name(self, name: str, database: str | None) -> str: + def _fully_qualified_name( + self, + name: str, + database: str | None, + catalog: str | None, + ) -> str: if is_fully_qualified(name): return name - return sg.table(name, db=database or self.current_database).sql(dialect="hive") + return sg.table( + name, + db=database or self.current_database, + catalog=catalog or self.current_catalog, + ).sql(dialect="hive") - def table(self, name: str, database: str | None = None) -> ir.Table: + def table( + self, + name: str, + database: str | None = None, + catalog: str | None = None, + ) -> ir.Table: """Return a table expression from a table or view in the database. Parameters @@ -79,6 +173,8 @@ def table(self, name: str, database: str | None = None) -> ir.Table: Table name database Database in which the table resides + catalog + Catalog in which the table resides Returns ------- @@ -89,29 +185,38 @@ def table(self, name: str, database: str | None = None) -> ir.Table: raise exc.IbisTypeError( f"`database` must be a string; got {type(database)}" ) - schema = self.get_schema(name, database=database) - qualified_name = self._fully_qualified_name(name, database) + schema = self.get_schema(name, catalog=catalog, database=database) + qualified_name = self._fully_qualified_name(name, catalog, database) _, quoted, unquoted = fully_qualified_re.search(qualified_name).groups() unqualified_name = quoted or unquoted - node = ops.DatabaseTable(unqualified_name, schema, self, namespace=database) + node = ops.DatabaseTable( + unqualified_name, schema, self, namespace=database + ) # TODO(chloeh13q): look into namespacing with catalog + db return node.to_expr() - def get_schema(self, table_name: str, database: str | None = None) -> sch.Schema: + def get_schema( + self, + table_name: str, + database: str | None = None, + catalog: str | None = None, + ) -> sch.Schema: """Return a Schema object for the indicated table and database. Parameters ---------- - table_name + table_name : str Table name - database + database : str, optional Database name + catalog : str, optional + Catalog name Returns ------- sch.Schema Ibis schema """ - qualified_name = self._fully_qualified_name(table_name, database) + qualified_name = self._fully_qualified_name(table_name, catalog, database) table = self._table_env.from_path(qualified_name) schema = table.get_schema() return sch.Schema.from_pyarrow( @@ -148,11 +253,21 @@ def create_table( *, schema: sch.Schema | None = None, database: str | None = None, + catalog: str | None = None, + tbl_properties: dict | None = None, temp: bool = False, overwrite: bool = False, ) -> ir.Table: """Create a new table in Flink. + Note that: in Flink, tables can be either virtual (VIEWS) or regular + (TABLES). + + VIEWS can be created from an existing Table object, usually the result + of a Table API or SQL query. TABLES describe external data, such as a + file, database table, or message queue. In other words, TABLES refer + explicitly to tables constructed directly from source/sink connectors. + Parameters ---------- name @@ -167,6 +282,13 @@ def create_table( database Name of the database where the table will be created, if not the default. + catalog + Name of the catalog where the table will be created, if not the + default. + tbl_properties + Table properties used to create a table source/sink. The properties + are usually used to find and create the underlying connector. Accepts + dictionary of key-value pairs (key1=val1, key2=val2, ...). temp Whether a table is temporary or not overwrite @@ -180,17 +302,40 @@ def create_table( import pandas as pd import pyarrow as pa + import ibis.expr.types as ir + if obj is None and schema is None: raise exc.IbisError("The schema or obj parameter is required") + + if overwrite: + self.drop_table(name=name, catalog=catalog, database=database, force=True) + if isinstance(obj, pa.Table): obj = obj.to_pandas() if isinstance(obj, pd.DataFrame): - qualified_name = self._fully_qualified_name(name, database) + qualified_name = self._fully_qualified_name(name, database, catalog) table = self._table_env.from_pandas(obj) - # FIXME(deepyaman): Create a catalog table, not a temp view. + # in-memory data is created as views in `pyflink` + # TODO(chloeh13q): alternatively, we can do CREATE TABLE and then INSERT + # INTO ... VALUES to keep things consistent self._table_env.create_temporary_view(qualified_name, table) - else: - raise NotImplementedError # TODO(deepyaman) + if isinstance(obj, ir.Table): + # TODO(chloeh13q): implement CREATE TABLE for expressions + raise NotImplementedError + if schema is not None: + if not tbl_properties: + raise exc.IbisError( + "tbl_properties is required when creating table with schema" + ) + statement = CreateTableFromConnector( + table_name=name, + schema=schema, + tbl_properties=tbl_properties, + temp=temp, + database=database, + catalog=catalog, + ) + self._exec_sql(statement.compile()) return self.table(name, database=database) @@ -199,6 +344,8 @@ def drop_table( name: str, *, database: str | None = None, + catalog: str | None = None, + temp: bool = False, force: bool = False, ) -> None: """Drop a table. @@ -209,14 +356,21 @@ def drop_table( Name of the table to drop. database Name of the database where the table exists, if not the default. + catalog + Name of the catalog where the table exists, if not the default. + temp + Whether a table is temporary or not. force If `False`, an exception is raised if the table does not exist. """ - qualified_name = self._fully_qualified_name(name, database) - if not (self._table_env.drop_temporary_table(qualified_name) or force): - raise exc.IntegrityError(f"Table {name} does not exist.") - - # TODO(deepyaman): Support (and differentiate) permanent tables. + statement = DropTable( + table_name=name, + database=database, + catalog=catalog, + must_exist=not force, + temp=temp, + ) + self._exec_sql(statement.compile()) def create_view( self, @@ -248,7 +402,12 @@ def create_view( raise NotImplementedError def drop_view( - self, name: str, *, database: str | None = None, force: bool = False + self, + name: str, + *, + database: str | None = None, + catalog: str | None = None, + force: bool = False, ) -> None: """Drop a view. @@ -258,10 +417,12 @@ def drop_view( Name of the view to drop. database Name of the database where the view exists, if not the default. + catalog + Name of the catalog where the view exists, if not the default. force If `False`, an exception is raised if the view does not exist. """ - qualified_name = self._fully_qualified_name(name, database) + qualified_name = self._fully_qualified_name(name, database, catalog) if not (self._table_env.drop_temporary_view(qualified_name) or force): raise exc.IntegrityError(f"View {name} does not exist.") diff --git a/ibis/backends/flink/ddl.py b/ibis/backends/flink/ddl.py new file mode 100644 index 000000000000..69f02adb5d12 --- /dev/null +++ b/ibis/backends/flink/ddl.py @@ -0,0 +1,152 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import sqlglot as sg + +from ibis.backends.base.sql.ddl import ( + CreateTableWithSchema, + DropObject, + _CreateDDL, + _format_properties, + _is_quoted, + is_fully_qualified, +) +from ibis.backends.base.sql.registry import quote_identifier + +if TYPE_CHECKING: + import ibis.expr.schema as sch + + +class _CatalogAwareBaseQualifiedSQLStatement: + def _get_scoped_name( + self, obj_name: str, database: str | None = None, catalog: str | None = None + ) -> str: + if is_fully_qualified(obj_name): + return obj_name + if _is_quoted(obj_name): + obj_name = obj_name[1:-1] + return sg.table(obj_name, db=database, catalog=catalog, quoted=True).sql( + dialect="hive" + ) + + +class CreateTableFromConnector( + _CatalogAwareBaseQualifiedSQLStatement, CreateTableWithSchema +): + def __init__( + self, + table_name: str, + schema: sch.Schema, + tbl_properties: dict, + database: str | None = None, + catalog: str | None = None, + temp: bool = False, + **kwargs, + ): + super().__init__( + table_name=table_name, + database=database, + schema=schema, + table_format=None, + format=None, + path=None, + tbl_properties=tbl_properties, + **kwargs, + ) + self.catalog = catalog + self.temp = temp + + def _storage(self) -> str: + return f"STORED AS {self.format}" if self.format else None + + def _format_tbl_properties(self) -> str: + return f"WITH {_format_properties(self.tbl_properties)}" + + @property + def _prefix(self) -> str: + # `TEMPORARY` is not documented in Flink's documentation + modifier = " TEMPORARY" if self.temp else "" + return f"CREATE{modifier} TABLE" + + def _create_line(self) -> str: + scoped_name = self._get_scoped_name( + self.table_name, self.database, self.catalog + ) + return f"{self._prefix} {self._if_exists()}{scoped_name}" + + @property + def _pieces(self): + yield from super()._pieces + yield self._format_tbl_properties() + + +class DropTable(_CatalogAwareBaseQualifiedSQLStatement, DropObject): + _object_type = "TABLE" + + def __init__( + self, + table_name: str, + database: str | None = None, + catalog: str | None = None, + must_exist: bool = True, + temp: bool = False, + ): + super().__init__(must_exist=must_exist) + self.table_name = table_name + self.database = database + self.catalog = catalog + self.temp = temp + + def _object_name(self): + return self._get_scoped_name(self.table_name, self.database, self.catalog) + + def compile(self): + temp = "TEMPORARY " if self.temp else "" + if_exists = "" if self.must_exist else "IF EXISTS " + object_name = self._object_name() + return f"DROP {temp}{self._object_type} {if_exists}{object_name}" + + +class _DatabaseObject: + def _object_name(self): + scoped_name = f"{quote_identifier(self.catalog)}." if self.catalog else "" + scoped_name += quote_identifier(self.name) + return scoped_name + + +class CreateDatabase(_DatabaseObject, _CreateDDL): + def __init__( + self, + name: str, + db_properties: dict | None, + catalog: str | None = None, + can_exist: bool = False, + ): + # TODO(chloeh13q): support COMMENT + self.name = name + self.db_properties = db_properties + self.catalog = catalog + self.can_exist = can_exist + + def _format_db_properties(self) -> str: + return ( + f"WITH {_format_properties(self.db_properties)}" + if self.db_properties + else "" + ) + + def compile(self): + create_decl = "CREATE DATABASE" + create_line = f"{create_decl} {self._if_exists()}{self._object_name()}" + + return f"{create_line}\n{self._format_db_properties()}" + + +class DropDatabase(_DatabaseObject, DropObject): + _object_type = "DATABASE" + + def __init__(self, name: str, catalog: str | None = None, must_exist: bool = True): + super().__init__(must_exist=must_exist) + self.name = name + self.catalog = catalog diff --git a/ibis/backends/flink/tests/conftest.py b/ibis/backends/flink/tests/conftest.py index 525233556b06..67a692207f38 100644 --- a/ibis/backends/flink/tests/conftest.py +++ b/ibis/backends/flink/tests/conftest.py @@ -49,3 +49,18 @@ def simple_schema(): @pytest.fixture def simple_table(simple_schema): return ibis.table(simple_schema, name="table") + + +@pytest.fixture(scope="session") +def con(tmp_path_factory, data_dir, worker_id): + return TestConf.load_data(data_dir, tmp_path_factory, worker_id).connection + + +@pytest.fixture(scope="session") +def db(con): + return con.database() + + +@pytest.fixture(scope="session") +def alltypes(con): + return con.tables.functional_alltypes diff --git a/ibis/backends/flink/tests/test_ddl.py b/ibis/backends/flink/tests/test_ddl.py new file mode 100644 index 000000000000..02b0f63b7d1c --- /dev/null +++ b/ibis/backends/flink/tests/test_ddl.py @@ -0,0 +1,84 @@ +from __future__ import annotations + +import pytest +from py4j.protocol import Py4JJavaError + +import ibis.expr.datatypes as dt +import ibis.expr.schema as sch + + +@pytest.fixture +def awards_players_schema(): + return sch.Schema( + { + "playerID": dt.string, + "awardID": dt.string, + "yearID": dt.int32, + "lgID": dt.string, + "tie": dt.string, + "notes": dt.string, + } + ) + + +@pytest.fixture +def awards_players_csv_connector_configs(): + return { + "connector": "filesystem", + "path": "ci/ibis-testing-data/csv/awards_players.csv", + "format": "csv", + "csv.ignore-parse-errors": "true", + } + + +def test_list_tables(con): + assert len(con.list_tables()) == 4 + assert ( + len(con.list_tables(catalog="default_catalog", database="default_database")) + == 4 + ) + + +def test_create_table_from_schema( + con, awards_players_schema, temp_table, awards_players_csv_connector_configs +): + new_table = con.create_table( + temp_table, + schema=awards_players_schema, + tbl_properties=awards_players_csv_connector_configs, + ) + assert len(con.list_tables()) == 5 + assert temp_table in con.list_tables() + assert new_table.schema() == awards_players_schema + + +def test_drop_table( + con, awards_players_schema, temp_table, awards_players_csv_connector_configs +): + con.create_table( + temp_table, + schema=awards_players_schema, + tbl_properties=awards_players_csv_connector_configs, + ) + assert len(con.list_tables()) == 5 + con.drop_table(temp_table) + assert len(con.list_tables()) == 4 + assert temp_table not in con.list_tables() + + +def test_temp_table( + con, awards_players_schema, temp_table, awards_players_csv_connector_configs +): + con.create_table( + temp_table, + schema=awards_players_schema, + tbl_properties=awards_players_csv_connector_configs, + temp=True, + ) + assert len(con.list_tables()) == 5 + assert temp_table in con.list_tables() + with pytest.raises(Py4JJavaError): + con.drop_table(temp_table) + con.drop_table(temp_table, temp=True) + assert len(con.list_tables()) == 4 + assert temp_table not in con.list_tables() diff --git a/ibis/backends/pyspark/__init__.py b/ibis/backends/pyspark/__init__.py index 9d1aafa8421d..524b23e60452 100644 --- a/ibis/backends/pyspark/__init__.py +++ b/ibis/backends/pyspark/__init__.py @@ -294,7 +294,7 @@ def create_database( path Path where to store the database data; otherwise uses Spark default force - Whether to append `IF EXISTS` to the database creation SQL + Whether to append `IF NOT EXISTS` to the database creation SQL """ statement = CreateDatabase(name, path=path, can_exist=force) return self.raw_sql(statement.compile())