From 3eebc4150b8317e8464fc8619bcfda15aa0795dd Mon Sep 17 00:00:00 2001 From: Mehmet Fatih Aktas Date: Sun, 21 Jan 2024 07:52:33 -0500 Subject: [PATCH] test(flink): deep dive on the tests marked for Flink in test_json.py (#7908) Reviving https://github.com/ibis-project/ibis/pull/7777 which got closed after switching to `main`. --- ibis/backends/flink/__init__.py | 137 +++++++++++++++++++++++++- ibis/backends/flink/tests/test_ddl.py | 63 ++++++++++++ ibis/backends/tests/test_param.py | 2 - ibis/backends/tests/test_register.py | 34 +++++-- 4 files changed, 225 insertions(+), 11 deletions(-) diff --git a/ibis/backends/flink/__init__.py b/ibis/backends/flink/__init__.py index d540114533e7..c71aee8f98e7 100644 --- a/ibis/backends/flink/__init__.py +++ b/ibis/backends/flink/__init__.py @@ -22,9 +22,11 @@ InsertSelect, RenameTable, ) +from ibis.util import gen_name if TYPE_CHECKING: from collections.abc import Mapping + from pathlib import Path import pandas as pd import pyarrow as pa @@ -119,9 +121,10 @@ def drop_database( def list_tables( self, like: str | None = None, - temp: bool = False, + *, database: str | None = None, catalog: str | None = None, + temp: bool = False, ) -> list[str]: """Return the list of table/view names. @@ -198,7 +201,7 @@ def _fully_qualified_name( database: str | None = None, catalog: str | None = None, ) -> str: - if is_fully_qualified(name): + if name and is_fully_qualified(name): return name return sg.table( @@ -642,6 +645,136 @@ def drop_view( sql = statement.compile() self._exec_sql(sql) + def _read_file( + self, + file_type: str, + path: str | Path, + schema: sch.Schema | None = None, + table_name: str | None = None, + ) -> ir.Table: + """Register a file as a table in the current database. + + Parameters + ---------- + file_type + File type, e.g., parquet, csv, json. + path + The data source. + schema + The schema for the new table. + table_name + An optional name to use for the created table. This defaults to + a sequentially generated name. + + Returns + ------- + ir.Table + The just-registered table + + Raises + ------ + ValueError + If `schema` is None. + """ + if schema is None: + raise ValueError( + f"`schema` must be explicitly provided when calling `read_{file_type}`" + ) + + table_name = table_name or gen_name(f"read_{file_type}") + tbl_properties = { + "connector": "filesystem", + "path": path, + "format": file_type, + } + + return self.create_table( + name=table_name, + schema=schema, + tbl_properties=tbl_properties, + ) + + def read_parquet( + self, + path: str | Path, + schema: sch.Schema | None = None, + table_name: str | None = None, + ) -> ir.Table: + """Register a parquet file as a table in the current database. + + Parameters + ---------- + path + The data source. + schema + The schema for the new table. + table_name + An optional name to use for the created table. This defaults to + a sequentially generated name. + + Returns + ------- + ir.Table + The just-registered table + """ + return self._read_file( + file_type="parquet", path=path, schema=schema, table_name=table_name + ) + + def read_csv( + self, + path: str | Path, + schema: sch.Schema | None = None, + table_name: str | None = None, + ) -> ir.Table: + """Register a csv file as a table in the current database. + + Parameters + ---------- + path + The data source. + schema + The schema for the new table. + table_name + An optional name to use for the created table. This defaults to + a sequentially generated name. + + Returns + ------- + ir.Table + The just-registered table + """ + return self._read_file( + file_type="csv", path=path, schema=schema, table_name=table_name + ) + + def read_json( + self, + path: str | Path, + schema: sch.Schema | None = None, + table_name: str | None = None, + ) -> ir.Table: + """Register a json file as a table in the current database. + + Parameters + ---------- + path + The data source. + schema + The schema for the new table. + table_name + An optional name to use for the created table. This defaults to + a sequentially generated name. + + Returns + ------- + ir.Table + The just-registered table + """ + return self._read_file( + file_type="json", path=path, schema=schema, table_name=table_name + ) + @classmethod @lru_cache def _get_operations(cls): diff --git a/ibis/backends/flink/tests/test_ddl.py b/ibis/backends/flink/tests/test_ddl.py index c946b341b159..a7fdc596c974 100644 --- a/ibis/backends/flink/tests/test_ddl.py +++ b/ibis/backends/flink/tests/test_ddl.py @@ -2,6 +2,7 @@ import os import tempfile +from pathlib import Path import pandas as pd import pyarrow as pa @@ -489,3 +490,65 @@ def test_insert_simple_select(con, tempdir_sink_configs): temporary_file = next(iter(os.listdir(tempdir))) with open(os.path.join(tempdir, temporary_file)) as f: assert f.read() == '"fred flintstone",35\n"barney rubble",32\n' + + +@pytest.mark.parametrize("table_name", ["new_table", None]) +def test_read_csv(con, awards_players_schema, csv_source_configs, table_name): + source_configs = csv_source_configs("awards_players") + table = con.read_csv( + path=source_configs["path"], + schema=awards_players_schema, + table_name=table_name, + ) + + if table_name is None: + table_name = table.get_name() + assert table_name in con.list_tables() + assert table.schema() == awards_players_schema + + con.drop_table(table_name) + assert table_name not in con.list_tables() + + +@pytest.mark.parametrize("table_name", ["new_table", None]) +def test_read_parquet(con, data_dir, tmp_path, table_name): + fname = Path("functional_alltypes.parquet") + fname = Path(data_dir) / "parquet" / fname.name + table = con.read_parquet( + path=tmp_path / fname.name, + schema=_functional_alltypes_schema, + table_name=table_name, + ) + + if table_name is None: + table_name = table.get_name() + assert table_name in con.list_tables() + assert table.schema() == _functional_alltypes_schema + + con.drop_table(table_name) + assert table_name not in con.list_tables() + + +@pytest.mark.parametrize("table_name", ["new_table", None]) +def test_read_json(con, data_dir, tmp_path, table_name): + pq = pytest.importorskip("pyarrow.parquet") + + pq_table = pq.read_table( + data_dir.joinpath("parquet", "functional_alltypes.parquet") + ) + df = pq_table.to_pandas() + + path = tmp_path / "functional_alltypes.json" + df.to_json(path, orient="records", lines=True, date_format="iso") + table = con.read_json( + path=path, schema=_functional_alltypes_schema, table_name=table_name + ) + + if table_name is None: + table_name = table.get_name() + assert table_name in con.list_tables() + assert table.schema() == _functional_alltypes_schema + assert table.count().execute() == len(pq_table) + + con.drop_table(table_name) + assert table_name not in con.list_tables() diff --git a/ibis/backends/tests/test_param.py b/ibis/backends/tests/test_param.py index dc7a898be44e..452536197320 100644 --- a/ibis/backends/tests/test_param.py +++ b/ibis/backends/tests/test_param.py @@ -77,7 +77,6 @@ def test_scalar_param_array(con): [ "datafusion", "impala", - "flink", "postgres", "pyspark", "druid", @@ -244,7 +243,6 @@ def test_scalar_param_date(backend, alltypes, value): "exasol", ] ) -@pytest.mark.notimpl(["flink"], "WIP") def test_scalar_param_nested(con): param = ibis.param("struct>>>") value = OrderedDict([("x", [OrderedDict([("y", [1.0, 2.0, 3.0])])])]) diff --git a/ibis/backends/tests/test_register.py b/ibis/backends/tests/test_register.py index 75a121f578d8..e9b2035741e5 100644 --- a/ibis/backends/tests/test_register.py +++ b/ibis/backends/tests/test_register.py @@ -395,8 +395,11 @@ def test_register_garbage(con, monkeypatch): ("functional_alltypes.parquet", "funk_all"), ], ) -@pytest.mark.notyet( - ["flink", "impala", "mssql", "mysql", "postgres", "sqlite", "trino"] +@pytest.mark.notyet(["impala", "mssql", "mysql", "postgres", "sqlite", "trino"]) +@pytest.mark.notimpl( + ["flink"], + raises=ValueError, + reason="read_parquet() missing required argument: 'schema'", ) def test_read_parquet(con, tmp_path, data_dir, fname, in_table_name): pq = pytest.importorskip("pyarrow.parquet") @@ -427,7 +430,12 @@ def ft_data(data_dir): @pytest.mark.notyet( - ["flink", "impala", "mssql", "mysql", "pandas", "postgres", "sqlite", "trino"] + ["impala", "mssql", "mysql", "pandas", "postgres", "sqlite", "trino"] +) +@pytest.mark.notimpl( + ["flink"], + raises=ValueError, + reason="read_parquet() missing required argument: 'schema'", ) def test_read_parquet_glob(con, tmp_path, ft_data): pq = pytest.importorskip("pyarrow.parquet") @@ -446,7 +454,12 @@ def test_read_parquet_glob(con, tmp_path, ft_data): @pytest.mark.notyet( - ["flink", "impala", "mssql", "mysql", "pandas", "postgres", "sqlite", "trino"] + ["impala", "mssql", "mysql", "pandas", "postgres", "sqlite", "trino"] +) +@pytest.mark.notimpl( + ["flink"], + raises=ValueError, + reason="read_csv() missing required argument: 'schema'", ) def test_read_csv_glob(con, tmp_path, ft_data): pc = pytest.importorskip("pyarrow.csv") @@ -469,7 +482,6 @@ def test_read_csv_glob(con, tmp_path, ft_data): "clickhouse", "dask", "datafusion", - "flink", "impala", "mssql", "mysql", @@ -479,6 +491,11 @@ def test_read_csv_glob(con, tmp_path, ft_data): "trino", ] ) +@pytest.mark.notimpl( + ["flink"], + raises=ValueError, + reason="read_json() missing required argument: 'schema'", +) def test_read_json_glob(con, tmp_path, ft_data): nrows = len(ft_data) ntables = 2 @@ -522,8 +539,11 @@ def num_diamonds(data_dir): "in_table_name", [param(None, id="default"), param("fancy_stones", id="file_name")], ) -@pytest.mark.notyet( - ["flink", "impala", "mssql", "mysql", "postgres", "sqlite", "trino"] +@pytest.mark.notyet(["impala", "mssql", "mysql", "postgres", "sqlite", "trino"]) +@pytest.mark.notimpl( + ["flink"], + raises=ValueError, + reason="read_csv() missing required argument: 'schema'", ) def test_read_csv(con, data_dir, in_table_name, num_diamonds): fname = "diamonds.csv"