Skip to content

Commit

Permalink
test(flink): deep dive on the tests marked for Flink in test_json.py (#…
Browse files Browse the repository at this point in the history
…7908)

Reviving #7777 which got closed
after switching to `main`.
  • Loading branch information
mfatihaktas authored Jan 21, 2024
1 parent 52b2e6d commit 3eebc41
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 11 deletions.
137 changes: 135 additions & 2 deletions ibis/backends/flink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
InsertSelect,
RenameTable,
)
from ibis.util import gen_name

if TYPE_CHECKING:
from collections.abc import Mapping
from pathlib import Path

import pandas as pd
import pyarrow as pa
Expand Down Expand Up @@ -119,9 +121,10 @@ def drop_database(
def list_tables(
self,
like: str | None = None,
temp: bool = False,
*,
database: str | None = None,
catalog: str | None = None,
temp: bool = False,
) -> list[str]:
"""Return the list of table/view names.
Expand Down Expand Up @@ -198,7 +201,7 @@ def _fully_qualified_name(
database: str | None = None,
catalog: str | None = None,
) -> str:
if is_fully_qualified(name):
if name and is_fully_qualified(name):
return name

return sg.table(
Expand Down Expand Up @@ -642,6 +645,136 @@ def drop_view(
sql = statement.compile()
self._exec_sql(sql)

def _read_file(
self,
file_type: str,
path: str | Path,
schema: sch.Schema | None = None,
table_name: str | None = None,
) -> ir.Table:
"""Register a file as a table in the current database.
Parameters
----------
file_type
File type, e.g., parquet, csv, json.
path
The data source.
schema
The schema for the new table.
table_name
An optional name to use for the created table. This defaults to
a sequentially generated name.
Returns
-------
ir.Table
The just-registered table
Raises
------
ValueError
If `schema` is None.
"""
if schema is None:
raise ValueError(
f"`schema` must be explicitly provided when calling `read_{file_type}`"
)

table_name = table_name or gen_name(f"read_{file_type}")
tbl_properties = {
"connector": "filesystem",
"path": path,
"format": file_type,
}

return self.create_table(
name=table_name,
schema=schema,
tbl_properties=tbl_properties,
)

def read_parquet(
self,
path: str | Path,
schema: sch.Schema | None = None,
table_name: str | None = None,
) -> ir.Table:
"""Register a parquet file as a table in the current database.
Parameters
----------
path
The data source.
schema
The schema for the new table.
table_name
An optional name to use for the created table. This defaults to
a sequentially generated name.
Returns
-------
ir.Table
The just-registered table
"""
return self._read_file(
file_type="parquet", path=path, schema=schema, table_name=table_name
)

def read_csv(
self,
path: str | Path,
schema: sch.Schema | None = None,
table_name: str | None = None,
) -> ir.Table:
"""Register a csv file as a table in the current database.
Parameters
----------
path
The data source.
schema
The schema for the new table.
table_name
An optional name to use for the created table. This defaults to
a sequentially generated name.
Returns
-------
ir.Table
The just-registered table
"""
return self._read_file(
file_type="csv", path=path, schema=schema, table_name=table_name
)

def read_json(
self,
path: str | Path,
schema: sch.Schema | None = None,
table_name: str | None = None,
) -> ir.Table:
"""Register a json file as a table in the current database.
Parameters
----------
path
The data source.
schema
The schema for the new table.
table_name
An optional name to use for the created table. This defaults to
a sequentially generated name.
Returns
-------
ir.Table
The just-registered table
"""
return self._read_file(
file_type="json", path=path, schema=schema, table_name=table_name
)

@classmethod
@lru_cache
def _get_operations(cls):
Expand Down
63 changes: 63 additions & 0 deletions ibis/backends/flink/tests/test_ddl.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import os
import tempfile
from pathlib import Path

import pandas as pd
import pyarrow as pa
Expand Down Expand Up @@ -489,3 +490,65 @@ def test_insert_simple_select(con, tempdir_sink_configs):
temporary_file = next(iter(os.listdir(tempdir)))
with open(os.path.join(tempdir, temporary_file)) as f:
assert f.read() == '"fred flintstone",35\n"barney rubble",32\n'


@pytest.mark.parametrize("table_name", ["new_table", None])
def test_read_csv(con, awards_players_schema, csv_source_configs, table_name):
source_configs = csv_source_configs("awards_players")
table = con.read_csv(
path=source_configs["path"],
schema=awards_players_schema,
table_name=table_name,
)

if table_name is None:
table_name = table.get_name()
assert table_name in con.list_tables()
assert table.schema() == awards_players_schema

con.drop_table(table_name)
assert table_name not in con.list_tables()


@pytest.mark.parametrize("table_name", ["new_table", None])
def test_read_parquet(con, data_dir, tmp_path, table_name):
fname = Path("functional_alltypes.parquet")
fname = Path(data_dir) / "parquet" / fname.name
table = con.read_parquet(
path=tmp_path / fname.name,
schema=_functional_alltypes_schema,
table_name=table_name,
)

if table_name is None:
table_name = table.get_name()
assert table_name in con.list_tables()
assert table.schema() == _functional_alltypes_schema

con.drop_table(table_name)
assert table_name not in con.list_tables()


@pytest.mark.parametrize("table_name", ["new_table", None])
def test_read_json(con, data_dir, tmp_path, table_name):
pq = pytest.importorskip("pyarrow.parquet")

pq_table = pq.read_table(
data_dir.joinpath("parquet", "functional_alltypes.parquet")
)
df = pq_table.to_pandas()

path = tmp_path / "functional_alltypes.json"
df.to_json(path, orient="records", lines=True, date_format="iso")
table = con.read_json(
path=path, schema=_functional_alltypes_schema, table_name=table_name
)

if table_name is None:
table_name = table.get_name()
assert table_name in con.list_tables()
assert table.schema() == _functional_alltypes_schema
assert table.count().execute() == len(pq_table)

con.drop_table(table_name)
assert table_name not in con.list_tables()
2 changes: 0 additions & 2 deletions ibis/backends/tests/test_param.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ def test_scalar_param_array(con):
[
"datafusion",
"impala",
"flink",
"postgres",
"pyspark",
"druid",
Expand Down Expand Up @@ -244,7 +243,6 @@ def test_scalar_param_date(backend, alltypes, value):
"exasol",
]
)
@pytest.mark.notimpl(["flink"], "WIP")
def test_scalar_param_nested(con):
param = ibis.param("struct<x: array<struct<y: array<double>>>>")
value = OrderedDict([("x", [OrderedDict([("y", [1.0, 2.0, 3.0])])])])
Expand Down
34 changes: 27 additions & 7 deletions ibis/backends/tests/test_register.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,11 @@ def test_register_garbage(con, monkeypatch):
("functional_alltypes.parquet", "funk_all"),
],
)
@pytest.mark.notyet(
["flink", "impala", "mssql", "mysql", "postgres", "sqlite", "trino"]
@pytest.mark.notyet(["impala", "mssql", "mysql", "postgres", "sqlite", "trino"])
@pytest.mark.notimpl(
["flink"],
raises=ValueError,
reason="read_parquet() missing required argument: 'schema'",
)
def test_read_parquet(con, tmp_path, data_dir, fname, in_table_name):
pq = pytest.importorskip("pyarrow.parquet")
Expand Down Expand Up @@ -427,7 +430,12 @@ def ft_data(data_dir):


@pytest.mark.notyet(
["flink", "impala", "mssql", "mysql", "pandas", "postgres", "sqlite", "trino"]
["impala", "mssql", "mysql", "pandas", "postgres", "sqlite", "trino"]
)
@pytest.mark.notimpl(
["flink"],
raises=ValueError,
reason="read_parquet() missing required argument: 'schema'",
)
def test_read_parquet_glob(con, tmp_path, ft_data):
pq = pytest.importorskip("pyarrow.parquet")
Expand All @@ -446,7 +454,12 @@ def test_read_parquet_glob(con, tmp_path, ft_data):


@pytest.mark.notyet(
["flink", "impala", "mssql", "mysql", "pandas", "postgres", "sqlite", "trino"]
["impala", "mssql", "mysql", "pandas", "postgres", "sqlite", "trino"]
)
@pytest.mark.notimpl(
["flink"],
raises=ValueError,
reason="read_csv() missing required argument: 'schema'",
)
def test_read_csv_glob(con, tmp_path, ft_data):
pc = pytest.importorskip("pyarrow.csv")
Expand All @@ -469,7 +482,6 @@ def test_read_csv_glob(con, tmp_path, ft_data):
"clickhouse",
"dask",
"datafusion",
"flink",
"impala",
"mssql",
"mysql",
Expand All @@ -479,6 +491,11 @@ def test_read_csv_glob(con, tmp_path, ft_data):
"trino",
]
)
@pytest.mark.notimpl(
["flink"],
raises=ValueError,
reason="read_json() missing required argument: 'schema'",
)
def test_read_json_glob(con, tmp_path, ft_data):
nrows = len(ft_data)
ntables = 2
Expand Down Expand Up @@ -522,8 +539,11 @@ def num_diamonds(data_dir):
"in_table_name",
[param(None, id="default"), param("fancy_stones", id="file_name")],
)
@pytest.mark.notyet(
["flink", "impala", "mssql", "mysql", "postgres", "sqlite", "trino"]
@pytest.mark.notyet(["impala", "mssql", "mysql", "postgres", "sqlite", "trino"])
@pytest.mark.notimpl(
["flink"],
raises=ValueError,
reason="read_csv() missing required argument: 'schema'",
)
def test_read_csv(con, data_dir, in_table_name, num_diamonds):
fname = "diamonds.csv"
Expand Down

0 comments on commit 3eebc41

Please sign in to comment.