Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: use the first page a results when query(api_method="QUERY") #1723

Merged
merged 8 commits into from
Nov 21, 2023
12 changes: 4 additions & 8 deletions google/cloud/bigquery/_job_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from google.api_core import retry as retries

from google.cloud.bigquery import job
import google.cloud.bigquery.query

# Avoid circular imports
if TYPE_CHECKING: # pragma: NO COVER
Expand Down Expand Up @@ -197,14 +198,9 @@ def _to_query_job(
job_complete = query_response.get("jobComplete")
if job_complete:
query_job._properties["status"]["state"] = "DONE"
# TODO: https://github.com/googleapis/python-bigquery/issues/589
# Set the first page of results if job is "complete" and there is
# only 1 page of results. Otherwise, use the existing logic that
# refreshes the job stats.
#
# This also requires updates to `to_dataframe` and the DB API connector
# so that they don't try to read from a destination table if all the
# results are present.
query_job._query_results = google.cloud.bigquery.query._QueryResults(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this change mean we also load the query results when job is complete, when query(api_method="INSERT")?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I just double-checked that this method is only called from query_jobs_query, so it won't affect when api_method="INSERT"

query_response
)
else:
query_job._properties["status"]["state"] = "PENDING"

Expand Down
9 changes: 9 additions & 0 deletions google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3862,6 +3862,7 @@ def _list_rows_from_query_results(
retry: retries.Retry = DEFAULT_RETRY,
timeout: TimeoutType = DEFAULT_TIMEOUT,
query_id: Optional[str] = None,
first_page_response: Optional[Dict[str, Any]] = None,
) -> RowIterator:
"""List the rows of a completed query.
See
Expand Down Expand Up @@ -3904,6 +3905,8 @@ def _list_rows_from_query_results(
query_id (Optional[str]):
[Preview] ID of a completed query. This ID is auto-generated
and not guaranteed to be populated.
first_page_response (Optional[dict]):
API response for the first page of results (if available).
Returns:
google.cloud.bigquery.table.RowIterator:
Iterator of row data
Expand All @@ -3923,6 +3926,11 @@ def _list_rows_from_query_results(
if start_index is not None:
params["startIndex"] = start_index

# We don't call jobs.query with a page size, so if the user explicitly
# requests a certain size, invalidate the cache.
if page_size is not None:
first_page_response = None

params["formatOptions.useInt64Timestamp"] = True
row_iterator = RowIterator(
client=self,
Expand All @@ -3938,6 +3946,7 @@ def _list_rows_from_query_results(
location=location,
job_id=job_id,
query_id=query_id,
first_page_response=first_page_response,
)
return row_iterator

Expand Down
13 changes: 12 additions & 1 deletion google/cloud/bigquery/job/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -1586,7 +1586,8 @@ def do_get_result():
# Since the job could already be "done" (e.g. got a finished job
# via client.get_job), the superclass call to done() might not
# set the self._query_results cache.
self._reload_query_results(retry=retry, timeout=timeout)
if self._query_results is None or not self._query_results.complete:
self._reload_query_results(retry=retry, timeout=timeout)

if retry_do_query is not None and job_retry is not None:
do_get_result = job_retry(do_get_result)
Expand Down Expand Up @@ -1615,6 +1616,15 @@ def do_get_result():
query_id=self.query_id,
)

# We know that there's at least 1 row, so only treat the response from
# jobs.getQueryResults / jobs.query as the first page of the
# RowIterator response if there are any rows in it. This prevents us
# from stopping the iteration early because we're missing rows and
# there's no next page token.
first_page_response = self._query_results._properties
if "rows" not in first_page_response:
first_page_response = None

rows = self._client._list_rows_from_query_results(
self.job_id,
self.location,
Expand All @@ -1628,6 +1638,7 @@ def do_get_result():
retry=retry,
timeout=timeout,
query_id=self.query_id,
first_page_response=first_page_response,
)
rows._preserve_order = _contains_order_by(self.query)
return rows
Expand Down
8 changes: 0 additions & 8 deletions google/cloud/bigquery/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -1005,14 +1005,6 @@ def _set_properties(self, api_response):
Args:
api_response (Dict): Response returned from an API call
"""
job_id_present = (
"jobReference" in api_response
and "jobId" in api_response["jobReference"]
and "projectId" in api_response["jobReference"]
)
if not job_id_present:
raise ValueError("QueryResult requires a job reference")

self._properties.clear()
self._properties.update(copy.deepcopy(api_response))

Expand Down
44 changes: 39 additions & 5 deletions google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@
"because the necessary `__from_arrow__` attribute is missing."
)

# How many of the total rows need to be downloaded already for us to skip
# calling the BQ Storage API?
ALMOST_COMPLETELY_CACHED_RATIO = 0.333


def _reference_getter(table):
"""A :class:`~google.cloud.bigquery.table.TableReference` pointing to
Expand Down Expand Up @@ -1625,16 +1629,31 @@ def query_id(self) -> Optional[str]:
"""
return self._query_id

def _is_completely_cached(self):
def _is_almost_completely_cached(self):
"""Check if all results are completely cached.

This is useful to know, because we can avoid alternative download
mechanisms.
"""
if self._first_page_response is None or self.next_page_token:
if self._first_page_response is None:
return False

return self._first_page_response.get(self._next_token) is None
total_cached_rows = len(self._first_page_response.get(self._items_key, []))
if self.max_results is not None and total_cached_rows >= self.max_results:
return True
tswast marked this conversation as resolved.
Show resolved Hide resolved

if (
self.next_page_token is None
and self._first_page_response.get(self._next_token) is None
):
return True

if self._total_rows is not None:
tswast marked this conversation as resolved.
Show resolved Hide resolved
almost_completely = self._total_rows * ALMOST_COMPLETELY_CACHED_RATIO
if total_cached_rows >= almost_completely:
return True

return False

def _validate_bqstorage(self, bqstorage_client, create_bqstorage_client):
"""Returns True if the BigQuery Storage API can be used.
Expand All @@ -1647,7 +1666,14 @@ def _validate_bqstorage(self, bqstorage_client, create_bqstorage_client):
if not using_bqstorage_api:
return False

if self._is_completely_cached():
if self._table is None:
return False

# The developer is manually paging through results if this is set.
if self.next_page_token is not None:
return False

if self._is_almost_completely_cached():
return False

if self.max_results is not None:
Expand All @@ -1671,7 +1697,15 @@ def _get_next_page_response(self):
The parsed JSON response of the next page's contents.
"""
if self._first_page_response:
response = self._first_page_response
rows = self._first_page_response.get(self._items_key, [])[
: self.max_results
]
response = {
self._items_key: rows,
}
if self._next_token in self._first_page_response:
response[self._next_token] = self._first_page_response[self._next_token]

self._first_page_response = None
return response

Expand Down
109 changes: 109 additions & 0 deletions tests/unit/job/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import requests

from google.cloud.bigquery.client import _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS
import google.cloud.bigquery._job_helpers
import google.cloud.bigquery.query
from google.cloud.bigquery.table import _EmptyRowIterator

Expand Down Expand Up @@ -1081,6 +1082,114 @@ def test_result_with_done_job_calls_get_query_results(self):
timeout=None,
)
conn.api_request.assert_has_calls([query_results_call, query_results_page_call])
assert conn.api_request.call_count == 2

def test_result_with_done_jobs_query_response_doesnt_call_get_query_results(self):
"""With a done result from jobs.query, we don't need to call
jobs.getQueryResults to wait for the query to finish.

jobs.get is still called because there is an assumption that after
QueryJob.result(), all job metadata is available locally.
"""
job_resource = self._make_resource(started=True, ended=True, location="EU")
conn = make_connection(job_resource)
client = _make_client(self.PROJECT, connection=conn)
query_resource_done = {
"jobComplete": True,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"rows": [{"f": [{"v": "abc"}]}],
"totalRows": "1",
}
job = google.cloud.bigquery._job_helpers._to_query_job(
client,
"SELECT 'abc' AS col1",
request_config=None,
query_response=query_resource_done,
)
assert job.state == "DONE"

result = job.result()

rows = list(result)
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0].col1, "abc")
job_path = f"/projects/{self.PROJECT}/jobs/{self.JOB_ID}"
conn.api_request.assert_called_once_with(
method="GET",
path=job_path,
query_params={},
timeout=None,
)

def test_result_with_done_jobs_query_response_and_page_size_invalidates_cache(self):
"""We don't call jobs.query with a page size, so if the user explicitly
requests a certain size, invalidate the cache.
"""
# Arrange
job_resource = self._make_resource(
started=True, ended=True, location="asia-northeast1"
)
query_resource_done = {
"jobComplete": True,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"rows": [{"f": [{"v": "abc"}]}],
"pageToken": "initial-page-token-shouldnt-be-used",
"totalRows": "4",
}
query_page_resource = {
"totalRows": 4,
"pageToken": "some-page-token",
"rows": [
{"f": [{"v": "row1"}]},
{"f": [{"v": "row2"}]},
{"f": [{"v": "row3"}]},
],
}
query_page_resource_2 = {"totalRows": 4, "rows": [{"f": [{"v": "row4"}]}]}
conn = make_connection(job_resource, query_page_resource, query_page_resource_2)
client = _make_client(self.PROJECT, connection=conn)
job = google.cloud.bigquery._job_helpers._to_query_job(
client,
"SELECT col1 FROM table",
request_config=None,
query_response=query_resource_done,
)
assert job.state == "DONE"

# Act
result = job.result(page_size=3)

# Assert
actual_rows = list(result)
self.assertEqual(len(actual_rows), 4)

query_results_path = f"/projects/{self.PROJECT}/queries/{self.JOB_ID}"
query_page_1_call = mock.call(
method="GET",
path=query_results_path,
query_params={
"maxResults": 3,
"fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS,
"location": "asia-northeast1",
"formatOptions.useInt64Timestamp": True,
},
timeout=None,
)
query_page_2_call = mock.call(
method="GET",
path=query_results_path,
query_params={
"pageToken": "some-page-token",
"maxResults": 3,
"fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS,
"location": "asia-northeast1",
"formatOptions.useInt64Timestamp": True,
},
timeout=None,
)
conn.api_request.assert_has_calls([query_page_1_call, query_page_2_call])

def test_result_with_max_results(self):
from google.cloud.bigquery.table import RowIterator
Expand Down
8 changes: 4 additions & 4 deletions tests/unit/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -1362,13 +1362,13 @@ def test_errors_present(self):
self.assertEqual(query.errors, ERRORS)

def test_job_id_missing(self):
with self.assertRaises(ValueError):
self._make_one({})
query = self._make_one({})
self.assertIsNone(query.job_id)

def test_job_id_broken_job_reference(self):
resource = {"jobReference": {"bogus": "BOGUS"}}
with self.assertRaises(ValueError):
self._make_one(resource)
query = self._make_one(resource)
self.assertIsNone(query.job_id)

def test_job_id_present(self):
resource = self._make_resource()
Expand Down
Loading