Skip to content

Commit

Permalink
perf(bigquery): use query_and_wait for better performance on querie…
Browse files Browse the repository at this point in the history
…s of small data with smaller result sets (#9418)
  • Loading branch information
cpcloud authored Jun 21, 2024
1 parent fbc79d2 commit ad1e915
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 60 deletions.
84 changes: 26 additions & 58 deletions ibis/backends/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,12 @@
from ibis.backends.sql.datatypes import BigQueryType

if TYPE_CHECKING:
from collections.abc import Callable, Iterable, Mapping
from collections.abc import Iterable, Mapping
from pathlib import Path

import pandas as pd
import polars as pl
import pyarrow as pa
from google.cloud.bigquery.table import RowIterator


SCOPES = ["https://www.googleapis.com/auth/bigquery"]
Expand Down Expand Up @@ -183,7 +182,6 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
table_id = sg.table(
raw_name, db=dataset, catalog=project, quoted=False
).sql(dialect=self.name)

bq_schema = BigQuerySchema.from_ibis(op.schema)
load_job = self.client.load_table_from_dataframe(
op.data.to_frame(),
Expand Down Expand Up @@ -628,14 +626,6 @@ def _get_schema_using_query(self, query: str) -> sch.Schema:
)
return BigQuerySchema.to_ibis(job.schema)

def _execute(self, stmt, query_parameters=None):
job_config = bq.job.QueryJobConfig(query_parameters=query_parameters or [])
query = self.client.query(
stmt, job_config=job_config, project=self.billing_project
)
query.result() # blocks until finished
return query

def _to_sqlglot(
self,
expr: ir.Expr,
Expand Down Expand Up @@ -674,7 +664,7 @@ def _to_sqlglot(
).transform(_remove_null_ordering_from_unsupported_window)
return query

def raw_sql(self, query: str, params=None):
def raw_sql(self, query: str, params=None, page_size: int | None = None):
query_parameters = [
bigquery_param(
param.type(),
Expand All @@ -689,7 +679,14 @@ def raw_sql(self, query: str, params=None):
]
with contextlib.suppress(AttributeError):
query = query.sql(self.dialect)
return self._execute(query, query_parameters=query_parameters)

job_config = bq.job.QueryJobConfig(query_parameters=query_parameters or [])
return self.client.query_and_wait(
query,
job_config=job_config,
project=self.billing_project,
page_size=page_size,
)

@property
def current_catalog(self) -> str:
Expand Down Expand Up @@ -740,14 +737,23 @@ def execute(self, expr, params=None, limit="default", **kwargs):
Output from execution
"""
from ibis.backends.bigquery.converter import BigQueryPandasData

self._run_pre_execute_hooks(expr)

schema = expr.as_table().schema() - ibis.schema({"_TABLE_SUFFIX": "string"})

sql = self.compile(expr, limit=limit, params=params, **kwargs)
self._log(sql)
query = self.raw_sql(sql, params=params, **kwargs)

schema = expr.as_table().schema() - ibis.schema({"_TABLE_SUFFIX": "string"})
result = self.fetch_from_query(query, schema)
arrow_t = query.to_arrow(
progress_bar_type=None, bqstorage_client=self.storage_client
)

result = BigQueryPandasData.convert_table(
arrow_t.to_pandas(timestamp_as_object=True), schema
)

return expr.__pandas_result__(result, schema=schema)

Expand Down Expand Up @@ -789,40 +795,6 @@ def insert(
overwrite=overwrite,
)

def fetch_from_query(self, query, schema):
from ibis.backends.bigquery.converter import BigQueryPandasData

arrow_t = self._query_to_arrow(query)
df = arrow_t.to_pandas(timestamp_as_object=True)
return BigQueryPandasData.convert_table(
df, schema - ibis.schema({"_TABLE_SUFFIX": "string"})
)

def _query_to_arrow(
self,
query,
*,
method: (
Callable[[RowIterator], pa.Table | Iterable[pa.RecordBatch]] | None
) = None,
chunk_size: int | None = None,
):
if method is None:
method = lambda result: result.to_arrow(
progress_bar_type=None,
bqstorage_client=self.storage_client,
)
query_result = query.result(page_size=chunk_size)
# workaround potentially not having the ability to create read sessions
# in the dataset project
orig_project = query_result._project
query_result._project = self.billing_project
try:
arrow_obj = method(query_result)
finally:
query_result._project = orig_project
return arrow_obj

def to_pyarrow(
self,
expr: ir.Expr,
Expand All @@ -836,7 +808,9 @@ def to_pyarrow(
sql = self.compile(expr, limit=limit, params=params, **kwargs)
self._log(sql)
query = self.raw_sql(sql, params=params, **kwargs)
table = self._query_to_arrow(query)
table = query.to_arrow(
progress_bar_type=None, bqstorage_client=self.storage_client
)
return expr.__pyarrow_result__(table)

def to_pyarrow_batches(
Expand All @@ -855,14 +829,8 @@ def to_pyarrow_batches(
self._register_in_memory_tables(expr)
sql = self.compile(expr, limit=limit, params=params, **kwargs)
self._log(sql)
query = self.raw_sql(sql, params=params, **kwargs)
batch_iter = self._query_to_arrow(
query,
method=lambda result: result.to_arrow_iterable(
bqstorage_client=self.storage_client
),
chunk_size=chunk_size,
)
query = self.raw_sql(sql, params=params, page_size=chunk_size, **kwargs)
batch_iter = query.to_arrow_iterable(bqstorage_client=self.storage_client)
return pa.ipc.RecordBatchReader.from_batches(schema.to_pyarrow(), batch_iter)

def _gen_udf_name(self, name: str, schema: Optional[str]) -> str:
Expand Down
6 changes: 4 additions & 2 deletions ibis/backends/bigquery/tests/system/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import pandas as pd
import pandas.testing as tm
import pyarrow as pa
import pytest
import pytz

Expand Down Expand Up @@ -186,8 +187,9 @@ def test_repr_struct_of_array_of_struct():


def test_raw_sql(con):
result = con.raw_sql("SELECT 1").result()
assert [row.values() for row in result] == [(1,)]
result = con.raw_sql("SELECT 1 as a").to_arrow()
expected = pa.Table.from_pydict({"a": [1]})
assert result.equals(expected)


def test_parted_column_rename(parted_alltypes):
Expand Down

0 comments on commit ad1e915

Please sign in to comment.