Skip to content

Commit

Permalink
feat(bigquery): make to_pyarrow_batches() smarter
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud committed Mar 24, 2023
1 parent 383c708 commit 42f5987
Showing 1 changed file with 28 additions and 13 deletions.
41 changes: 28 additions & 13 deletions ibis/backends/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from __future__ import annotations

import contextlib
from typing import TYPE_CHECKING, Any, Mapping
from typing import TYPE_CHECKING, Any, Callable, Iterable, Mapping
from urllib.parse import parse_qs, urlparse

import google.auth.credentials
Expand Down Expand Up @@ -37,6 +37,7 @@

if TYPE_CHECKING:
import pyarrow as pa
from google.cloud.bigquery.table import RowIterator

SCOPES = ["https://www.googleapis.com/auth/bigquery"]
EXTERNAL_DATA_SCOPES = [
Expand Down Expand Up @@ -347,22 +348,31 @@ def fetch_from_cursor(self, cursor, schema):
df = arrow_t.to_pandas(timestamp_as_object=True)
return schema.apply_to(df)

def _cursor_to_arrow(self, cursor):
def _cursor_to_arrow(
self,
cursor,
*,
method: Callable[[RowIterator], pa.Table | Iterable[pa.RecordBatch]]
| None = None,
chunk_size: int | None = None,
):
if method is None:
method = lambda result: result.to_arrow(
progress_bar_type=None,
bqstorage_client=None,
create_bqstorage_client=True,
)
query = cursor.query
query_result = query.result()
query_result = query.result(page_size=chunk_size)
# workaround potentially not having the ability to create read sessions
# in the dataset project
orig_project = query_result._project
query_result._project = self.billing_project
try:
arrow_table = query_result.to_arrow(
progress_bar_type=None,
bqstorage_client=None,
create_bqstorage_client=True,
)
arrow_obj = method(query_result)
finally:
query_result._project = orig_project
return arrow_table
return arrow_obj

def to_pyarrow(
self,
Expand Down Expand Up @@ -395,14 +405,19 @@ def to_pyarrow_batches(
chunk_size: int = 1_000_000,
**kwargs: Any,
):
self._import_pyarrow()
pa = self._import_pyarrow()

schema = expr.as_table().schema()

# kind of pointless, but it'll work if there's enough memory
query_ast = self.compiler.to_ast_ensure_limit(expr, limit, params=params)
sql = query_ast.compile()
cursor = self.raw_sql(sql, params=params, **kwargs)
table = self._cursor_to_arrow(cursor)
return table.to_reader(chunk_size)
batch_iter = self._cursor_to_arrow(
cursor,
method=lambda result: result.to_arrow_iterable(),
chunk_size=chunk_size,
)
return pa.RecordBatchReader.from_batches(schema.to_pyarrow(), batch_iter)

def get_schema(self, name, database=None):
table_id = self._fully_qualified_name(name, database)
Expand Down

0 comments on commit 42f5987

Please sign in to comment.