Skip to content

Commit

Permalink
Add timeout parameter to QueryJob.done() method.
Browse files Browse the repository at this point in the history
  • Loading branch information
plamut committed Nov 21, 2019
1 parent 57bb505 commit e19ba16
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 13 deletions.
30 changes: 23 additions & 7 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
from google.cloud.bigquery.model import ModelReference
from google.cloud.bigquery.query import _QueryResults
from google.cloud.bigquery.retry import DEFAULT_RETRY
from google.cloud.bigquery.retry import timeoutable_http
from google.cloud.bigquery.routine import Routine
from google.cloud.bigquery.routine import RoutineReference
from google.cloud.bigquery.schema import SchemaField
Expand Down Expand Up @@ -1056,7 +1057,13 @@ def delete_table(self, table, retry=DEFAULT_RETRY, not_found_ok=False):
raise

def _get_query_results(
self, job_id, retry, project=None, timeout_ms=None, location=None
self,
job_id,
retry,
project=None,
timeout_ms=None,
location=None,
http_timeout=None,
):
"""Get the query results object for a query job.
Expand All @@ -1068,9 +1075,12 @@ def _get_query_results(
(Optional) project ID for the query job (defaults to the
project of the client).
timeout_ms (int):
(Optional) number of milliseconds the the API call should
(Optional) the number of milliseconds the API call should
wait for the query to complete before the request times out.
location (str): Location of the query job.
http_timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
each time before retrying the HTTP request.
Returns:
google.cloud.bigquery.query._QueryResults:
Expand All @@ -1093,12 +1103,18 @@ def _get_query_results(

path = "/projects/{}/queries/{}".format(project, job_id)

# This call is typically made in a polling loop that checks whether the
# job is complete (from QueryJob.done(), called ultimately from
# QueryJob.result()). So we don't need to poll here.
resource = self._call_api(
retry, method="GET", path=path, query_params=extra_params
# The following call is typically made in a polling loop that checks
# whether the job is complete (from QueryJob.done(), called ultimately
# from QueryJob.result()). So we don't need to poll here.
call_api = functools.partial(
self._call_api, method="GET", path=path, query_params=extra_params
)
if http_timeout is not None:
with timeoutable_http(self._http, http_timeout, retry) as adjusted_retry:
resource = call_api(adjusted_retry)
else:
resource = call_api(retry)

return _QueryResults.from_api_repr(resource)

def job_from_resource(self, resource):
Expand Down
17 changes: 11 additions & 6 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2994,7 +2994,7 @@ def estimated_bytes_processed(self):
result = int(result)
return result

def done(self, retry=DEFAULT_RETRY):
def done(self, retry=DEFAULT_RETRY, timeout=None):
"""Refresh the job and checks if it is complete.
Returns:
Expand All @@ -3007,11 +3007,11 @@ def done(self, retry=DEFAULT_RETRY):
timeout_ms = None
if self._done_timeout is not None:
# Subtract a buffer for context switching, network latency, etc.
timeout = self._done_timeout - _TIMEOUT_BUFFER_SECS
timeout = max(min(timeout, 10), 0)
self._done_timeout -= timeout
api_timeout = self._done_timeout - _TIMEOUT_BUFFER_SECS
api_timeout = max(min(api_timeout, 10), 0)
self._done_timeout -= api_timeout
self._done_timeout = max(0, self._done_timeout)
timeout_ms = int(timeout * 1000)
timeout_ms = int(api_timeout * 1000)

# Do not refresh is the state is already done, as the job will not
# change once complete.
Expand All @@ -3022,6 +3022,7 @@ def done(self, retry=DEFAULT_RETRY):
project=self.project,
timeout_ms=timeout_ms,
location=self.location,
http_timeout=timeout,
)

# Only reload the job once we know the query is complete.
Expand Down Expand Up @@ -3126,7 +3127,11 @@ def result(
# Return an iterator instead of returning the job.
if not self._query_results:
self._query_results = self._client._get_query_results(
self.job_id, retry, project=self.project, location=self.location
self.job_id,
retry,
project=self.project,
location=self.location,
http_timeout=timeout,
)
except exceptions.GoogleCloudError as exc:
exc.message += self._format_for_exception(self.query, self.job_id)
Expand Down
55 changes: 55 additions & 0 deletions bigquery/google/cloud/bigquery/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import contextlib

import requests

from google.api_core import exceptions
from google.api_core import retry

Expand All @@ -27,6 +31,50 @@
)


class _TimeoutableHTTPAdapter(requests.adapters.HTTPAdapter):
"""An HTTP adapter that allows sending HTTP requests with a timeout."""

def __init__(self, timeout=None, *args, **kwargs):
self._timeout = timeout
super(_TimeoutableHTTPAdapter, self).__init__(*args, **kwargs)

def send(self, *args, **kwargs):
kwargs["timeout"] = self._timeout
return super(_TimeoutableHTTPAdapter, self).send(*args, **kwargs)


@contextlib.contextmanager
def timeoutable_http(http, timeout, retry):
"""Add timeout to the ``http`` transport object.
Args:
http (requests.Session): A transport objects for making HTTP requests.
timeout (float): The number of seconds to wait for the transport's HTTP
request before retrying it.
retry (api_core.retry.Retry): A Retry object that will be used for
retrying the requests.
Yields:
(api_core.retry.Retry) An adjusted Retry object that also retries on the
underlying transport errors, and **done** retrying at ``timeout``.
"""
orig_http_adapter = http.get_adapter("http://")
orig_https_adapter = http.get_adapter("https://")

http.mount("http://", _TimeoutableHTTPAdapter(timeout=timeout, max_retries=0))
http.mount("https://", _TimeoutableHTTPAdapter(timeout=timeout, max_retries=0))

adjusted_retry = retry.with_deadline(timeout, strict_deadline=True).with_predicate(
_should_retry_with_http
)

try:
yield adjusted_retry
finally:
http.mount("http://", orig_http_adapter)
http.mount("https://", orig_https_adapter)


def _should_retry(exc):
"""Predicate for determining when to retry.
Expand All @@ -44,6 +92,13 @@ def _should_retry(exc):
return reason in _RETRYABLE_REASONS


def _should_retry_with_http(exc):
# TODO: narrow down the list of retriable Requests exceptions?
if isinstance(exc, requests.exceptions.RequestException):
return True
return _should_retry(exc)


DEFAULT_RETRY = retry.Retry(predicate=_should_retry)
"""The default retry object.
Expand Down

0 comments on commit e19ba16

Please sign in to comment.