Skip to content

Commit

Permalink
fix(flink): use lazy import to prevent premature loading of pyflink d…
Browse files Browse the repository at this point in the history
…uring gen_matrix
  • Loading branch information
zhenzhongxu authored and gforsyth committed Dec 6, 2023
1 parent 33e1a31 commit d042402
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 79 deletions.
18 changes: 0 additions & 18 deletions .github/workflows/ibis-docs-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,6 @@ jobs:
- name: checkout
uses: actions/checkout@v4

- name: install pyflink
run: nix develop --ignore-environment --keep HOME -c pip install apache-flink

- name: run doctest
# keep HOME because duckdb (which we use for doctests) wants to use
# that for extensions
Expand All @@ -203,21 +200,6 @@ jobs:
- name: check that all frozen computations were done before push
run: git diff --exit-code --stat

- name: ls links 1
run: ls /home/runner/work/ibis/ibis/docs/_output/backends/

- name: ls links 2
run: ls /home/runner/work/ibis/ibis/docs/_output/

- name: ls links 3
run: ls /home/runner/work/ibis/ibis/docs/

- name: support
run: nix develop --ignore-environment -c python ./gen_matrix.py

- name: redirect links
run: python ./gen_redirects.py

- name: verify internal links
run: nix develop --ignore-environment '.#links' -c just checklinks --offline --no-progress

Expand Down
2 changes: 1 addition & 1 deletion docs/support_matrix.qmd
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ The changes will show up in the dev docs when your PR is merged!
## Raw Data

```{python}
#| echo: true
#| echo: false
!python ../gen_matrix.py
```

Expand Down
1 change: 0 additions & 1 deletion gen_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ def main():
"docs", "backends", "raw_support_matrix.csv"
).open(mode="w") as f:
df.to_csv(f, index_label="FullOperation")
print(f"CSV output path: {f.name}") # noqa: T201


if __name__ == "__main__":
Expand Down
27 changes: 11 additions & 16 deletions ibis/backends/flink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from ibis.backends.base import BaseBackend, CanCreateDatabase
from ibis.backends.base.sql.ddl import fully_qualified_re, is_fully_qualified
from ibis.backends.flink.compiler.core import FlinkCompiler
from ibis.backends.flink.datatypes import FlinkRowSchema
from ibis.backends.flink.ddl import (
CreateDatabase,
CreateTableFromConnector,
Expand Down Expand Up @@ -146,15 +145,14 @@ def list_tables(
# but executing the SQL string directly yields a `TableResult` object
return self._filter_with_like(tables, like)

def list_views(
def _list_views(
self,
like: str | None = None,
temporary: bool = True,
) -> list[str]:
"""Return the list of view names.
Return the list of view names in the specified database and catalog.
or the default one if no database/catalog is specified.
Return the list of view names.
Parameters
----------
Expand Down Expand Up @@ -311,12 +309,11 @@ def create_table(
name
Name of the new table.
obj
An Ibis table expression or pandas table that will be used to
extract the schema and the data of the new table. If not provided,
`schema` must be given.
An Ibis table expression, pandas DataFrame, or PyArrow Table that will
be used to extract the schema and the data of the new table. An
optional `schema` can be used to override the schema.
schema
The schema for the new table. Only one of `schema` or `obj` can be
provided.
The schema for the new table. Required if `obj` is not provided.
database
Name of the database where the table will be created, if not the
default.
Expand Down Expand Up @@ -344,6 +341,7 @@ def create_table(
import pyarrow_hotfix # noqa: F401

import ibis.expr.types as ir
from ibis.backends.flink.datatypes import FlinkRowSchema

if obj is None and schema is None:
raise exc.IbisError("The schema or obj parameter is required")
Expand Down Expand Up @@ -381,7 +379,7 @@ def create_table(
catalog=catalog,
)
self._exec_sql(statement.compile())
return self.table(name, database=database)
return self.table(name, database=database, catalog=catalog)

def drop_table(
self,
Expand Down Expand Up @@ -419,7 +417,7 @@ def drop_table(
def create_view(
self,
name: str,
obj: pd.DataFrame | ir.Table | None = None,
obj: pd.DataFrame | ir.Table,
*,
database: str | None = None,
catalog: str | None = None,
Expand All @@ -446,19 +444,16 @@ def create_view(
Table
The view that was created.
"""
if obj is None:
raise exc.IbisError("The obj parameter is required")

if isinstance(obj, ir.Table):
# TODO(chloeh13q): implement CREATE VIEW for expressions
raise NotImplementedError

if overwrite and self.list_views(name):
if overwrite and name in self._list_views():
self.drop_view(name=name, catalog=catalog, database=database, force=True)

qualified_name = self._fully_qualified_name(name, database, catalog)
self._table_env.create_temporary_view(qualified_name, obj)
return self.table(name, database=database)
return self.table(name, database=database, catalog=catalog)

def drop_view(
self,
Expand Down
70 changes: 35 additions & 35 deletions ibis/backends/flink/datatypes.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

import pyflink.table.types as fl
from pyflink.table.types import DataType, DataTypes, RowType

import ibis.expr.datatypes as dt
import ibis.expr.schema as sch
Expand All @@ -9,86 +9,86 @@

class FlinkRowSchema(SchemaMapper):
@classmethod
def from_ibis(cls, schema: sch.Schema | None) -> list[fl.RowType]:
def from_ibis(cls, schema: sch.Schema | None) -> list[RowType]:
if schema is None:
return None

return fl.DataTypes.ROW(
return DataTypes.ROW(
[
fl.DataTypes.FIELD(k, FlinkType.from_ibis(v))
DataTypes.FIELD(k, FlinkType.from_ibis(v))
for k, v in schema.fields.items()
]
)


class FlinkType(TypeMapper):
@classmethod
def to_ibis(cls, typ: fl.DataType, nullable=True) -> dt.DataType:
def to_ibis(cls, typ: DataType, nullable=True) -> dt.DataType:
"""Convert a flink type to an ibis type."""
if typ == fl.DataTypes.STRING():
if typ == DataTypes.STRING():
return dt.String(nullable=nullable)
elif typ == fl.DataTypes.BOOLEAN():
elif typ == DataTypes.BOOLEAN():
return dt.Boolean(nullable=nullable)
elif typ == fl.DataTypes.BYTES():
elif typ == DataTypes.BYTES():
return dt.Binary(nullable=nullable)
elif typ == fl.DataTypes.TINYINT():
elif typ == DataTypes.TINYINT():
return dt.Int8(nullable=nullable)
elif typ == fl.DataTypes.SMALLINT():
elif typ == DataTypes.SMALLINT():
return dt.Int16(nullable=nullable)
elif typ == fl.DataTypes.INT():
elif typ == DataTypes.INT():
return dt.Int32(nullable=nullable)
elif typ == fl.DataTypes.BIGINT():
elif typ == DataTypes.BIGINT():
return dt.Int64(nullable=nullable)
elif typ == fl.DataTypes.FLOAT():
elif typ == DataTypes.FLOAT():
return dt.Float32(nullable=nullable)
elif typ == fl.DataTypes.DOUBLE():
elif typ == DataTypes.DOUBLE():
return dt.Float64(nullable=nullable)
elif typ == fl.DataTypes.DATE():
elif typ == DataTypes.DATE():
return dt.Date(nullable=nullable)
elif typ == fl.DataTypes.TIME():
elif typ == DataTypes.TIME():
return dt.Time(nullable=nullable)
elif typ == fl.DataTypes.TIMESTAMP():
elif typ == DataTypes.TIMESTAMP():
return dt.Timestamp(nullable=nullable)
else:
return super().to_ibis(typ, nullable=nullable)

@classmethod
def from_ibis(cls, dtype: dt.DataType) -> fl.DataType:
def from_ibis(cls, dtype: dt.DataType) -> DataType:
"""Convert an ibis type to a flink type."""
if dtype.is_string():
return fl.DataTypes.STRING()
return DataTypes.STRING(nullable=dtype.nullable)
elif dtype.is_boolean():
return fl.DataTypes.BOOLEAN()
return DataTypes.BOOLEAN(nullable=dtype.nullable)
elif dtype.is_binary():
return fl.DataTypes.BYTES()
return DataTypes.BYTES(nullable=dtype.nullable)
elif dtype.is_int8():
return fl.DataTypes.TINYINT()
return DataTypes.TINYINT(nullable=dtype.nullable)
elif dtype.is_int16():
return fl.DataTypes.SMALLINT()
return DataTypes.SMALLINT(nullable=dtype.nullable)
elif dtype.is_int32():
return fl.DataTypes.INT()
return DataTypes.INT(nullable=dtype.nullable)
elif dtype.is_int64():
return fl.DataTypes.BIGINT()
return DataTypes.BIGINT(nullable=dtype.nullable)
elif dtype.is_uint8():
return fl.DataTypes.TINYINT()
return DataTypes.TINYINT(nullable=dtype.nullable)
elif dtype.is_uint16():
return fl.DataTypes.SMALLINT()
return DataTypes.SMALLINT(nullable=dtype.nullable)
elif dtype.is_uint32():
return fl.DataTypes.INT()
return DataTypes.INT(nullable=dtype.nullable)
elif dtype.is_uint64():
return fl.DataTypes.BIGINT()
return DataTypes.BIGINT(nullable=dtype.nullable)
elif dtype.is_float16():
return fl.DataTypes.FLOAT()
return DataTypes.FLOAT(nullable=dtype.nullable)
elif dtype.is_float32():
return fl.DataTypes.FLOAT()
return DataTypes.FLOAT(nullable=dtype.nullable)
elif dtype.is_float64():
return fl.DataTypes.DOUBLE()
return DataTypes.DOUBLE(nullable=dtype.nullable)
elif dtype.is_date():
return fl.DataTypes.DATE()
return DataTypes.DATE(nullable=dtype.nullable)
elif dtype.is_time():
return fl.DataTypes.TIME()
return DataTypes.TIME(nullable=dtype.nullable)
elif dtype.is_timestamp():
return fl.DataTypes.TIMESTAMP()
return DataTypes.TIMESTAMP(nullable=dtype.nullable)
else:
return super().from_ibis(dtype)

Expand Down
3 changes: 2 additions & 1 deletion ibis/backends/flink/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
operation_registry as base_operation_registry,
)
from ibis.backends.base.sql.registry.main import varargs
from ibis.backends.flink.datatypes import FlinkType
from ibis.common.temporal import TimestampUnit

if TYPE_CHECKING:
Expand Down Expand Up @@ -222,6 +221,8 @@ def _window(translator: ExprTranslator, op: ops.Node) -> str:


def _clip(translator: ExprTranslator, op: ops.Node) -> str:
from ibis.backends.flink.datatypes import FlinkType

arg = translator.translate(op.arg)

if op.upper is not None:
Expand Down
11 changes: 4 additions & 7 deletions ibis/backends/flink/tests/test_ddl.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,15 +178,14 @@ def test_force_recreate_table_from_schema(
],
)
@pytest.mark.parametrize(
"schema_props", [(None, None), (_awards_players_schema, "awards_players")]
"schema, table_name", [(None, None), (_awards_players_schema, "awards_players")]
)
def test_recreate_in_mem_table(
con, employee_df, schema_props, temp_table, csv_source_configs
con, employee_df, schema, table_name, temp_table, csv_source_configs
):
# create table once
schema = schema_props[0]
if schema_props[1] is not None:
tbl_properties = csv_source_configs(schema_props[1])
if table_name is not None:
tbl_properties = csv_source_configs(table_name)
else:
tbl_properties = None

Expand Down Expand Up @@ -242,7 +241,6 @@ def test_force_recreate_in_mem_table(
tbl_properties=tbl_properties,
)
assert temp_table in con.list_tables()
assert temp_table in con.list_views()
if schema is not None:
assert new_table.schema() == schema

Expand All @@ -255,7 +253,6 @@ def test_force_recreate_in_mem_table(
overwrite=True,
)
assert temp_table in con.list_tables()
assert temp_table in con.list_views()
if schema is not None:
assert new_table.schema() == schema

Expand Down

0 comments on commit d042402

Please sign in to comment.