Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: use jobs.getQueryResults to download result sets #347

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SN
pyarrow.parquet.write_table(arrow_table, filepath, compression=parquet_compression)


def _tabledata_list_page_to_arrow(page, column_names, arrow_types):
def _row_iterator_page_to_arrow(page, column_names, arrow_types):
# Iterate over the page to force the API request to get the page data.
try:
next(iter(page))
Expand All @@ -490,8 +490,8 @@ def _tabledata_list_page_to_arrow(page, column_names, arrow_types):
return pyarrow.RecordBatch.from_arrays(arrays, names=column_names)


def download_arrow_tabledata_list(pages, bq_schema):
"""Use tabledata.list to construct an iterable of RecordBatches.
def download_arrow_row_iterator(pages, bq_schema):
"""Use HTTP JSON RowIterator to construct an iterable of RecordBatches.

Args:
pages (Iterator[:class:`google.api_core.page_iterator.Page`]):
Expand All @@ -510,10 +510,10 @@ def download_arrow_tabledata_list(pages, bq_schema):
arrow_types = [bq_to_arrow_data_type(field) for field in bq_schema]

for page in pages:
yield _tabledata_list_page_to_arrow(page, column_names, arrow_types)
yield _row_iterator_page_to_arrow(page, column_names, arrow_types)


def _tabledata_list_page_to_dataframe(page, column_names, dtypes):
def _row_iterator_page_to_dataframe(page, column_names, dtypes):
# Iterate over the page to force the API request to get the page data.
try:
next(iter(page))
Expand All @@ -528,8 +528,8 @@ def _tabledata_list_page_to_dataframe(page, column_names, dtypes):
return pandas.DataFrame(columns, columns=column_names)


def download_dataframe_tabledata_list(pages, bq_schema, dtypes):
"""Use (slower, but free) tabledata.list to construct a DataFrame.
def download_dataframe_row_iterator(pages, bq_schema, dtypes):
"""Use HTTP JSON RowIterator to construct a DataFrame.

Args:
pages (Iterator[:class:`google.api_core.page_iterator.Page`]):
Expand All @@ -549,7 +549,7 @@ def download_dataframe_tabledata_list(pages, bq_schema, dtypes):
bq_schema = schema._to_schema_fields(bq_schema)
column_names = [field.name for field in bq_schema]
for page in pages:
yield _tabledata_list_page_to_dataframe(page, column_names, dtypes)
yield _row_iterator_page_to_dataframe(page, column_names, dtypes)


def _bqstorage_page_to_arrow(page):
Expand Down
105 changes: 91 additions & 14 deletions google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@
_MAX_MULTIPART_SIZE = 5 * 1024 * 1024
_DEFAULT_NUM_RETRIES = 6
_BASE_UPLOAD_TEMPLATE = (
u"https://bigquery.googleapis.com/upload/bigquery/v2/projects/"
u"{project}/jobs?uploadType="
"https://bigquery.googleapis.com/upload/bigquery/v2/projects/"
"{project}/jobs?uploadType="
)
_MULTIPART_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + u"multipart"
_RESUMABLE_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + u"resumable"
_GENERIC_CONTENT_TYPE = u"*/*"
_MULTIPART_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + "multipart"
_RESUMABLE_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + "resumable"
_GENERIC_CONTENT_TYPE = "*/*"
_READ_LESS_THAN_SIZE = (
"Size {:d} was specified but the file-like object only had " "{:d} bytes remaining."
)
Expand Down Expand Up @@ -293,7 +293,7 @@ def api_request(*args, **kwargs):
span_attributes=span_attributes,
*args,
timeout=timeout,
**kwargs
**kwargs,
)

return page_iterator.HTTPIterator(
Expand Down Expand Up @@ -371,7 +371,7 @@ def api_request(*args, **kwargs):
span_attributes=span_attributes,
*args,
timeout=timeout,
**kwargs
**kwargs,
)

return page_iterator.HTTPIterator(
Expand Down Expand Up @@ -1129,7 +1129,7 @@ def api_request(*args, **kwargs):
span_attributes=span_attributes,
*args,
timeout=timeout,
**kwargs
**kwargs,
)

result = page_iterator.HTTPIterator(
Expand Down Expand Up @@ -1207,7 +1207,7 @@ def api_request(*args, **kwargs):
span_attributes=span_attributes,
*args,
timeout=timeout,
**kwargs
**kwargs,
)

result = page_iterator.HTTPIterator(
Expand Down Expand Up @@ -1284,7 +1284,7 @@ def api_request(*args, **kwargs):
span_attributes=span_attributes,
*args,
timeout=timeout,
**kwargs
**kwargs,
)

result = page_iterator.HTTPIterator(
Expand Down Expand Up @@ -1510,7 +1510,15 @@ def delete_table(
raise

def _get_query_results(
self, job_id, retry, project=None, timeout_ms=None, location=None, timeout=None
self,
job_id,
retry,
project=None,
timeout_ms=None,
location=None,
timeout=None,
max_results=None,
start_index=None,
):
"""Get the query results object for a query job.

Expand All @@ -1527,13 +1535,18 @@ def _get_query_results(
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
max_results (Optional[int]):
The maximum number of records to fetch per response page.
Defaults to unspecified (API default).
start_index (Optional[int]):
The zero-based index of the starting row to read.

Returns:
google.cloud.bigquery.query._QueryResults:
A new ``_QueryResults`` instance.
"""

extra_params = {"maxResults": 0}
extra_params = {}

if project is None:
project = self.project
Expand All @@ -1547,6 +1560,12 @@ def _get_query_results(
if location is not None:
extra_params["location"] = location

if max_results is not None:
extra_params["maxResults"] = max_results

if start_index is not None:
extra_params["startIndex"] = start_index

path = "/projects/{}/queries/{}".format(project, job_id)

# This call is typically made in a polling loop that checks whether the
Expand Down Expand Up @@ -1890,7 +1909,7 @@ def api_request(*args, **kwargs):
span_attributes=span_attributes,
*args,
timeout=timeout,
**kwargs
**kwargs,
)

return page_iterator.HTTPIterator(
Expand Down Expand Up @@ -2374,7 +2393,7 @@ def load_table_from_json(

destination = _table_arg_to_table_ref(destination, default_project=self.project)

data_str = u"\n".join(json.dumps(item) for item in json_rows)
data_str = "\n".join(json.dumps(item) for item in json_rows)
encoded_str = data_str.encode()
data_file = io.BytesIO(encoded_str)
return self.load_table_from_file(
Expand Down Expand Up @@ -3172,6 +3191,64 @@ def list_rows(
)
return row_iterator

def _list_rows_from_query_results(
self,
query_results,
table,
max_results=None,
page_size=None,
retry=DEFAULT_RETRY,
timeout=None,
):
"""List the rows of a completed query.

See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/getQueryResults

Args:
query_results (google.cloud.bigquery.query._QueryResults):
A ``_QueryResults`` instance containing the first page of
results.
table (Union[ \
google.cloud.bigquery.table.Table, \
google.cloud.bigquery.table.TableListItem, \
google.cloud.bigquery.table.TableReference, \
str, \
]):
DEPRECATED: The table to list, or a reference to it.

To be removed in a future update, where the QueryJob class is
not reloaded by `result()` or `to_dataframe()`.
max_results (Optional[int]):
Maximum number of rows to return across the whole iterator.
page_size (Optional[int]):
The maximum number of rows in each page of results from this request.
Non-positive values are ignored. Defaults to a sensible value set by the API.
retry (Optional[google.api_core.retry.Retry]):
How to retry the RPC.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
If multiple requests are made under the hood, ``timeout``
applies to each individual request.

Returns:
google.cloud.bigquery.table.RowIterator:
Iterator of row data
:class:`~google.cloud.bigquery.table.Row`-s.
"""
row_iterator = RowIterator(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be sure to populate extra args with the field projection. We only need rows and page token.

client=self,
api_request=functools.partial(self._call_api, retry, timeout=timeout),
path=f"/projects/{query_results.project}/queries/{query_results.job_id}",
schema=query_results.schema,
max_results=max_results,
page_size=page_size,
table=table,
first_page_response=query_results._properties,
)
return row_iterator

def _schema_from_json_file_object(self, file_obj):
"""Helper function for schema_from_json that takes a
file object that describes a table schema.
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/bigquery/dbapi/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def execute(self, operation, parameters=None, job_id=None, job_config=None):
except google.cloud.exceptions.GoogleCloudError as exc:
raise exceptions.DatabaseError(exc)

query_results = self._query_job._query_results
query_results = self._query_job._thread_local._query_results
self._set_rowcount(query_results)
self._set_description(query_results.schema)

Expand Down Expand Up @@ -239,7 +239,7 @@ def _try_fetch(self, size=None):

rows_iter = client.list_rows(
self._query_job.destination,
selected_fields=self._query_job._query_results.schema,
selected_fields=self._query_job._thread_local._query_results.schema,
page_size=self.arraysize,
)
self._query_data = iter(rows_iter)
Expand Down
Loading