From 82a70e47c5774e13f253e21fa63d9959d87b0515 Mon Sep 17 00:00:00 2001 From: Jim Crist-Harif Date: Fri, 6 Sep 2024 17:20:21 -0500 Subject: [PATCH] fix(bigquery): handle column name mismatches and `_TABLE_SUFFIX` everywhere --- ibis/backends/bigquery/__init__.py | 138 ++++++++++-------- .../bigquery/tests/system/test_client.py | 16 +- 2 files changed, 93 insertions(+), 61 deletions(-) diff --git a/ibis/backends/bigquery/__init__.py b/ibis/backends/bigquery/__init__.py index 6b30c58b98dd..230795a9b034 100644 --- a/ibis/backends/bigquery/__init__.py +++ b/ibis/backends/bigquery/__init__.py @@ -147,6 +147,15 @@ def _force_quote_table(table: sge.Table) -> sge.Table: return table +def _postprocess_arrow( + table_or_batch: pa.Table | pa.RecordBatch, names: list[str] +) -> pa.Table | pa.RecordBatch: + """Drop `_TABLE_SUFFIX` if present in the results, then rename columns.""" + if "_TABLE_SUFFIX" in table_or_batch.column_names: + table_or_batch = table_or_batch.drop_columns(["_TABLE_SUFFIX"]) + return table_or_batch.rename_columns(names) + + class Backend(SQLBackend, CanCreateDatabase, CanCreateSchema): name = "bigquery" compiler = sc.bigquery.compiler @@ -702,50 +711,6 @@ def compile( self._log(sql) return sql - def execute(self, expr, params=None, limit="default", **kwargs): - """Compile and execute the given Ibis expression. - - Compile and execute Ibis expression using this backend client - interface, returning results in-memory in the appropriate object type - - Parameters - ---------- - expr - Ibis expression to execute - limit - Retrieve at most this number of values/rows. Overrides any limit - already set on the expression. - params - Query parameters - kwargs - Extra arguments specific to the backend - - Returns - ------- - pd.DataFrame | pd.Series | scalar - Output from execution - - """ - from ibis.backends.bigquery.converter import BigQueryPandasData - - self._run_pre_execute_hooks(expr) - - schema = expr.as_table().schema() - ibis.schema({"_TABLE_SUFFIX": "string"}) - - sql = self.compile(expr, limit=limit, params=params, **kwargs) - self._log(sql) - query = self.raw_sql(sql, params=params, **kwargs) - - arrow_t = query.to_arrow( - progress_bar_type=None, bqstorage_client=self.storage_client - ) - - result = BigQueryPandasData.convert_table( - arrow_t.to_pandas(timestamp_as_object=True), schema - ) - - return expr.__pandas_result__(result, schema=schema) - def insert( self, table_name: str, @@ -784,6 +749,21 @@ def insert( overwrite=overwrite, ) + def _to_query( + self, + table_expr: ir.Table, + *, + params: Mapping[ir.Scalar, Any] | None = None, + limit: int | str | None = None, + page_size: int | None = None, + **kwargs: Any, + ): + self._run_pre_execute_hooks(table_expr) + sql = self.compile(table_expr, limit=limit, params=params, **kwargs) + self._log(sql) + + return self.raw_sql(sql, params=params, page_size=page_size) + def to_pyarrow( self, expr: ir.Expr, @@ -793,15 +773,16 @@ def to_pyarrow( **kwargs: Any, ) -> pa.Table: self._import_pyarrow() - self._register_in_memory_tables(expr) - sql = self.compile(expr, limit=limit, params=params, **kwargs) - self._log(sql) - query = self.raw_sql(sql, params=params, **kwargs) + + table_expr = expr.as_table() + schema = table_expr.schema() - ibis.schema({"_TABLE_SUFFIX": "string"}) + + query = self._to_query(table_expr, params=params, limit=limit, **kwargs) table = query.to_arrow( progress_bar_type=None, bqstorage_client=self.storage_client ) - table = table.rename_columns(list(expr.as_table().schema().names)) - return expr.__pyarrow_result__(table) + table = _postprocess_arrow(table, list(schema.names)) + return expr.__pyarrow_result__(table, schema=schema) def to_pyarrow_batches( self, @@ -814,14 +795,55 @@ def to_pyarrow_batches( ): pa = self._import_pyarrow() - schema = expr.as_table().schema() + table_expr = expr.as_table() + schema = table_expr.schema() - ibis.schema({"_TABLE_SUFFIX": "string"}) + colnames = list(schema.names) - self._register_in_memory_tables(expr) - sql = self.compile(expr, limit=limit, params=params, **kwargs) - self._log(sql) - query = self.raw_sql(sql, params=params, page_size=chunk_size, **kwargs) + query = self._to_query( + table_expr, params=params, limit=limit, page_size=chunk_size, **kwargs + ) batch_iter = query.to_arrow_iterable(bqstorage_client=self.storage_client) - return pa.ipc.RecordBatchReader.from_batches(schema.to_pyarrow(), batch_iter) + return pa.ipc.RecordBatchReader.from_batches( + schema.to_pyarrow(), + (_postprocess_arrow(b, colnames) for b in batch_iter), + ) + + def execute(self, expr, params=None, limit="default", **kwargs): + """Compile and execute the given Ibis expression. + + Compile and execute Ibis expression using this backend client + interface, returning results in-memory in the appropriate object type + + Parameters + ---------- + expr + Ibis expression to execute + limit + Retrieve at most this number of values/rows. Overrides any limit + already set on the expression. + params + Query parameters + kwargs + Extra arguments specific to the backend + + Returns + ------- + pd.DataFrame | pd.Series | scalar + Output from execution + + """ + from ibis.backends.bigquery.converter import BigQueryPandasData + + table_expr = expr.as_table() + schema = table_expr.schema() - ibis.schema({"_TABLE_SUFFIX": "string"}) + query = self._to_query(table_expr, params=params, limit=limit, **kwargs) + df = query.to_arrow( + progress_bar_type=None, bqstorage_client=self.storage_client + ).to_pandas(timestamp_as_object=True) + # Drop _TABLE_SUFFIX if present in the results, then rename columns + df = df.drop(columns="_TABLE_SUFFIX", errors="ignore") + df.columns = schema.names + return expr.__pandas_result__(df, schema=schema, data_mapper=BigQueryPandasData) def _gen_udf_name(self, name: str, schema: Optional[str]) -> str: func = ".".join(filter(None, (schema, name))) @@ -1010,7 +1032,7 @@ def create_table( obj = ibis.memtable(obj, schema=schema) if obj is not None: - self._register_in_memory_tables(obj) + self._run_pre_execute_hooks(obj) if temp: dataset = self._session_dataset.dataset_id @@ -1107,7 +1129,7 @@ def create_view( expression=self.compile(obj), replace=overwrite, ) - self._register_in_memory_tables(obj) + self._run_pre_execute_hooks(obj) self.raw_sql(stmt.sql(self.name)) return self.table(name, database=(catalog, database)) diff --git a/ibis/backends/bigquery/tests/system/test_client.py b/ibis/backends/bigquery/tests/system/test_client.py index 8dd5c6e3d747..c31a33bd8694 100644 --- a/ibis/backends/bigquery/tests/system/test_client.py +++ b/ibis/backends/bigquery/tests/system/test_client.py @@ -421,12 +421,22 @@ def test_create_table_from_scratch_with_spaces(project_id, dataset_id): con.drop_table(name) -def test_table_suffix(): +@pytest.mark.parametrize("ret_type", ["pandas", "pyarrow", "pyarrow_batches"]) +def test_table_suffix(ret_type): con = ibis.connect("bigquery://ibis-gbq") t = con.table("gsod*", database="bigquery-public-data.noaa_gsod") expr = t.filter(t._TABLE_SUFFIX == "1929", t.max != 9999.9).head(1) - result = expr.execute() - assert not result.empty + if ret_type == "pandas": + result = expr.to_pandas() + cols = list(result.columns) + elif ret_type == "pyarrow": + result = expr.to_pyarrow() + cols = result.column_names + elif ret_type == "pyarrow_batches": + result = pa.Table.from_batches(expr.to_pyarrow_batches()) + cols = result.column_names + assert len(result) + assert "_TABLE_PREFIX" not in cols def test_parameters_in_url_connect(mocker):