Skip to content

Commit

Permalink
perf: use jobs.getQueryResults to download result sets
Browse files Browse the repository at this point in the history
Since `getQueryResults` was already used to wait for the job to finish,
this avoids an additional call to `tabledata.list`. The first page of
results are cached in-memory.

Additional changes will come in the future to avoid calling the BQ
Storage API when the cached results contain the full result set.
  • Loading branch information
tswast committed Oct 27, 2020
1 parent 0dca76c commit 7364196
Show file tree
Hide file tree
Showing 8 changed files with 239 additions and 148 deletions.
16 changes: 8 additions & 8 deletions google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SN
pyarrow.parquet.write_table(arrow_table, filepath, compression=parquet_compression)


def _tabledata_list_page_to_arrow(page, column_names, arrow_types):
def _row_iterator_page_to_arrow(page, column_names, arrow_types):
# Iterate over the page to force the API request to get the page data.
try:
next(iter(page))
Expand All @@ -490,8 +490,8 @@ def _tabledata_list_page_to_arrow(page, column_names, arrow_types):
return pyarrow.RecordBatch.from_arrays(arrays, names=column_names)


def download_arrow_tabledata_list(pages, bq_schema):
"""Use tabledata.list to construct an iterable of RecordBatches.
def download_arrow_row_iterator(pages, bq_schema):
"""Use HTTP JSON RowIterator to construct an iterable of RecordBatches.
Args:
pages (Iterator[:class:`google.api_core.page_iterator.Page`]):
Expand All @@ -510,10 +510,10 @@ def download_arrow_tabledata_list(pages, bq_schema):
arrow_types = [bq_to_arrow_data_type(field) for field in bq_schema]

for page in pages:
yield _tabledata_list_page_to_arrow(page, column_names, arrow_types)
yield _row_iterator_page_to_arrow(page, column_names, arrow_types)


def _tabledata_list_page_to_dataframe(page, column_names, dtypes):
def _row_iterator_page_to_dataframe(page, column_names, dtypes):
# Iterate over the page to force the API request to get the page data.
try:
next(iter(page))
Expand All @@ -528,8 +528,8 @@ def _tabledata_list_page_to_dataframe(page, column_names, dtypes):
return pandas.DataFrame(columns, columns=column_names)


def download_dataframe_tabledata_list(pages, bq_schema, dtypes):
"""Use (slower, but free) tabledata.list to construct a DataFrame.
def download_dataframe_row_iterator(pages, bq_schema, dtypes):
"""Use HTTP JSON RowIterator to construct a DataFrame.
Args:
pages (Iterator[:class:`google.api_core.page_iterator.Page`]):
Expand All @@ -549,7 +549,7 @@ def download_dataframe_tabledata_list(pages, bq_schema, dtypes):
bq_schema = schema._to_schema_fields(bq_schema)
column_names = [field.name for field in bq_schema]
for page in pages:
yield _tabledata_list_page_to_dataframe(page, column_names, dtypes)
yield _row_iterator_page_to_dataframe(page, column_names, dtypes)


def _bqstorage_page_to_arrow(page):
Expand Down
105 changes: 91 additions & 14 deletions google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@
_MAX_MULTIPART_SIZE = 5 * 1024 * 1024
_DEFAULT_NUM_RETRIES = 6
_BASE_UPLOAD_TEMPLATE = (
u"https://bigquery.googleapis.com/upload/bigquery/v2/projects/"
u"{project}/jobs?uploadType="
"https://bigquery.googleapis.com/upload/bigquery/v2/projects/"
"{project}/jobs?uploadType="
)
_MULTIPART_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + u"multipart"
_RESUMABLE_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + u"resumable"
_GENERIC_CONTENT_TYPE = u"*/*"
_MULTIPART_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + "multipart"
_RESUMABLE_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + "resumable"
_GENERIC_CONTENT_TYPE = "*/*"
_READ_LESS_THAN_SIZE = (
"Size {:d} was specified but the file-like object only had " "{:d} bytes remaining."
)
Expand Down Expand Up @@ -293,7 +293,7 @@ def api_request(*args, **kwargs):
span_attributes=span_attributes,
*args,
timeout=timeout,
**kwargs
**kwargs,
)

return page_iterator.HTTPIterator(
Expand Down Expand Up @@ -371,7 +371,7 @@ def api_request(*args, **kwargs):
span_attributes=span_attributes,
*args,
timeout=timeout,
**kwargs
**kwargs,
)

return page_iterator.HTTPIterator(
Expand Down Expand Up @@ -1129,7 +1129,7 @@ def api_request(*args, **kwargs):
span_attributes=span_attributes,
*args,
timeout=timeout,
**kwargs
**kwargs,
)

result = page_iterator.HTTPIterator(
Expand Down Expand Up @@ -1207,7 +1207,7 @@ def api_request(*args, **kwargs):
span_attributes=span_attributes,
*args,
timeout=timeout,
**kwargs
**kwargs,
)

result = page_iterator.HTTPIterator(
Expand Down Expand Up @@ -1284,7 +1284,7 @@ def api_request(*args, **kwargs):
span_attributes=span_attributes,
*args,
timeout=timeout,
**kwargs
**kwargs,
)

result = page_iterator.HTTPIterator(
Expand Down Expand Up @@ -1510,7 +1510,15 @@ def delete_table(
raise

def _get_query_results(
self, job_id, retry, project=None, timeout_ms=None, location=None, timeout=None
self,
job_id,
retry,
project=None,
timeout_ms=None,
location=None,
timeout=None,
max_results=None,
start_index=None,
):
"""Get the query results object for a query job.
Expand All @@ -1527,13 +1535,18 @@ def _get_query_results(
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
max_results (Optional[int]):
The maximum number of records to fetch per response page.
Defaults to unspecified (API default).
start_index (Optional[int]):
The zero-based index of the starting row to read.
Returns:
google.cloud.bigquery.query._QueryResults:
A new ``_QueryResults`` instance.
"""

extra_params = {"maxResults": 0}
extra_params = {}

if project is None:
project = self.project
Expand All @@ -1547,6 +1560,12 @@ def _get_query_results(
if location is not None:
extra_params["location"] = location

if max_results is not None:
extra_params["maxResults"] = max_results

if start_index is not None:
extra_params["startIndex"] = start_index

path = "/projects/{}/queries/{}".format(project, job_id)

# This call is typically made in a polling loop that checks whether the
Expand Down Expand Up @@ -1890,7 +1909,7 @@ def api_request(*args, **kwargs):
span_attributes=span_attributes,
*args,
timeout=timeout,
**kwargs
**kwargs,
)

return page_iterator.HTTPIterator(
Expand Down Expand Up @@ -2374,7 +2393,7 @@ def load_table_from_json(

destination = _table_arg_to_table_ref(destination, default_project=self.project)

data_str = u"\n".join(json.dumps(item) for item in json_rows)
data_str = "\n".join(json.dumps(item) for item in json_rows)
encoded_str = data_str.encode()
data_file = io.BytesIO(encoded_str)
return self.load_table_from_file(
Expand Down Expand Up @@ -3172,6 +3191,64 @@ def list_rows(
)
return row_iterator

def _list_rows_from_query_results(
self,
query_results,
table,
max_results=None,
page_size=None,
retry=DEFAULT_RETRY,
timeout=None,
):
"""List the rows of a completed query.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/getQueryResults
Args:
query_results (google.cloud.bigquery.query._QueryResults):
A ``_QueryResults`` instance containing the first page of
results.
table (Union[ \
google.cloud.bigquery.table.Table, \
google.cloud.bigquery.table.TableListItem, \
google.cloud.bigquery.table.TableReference, \
str, \
]):
DEPRECATED: The table to list, or a reference to it.
To be removed in a future update, where the QueryJob class is
not reloaded by `result()` or `to_dataframe()`.
max_results (Optional[int]):
Maximum number of rows to return across the whole iterator.
page_size (Optional[int]):
The maximum number of rows in each page of results from this request.
Non-positive values are ignored. Defaults to a sensible value set by the API.
retry (Optional[google.api_core.retry.Retry]):
How to retry the RPC.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
If multiple requests are made under the hood, ``timeout``
applies to each individual request.
Returns:
google.cloud.bigquery.table.RowIterator:
Iterator of row data
:class:`~google.cloud.bigquery.table.Row`-s.
"""
row_iterator = RowIterator(
client=self,
api_request=functools.partial(self._call_api, retry, timeout=timeout),
path=f"/projects/{query_results.project}/queries/{query_results.job_id}",
schema=query_results.schema,
max_results=max_results,
page_size=page_size,
table=table,
first_page_response=query_results._properties,
)
return row_iterator

def _schema_from_json_file_object(self, file_obj):
"""Helper function for schema_from_json that takes a
file object that describes a table schema.
Expand Down
40 changes: 29 additions & 11 deletions google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2649,6 +2649,7 @@ def __init__(self, job_id, query, client, job_config=None):
)

self._query_results = None
self._get_query_results_kwargs = {}
self._done_timeout = None
self._transport_timeout = None

Expand Down Expand Up @@ -3121,14 +3122,16 @@ def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True):
# stored in _blocking_poll() in the process of polling for job completion.
transport_timeout = timeout if timeout is not None else self._transport_timeout

self._query_results = self._client._get_query_results(
self.job_id,
retry,
project=self.project,
timeout_ms=timeout_ms,
location=self.location,
timeout=transport_timeout,
)
if not self._query_results or not self._query_results.complete:
self._query_results = self._client._get_query_results(
self.job_id,
retry,
project=self.project,
location=self.location,
timeout_ms=timeout_ms,
timeout=transport_timeout,
**self._get_query_results_kwargs
)

# Only reload the job once we know the query is complete.
# This will ensure that fields such as the destination table are
Expand Down Expand Up @@ -3244,6 +3247,18 @@ def result(
concurrent.futures.TimeoutError:
If the job did not complete in the given timeout.
"""
# Save arguments which are relevant for result and reset any cached
# _query_results so that the first page of results is fetched correctly
# in the done() method. The done() method is called by the super-class.
if page_size is None:
max_results_kwarg = max_results
elif max_results is None:
max_results_kwarg = page_size
else:
max_results_kwarg = min(page_size, max_results)
self._get_query_results_kwargs["max_results"] = max_results_kwarg
self._get_query_results_kwargs["start_index"] = start_index

try:
super(QueryJob, self).result(retry=retry, timeout=timeout)
except exceptions.GoogleCloudError as exc:
Expand All @@ -3255,7 +3270,7 @@ def result(

# If the query job is complete but there are no query results, this was
# special job, such as a DDL query. Return an empty result set to
# indicate success and avoid calling tabledata.list on a table which
# indicate success and avoid reading from a destination table which
# can't be read (such as a view table).
if self._query_results.total_rows is None:
return _EmptyRowIterator()
Expand All @@ -3264,11 +3279,14 @@ def result(
dest_table_ref = self.destination
dest_table = Table(dest_table_ref, schema=schema)
dest_table._properties["numRows"] = self._query_results.total_rows
rows = self._client.list_rows(

# Return an iterator instead of returning the job. Omit start_index
# because it's only needed for the first call to getQueryResults.
rows = self._client._list_rows_from_query_results(
self._query_results,
dest_table,
page_size=page_size,
max_results=max_results,
start_index=start_index,
retry=retry,
timeout=timeout,
)
Expand Down
Loading

0 comments on commit 7364196

Please sign in to comment.