Skip to content

Commit

Permalink
perf: dont fetch rows when waiting for query to finish
Browse files Browse the repository at this point in the history
When there are large result sets, fetching rows while waiting for the
query to finish can cause the API to hang indefinitely. (This may be due
to an interaction between connection timeout and API timeout.)

This reverts commit 86f6a51 (googleapis#374).
  • Loading branch information
tswast committed Nov 24, 2020
1 parent a1949ae commit f8887d3
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 115 deletions.
4 changes: 1 addition & 3 deletions google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1534,7 +1534,7 @@ def _get_query_results(
A new ``_QueryResults`` instance.
"""

extra_params = {}
extra_params = {"maxResults": 0}

if project is None:
project = self.project
Expand Down Expand Up @@ -3187,7 +3187,6 @@ def _list_rows_from_query_results(
page_size=None,
retry=DEFAULT_RETRY,
timeout=None,
first_page_response=None,
):
"""List the rows of a completed query.
See
Expand Down Expand Up @@ -3248,7 +3247,6 @@ def _list_rows_from_query_results(
table=destination,
extra_params=params,
total_rows=total_rows,
first_page_response=first_page_response,
)
return row_iterator

Expand Down
85 changes: 31 additions & 54 deletions google/cloud/bigquery/job/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -991,22 +991,48 @@ def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True):
Returns:
bool: True if the job is complete, False otherwise.
"""
is_done = (
# Only consider a QueryJob complete when we know we have the final
# query results available.
self._query_results is not None
and self._query_results.complete
and self.state == _DONE_STATE
)
# Do not refresh if the state is already done, as the job will not
# change once complete.
is_done = self.state == _DONE_STATE
if not reload or is_done:
return is_done

self._reload_query_results(retry=retry, timeout=timeout)
# Since the API to getQueryResults can hang up to the timeout value
# (default of 10 seconds), set the timeout parameter to ensure that
# the timeout from the futures API is respected. See:
# https://github.com/GoogleCloudPlatform/google-cloud-python/issues/4135
timeout_ms = None
if self._done_timeout is not None:
# Subtract a buffer for context switching, network latency, etc.
api_timeout = self._done_timeout - _TIMEOUT_BUFFER_SECS
api_timeout = max(min(api_timeout, 10), 0)
self._done_timeout -= api_timeout
self._done_timeout = max(0, self._done_timeout)
timeout_ms = int(api_timeout * 1000)

# If an explicit timeout is not given, fall back to the transport timeout
# stored in _blocking_poll() in the process of polling for job completion.
transport_timeout = timeout if timeout is not None else self._transport_timeout

self._query_results = self._client._get_query_results(
self.job_id,
retry,
project=self.project,
timeout_ms=timeout_ms,
location=self.location,
timeout=transport_timeout,
)

# Only reload the job once we know the query is complete.
# This will ensure that fields such as the destination table are
# correctly populated.
if self._query_results.complete:
if self._query_results.complete and self.state != _DONE_STATE:
self.reload(retry=retry, timeout=transport_timeout)

return self.state == _DONE_STATE
Expand Down Expand Up @@ -1073,45 +1099,6 @@ def _begin(self, client=None, retry=DEFAULT_RETRY, timeout=None):
exc.query_job = self
raise

def _reload_query_results(self, retry=DEFAULT_RETRY, timeout=None):
"""Refresh the cached query results.
Args:
retry (Optional[google.api_core.retry.Retry]):
How to retry the call that retrieves query results.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
"""
if self._query_results and self._query_results.complete:
return

# Since the API to getQueryResults can hang up to the timeout value
# (default of 10 seconds), set the timeout parameter to ensure that
# the timeout from the futures API is respected. See:
# https://github.com/GoogleCloudPlatform/google-cloud-python/issues/4135
timeout_ms = None
if self._done_timeout is not None:
# Subtract a buffer for context switching, network latency, etc.
api_timeout = self._done_timeout - _TIMEOUT_BUFFER_SECS
api_timeout = max(min(api_timeout, 10), 0)
self._done_timeout -= api_timeout
self._done_timeout = max(0, self._done_timeout)
timeout_ms = int(api_timeout * 1000)

# If an explicit timeout is not given, fall back to the transport timeout
# stored in _blocking_poll() in the process of polling for job completion.
transport_timeout = timeout if timeout is not None else self._transport_timeout

self._query_results = self._client._get_query_results(
self.job_id,
retry,
project=self.project,
timeout_ms=timeout_ms,
location=self.location,
timeout=transport_timeout,
)

def result(
self,
page_size=None,
Expand Down Expand Up @@ -1158,11 +1145,6 @@ def result(
"""
try:
super(QueryJob, self).result(retry=retry, timeout=timeout)

# Since the job could already be "done" (e.g. got a finished job
# via client.get_job), the superclass call to done() might not
# set the self._query_results cache.
self._reload_query_results(retry=retry, timeout=timeout)
except exceptions.GoogleAPICallError as exc:
exc.message += self._format_for_exception(self.query, self.job_id)
exc.query_job = self
Expand All @@ -1177,14 +1159,10 @@ def result(
if self._query_results.total_rows is None:
return _EmptyRowIterator()

first_page_response = None
if max_results is None and page_size is None and start_index is None:
first_page_response = self._query_results._properties

rows = self._client._list_rows_from_query_results(
self.job_id,
self._query_results.job_id,
self.location,
self.project,
self._query_results.project,
self._query_results.schema,
total_rows=self._query_results.total_rows,
destination=self.destination,
Expand All @@ -1193,7 +1171,6 @@ def result(
start_index=start_index,
retry=retry,
timeout=timeout,
first_page_response=first_page_response,
)
rows._preserve_order = _contains_order_by(self.query)
return rows
Expand Down
11 changes: 1 addition & 10 deletions google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1301,9 +1301,7 @@ class RowIterator(HTTPIterator):
A subset of columns to select from this table.
total_rows (Optional[int]):
Total number of rows in the table.
first_page_response (Optional[dict]):
API response for the first page of results. These are returned when
the first page is requested.
"""

def __init__(
Expand All @@ -1319,7 +1317,6 @@ def __init__(
table=None,
selected_fields=None,
total_rows=None,
first_page_response=None,
):
super(RowIterator, self).__init__(
client,
Expand All @@ -1342,7 +1339,6 @@ def __init__(
self._selected_fields = selected_fields
self._table = table
self._total_rows = total_rows
self._first_page_response = first_page_response

def _is_completely_cached(self):
"""Check if all results are completely cached.
Expand Down Expand Up @@ -1386,11 +1382,6 @@ def _get_next_page_response(self):
Dict[str, object]:
The parsed JSON response of the next page's contents.
"""
if self._first_page_response:
response = self._first_page_response
self._first_page_response = None
return response

params = self._get_query_params()
if self._page_size is not None:
if self.page_number and "startIndex" in params:
Expand Down
55 changes: 13 additions & 42 deletions tests/unit/job/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -787,9 +787,7 @@ def test_result(self):
"location": "EU",
},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"totalRows": "3",
"rows": [{"f": [{"v": "abc"}]}],
"pageToken": "next-page",
"totalRows": "2",
}
job_resource = self._make_resource(started=True, location="EU")
job_resource_done = self._make_resource(started=True, ended=True, location="EU")
Expand All @@ -801,9 +799,9 @@ def test_result(self):
query_page_resource = {
# Explicitly set totalRows to be different from the initial
# response to test update during iteration.
"totalRows": "2",
"totalRows": "1",
"pageToken": None,
"rows": [{"f": [{"v": "def"}]}],
"rows": [{"f": [{"v": "abc"}]}],
}
conn = _make_connection(
query_resource, query_resource_done, job_resource_done, query_page_resource
Expand All @@ -814,20 +812,19 @@ def test_result(self):
result = job.result()

self.assertIsInstance(result, RowIterator)
self.assertEqual(result.total_rows, 3)
self.assertEqual(result.total_rows, 2)
rows = list(result)
self.assertEqual(len(rows), 2)
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0].col1, "abc")
self.assertEqual(rows[1].col1, "def")
# Test that the total_rows property has changed during iteration, based
# on the response from tabledata.list.
self.assertEqual(result.total_rows, 2)
self.assertEqual(result.total_rows, 1)

query_results_path = f"/projects/{self.PROJECT}/queries/{self.JOB_ID}"
query_results_call = mock.call(
method="GET",
path=query_results_path,
query_params={"location": "EU"},
query_params={"maxResults": 0, "location": "EU"},
timeout=None,
)
reload_call = mock.call(
Expand All @@ -842,7 +839,6 @@ def test_result(self):
query_params={
"fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS,
"location": "EU",
"pageToken": "next-page",
},
timeout=None,
)
Expand All @@ -855,9 +851,7 @@ def test_result_with_done_job_calls_get_query_results(self):
"jobComplete": True,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"totalRows": "2",
"rows": [{"f": [{"v": "abc"}]}],
"pageToken": "next-page",
"totalRows": "1",
}
job_resource = self._make_resource(started=True, ended=True, location="EU")
job_resource["configuration"]["query"]["destinationTable"] = {
Expand All @@ -866,9 +860,9 @@ def test_result_with_done_job_calls_get_query_results(self):
"tableId": "dest_table",
}
results_page_resource = {
"totalRows": "2",
"totalRows": "1",
"pageToken": None,
"rows": [{"f": [{"v": "def"}]}],
"rows": [{"f": [{"v": "abc"}]}],
}
conn = _make_connection(query_resource_done, results_page_resource)
client = _make_client(self.PROJECT, connection=conn)
Expand All @@ -877,15 +871,14 @@ def test_result_with_done_job_calls_get_query_results(self):
result = job.result()

rows = list(result)
self.assertEqual(len(rows), 2)
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0].col1, "abc")
self.assertEqual(rows[1].col1, "def")

query_results_path = f"/projects/{self.PROJECT}/queries/{self.JOB_ID}"
query_results_call = mock.call(
method="GET",
path=query_results_path,
query_params={"location": "EU"},
query_params={"maxResults": 0, "location": "EU"},
timeout=None,
)
query_results_page_call = mock.call(
Expand All @@ -894,7 +887,6 @@ def test_result_with_done_job_calls_get_query_results(self):
query_params={
"fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS,
"location": "EU",
"pageToken": "next-page",
},
timeout=None,
)
Expand All @@ -908,12 +900,6 @@ def test_result_with_max_results(self):
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"totalRows": "5",
# These rows are discarded because max_results is set.
"rows": [
{"f": [{"v": "xyz"}]},
{"f": [{"v": "uvw"}]},
{"f": [{"v": "rst"}]},
],
}
query_page_resource = {
"totalRows": "5",
Expand All @@ -939,7 +925,6 @@ def test_result_with_max_results(self):
rows = list(result)

self.assertEqual(len(rows), 3)
self.assertEqual(rows[0].col1, "abc")
self.assertEqual(len(connection.api_request.call_args_list), 2)
query_page_request = connection.api_request.call_args_list[1]
self.assertEqual(
Expand Down Expand Up @@ -994,7 +979,7 @@ def test_result_w_retry(self):
query_results_call = mock.call(
method="GET",
path=f"/projects/{self.PROJECT}/queries/{self.JOB_ID}",
query_params={"location": "asia-northeast1"},
query_params={"maxResults": 0, "location": "asia-northeast1"},
timeout=None,
)
reload_call = mock.call(
Expand Down Expand Up @@ -1094,12 +1079,6 @@ def test_result_w_page_size(self):
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"totalRows": "4",
# These rows are discarded because page_size is set.
"rows": [
{"f": [{"v": "xyz"}]},
{"f": [{"v": "uvw"}]},
{"f": [{"v": "rst"}]},
],
}
job_resource = self._make_resource(started=True, ended=True, location="US")
q_config = job_resource["configuration"]["query"]
Expand Down Expand Up @@ -1130,7 +1109,6 @@ def test_result_w_page_size(self):
# Assert
actual_rows = list(result)
self.assertEqual(len(actual_rows), 4)
self.assertEqual(actual_rows[0].col1, "row1")

query_results_path = f"/projects/{self.PROJECT}/queries/{self.JOB_ID}"
query_page_1_call = mock.call(
Expand Down Expand Up @@ -1164,12 +1142,6 @@ def test_result_with_start_index(self):
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
"totalRows": "5",
# These rows are discarded because start_index is set.
"rows": [
{"f": [{"v": "xyz"}]},
{"f": [{"v": "uvw"}]},
{"f": [{"v": "rst"}]},
],
}
tabledata_resource = {
"totalRows": "5",
Expand All @@ -1196,7 +1168,6 @@ def test_result_with_start_index(self):
rows = list(result)

self.assertEqual(len(rows), 4)
self.assertEqual(rows[0].col1, "abc")
self.assertEqual(len(connection.api_request.call_args_list), 2)
tabledata_list_request = connection.api_request.call_args_list[1]
self.assertEqual(
Expand Down
Loading

0 comments on commit f8887d3

Please sign in to comment.