Skip to content

Commit

Permalink
feat: make QueryJob.done() method more performant
Browse files Browse the repository at this point in the history
  • Loading branch information
plamut committed Mar 10, 2021
1 parent 3ce826e commit 0663a95
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 100 deletions.
91 changes: 35 additions & 56 deletions google/cloud/bigquery/job/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import re

from google.api_core import exceptions
from google.api_core.future import polling as polling_future
import requests

from google.cloud.bigquery.dataset import Dataset
Expand All @@ -42,7 +43,6 @@
from google.cloud.bigquery._tqdm_helpers import wait_for_query

from google.cloud.bigquery.job.base import _AsyncJob
from google.cloud.bigquery.job.base import _DONE_STATE
from google.cloud.bigquery.job.base import _JobConfig
from google.cloud.bigquery.job.base import _JobReference

Expand Down Expand Up @@ -974,61 +974,6 @@ def estimated_bytes_processed(self):
result = int(result)
return result

def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True):
"""Refresh the job and checks if it is complete.
Args:
retry (Optional[google.api_core.retry.Retry]):
How to retry the call that retrieves query results. If the job state is
``DONE``, retrying is aborted early, as the job will not change anymore.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
reload (Optional[bool]):
If ``True``, make an API call to refresh the job state of
unfinished jobs before checking. Default ``True``.
Returns:
bool: ``True`` if the job is complete or if fetching its status resulted in
an error, ``False`` otherwise.
"""
# Do not refresh if the state is already done, as the job will not
# change once complete.
is_done = self.state == _DONE_STATE
if not reload or is_done:
return is_done

# If an explicit timeout is not given, fall back to the transport timeout
# stored in _blocking_poll() in the process of polling for job completion.
transport_timeout = timeout if timeout is not None else self._transport_timeout

try:
self._reload_query_results(retry=retry, timeout=transport_timeout)
except exceptions.GoogleAPIError as exc:
# Reloading also updates error details on self, thus no need for an
# explicit self.set_exception() call if reloading succeeds.
try:
self.reload(retry=retry, timeout=transport_timeout)
except exceptions.GoogleAPIError:
# Use the query results reload exception, as it generally contains
# much more useful error information.
self.set_exception(exc)
return True
else:
return self.state == _DONE_STATE

# Only reload the job once we know the query is complete.
# This will ensure that fields such as the destination table are
# correctly populated.
if self._query_results.complete:
try:
self.reload(retry=retry, timeout=transport_timeout)
except exceptions.GoogleAPIError as exc:
self.set_exception(exc)
return True

return self.state == _DONE_STATE

def _blocking_poll(self, timeout=None, **kwargs):
self._done_timeout = timeout
self._transport_timeout = timeout
Expand Down Expand Up @@ -1130,6 +1075,40 @@ def _reload_query_results(self, retry=DEFAULT_RETRY, timeout=None):
timeout=transport_timeout,
)

def _done_or_raise(self, retry=DEFAULT_RETRY, timeout=None):
"""Check if the query has finished running and raise if it's not.
If the query has finished, also reload the job itself.
"""
# If an explicit timeout is not given, fall back to the transport timeout
# stored in _blocking_poll() in the process of polling for job completion.
transport_timeout = timeout if timeout is not None else self._transport_timeout

try:
self._reload_query_results(retry=retry, timeout=transport_timeout)
except exceptions.GoogleAPIError as exc:
# Reloading also updates error details on self, thus no need for an
# explicit self.set_exception() call if reloading succeeds.
try:
self.reload(retry=retry, timeout=transport_timeout)
except exceptions.GoogleAPIError:
# Use the query results reload exception, as it generally contains
# much more useful error information.
self.set_exception(exc)
finally:
return

# Only reload the job once we know the query is complete.
# This will ensure that fields such as the destination table are
# correctly populated.
if not self._query_results.complete:
raise polling_future._OperationNotComplete()
else:
try:
self.reload(retry=retry, timeout=transport_timeout)
except exceptions.GoogleAPIError as exc:
self.set_exception(exc)

def result(
self,
page_size=None,
Expand Down
54 changes: 10 additions & 44 deletions tests/unit/job/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,24 +309,15 @@ def test_cancelled(self):

self.assertTrue(job.cancelled())

def test_done_job_complete(self):
client = _make_client(project=self.PROJECT)
resource = self._make_resource(ended=True)
job = self._get_target_class().from_api_repr(resource, client)
job._query_results = google.cloud.bigquery.query._QueryResults.from_api_repr(
{"jobComplete": True, "jobReference": resource["jobReference"]}
)
self.assertTrue(job.done())

def test_done_w_timeout(self):
def test__done_or_raise_w_timeout(self):
client = _make_client(project=self.PROJECT)
resource = self._make_resource(ended=False)
job = self._get_target_class().from_api_repr(resource, client)

with mock.patch.object(
client, "_get_query_results"
) as fake_get_results, mock.patch.object(job, "reload") as fake_reload:
job.done(timeout=42)
job._done_or_raise(timeout=42)

fake_get_results.assert_called_once()
call_args = fake_get_results.call_args
Expand All @@ -335,7 +326,7 @@ def test_done_w_timeout(self):
call_args = fake_reload.call_args
self.assertEqual(call_args.kwargs.get("timeout"), 42)

def test_done_w_timeout_and_longer_internal_api_timeout(self):
def test__done_or_raise_w_timeout_and_longer_internal_api_timeout(self):
client = _make_client(project=self.PROJECT)
resource = self._make_resource(ended=False)
job = self._get_target_class().from_api_repr(resource, client)
Expand All @@ -344,7 +335,7 @@ def test_done_w_timeout_and_longer_internal_api_timeout(self):
with mock.patch.object(
client, "_get_query_results"
) as fake_get_results, mock.patch.object(job, "reload") as fake_reload:
job.done(timeout=5.5)
job._done_or_raise(timeout=5.5)

# The expected timeout used is simply the given timeout, as the latter
# is shorter than the job's internal done timeout.
Expand All @@ -357,7 +348,7 @@ def test_done_w_timeout_and_longer_internal_api_timeout(self):
call_args = fake_reload.call_args
self.assertAlmostEqual(call_args.kwargs.get("timeout"), expected_timeout)

def test_done_w_query_results_error_reload_ok_job_finished(self):
def test__done_or_raise_w_query_results_error_reload_ok(self):
client = _make_client(project=self.PROJECT)
bad_request_error = exceptions.BadRequest("Error in query")
client._get_query_results = mock.Mock(side_effect=bad_request_error)
Expand All @@ -373,32 +364,11 @@ def fake_reload(self, *args, **kwargs):
fake_reload_method = types.MethodType(fake_reload, job)

with mock.patch.object(job, "reload", new=fake_reload_method):
is_done = job.done()
job._done_or_raise()

assert is_done
assert isinstance(job._exception, exceptions.BadRequest)

def test_done_w_query_results_error_reload_ok_job_still_running(self):
client = _make_client(project=self.PROJECT)
retry_error = exceptions.RetryError("Too many retries", cause=TimeoutError)
client._get_query_results = mock.Mock(side_effect=retry_error)

resource = self._make_resource(ended=False)
job = self._get_target_class().from_api_repr(resource, client)
job._exception = None

def fake_reload(self, *args, **kwargs):
self._properties["status"]["state"] = "RUNNING"

fake_reload_method = types.MethodType(fake_reload, job)

with mock.patch.object(job, "reload", new=fake_reload_method):
is_done = job.done()

assert not is_done
assert job._exception is None

def test_done_w_query_results_error_reload_error(self):
def test__done_or_raise_w_query_results_error_reload_error(self):
client = _make_client(project=self.PROJECT)
bad_request_error = exceptions.BadRequest("Error in query")
client._get_query_results = mock.Mock(side_effect=bad_request_error)
Expand All @@ -409,12 +379,11 @@ def test_done_w_query_results_error_reload_error(self):
job.reload = mock.Mock(side_effect=reload_error)
job._exception = None

is_done = job.done()
job._done_or_raise()

assert is_done
assert job._exception is bad_request_error

def test_done_w_job_query_results_ok_reload_error(self):
def test__done_or_raise_w_job_query_results_ok_reload_error(self):
client = _make_client(project=self.PROJECT)
query_results = google.cloud.bigquery.query._QueryResults(
properties={
Expand All @@ -430,9 +399,8 @@ def test_done_w_job_query_results_ok_reload_error(self):
job.reload = mock.Mock(side_effect=retry_error)
job._exception = None

is_done = job.done()
job._done_or_raise()

assert is_done
assert job._exception is retry_error

def test_query_plan(self):
Expand Down Expand Up @@ -1905,8 +1873,6 @@ def test_reload_w_timeout(self):
)

def test_iter(self):
import types

begun_resource = self._make_resource()
query_resource = {
"jobComplete": True,
Expand Down

0 comments on commit 0663a95

Please sign in to comment.