Skip to content

Commit

Permalink
feat(flink): implement table-related ddl in Flink backend to support …
Browse files Browse the repository at this point in the history
…streaming connectors
  • Loading branch information
chloeh13q authored and jcrist committed Sep 6, 2023
1 parent 2069e99 commit 8dabefd
Show file tree
Hide file tree
Showing 5 changed files with 439 additions and 27 deletions.
213 changes: 187 additions & 26 deletions ibis/backends/flink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,15 @@
import ibis.common.exceptions as exc
import ibis.expr.operations as ops
import ibis.expr.schema as sch
from ibis.backends.base import BaseBackend, CanListDatabases
from ibis.backends.base import BaseBackend, CanCreateDatabase
from ibis.backends.base.sql.ddl import fully_qualified_re, is_fully_qualified
from ibis.backends.flink.compiler.core import FlinkCompiler
from ibis.backends.flink.ddl import (
CreateDatabase,
CreateTableFromConnector,
DropDatabase,
DropTable,
)

if TYPE_CHECKING:
from collections.abc import Mapping
Expand All @@ -24,7 +30,7 @@
import ibis.expr.types as ir


class Backend(BaseBackend, CanListDatabases):
class Backend(BaseBackend, CanCreateDatabase):
name = "flink"
compiler = FlinkCompiler
supports_temporary_tables = True
Expand All @@ -48,29 +54,117 @@ def do_connect(self, table_env: TableEnvironment) -> None:
"""
self._table_env = table_env

def _exec_sql(self, query: str) -> None:
self._table_env.execute_sql(query)

def list_databases(self, like: str | None = None) -> list[str]:
databases = self._table_env.list_databases()
return self._filter_with_like(databases, like)

@property
def current_catalog(self) -> str:
return self._table_env.get_current_catalog()

@property
def current_database(self) -> str:
return self._table_env.get_current_database()

def create_database(
self,
name: str,
db_properties: dict | None = None,
catalog: str | None = None,
force: bool = False,
) -> None:
"""Create a new database.
Parameters
----------
name : str
Name of the new database.
db_properties : dict, optional
Properties of the database. Accepts dictionary of key-value pairs
(key1=val1, key2=val2, ...).
catalog : str, optional
Name of the catalog in which the new database will be created.
force : bool, optional
If `False`, an exception is raised if the database already exists.
"""
statement = CreateDatabase(
name=name, db_properties=db_properties, catalog=catalog, can_exist=force
)
self._exec_sql(statement.compile())

def drop_database(
self, name: str, catalog: str | None = None, force: bool = False
) -> None:
"""Drop a database with name `name`.
Parameters
----------
name : str
Database to drop.
catalog : str, optional
Name of the catalog from which the database will be dropped.
force : bool, optional
If `False`, an exception is raised if the database does not exist.
"""
statement = DropDatabase(name=name, catalog=catalog, must_exist=not force)
self._exec_sql(statement.compile())

def list_tables(
self, like: str | None = None, database: str | None = None
self,
like: str | None = None,
database: str | None = None,
catalog: str | None = None,
) -> list[str]:
"""Return the list of table names.
Return the list of table names in the specified database and catalog.
or the default one if no database/catalog is specified.
Parameters
----------
like : str, optional
A pattern in Python's regex format.
database : str, optional
The database to list tables of, if not the current one.
catalog : str, optional
The catalog to list tables of, if not the current one.
Returns
-------
list[str]
The list of the table names that match the pattern `like`.
"""
tables = self._table_env._j_tenv.listTables(
self._table_env.get_current_catalog(), database or self.current_database
)
catalog or self.current_catalog,
database or self.current_database,
) # this is equivalent to the SQL query string `SHOW TABLES FROM|IN`,
# but executing the SQL string directly yields a `TableResult` object
return self._filter_with_like(tables, like)

def _fully_qualified_name(self, name: str, database: str | None) -> str:
def _fully_qualified_name(
self,
name: str,
database: str | None,
catalog: str | None,
) -> str:
if is_fully_qualified(name):
return name

return sg.table(name, db=database or self.current_database).sql(dialect="hive")
return sg.table(
name,
db=database or self.current_database,
catalog=catalog or self.current_catalog,
).sql(dialect="hive")

def table(self, name: str, database: str | None = None) -> ir.Table:
def table(
self,
name: str,
database: str | None = None,
catalog: str | None = None,
) -> ir.Table:
"""Return a table expression from a table or view in the database.
Parameters
Expand All @@ -79,6 +173,8 @@ def table(self, name: str, database: str | None = None) -> ir.Table:
Table name
database
Database in which the table resides
catalog
Catalog in which the table resides
Returns
-------
Expand All @@ -89,29 +185,38 @@ def table(self, name: str, database: str | None = None) -> ir.Table:
raise exc.IbisTypeError(
f"`database` must be a string; got {type(database)}"
)
schema = self.get_schema(name, database=database)
qualified_name = self._fully_qualified_name(name, database)
schema = self.get_schema(name, catalog=catalog, database=database)
qualified_name = self._fully_qualified_name(name, catalog, database)
_, quoted, unquoted = fully_qualified_re.search(qualified_name).groups()
unqualified_name = quoted or unquoted
node = ops.DatabaseTable(unqualified_name, schema, self, namespace=database)
node = ops.DatabaseTable(
unqualified_name, schema, self, namespace=database
) # TODO(chloeh13q): look into namespacing with catalog + db
return node.to_expr()

def get_schema(self, table_name: str, database: str | None = None) -> sch.Schema:
def get_schema(
self,
table_name: str,
database: str | None = None,
catalog: str | None = None,
) -> sch.Schema:
"""Return a Schema object for the indicated table and database.
Parameters
----------
table_name
table_name : str
Table name
database
database : str, optional
Database name
catalog : str, optional
Catalog name
Returns
-------
sch.Schema
Ibis schema
"""
qualified_name = self._fully_qualified_name(table_name, database)
qualified_name = self._fully_qualified_name(table_name, catalog, database)
table = self._table_env.from_path(qualified_name)
schema = table.get_schema()
return sch.Schema.from_pyarrow(
Expand Down Expand Up @@ -148,11 +253,21 @@ def create_table(
*,
schema: sch.Schema | None = None,
database: str | None = None,
catalog: str | None = None,
tbl_properties: dict | None = None,
temp: bool = False,
overwrite: bool = False,
) -> ir.Table:
"""Create a new table in Flink.
Note that: in Flink, tables can be either virtual (VIEWS) or regular
(TABLES).
VIEWS can be created from an existing Table object, usually the result
of a Table API or SQL query. TABLES describe external data, such as a
file, database table, or message queue. In other words, TABLES refer
explicitly to tables constructed directly from source/sink connectors.
Parameters
----------
name
Expand All @@ -167,6 +282,13 @@ def create_table(
database
Name of the database where the table will be created, if not the
default.
catalog
Name of the catalog where the table will be created, if not the
default.
tbl_properties
Table properties used to create a table source/sink. The properties
are usually used to find and create the underlying connector. Accepts
dictionary of key-value pairs (key1=val1, key2=val2, ...).
temp
Whether a table is temporary or not
overwrite
Expand All @@ -180,17 +302,40 @@ def create_table(
import pandas as pd
import pyarrow as pa

import ibis.expr.types as ir

if obj is None and schema is None:
raise exc.IbisError("The schema or obj parameter is required")

if overwrite:
self.drop_table(name=name, catalog=catalog, database=database, force=True)

if isinstance(obj, pa.Table):
obj = obj.to_pandas()
if isinstance(obj, pd.DataFrame):
qualified_name = self._fully_qualified_name(name, database)
qualified_name = self._fully_qualified_name(name, database, catalog)
table = self._table_env.from_pandas(obj)
# FIXME(deepyaman): Create a catalog table, not a temp view.
# in-memory data is created as views in `pyflink`
# TODO(chloeh13q): alternatively, we can do CREATE TABLE and then INSERT
# INTO ... VALUES to keep things consistent
self._table_env.create_temporary_view(qualified_name, table)
else:
raise NotImplementedError # TODO(deepyaman)
if isinstance(obj, ir.Table):
# TODO(chloeh13q): implement CREATE TABLE for expressions
raise NotImplementedError
if schema is not None:
if not tbl_properties:
raise exc.IbisError(
"tbl_properties is required when creating table with schema"
)
statement = CreateTableFromConnector(
table_name=name,
schema=schema,
tbl_properties=tbl_properties,
temp=temp,
database=database,
catalog=catalog,
)
self._exec_sql(statement.compile())

return self.table(name, database=database)

Expand All @@ -199,6 +344,8 @@ def drop_table(
name: str,
*,
database: str | None = None,
catalog: str | None = None,
temp: bool = False,
force: bool = False,
) -> None:
"""Drop a table.
Expand All @@ -209,14 +356,21 @@ def drop_table(
Name of the table to drop.
database
Name of the database where the table exists, if not the default.
catalog
Name of the catalog where the table exists, if not the default.
temp
Whether a table is temporary or not.
force
If `False`, an exception is raised if the table does not exist.
"""
qualified_name = self._fully_qualified_name(name, database)
if not (self._table_env.drop_temporary_table(qualified_name) or force):
raise exc.IntegrityError(f"Table {name} does not exist.")

# TODO(deepyaman): Support (and differentiate) permanent tables.
statement = DropTable(
table_name=name,
database=database,
catalog=catalog,
must_exist=not force,
temp=temp,
)
self._exec_sql(statement.compile())

def create_view(
self,
Expand Down Expand Up @@ -248,7 +402,12 @@ def create_view(
raise NotImplementedError

def drop_view(
self, name: str, *, database: str | None = None, force: bool = False
self,
name: str,
*,
database: str | None = None,
catalog: str | None = None,
force: bool = False,
) -> None:
"""Drop a view.
Expand All @@ -258,10 +417,12 @@ def drop_view(
Name of the view to drop.
database
Name of the database where the view exists, if not the default.
catalog
Name of the catalog where the view exists, if not the default.
force
If `False`, an exception is raised if the view does not exist.
"""
qualified_name = self._fully_qualified_name(name, database)
qualified_name = self._fully_qualified_name(name, database, catalog)
if not (self._table_env.drop_temporary_view(qualified_name) or force):
raise exc.IntegrityError(f"View {name} does not exist.")

Expand Down
Loading

0 comments on commit 8dabefd

Please sign in to comment.