diff --git a/.github/workflows/ibis-docs-lint.yml b/.github/workflows/ibis-docs-lint.yml index 7fdc8a1b1df0..760eaaac9564 100644 --- a/.github/workflows/ibis-docs-lint.yml +++ b/.github/workflows/ibis-docs-lint.yml @@ -186,9 +186,6 @@ jobs: - name: checkout uses: actions/checkout@v4 - - name: install pyflink - run: nix develop --ignore-environment --keep HOME -c pip install apache-flink - - name: run doctest # keep HOME because duckdb (which we use for doctests) wants to use # that for extensions @@ -203,21 +200,6 @@ jobs: - name: check that all frozen computations were done before push run: git diff --exit-code --stat - - name: ls links 1 - run: ls /home/runner/work/ibis/ibis/docs/_output/backends/ - - - name: ls links 2 - run: ls /home/runner/work/ibis/ibis/docs/_output/ - - - name: ls links 3 - run: ls /home/runner/work/ibis/ibis/docs/ - - - name: support - run: nix develop --ignore-environment -c python ./gen_matrix.py - - - name: redirect links - run: python ./gen_redirects.py - - name: verify internal links run: nix develop --ignore-environment '.#links' -c just checklinks --offline --no-progress diff --git a/docs/support_matrix.qmd b/docs/support_matrix.qmd index f1afa996c8d8..854126e029d8 100644 --- a/docs/support_matrix.qmd +++ b/docs/support_matrix.qmd @@ -36,7 +36,7 @@ The changes will show up in the dev docs when your PR is merged! ## Raw Data ```{python} -#| echo: true +#| echo: false !python ../gen_matrix.py ``` diff --git a/gen_matrix.py b/gen_matrix.py index e149e77d2df4..b850b8ca037f 100644 --- a/gen_matrix.py +++ b/gen_matrix.py @@ -45,7 +45,6 @@ def main(): "docs", "backends", "raw_support_matrix.csv" ).open(mode="w") as f: df.to_csv(f, index_label="FullOperation") - print(f"CSV output path: {f.name}") # noqa: T201 if __name__ == "__main__": diff --git a/ibis/backends/flink/__init__.py b/ibis/backends/flink/__init__.py index f7936b798354..b31f939425e5 100644 --- a/ibis/backends/flink/__init__.py +++ b/ibis/backends/flink/__init__.py @@ -12,7 +12,6 @@ from ibis.backends.base import BaseBackend, CanCreateDatabase from ibis.backends.base.sql.ddl import fully_qualified_re, is_fully_qualified from ibis.backends.flink.compiler.core import FlinkCompiler -from ibis.backends.flink.datatypes import FlinkRowSchema from ibis.backends.flink.ddl import ( CreateDatabase, CreateTableFromConnector, @@ -146,15 +145,14 @@ def list_tables( # but executing the SQL string directly yields a `TableResult` object return self._filter_with_like(tables, like) - def list_views( + def _list_views( self, like: str | None = None, temporary: bool = True, ) -> list[str]: """Return the list of view names. - Return the list of view names in the specified database and catalog. - or the default one if no database/catalog is specified. + Return the list of view names. Parameters ---------- @@ -311,12 +309,11 @@ def create_table( name Name of the new table. obj - An Ibis table expression or pandas table that will be used to - extract the schema and the data of the new table. If not provided, - `schema` must be given. + An Ibis table expression, pandas DataFrame, or PyArrow Table that will + be used to extract the schema and the data of the new table. An + optional `schema` can be used to override the schema. schema - The schema for the new table. Only one of `schema` or `obj` can be - provided. + The schema for the new table. Required if `obj` is not provided. database Name of the database where the table will be created, if not the default. @@ -344,6 +341,7 @@ def create_table( import pyarrow_hotfix # noqa: F401 import ibis.expr.types as ir + from ibis.backends.flink.datatypes import FlinkRowSchema if obj is None and schema is None: raise exc.IbisError("The schema or obj parameter is required") @@ -381,7 +379,7 @@ def create_table( catalog=catalog, ) self._exec_sql(statement.compile()) - return self.table(name, database=database) + return self.table(name, database=database, catalog=catalog) def drop_table( self, @@ -419,7 +417,7 @@ def drop_table( def create_view( self, name: str, - obj: pd.DataFrame | ir.Table | None = None, + obj: pd.DataFrame | ir.Table, *, database: str | None = None, catalog: str | None = None, @@ -446,19 +444,16 @@ def create_view( Table The view that was created. """ - if obj is None: - raise exc.IbisError("The obj parameter is required") - if isinstance(obj, ir.Table): # TODO(chloeh13q): implement CREATE VIEW for expressions raise NotImplementedError - if overwrite and self.list_views(name): + if overwrite and name in self._list_views(): self.drop_view(name=name, catalog=catalog, database=database, force=True) qualified_name = self._fully_qualified_name(name, database, catalog) self._table_env.create_temporary_view(qualified_name, obj) - return self.table(name, database=database) + return self.table(name, database=database, catalog=catalog) def drop_view( self, diff --git a/ibis/backends/flink/datatypes.py b/ibis/backends/flink/datatypes.py index 15e3df5215a4..8fb4dd281891 100644 --- a/ibis/backends/flink/datatypes.py +++ b/ibis/backends/flink/datatypes.py @@ -1,6 +1,6 @@ from __future__ import annotations -import pyflink.table.types as fl +from pyflink.table.types import DataType, DataTypes, RowType import ibis.expr.datatypes as dt import ibis.expr.schema as sch @@ -9,13 +9,13 @@ class FlinkRowSchema(SchemaMapper): @classmethod - def from_ibis(cls, schema: sch.Schema | None) -> list[fl.RowType]: + def from_ibis(cls, schema: sch.Schema | None) -> list[RowType]: if schema is None: return None - return fl.DataTypes.ROW( + return DataTypes.ROW( [ - fl.DataTypes.FIELD(k, FlinkType.from_ibis(v)) + DataTypes.FIELD(k, FlinkType.from_ibis(v)) for k, v in schema.fields.items() ] ) @@ -23,72 +23,72 @@ def from_ibis(cls, schema: sch.Schema | None) -> list[fl.RowType]: class FlinkType(TypeMapper): @classmethod - def to_ibis(cls, typ: fl.DataType, nullable=True) -> dt.DataType: + def to_ibis(cls, typ: DataType, nullable=True) -> dt.DataType: """Convert a flink type to an ibis type.""" - if typ == fl.DataTypes.STRING(): + if typ == DataTypes.STRING(): return dt.String(nullable=nullable) - elif typ == fl.DataTypes.BOOLEAN(): + elif typ == DataTypes.BOOLEAN(): return dt.Boolean(nullable=nullable) - elif typ == fl.DataTypes.BYTES(): + elif typ == DataTypes.BYTES(): return dt.Binary(nullable=nullable) - elif typ == fl.DataTypes.TINYINT(): + elif typ == DataTypes.TINYINT(): return dt.Int8(nullable=nullable) - elif typ == fl.DataTypes.SMALLINT(): + elif typ == DataTypes.SMALLINT(): return dt.Int16(nullable=nullable) - elif typ == fl.DataTypes.INT(): + elif typ == DataTypes.INT(): return dt.Int32(nullable=nullable) - elif typ == fl.DataTypes.BIGINT(): + elif typ == DataTypes.BIGINT(): return dt.Int64(nullable=nullable) - elif typ == fl.DataTypes.FLOAT(): + elif typ == DataTypes.FLOAT(): return dt.Float32(nullable=nullable) - elif typ == fl.DataTypes.DOUBLE(): + elif typ == DataTypes.DOUBLE(): return dt.Float64(nullable=nullable) - elif typ == fl.DataTypes.DATE(): + elif typ == DataTypes.DATE(): return dt.Date(nullable=nullable) - elif typ == fl.DataTypes.TIME(): + elif typ == DataTypes.TIME(): return dt.Time(nullable=nullable) - elif typ == fl.DataTypes.TIMESTAMP(): + elif typ == DataTypes.TIMESTAMP(): return dt.Timestamp(nullable=nullable) else: return super().to_ibis(typ, nullable=nullable) @classmethod - def from_ibis(cls, dtype: dt.DataType) -> fl.DataType: + def from_ibis(cls, dtype: dt.DataType) -> DataType: """Convert an ibis type to a flink type.""" if dtype.is_string(): - return fl.DataTypes.STRING() + return DataTypes.STRING(nullable=dtype.nullable) elif dtype.is_boolean(): - return fl.DataTypes.BOOLEAN() + return DataTypes.BOOLEAN(nullable=dtype.nullable) elif dtype.is_binary(): - return fl.DataTypes.BYTES() + return DataTypes.BYTES(nullable=dtype.nullable) elif dtype.is_int8(): - return fl.DataTypes.TINYINT() + return DataTypes.TINYINT(nullable=dtype.nullable) elif dtype.is_int16(): - return fl.DataTypes.SMALLINT() + return DataTypes.SMALLINT(nullable=dtype.nullable) elif dtype.is_int32(): - return fl.DataTypes.INT() + return DataTypes.INT(nullable=dtype.nullable) elif dtype.is_int64(): - return fl.DataTypes.BIGINT() + return DataTypes.BIGINT(nullable=dtype.nullable) elif dtype.is_uint8(): - return fl.DataTypes.TINYINT() + return DataTypes.TINYINT(nullable=dtype.nullable) elif dtype.is_uint16(): - return fl.DataTypes.SMALLINT() + return DataTypes.SMALLINT(nullable=dtype.nullable) elif dtype.is_uint32(): - return fl.DataTypes.INT() + return DataTypes.INT(nullable=dtype.nullable) elif dtype.is_uint64(): - return fl.DataTypes.BIGINT() + return DataTypes.BIGINT(nullable=dtype.nullable) elif dtype.is_float16(): - return fl.DataTypes.FLOAT() + return DataTypes.FLOAT(nullable=dtype.nullable) elif dtype.is_float32(): - return fl.DataTypes.FLOAT() + return DataTypes.FLOAT(nullable=dtype.nullable) elif dtype.is_float64(): - return fl.DataTypes.DOUBLE() + return DataTypes.DOUBLE(nullable=dtype.nullable) elif dtype.is_date(): - return fl.DataTypes.DATE() + return DataTypes.DATE(nullable=dtype.nullable) elif dtype.is_time(): - return fl.DataTypes.TIME() + return DataTypes.TIME(nullable=dtype.nullable) elif dtype.is_timestamp(): - return fl.DataTypes.TIMESTAMP() + return DataTypes.TIMESTAMP(nullable=dtype.nullable) else: return super().from_ibis(dtype) diff --git a/ibis/backends/flink/registry.py b/ibis/backends/flink/registry.py index 76ea1002fe43..91730917b793 100644 --- a/ibis/backends/flink/registry.py +++ b/ibis/backends/flink/registry.py @@ -9,7 +9,6 @@ operation_registry as base_operation_registry, ) from ibis.backends.base.sql.registry.main import varargs -from ibis.backends.flink.datatypes import FlinkType from ibis.common.temporal import TimestampUnit if TYPE_CHECKING: @@ -222,6 +221,8 @@ def _window(translator: ExprTranslator, op: ops.Node) -> str: def _clip(translator: ExprTranslator, op: ops.Node) -> str: + from ibis.backends.flink.datatypes import FlinkType + arg = translator.translate(op.arg) if op.upper is not None: diff --git a/ibis/backends/flink/tests/test_ddl.py b/ibis/backends/flink/tests/test_ddl.py index 65b2a1a81c23..e98256a17c91 100644 --- a/ibis/backends/flink/tests/test_ddl.py +++ b/ibis/backends/flink/tests/test_ddl.py @@ -178,15 +178,14 @@ def test_force_recreate_table_from_schema( ], ) @pytest.mark.parametrize( - "schema_props", [(None, None), (_awards_players_schema, "awards_players")] + "schema, table_name", [(None, None), (_awards_players_schema, "awards_players")] ) def test_recreate_in_mem_table( - con, employee_df, schema_props, temp_table, csv_source_configs + con, employee_df, schema, table_name, temp_table, csv_source_configs ): # create table once - schema = schema_props[0] - if schema_props[1] is not None: - tbl_properties = csv_source_configs(schema_props[1]) + if table_name is not None: + tbl_properties = csv_source_configs(table_name) else: tbl_properties = None @@ -242,7 +241,6 @@ def test_force_recreate_in_mem_table( tbl_properties=tbl_properties, ) assert temp_table in con.list_tables() - assert temp_table in con.list_views() if schema is not None: assert new_table.schema() == schema @@ -255,7 +253,6 @@ def test_force_recreate_in_mem_table( overwrite=True, ) assert temp_table in con.list_tables() - assert temp_table in con.list_views() if schema is not None: assert new_table.schema() == schema