diff --git a/ibis/backends/flink/__init__.py b/ibis/backends/flink/__init__.py index 4d07eca7b541..babe6521e786 100644 --- a/ibis/backends/flink/__init__.py +++ b/ibis/backends/flink/__init__.py @@ -419,7 +419,7 @@ def drop_table( def create_view( self, name: str, - obj: ir.Table, + obj: pd.DataFrame | ir.Table | None = None, *, database: str | None = None, catalog: str | None = None, @@ -449,6 +449,10 @@ def create_view( if obj is None: raise exc.IbisError("The obj parameter is required") + if isinstance(obj, ir.Table): + # TODO(chloeh13q): implement CREATE VIEW for expressions + raise NotImplementedError + if overwrite and self.list_views(name): self.drop_view(name=name, catalog=catalog, database=database, force=True) diff --git a/ibis/backends/flink/tests/test_ddl.py b/ibis/backends/flink/tests/test_ddl.py index 87c4e84a1c4e..2261a5dad481 100644 --- a/ibis/backends/flink/tests/test_ddl.py +++ b/ibis/backends/flink/tests/test_ddl.py @@ -17,6 +17,34 @@ except ImportError: Py4JJavaError = None +_awards_players_schema = sch.Schema( + { + "playerID": dt.string, + "awardID": dt.string, + "yearID": dt.int32, + "lgID": dt.string, + "tie": dt.string, + "notes": dt.string, + } +) + +_functiona_alltypes_schema = sch.Schema( + { + "id": dt.int32, + "bool_col": dt.bool, + "smallint_col": dt.int16, + "int_col": dt.int32, + "bigint_col": dt.int64, + "float_col": dt.float32, + "double_col": dt.float64, + "date_string_col": dt.string, + "string_col": dt.string, + "timestamp_col": dt.timestamp(scale=3), + "year": dt.int32, + "month": dt.int32, + } +) + @pytest.fixture(autouse=True) def reset_con(con): @@ -28,36 +56,12 @@ def reset_con(con): @pytest.fixture def awards_players_schema(): - return sch.Schema( - { - "playerID": dt.string, - "awardID": dt.string, - "yearID": dt.int32, - "lgID": dt.string, - "tie": dt.string, - "notes": dt.string, - } - ) + return _awards_players_schema @pytest.fixture def functiona_alltypes_schema(): - return sch.Schema( - { - "id": dt.int32, - "bool_col": dt.bool, - "smallint_col": dt.int16, - "int_col": dt.int32, - "bigint_col": dt.int64, - "float_col": dt.float32, - "double_col": dt.float64, - "date_string_col": dt.string, - "string_col": dt.string, - "timestamp_col": dt.timestamp(scale=3), - "year": dt.int32, - "month": dt.int32, - } - ) + return _functiona_alltypes_schema @pytest.fixture @@ -117,6 +121,143 @@ def test_create_table(con, awards_players_schema, temp_table, csv_source_configs assert temp_table not in con.list_tables() +def test_recreate_table_from_schema( + con, awards_players_schema, temp_table, csv_source_configs +): + # create table once + new_table = con.create_table( + temp_table, + schema=awards_players_schema, + tbl_properties=csv_source_configs("awards_players"), + ) + assert temp_table in con.list_tables() + assert new_table.schema() == awards_players_schema + + # create the same table a second time should fail + with pytest.raises( + Py4JJavaError, + match="org.apache.flink.table.catalog.exceptions.TableAlreadyExistException", + ): + new_table = con.create_table( + temp_table, + schema=awards_players_schema, + tbl_properties=csv_source_configs("awards_players"), + overwrite=False, + ) + + +def test_force_recreate_table_from_schema( + con, awards_players_schema, temp_table, csv_source_configs +): + # create table once + new_table = con.create_table( + temp_table, + schema=awards_players_schema, + tbl_properties=csv_source_configs("awards_players"), + ) + assert temp_table in con.list_tables() + assert new_table.schema() == awards_players_schema + + # force creating the same twice a second time + new_table = con.create_table( + temp_table, + schema=awards_players_schema, + tbl_properties=csv_source_configs("awards_players"), + overwrite=True, + ) + assert temp_table in con.list_tables() + assert new_table.schema() == awards_players_schema + + +@pytest.mark.parametrize( + "employee_df", + [ + pd.DataFrame( + [("fred flintstone", "award", 2002, "lg_id", "tie", "this is a note")] + ) + ], +) +@pytest.mark.parametrize( + "schema_props", [(None, None), (_awards_players_schema, "awards_players")] +) +def test_recreate_in_mem_table( + con, employee_df, schema_props, temp_table, csv_source_configs +): + # create table once + schema = schema_props[0] + if schema_props[1] is not None: + tbl_properties = csv_source_configs(schema_props[1]) + else: + tbl_properties = None + + new_table = con.create_table( + name=temp_table, + obj=employee_df, + schema=schema, + tbl_properties=tbl_properties, + ) + assert temp_table in con.list_tables() + if schema is not None: + assert new_table.schema() == schema + + # create the same table a second time should fail + with pytest.raises( + Py4JJavaError, + match="An error occurred while calling o8.createTemporaryView", + ): + new_table = con.create_table( + name=temp_table, + obj=employee_df, + schema=schema, + tbl_properties=tbl_properties, + overwrite=False, + ) + + +@pytest.mark.parametrize( + "employee_df", + [ + pd.DataFrame( + [("fred flintstone", "award", 2002, "lg_id", "tie", "this is a note")] + ) + ], +) +@pytest.mark.parametrize( + "schema_props", [(None, None), (_awards_players_schema, "awards_players")] +) +def test_force_recreate_in_mem_table( + con, employee_df, schema_props, temp_table, csv_source_configs +): + # create table once + schema = schema_props[0] + if schema_props[1] is not None: + tbl_properties = csv_source_configs(schema_props[1]) + else: + tbl_properties = None + + new_table = con.create_table( + name=temp_table, + obj=employee_df, + schema=schema, + tbl_properties=tbl_properties, + ) + assert temp_table in con.list_tables() + if schema is not None: + assert new_table.schema() == schema + + # force recreate the same table a second time should succeed + new_table = con.create_table( + name=temp_table, + obj=employee_df, + schema=schema, + tbl_properties=tbl_properties, + overwrite=True, + ) + assert temp_table in con.list_tables() + if schema is not None: + assert new_table.schema() == schema + + def test_create_source_table_with_watermark( con, functiona_alltypes_schema, temp_table, csv_source_configs ):