From ad1e915017a8b06f8824af174e8c24ad25004ac3 Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Fri, 21 Jun 2024 14:38:02 -0400 Subject: [PATCH] perf(bigquery): use `query_and_wait` for better performance on queries of small data with smaller result sets (#9418) --- ibis/backends/bigquery/__init__.py | 84 ++++++------------- .../bigquery/tests/system/test_client.py | 6 +- 2 files changed, 30 insertions(+), 60 deletions(-) diff --git a/ibis/backends/bigquery/__init__.py b/ibis/backends/bigquery/__init__.py index b889bad9866e..eef174a0ce00 100644 --- a/ibis/backends/bigquery/__init__.py +++ b/ibis/backends/bigquery/__init__.py @@ -39,13 +39,12 @@ from ibis.backends.sql.datatypes import BigQueryType if TYPE_CHECKING: - from collections.abc import Callable, Iterable, Mapping + from collections.abc import Iterable, Mapping from pathlib import Path import pandas as pd import polars as pl import pyarrow as pa - from google.cloud.bigquery.table import RowIterator SCOPES = ["https://www.googleapis.com/auth/bigquery"] @@ -183,7 +182,6 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: table_id = sg.table( raw_name, db=dataset, catalog=project, quoted=False ).sql(dialect=self.name) - bq_schema = BigQuerySchema.from_ibis(op.schema) load_job = self.client.load_table_from_dataframe( op.data.to_frame(), @@ -628,14 +626,6 @@ def _get_schema_using_query(self, query: str) -> sch.Schema: ) return BigQuerySchema.to_ibis(job.schema) - def _execute(self, stmt, query_parameters=None): - job_config = bq.job.QueryJobConfig(query_parameters=query_parameters or []) - query = self.client.query( - stmt, job_config=job_config, project=self.billing_project - ) - query.result() # blocks until finished - return query - def _to_sqlglot( self, expr: ir.Expr, @@ -674,7 +664,7 @@ def _to_sqlglot( ).transform(_remove_null_ordering_from_unsupported_window) return query - def raw_sql(self, query: str, params=None): + def raw_sql(self, query: str, params=None, page_size: int | None = None): query_parameters = [ bigquery_param( param.type(), @@ -689,7 +679,14 @@ def raw_sql(self, query: str, params=None): ] with contextlib.suppress(AttributeError): query = query.sql(self.dialect) - return self._execute(query, query_parameters=query_parameters) + + job_config = bq.job.QueryJobConfig(query_parameters=query_parameters or []) + return self.client.query_and_wait( + query, + job_config=job_config, + project=self.billing_project, + page_size=page_size, + ) @property def current_catalog(self) -> str: @@ -740,14 +737,23 @@ def execute(self, expr, params=None, limit="default", **kwargs): Output from execution """ + from ibis.backends.bigquery.converter import BigQueryPandasData + self._run_pre_execute_hooks(expr) + schema = expr.as_table().schema() - ibis.schema({"_TABLE_SUFFIX": "string"}) + sql = self.compile(expr, limit=limit, params=params, **kwargs) self._log(sql) query = self.raw_sql(sql, params=params, **kwargs) - schema = expr.as_table().schema() - ibis.schema({"_TABLE_SUFFIX": "string"}) - result = self.fetch_from_query(query, schema) + arrow_t = query.to_arrow( + progress_bar_type=None, bqstorage_client=self.storage_client + ) + + result = BigQueryPandasData.convert_table( + arrow_t.to_pandas(timestamp_as_object=True), schema + ) return expr.__pandas_result__(result, schema=schema) @@ -789,40 +795,6 @@ def insert( overwrite=overwrite, ) - def fetch_from_query(self, query, schema): - from ibis.backends.bigquery.converter import BigQueryPandasData - - arrow_t = self._query_to_arrow(query) - df = arrow_t.to_pandas(timestamp_as_object=True) - return BigQueryPandasData.convert_table( - df, schema - ibis.schema({"_TABLE_SUFFIX": "string"}) - ) - - def _query_to_arrow( - self, - query, - *, - method: ( - Callable[[RowIterator], pa.Table | Iterable[pa.RecordBatch]] | None - ) = None, - chunk_size: int | None = None, - ): - if method is None: - method = lambda result: result.to_arrow( - progress_bar_type=None, - bqstorage_client=self.storage_client, - ) - query_result = query.result(page_size=chunk_size) - # workaround potentially not having the ability to create read sessions - # in the dataset project - orig_project = query_result._project - query_result._project = self.billing_project - try: - arrow_obj = method(query_result) - finally: - query_result._project = orig_project - return arrow_obj - def to_pyarrow( self, expr: ir.Expr, @@ -836,7 +808,9 @@ def to_pyarrow( sql = self.compile(expr, limit=limit, params=params, **kwargs) self._log(sql) query = self.raw_sql(sql, params=params, **kwargs) - table = self._query_to_arrow(query) + table = query.to_arrow( + progress_bar_type=None, bqstorage_client=self.storage_client + ) return expr.__pyarrow_result__(table) def to_pyarrow_batches( @@ -855,14 +829,8 @@ def to_pyarrow_batches( self._register_in_memory_tables(expr) sql = self.compile(expr, limit=limit, params=params, **kwargs) self._log(sql) - query = self.raw_sql(sql, params=params, **kwargs) - batch_iter = self._query_to_arrow( - query, - method=lambda result: result.to_arrow_iterable( - bqstorage_client=self.storage_client - ), - chunk_size=chunk_size, - ) + query = self.raw_sql(sql, params=params, page_size=chunk_size, **kwargs) + batch_iter = query.to_arrow_iterable(bqstorage_client=self.storage_client) return pa.ipc.RecordBatchReader.from_batches(schema.to_pyarrow(), batch_iter) def _gen_udf_name(self, name: str, schema: Optional[str]) -> str: diff --git a/ibis/backends/bigquery/tests/system/test_client.py b/ibis/backends/bigquery/tests/system/test_client.py index 09c392a944a0..048b8fb5cdef 100644 --- a/ibis/backends/bigquery/tests/system/test_client.py +++ b/ibis/backends/bigquery/tests/system/test_client.py @@ -6,6 +6,7 @@ import pandas as pd import pandas.testing as tm +import pyarrow as pa import pytest import pytz @@ -186,8 +187,9 @@ def test_repr_struct_of_array_of_struct(): def test_raw_sql(con): - result = con.raw_sql("SELECT 1").result() - assert [row.values() for row in result] == [(1,)] + result = con.raw_sql("SELECT 1 as a").to_arrow() + expected = pa.Table.from_pydict({"a": [1]}) + assert result.equals(expected) def test_parted_column_rename(parted_alltypes):