Skip to content

Commit

Permalink
perf: use jobs.getQueryResults to download result sets (#363)
Browse files Browse the repository at this point in the history
* refactor: break job into multiple modules

Original paths are retained for backwards compatibility.

* perf: use `jobs.getQueryResults` to download result sets

Replaces `tabledata.list` when `RowIterator` is used for query results.
This likely also fixes a few edge cases around BigQuery scripting jobs.

* revert unnecessary changes to _get_query_results

* simplify RowIterator. no need to hack Table object

* fix tests for bqstorage warning

* populate location
  • Loading branch information
tswast authored Nov 4, 2020
1 parent 2849e56 commit 0c3476d
Show file tree
Hide file tree
Showing 11 changed files with 256 additions and 131 deletions.
16 changes: 8 additions & 8 deletions google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ def dataframe_to_parquet(dataframe, bq_schema, filepath, parquet_compression="SN
pyarrow.parquet.write_table(arrow_table, filepath, compression=parquet_compression)


def _tabledata_list_page_to_arrow(page, column_names, arrow_types):
def _row_iterator_page_to_arrow(page, column_names, arrow_types):
# Iterate over the page to force the API request to get the page data.
try:
next(iter(page))
Expand All @@ -490,8 +490,8 @@ def _tabledata_list_page_to_arrow(page, column_names, arrow_types):
return pyarrow.RecordBatch.from_arrays(arrays, names=column_names)


def download_arrow_tabledata_list(pages, bq_schema):
"""Use tabledata.list to construct an iterable of RecordBatches.
def download_arrow_row_iterator(pages, bq_schema):
"""Use HTTP JSON RowIterator to construct an iterable of RecordBatches.
Args:
pages (Iterator[:class:`google.api_core.page_iterator.Page`]):
Expand All @@ -510,10 +510,10 @@ def download_arrow_tabledata_list(pages, bq_schema):
arrow_types = [bq_to_arrow_data_type(field) for field in bq_schema]

for page in pages:
yield _tabledata_list_page_to_arrow(page, column_names, arrow_types)
yield _row_iterator_page_to_arrow(page, column_names, arrow_types)


def _tabledata_list_page_to_dataframe(page, column_names, dtypes):
def _row_iterator_page_to_dataframe(page, column_names, dtypes):
# Iterate over the page to force the API request to get the page data.
try:
next(iter(page))
Expand All @@ -528,8 +528,8 @@ def _tabledata_list_page_to_dataframe(page, column_names, dtypes):
return pandas.DataFrame(columns, columns=column_names)


def download_dataframe_tabledata_list(pages, bq_schema, dtypes):
"""Use (slower, but free) tabledata.list to construct a DataFrame.
def download_dataframe_row_iterator(pages, bq_schema, dtypes):
"""Use HTTP JSON RowIterator to construct a DataFrame.
Args:
pages (Iterator[:class:`google.api_core.page_iterator.Page`]):
Expand All @@ -549,7 +549,7 @@ def download_dataframe_tabledata_list(pages, bq_schema, dtypes):
bq_schema = schema._to_schema_fields(bq_schema)
column_names = [field.name for field in bq_schema]
for page in pages:
yield _tabledata_list_page_to_dataframe(page, column_names, dtypes)
yield _row_iterator_page_to_dataframe(page, column_names, dtypes)


def _bqstorage_page_to_arrow(page):
Expand Down
104 changes: 91 additions & 13 deletions google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,19 @@
_MAX_MULTIPART_SIZE = 5 * 1024 * 1024
_DEFAULT_NUM_RETRIES = 6
_BASE_UPLOAD_TEMPLATE = (
u"https://bigquery.googleapis.com/upload/bigquery/v2/projects/"
u"{project}/jobs?uploadType="
"https://bigquery.googleapis.com/upload/bigquery/v2/projects/"
"{project}/jobs?uploadType="
)
_MULTIPART_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + u"multipart"
_RESUMABLE_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + u"resumable"
_GENERIC_CONTENT_TYPE = u"*/*"
_MULTIPART_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + "multipart"
_RESUMABLE_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + "resumable"
_GENERIC_CONTENT_TYPE = "*/*"
_READ_LESS_THAN_SIZE = (
"Size {:d} was specified but the file-like object only had " "{:d} bytes remaining."
)
_NEED_TABLE_ARGUMENT = (
"The table argument should be a table ID string, Table, or TableReference"
)
_LIST_ROWS_FROM_QUERY_RESULTS_FIELDS = "jobReference,totalRows,pageToken,rows"


class Project(object):
Expand Down Expand Up @@ -293,7 +294,7 @@ def api_request(*args, **kwargs):
span_attributes=span_attributes,
*args,
timeout=timeout,
**kwargs
**kwargs,
)

return page_iterator.HTTPIterator(
Expand Down Expand Up @@ -371,7 +372,7 @@ def api_request(*args, **kwargs):
span_attributes=span_attributes,
*args,
timeout=timeout,
**kwargs
**kwargs,
)

return page_iterator.HTTPIterator(
Expand Down Expand Up @@ -1129,7 +1130,7 @@ def api_request(*args, **kwargs):
span_attributes=span_attributes,
*args,
timeout=timeout,
**kwargs
**kwargs,
)

result = page_iterator.HTTPIterator(
Expand Down Expand Up @@ -1207,7 +1208,7 @@ def api_request(*args, **kwargs):
span_attributes=span_attributes,
*args,
timeout=timeout,
**kwargs
**kwargs,
)

result = page_iterator.HTTPIterator(
Expand Down Expand Up @@ -1284,7 +1285,7 @@ def api_request(*args, **kwargs):
span_attributes=span_attributes,
*args,
timeout=timeout,
**kwargs
**kwargs,
)

result = page_iterator.HTTPIterator(
Expand Down Expand Up @@ -1510,7 +1511,7 @@ def delete_table(
raise

def _get_query_results(
self, job_id, retry, project=None, timeout_ms=None, location=None, timeout=None
self, job_id, retry, project=None, timeout_ms=None, location=None, timeout=None,
):
"""Get the query results object for a query job.
Expand Down Expand Up @@ -1890,7 +1891,7 @@ def api_request(*args, **kwargs):
span_attributes=span_attributes,
*args,
timeout=timeout,
**kwargs
**kwargs,
)

return page_iterator.HTTPIterator(
Expand Down Expand Up @@ -2374,7 +2375,7 @@ def load_table_from_json(

destination = _table_arg_to_table_ref(destination, default_project=self.project)

data_str = u"\n".join(json.dumps(item) for item in json_rows)
data_str = "\n".join(json.dumps(item) for item in json_rows)
encoded_str = data_str.encode()
data_file = io.BytesIO(encoded_str)
return self.load_table_from_file(
Expand Down Expand Up @@ -3169,6 +3170,83 @@ def list_rows(
# Pass in selected_fields separately from schema so that full
# tables can be fetched without a column filter.
selected_fields=selected_fields,
total_rows=getattr(table, "num_rows", None),
)
return row_iterator

def _list_rows_from_query_results(
self,
job_id,
location,
project,
schema,
total_rows=None,
destination=None,
max_results=None,
start_index=None,
page_size=None,
retry=DEFAULT_RETRY,
timeout=None,
):
"""List the rows of a completed query.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/getQueryResults
Args:
job_id (str):
ID of a query job.
location (str): Location of the query job.
project (str):
ID of the project where the query job was run.
schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
The fields expected in these query results. Used to convert
from JSON to expected Python types.
total_rows (Optional[int]):
Total number of rows in the query results.
destination (Optional[Union[ \
google.cloud.bigquery.table.Table, \
google.cloud.bigquery.table.TableListItem, \
google.cloud.bigquery.table.TableReference, \
str, \
]]):
Destination table reference. Used to fetch the query results
with the BigQuery Storage API.
max_results (Optional[int]):
Maximum number of rows to return across the whole iterator.
start_index (Optional[int]):
The zero-based index of the starting row to read.
page_size (Optional[int]):
The maximum number of rows in each page of results from this request.
Non-positive values are ignored. Defaults to a sensible value set by the API.
retry (Optional[google.api_core.retry.Retry]):
How to retry the RPC.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
If multiple requests are made under the hood, ``timeout``
applies to each individual request.
Returns:
google.cloud.bigquery.table.RowIterator:
Iterator of row data
:class:`~google.cloud.bigquery.table.Row`-s.
"""
params = {
"fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS,
"location": location,
}

if start_index is not None:
params["startIndex"] = start_index

row_iterator = RowIterator(
client=self,
api_request=functools.partial(self._call_api, retry, timeout=timeout),
path=f"/projects/{project}/queries/{job_id}",
schema=schema,
max_results=max_results,
page_size=page_size,
table=destination,
extra_params=params,
total_rows=total_rows,
)
return row_iterator

Expand Down
14 changes: 7 additions & 7 deletions google/cloud/bigquery/job/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
from google.cloud.bigquery.table import _EmptyRowIterator
from google.cloud.bigquery.table import RangePartitioning
from google.cloud.bigquery.table import _table_arg_to_table_ref
from google.cloud.bigquery.table import Table
from google.cloud.bigquery.table import TableReference
from google.cloud.bigquery.table import TimePartitioning

Expand Down Expand Up @@ -1159,12 +1158,13 @@ def result(
if self._query_results.total_rows is None:
return _EmptyRowIterator()

schema = self._query_results.schema
dest_table_ref = self.destination
dest_table = Table(dest_table_ref, schema=schema)
dest_table._properties["numRows"] = self._query_results.total_rows
rows = self._client.list_rows(
dest_table,
rows = self._client._list_rows_from_query_results(
self._query_results.job_id,
self.location,
self._query_results.project,
self._query_results.schema,
total_rows=self._query_results.total_rows,
destination=self.destination,
page_size=page_size,
max_results=max_results,
start_index=start_index,
Expand Down
17 changes: 10 additions & 7 deletions google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1306,6 +1306,8 @@ class RowIterator(HTTPIterator):
call the BigQuery Storage API to fetch rows.
selected_fields (Optional[Sequence[google.cloud.bigquery.schema.SchemaField]]):
A subset of columns to select from this table.
total_rows (Optional[int]):
Total number of rows in the table.
"""

Expand All @@ -1321,6 +1323,7 @@ def __init__(
extra_params=None,
table=None,
selected_fields=None,
total_rows=None,
):
super(RowIterator, self).__init__(
client,
Expand All @@ -1342,7 +1345,7 @@ def __init__(
self._schema = schema
self._selected_fields = selected_fields
self._table = table
self._total_rows = getattr(table, "num_rows", None)
self._total_rows = total_rows

def _get_next_page_response(self):
"""Requests the next page from the path provided.
Expand Down Expand Up @@ -1419,7 +1422,7 @@ def _to_arrow_iterable(self, bqstorage_client=None):
selected_fields=self._selected_fields,
)
tabledata_list_download = functools.partial(
_pandas_helpers.download_arrow_tabledata_list, iter(self.pages), self.schema
_pandas_helpers.download_arrow_row_iterator, iter(self.pages), self.schema
)
return self._to_page_iterable(
bqstorage_download,
Expand Down Expand Up @@ -1496,7 +1499,7 @@ def to_arrow(
) and self.max_results is not None:
warnings.warn(
"Cannot use bqstorage_client if max_results is set, "
"reverting to fetching data with the tabledata.list endpoint.",
"reverting to fetching data with the REST endpoint.",
stacklevel=2,
)
create_bqstorage_client = False
Expand Down Expand Up @@ -1582,7 +1585,7 @@ def to_dataframe_iterable(self, bqstorage_client=None, dtypes=None):
selected_fields=self._selected_fields,
)
tabledata_list_download = functools.partial(
_pandas_helpers.download_dataframe_tabledata_list,
_pandas_helpers.download_dataframe_row_iterator,
iter(self.pages),
self.schema,
dtypes,
Expand Down Expand Up @@ -1680,7 +1683,7 @@ def to_dataframe(
) and self.max_results is not None:
warnings.warn(
"Cannot use bqstorage_client if max_results is set, "
"reverting to fetching data with the tabledata.list endpoint.",
"reverting to fetching data with the REST endpoint.",
stacklevel=2,
)
create_bqstorage_client = False
Expand Down Expand Up @@ -2167,7 +2170,7 @@ def _item_to_row(iterator, resource):
)


def _tabledata_list_page_columns(schema, response):
def _row_iterator_page_columns(schema, response):
"""Make a generator of all the columns in a page from tabledata.list.
This enables creating a :class:`pandas.DataFrame` and other
Expand Down Expand Up @@ -2197,7 +2200,7 @@ def _rows_page_start(iterator, page, response):
"""
# Make a (lazy) copy of the page in column-oriented format for use in data
# science packages.
page._columns = _tabledata_list_page_columns(iterator._schema, response)
page._columns = _row_iterator_page_columns(iterator._schema, response)

total_rows = response.get("totalRows")
if total_rows is not None:
Expand Down
10 changes: 8 additions & 2 deletions tests/unit/job/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def _make_job_resource(
endpoint="https://bigquery.googleapis.com",
job_type="load",
job_id="a-random-id",
location="US",
project_id="some-project",
user_email="bq-user@example.com",
):
Expand All @@ -69,7 +70,11 @@ def _make_job_resource(
"statistics": {"creationTime": creation_time_ms, job_type: {}},
"etag": etag,
"id": "{}:{}".format(project_id, job_id),
"jobReference": {"projectId": project_id, "jobId": job_id},
"jobReference": {
"projectId": project_id,
"jobId": job_id,
"location": location,
},
"selfLink": "{}/bigquery/v2/projects/{}/jobs/{}".format(
endpoint, project_id, job_id
),
Expand Down Expand Up @@ -130,7 +135,7 @@ def _table_ref(self, table_id):

return TableReference(self.DS_REF, table_id)

def _make_resource(self, started=False, ended=False):
def _make_resource(self, started=False, ended=False, location="US"):
self._setUpConstants()
return _make_job_resource(
creation_time_ms=int(self.WHEN_TS * 1000),
Expand All @@ -144,6 +149,7 @@ def _make_resource(self, started=False, ended=False):
job_id=self.JOB_ID,
project_id=self.PROJECT,
user_email=self.USER_EMAIL,
location=location,
)

def _verifyInitialReadonlyProperties(self, job):
Expand Down
Loading

0 comments on commit 0c3476d

Please sign in to comment.