From ff831106b1beedab517e13a51fa631916032e5dd Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Sun, 12 Nov 2023 11:23:40 -0500 Subject: [PATCH] feat(bigquery): add `read_csv`, `read_json`, `read_parquet` support --- ibis/backends/bigquery/__init__.py | 139 +++++++++++++++++++++++++++ ibis/backends/tests/test_register.py | 78 +++------------ 2 files changed, 155 insertions(+), 62 deletions(-) diff --git a/ibis/backends/bigquery/__init__.py b/ibis/backends/bigquery/__init__.py index 0b962cd39f90..99ab3d45c3fc 100644 --- a/ibis/backends/bigquery/__init__.py +++ b/ibis/backends/bigquery/__init__.py @@ -2,7 +2,10 @@ from __future__ import annotations +import concurrent.futures import contextlib +import glob +import os import re import warnings from functools import partial @@ -40,6 +43,7 @@ if TYPE_CHECKING: from collections.abc import Iterable, Mapping + from pathlib import Path import pyarrow as pa from google.cloud.bigquery.table import RowIterator @@ -147,6 +151,141 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: ) load_job.result() + def _read_file( + self, + path: str | Path, + *, + table_name: str | None = None, + job_config: bq.LoadJobConfig, + ) -> ir.Table: + self._make_session() + + if table_name is None: + table_name = util.gen_name(f"bq_read_{job_config.source_format}") + + table_ref = self._session_dataset.table(table_name) + + schema = self._session_dataset.dataset_id + database = self._session_dataset.project + + # drop the table if it exists + # + # we could do this with write_disposition = WRITE_TRUNCATE but then the + # concurrent append jobs aren't possible + # + # dropping the table first means all write_dispositions can be + # WRITE_APPEND + self.drop_table(table_name, schema=schema, database=database, force=True) + + if os.path.isdir(path): + raise NotImplementedError("Reading from a directory is not supported.") + elif str(path).startswith("gs://"): + load_job = self.client.load_table_from_uri( + path, table_ref, job_config=job_config + ) + load_job.result() + else: + + def load(file: str) -> None: + with open(file, mode="rb") as f: + load_job = self.client.load_table_from_file( + f, table_ref, job_config=job_config + ) + load_job.result() + + job_config.write_disposition = bq.WriteDisposition.WRITE_APPEND + + with concurrent.futures.ThreadPoolExecutor() as executor: + for fut in concurrent.futures.as_completed( + executor.submit(load, file) for file in glob.glob(str(path)) + ): + fut.result() + + return self.table(table_name, schema=schema, database=database) + + def read_parquet( + self, path: str | Path, table_name: str | None = None, **kwargs: Any + ): + """Read Parquet data into a BigQuery table. + + Parameters + ---------- + path + Path to a Parquet file on GCS or the local filesystem. Globs are supported. + table_name + Optional table name + kwargs + Additional keyword arguments passed to `google.cloud.bigquery.LoadJobConfig`. + + Returns + ------- + Table + An Ibis table expression + """ + return self._read_file( + path, + table_name=table_name, + job_config=bq.LoadJobConfig( + source_format=bq.SourceFormat.PARQUET, **kwargs + ), + ) + + def read_csv( + self, path: str | Path, table_name: str | None = None, **kwargs: Any + ) -> ir.Table: + """Read CSV data into a BigQuery table. + + Parameters + ---------- + path + Path to a CSV file on GCS or the local filesystem. Globs are supported. + table_name + Optional table name + kwargs + Additional keyword arguments passed to + `google.cloud.bigquery.LoadJobConfig`. + + Returns + ------- + Table + An Ibis table expression + """ + job_config = bq.LoadJobConfig( + source_format=bq.SourceFormat.CSV, + autodetect=True, + skip_leading_rows=1, + **kwargs, + ) + return self._read_file(path, table_name=table_name, job_config=job_config) + + def read_json( + self, path: str | Path, table_name: str | None = None, **kwargs: Any + ) -> ir.Table: + """Read newline-delimited JSON data into a BigQuery table. + + Parameters + ---------- + path + Path to a newline-delimited JSON file on GCS or the local + filesystem. Globs are supported. + table_name + Optional table name + kwargs + Additional keyword arguments passed to + `google.cloud.bigquery.LoadJobConfig`. + + Returns + ------- + Table + An Ibis table expression + """ + job_config = bq.LoadJobConfig( + source_format=bq.SourceFormat.NEWLINE_DELIMITED_JSON, + autodetect=True, + **kwargs, + ) + return self._read_file(path, table_name=table_name, job_config=job_config) + def _from_url(self, url: str, **kwargs): result = urlparse(url) params = parse_qs(result.query) diff --git a/ibis/backends/tests/test_register.py b/ibis/backends/tests/test_register.py index 1d8759d153cf..4ac1a216d3cd 100644 --- a/ibis/backends/tests/test_register.py +++ b/ibis/backends/tests/test_register.py @@ -389,29 +389,16 @@ def test_register_garbage(con, monkeypatch): @pytest.mark.parametrize( - ("fname", "in_table_name", "out_table_name"), + ("fname", "in_table_name"), [ - ( - "functional_alltypes.parquet", - None, - "ibis_read_parquet", - ), - ("functional_alltypes.parquet", "funk_all", "funk_all"), + ("functional_alltypes.parquet", None), + ("functional_alltypes.parquet", "funk_all"), ], ) @pytest.mark.notyet( - [ - "bigquery", - "flink", - "impala", - "mssql", - "mysql", - "postgres", - "sqlite", - "trino", - ] + ["flink", "impala", "mssql", "mysql", "postgres", "sqlite", "trino"] ) -def test_read_parquet(con, tmp_path, data_dir, fname, in_table_name, out_table_name): +def test_read_parquet(con, tmp_path, data_dir, fname, in_table_name): pq = pytest.importorskip("pyarrow.parquet") fname = Path(fname) @@ -426,10 +413,9 @@ def test_read_parquet(con, tmp_path, data_dir, fname, in_table_name, out_table_n fname = str(Path(fname).absolute()) table = con.read_parquet(fname, table_name=in_table_name) - assert any(out_table_name in t for t in con.list_tables()) - - if con.name != "datafusion": - table.count().execute() + if in_table_name is not None: + assert table.op().name == in_table_name + assert table.count().execute() @pytest.fixture(scope="module") @@ -441,17 +427,7 @@ def ft_data(data_dir): @pytest.mark.notyet( - [ - "bigquery", - "flink", - "impala", - "mssql", - "mysql", - "pandas", - "postgres", - "sqlite", - "trino", - ] + ["flink", "impala", "mssql", "mysql", "pandas", "postgres", "sqlite", "trino"] ) def test_read_parquet_glob(con, tmp_path, ft_data): pq = pytest.importorskip("pyarrow.parquet") @@ -470,17 +446,7 @@ def test_read_parquet_glob(con, tmp_path, ft_data): @pytest.mark.notyet( - [ - "bigquery", - "flink", - "impala", - "mssql", - "mysql", - "pandas", - "postgres", - "sqlite", - "trino", - ] + ["flink", "impala", "mssql", "mysql", "pandas", "postgres", "sqlite", "trino"] ) def test_read_csv_glob(con, tmp_path, ft_data): pc = pytest.importorskip("pyarrow.csv") @@ -500,7 +466,6 @@ def test_read_csv_glob(con, tmp_path, ft_data): @pytest.mark.notyet( [ - "bigquery", "clickhouse", "dask", "datafusion", @@ -554,25 +519,13 @@ def num_diamonds(data_dir): @pytest.mark.parametrize( - ("in_table_name", "out_table_name"), - [ - param(None, "ibis_read_csv_", id="default"), - param("fancy_stones", "fancy_stones", id="file_name"), - ], + "in_table_name", + [param(None, id="default"), param("fancy_stones", id="file_name")], ) @pytest.mark.notyet( - [ - "bigquery", - "flink", - "impala", - "mssql", - "mysql", - "postgres", - "sqlite", - "trino", - ] + ["flink", "impala", "mssql", "mysql", "postgres", "sqlite", "trino"] ) -def test_read_csv(con, data_dir, in_table_name, out_table_name, num_diamonds): +def test_read_csv(con, data_dir, in_table_name, num_diamonds): fname = "diamonds.csv" with pushd(data_dir / "csv"): if con.name == "pyspark": @@ -580,7 +533,8 @@ def test_read_csv(con, data_dir, in_table_name, out_table_name, num_diamonds): fname = str(Path(fname).absolute()) table = con.read_csv(fname, table_name=in_table_name) - assert any(out_table_name in t for t in con.list_tables()) + if in_table_name is not None: + assert table.op().name == in_table_name special_types = DIAMONDS_COLUMN_TYPES.get(con.name, {})