Skip to content

Commit

Permalink
feat(bigquery): add timeout to QueryJob.done()
Browse files Browse the repository at this point in the history
  • Loading branch information
plamut committed Dec 9, 2019
1 parent 80f5295 commit 2f37317
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 10 deletions.
7 changes: 5 additions & 2 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1068,7 +1068,7 @@ def delete_table(self, table, retry=DEFAULT_RETRY, not_found_ok=False):
raise

def _get_query_results(
self, job_id, retry, project=None, timeout_ms=None, location=None
self, job_id, retry, project=None, timeout_ms=None, location=None, timeout=None,
):
"""Get the query results object for a query job.
Expand All @@ -1083,6 +1083,9 @@ def _get_query_results(
(Optional) number of milliseconds the the API call should
wait for the query to complete before the request times out.
location (str): Location of the query job.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before retrying the HTTP request.
Returns:
google.cloud.bigquery.query._QueryResults:
Expand All @@ -1109,7 +1112,7 @@ def _get_query_results(
# job is complete (from QueryJob.done(), called ultimately from
# QueryJob.result()). So we don't need to poll here.
resource = self._call_api(
retry, method="GET", path=path, query_params=extra_params
retry, method="GET", path=path, query_params=extra_params, timeout=timeout
)
return _QueryResults.from_api_repr(resource)

Expand Down
37 changes: 29 additions & 8 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ def exists(self, client=None, retry=DEFAULT_RETRY):
else:
return True

def reload(self, client=None, retry=DEFAULT_RETRY):
def reload(self, client=None, retry=DEFAULT_RETRY, timeout=None):
"""API call: refresh job properties via a GET request.
See
Expand All @@ -675,6 +675,9 @@ def reload(self, client=None, retry=DEFAULT_RETRY):
``client`` stored on the current dataset.
retry (google.api_core.retry.Retry): (Optional) How to retry the RPC.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before retrying the HTTP request.
"""
client = self._require_client(client)

Expand All @@ -683,7 +686,11 @@ def reload(self, client=None, retry=DEFAULT_RETRY):
extra_params["location"] = self.location

api_response = client._call_api(
retry, method="GET", path=self.path, query_params=extra_params
retry,
method="GET",
path=self.path,
query_params=extra_params,
timeout=timeout,
)
self._set_properties(api_response)

Expand Down Expand Up @@ -2994,9 +3001,16 @@ def estimated_bytes_processed(self):
result = int(result)
return result

def done(self, retry=DEFAULT_RETRY):
def done(self, retry=DEFAULT_RETRY, timeout=None):
"""Refresh the job and checks if it is complete.
Args:
retry (Optional[google.api_core.retry.Retry]):
How to retry the call that retrieves query results.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before retrying the HTTP request.
Returns:
bool: True if the job is complete, False otherwise.
"""
Expand All @@ -3007,11 +3021,17 @@ def done(self, retry=DEFAULT_RETRY):
timeout_ms = None
if self._done_timeout is not None:
# Subtract a buffer for context switching, network latency, etc.
timeout = self._done_timeout - _TIMEOUT_BUFFER_SECS
timeout = max(min(timeout, 10), 0)
self._done_timeout -= timeout
api_timeout = self._done_timeout - _TIMEOUT_BUFFER_SECS
api_timeout = max(min(api_timeout, 10), 0)
self._done_timeout -= api_timeout
self._done_timeout = max(0, self._done_timeout)
timeout_ms = int(timeout * 1000)
timeout_ms = int(api_timeout * 1000)

if timeout_ms is not None and timeout_ms > 0:
if timeout is not None:
timeout = min(timeout_ms, timeout)
else:
timeout = timeout_ms

# Do not refresh is the state is already done, as the job will not
# change once complete.
Expand All @@ -3022,13 +3042,14 @@ def done(self, retry=DEFAULT_RETRY):
project=self.project,
timeout_ms=timeout_ms,
location=self.location,
timeout=timeout,
)

# Only reload the job once we know the query is complete.
# This will ensure that fields such as the destination table are
# correctly populated.
if self._query_results.complete:
self.reload(retry=retry)
self.reload(retry=retry, timeout=timeout)

return self.state == _DONE_STATE

Expand Down

0 comments on commit 2f37317

Please sign in to comment.