Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: retry failed query jobs in result() #837

Merged
merged 32 commits into from
Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
cc62448
initial stab
jimfulton Jul 28, 2021
61e1c38
test start
jimfulton Jul 28, 2021
093f9b4
Test high-level retry behavior.
jimfulton Jul 28, 2021
b2ce74b
Don't retry here
jimfulton Jul 28, 2021
d930c83
reworked the retry logic.
jimfulton Jul 28, 2021
bf7b4c6
if, when retrying, the new query job is complete and errorer, stop ri…
jimfulton Jul 29, 2021
0598197
Added test (and to existing test) to make sure we can call result mul…
jimfulton Jul 29, 2021
c9644e9
Keep carrying retry_do_query, even though it shouldn't be necessary.
jimfulton Jul 29, 2021
2d0e067
blacken
jimfulton Jul 29, 2021
0dcac01
removed unecessary condition
jimfulton Jul 29, 2021
026edba
System test that demonstrates the retry behavior as applied to the or…
jimfulton Jul 29, 2021
0e764d2
Added missing copyright
jimfulton Jul 29, 2021
c96d8b3
Added missing copyright
jimfulton Jul 29, 2021
5058cb4
Merge branch 'master' into fix-retry
tswast Aug 4, 2021
3c53172
Added a leading _ to the retry_do_query query-jov attribute to make i…
jimfulton Aug 4, 2021
eb51432
Merge branch 'master' into fix-retry
tswast Aug 5, 2021
d6d958f
fixed copyright
jimfulton Aug 5, 2021
a05f75f
Merge branch 'fix-retry' of github.com:jimfulton/python-bigquery into…
jimfulton Aug 5, 2021
32e7050
why sleep?
jimfulton Aug 5, 2021
3361a82
use DEFAULT_RETRY for low-level requests, to retry API errors
jimfulton Aug 5, 2021
4c6ef5b
Separate job retry into separate option for client query and query-jo…
jimfulton Aug 9, 2021
25b44a0
Merge branch 'master' into fix-retry
jimfulton Aug 9, 2021
9ab84e4
Use None job_retry to disable retry
jimfulton Aug 10, 2021
d2bf840
Use a 10-minute deadline for job retry by default
jimfulton Aug 10, 2021
4c004a5
Merge branch 'fix-retry' of github.com:jimfulton/python-bigquery into…
jimfulton Aug 10, 2021
be49c4b
Merge branch 'master' into fix-retry
jimfulton Aug 10, 2021
9e4a011
Merge branch 'master' into fix-retry
jimfulton Aug 11, 2021
204641b
Merge branch 'master' into fix-retry
jimfulton Aug 11, 2021
bf051b0
Added another default reason to retry jobs
jimfulton Aug 11, 2021
b47244f
Merge branch 'fix-retry' of github.com:jimfulton/python-bigquery into…
jimfulton Aug 11, 2021
2377a1b
Merge branch 'master' into fix-retry
jimfulton Aug 11, 2021
6b790e8
Update tests/unit/test_job_retry.py
tswast Aug 11, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 44 additions & 25 deletions google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3206,16 +3206,14 @@ def query(
class.
"""
job_id_given = job_id is not None
job_id = _make_job_id(job_id, job_id_prefix)
job_id_save = job_id

if project is None:
project = self.project

if location is None:
location = self.location

job_config = copy.deepcopy(job_config)

if self._default_query_job_config:
if job_config:
_verify_job_config_type(
Expand All @@ -3225,6 +3223,8 @@ def query(
# that is in the default,
# should be filled in with the default
# the incoming therefore has precedence
#
# Note that _fill_from_default doesn't mutate the receiver
job_config = job_config._fill_from_default(
self._default_query_job_config
)
Expand All @@ -3233,34 +3233,53 @@ def query(
self._default_query_job_config,
google.cloud.bigquery.job.QueryJobConfig,
)
job_config = copy.deepcopy(self._default_query_job_config)
job_config = self._default_query_job_config

job_ref = job._JobReference(job_id, project=project, location=location)
query_job = job.QueryJob(job_ref, query, client=self, job_config=job_config)
# Note that we haven't modified the original job_config (or
# _default_query_job_config) up to this point.
job_config_save = job_config

try:
query_job._begin(retry=retry, timeout=timeout)
except core_exceptions.Conflict as create_exc:
# The thought is if someone is providing their own job IDs and they get
# their job ID generation wrong, this could end up returning results for
# the wrong query. We thus only try to recover if job ID was not given.
if job_id_given:
raise create_exc
def do_query():
# Make a copy now, so that original doesn't get changed by the process
# below and to facilitate retry
job_config = copy.deepcopy(job_config_save)

job_id = _make_job_id(job_id_save, job_id_prefix)
job_ref = job._JobReference(job_id, project=project, location=location)
query_job = job.QueryJob(job_ref, query, client=self, job_config=job_config)
jimfulton marked this conversation as resolved.
Show resolved Hide resolved

try:
query_job = self.get_job(
job_id,
project=project,
location=location,
retry=retry,
timeout=timeout,
)
except core_exceptions.GoogleAPIError: # (includes RetryError)
raise create_exc
query_job._begin(retry=retry, timeout=timeout)
except core_exceptions.Conflict as create_exc:
# The thought is if someone is providing their own job IDs and they get
# their job ID generation wrong, this could end up returning results for
# the wrong query. We thus only try to recover if job ID was not given.
if job_id_given:
raise create_exc

try:
query_job = self.get_job(
job_id,
project=project,
location=location,
retry=retry,
timeout=timeout,
)
except core_exceptions.GoogleAPIError: # (includes RetryError)
raise create_exc
else:
return query_job
else:
return query_job
else:
return query_job

future = do_query()
# The future might be in a failed state now, but if it's
# unrecoverable, we'll find out when we ask for it's result, at which
# point, we may retry.
if not job_id_given:
future.retry_do_query = do_query # in case we have to retry later
jimfulton marked this conversation as resolved.
Show resolved Hide resolved

return future

def insert_rows(
self,
Expand Down
45 changes: 40 additions & 5 deletions google/cloud/bigquery/job/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -1300,12 +1300,47 @@ def result(
If the job did not complete in the given timeout.
"""
try:
super(QueryJob, self).result(retry=retry, timeout=timeout)
retry_do_query = getattr(self, "retry_do_query", None)
first = True
tswast marked this conversation as resolved.
Show resolved Hide resolved
sub_retry = retry if retry_do_query is None else None

def do_get_result():
nonlocal first

if first:
first = False
else:
# Note that we won't get here if retry_do_query is
# None, because we won't use a retry.

# The orinal job is failed. Create a new one.
job = retry_do_query()

# If it's already failed, we might as well stop:
tswast marked this conversation as resolved.
Show resolved Hide resolved
if job.done() and job.exception() is not None:
raise job.exception()

# Become the new job:
self.__dict__.clear()
self.__dict__.update(job.__dict__)

# This shouldn't be necessary, because once we have a good
# job, it should stay good,and we shouldn't have to retry.
# But let's be paranoid. :)
self.retry_do_query = retry_do_query

super(QueryJob, self).result(retry=sub_retry, timeout=timeout)

# Since the job could already be "done" (e.g. got a finished job
# via client.get_job), the superclass call to done() might not
# set the self._query_results cache.
self._reload_query_results(retry=sub_retry, timeout=timeout)

if retry is not None and retry_do_query is not None:
do_get_result = retry(do_get_result)

do_get_result()

# Since the job could already be "done" (e.g. got a finished job
# via client.get_job), the superclass call to done() might not
# set the self._query_results cache.
self._reload_query_results(retry=retry, timeout=timeout)
except exceptions.GoogleAPICallError as exc:
exc.message += self._format_for_exception(self.query, self.job_id)
exc.query_job = self
Expand Down
46 changes: 46 additions & 0 deletions tests/system/test_query_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import contextlib
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

license headers :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, here and in unit test. Sorry.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs to be the Google header in this repo. https://opensource.google/docs/releasing/preparing/#Apache-header

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ouch. Sorry.

Fixed.

import threading
import time

import google.cloud.bigquery


def thread(func):
thread = threading.Thread(target=func, daemon=True)
thread.start()
return thread


def test_query_retry_539(bigquery_client, dataset_id):
"""
Test semantic retry

See: https://github.com/googleapis/python-bigquery/issues/539
"""
from google.api_core import exceptions
from google.api_core.retry import if_exception_type, Retry

table_name = f"{dataset_id}.t539"
job = bigquery_client.query(f"select count(*) from {table_name}")
job_id = job.job_id

# We can already know that the job failed, but we're not supposed
# to find out until we call result, which is where retry happend
assert job.done()
assert job.exception() is not None

@thread
def create_table():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neat! Maybe add a comment explaining that the sleep is so that the first query fails.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

time.sleep(1)
with contextlib.closing(google.cloud.bigquery.Client()) as client:
client.query(f"create table {table_name} (id int64)")

retry_policy = Retry(predicate=if_exception_type(exceptions.NotFound))
[[count]] = list(job.result(retry=retry_policy))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, is the retry used to determine if we should re-issue a query? I think we should have a separate object for that, since retrying individual API requests can still be useful in cases where we don't want to retry the whole query if its failed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I changed it to use DEFAULT_RETRY for the 2 sub-requests, however...

There's a problem here. I don't think users understand retry. They expect it to deal with transient semantic errors, like NotFound and rate limiting. They don't get that network errors and application errors are handled separately. They probably aren't aware of what network errors they should worry about. It took me a while to grok the difference between errors raised by api_request and other errors that cause a job to be in a failed state. I'm not 100% sure I understand the difference now. 0.5 ;)

IMO, it would not help to let them supply two different retries.

Some problems with this PR as it stands ATM:

  1. The meaning of the retry argument to result() depends on whether a job id was supplied to query. This is bad. :)
  2. Users probably expect to specify application-level retry when calling query, not when calling result.

A fundamental problem, IMO, is that there are two kinds of retry and the distinction between them is subtle.

Maybe, a breaking change we could make, since we're making breaking changes :), would be to:

  1. Define and use a new retry for network retry. The user doesn't influence this.
  2. Use the retry argument to query and result to specify how failed jobs should be retried.
    • Default them to a new sensible default, perhaps limited to rate-limit errors.
    • The retry passed to query is only used as the default retry for result.
  3. If a user supplies a job id to query, then error if they pass a retry argument to query or result, to the effect that their job isn't retryable, because the supplied a custom job id.

I suspect we should brainstorm this a bit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd hate for us to retry the query if there was just a transient API error, so I do think we need separate logic.

A fundamental problem, IMO, is that there are two kinds of retry and the distinction between them is subtle.

I agree. I'm hoping the with good defaults, most users won't have to worry about it. But I also don't think it's something we should hide from them. It's not intuitive and requires some detailed knowledge about how the API works to get right, but I think that's true for custom retry logic in general.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The retry passed to query is only used as the default retry for result.

This could work. Especially if we use the jobs.query API. #589 This is all way too complicated. 😭

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd hate for us to retry the query if there was just a transient API error, so I do think we need separate logic.

I'm not suggesting we don't retry API/network errors. I was suggesting we don't give users control over them.

However, in the interest of backward compatibility, I propose:

  • Document the existing retry argument as API retry. IN the documentation, I'd say something to the effect of "this isn't the retry you're looking for."
  • Add a new job_retry argument to query and result to retry failed jobs. (The job_retry passed to query is simply the default for result.)
  • Error if job_retry is provided for non-retryable jobs, due to user-suplied job id.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this proposal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, see 4c6ef5b.

assert count == 0

# The job was retried, and thus got a new job id
assert job.job_id != job_id

# Make sure we don't leave a thread behind:
create_table.join()
134 changes: 134 additions & 0 deletions tests/unit/test_query_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import datetime

import mock
import pytest

import google.api_core.exceptions

from .helpers import make_connection


@mock.patch("time.sleep")
def test_retry_failed_jobs(sleep, client):
"""
Test retry of job failures, as opposed to API-invocation failures.
"""
err = dict(reason="rateLimitExceeded")
responses = [
dict(status=dict(state="DONE", errors=[err], errorResult=err)),
dict(status=dict(state="DONE", errors=[err], errorResult=err)),
dict(status=dict(state="DONE", errors=[err], errorResult=err)),
dict(status=dict(state="DONE")),
dict(rows=[{"f": [{"v": "1"}]}], totalRows="1"),
]

def api_request(method, path, query_params=None, data=None, **kw):
response = responses.pop(0)
if data:
response["jobReference"] = data["jobReference"]
else:
response["jobReference"] = dict(
jobId=path.split("/")[-1], projectId="PROJECT"
)
return response

conn = client._connection = make_connection()
conn.api_request.side_effect = api_request

job = client.query("select 1")

orig_job_id = job.job_id
result = job.result()
assert result.total_rows == 1
assert not responses # We made all the calls we expected to.

# The job adjusts it's job id based on the id of the last attempt.
assert job.job_id != orig_job_id
assert job.job_id == conn.mock_calls[3][2]["data"]["jobReference"]["jobId"]

# We had to sleep three times
assert len(sleep.mock_calls) == 3

# Sleeps are random, however they're more than 0
assert min(c[1][0] for c in sleep.mock_calls) > 0

# They're at most 2 * (multiplier**(number of sleeps - 1)) * initial
# The default multiplier is 2
assert max(c[1][0] for c in sleep.mock_calls) <= 8

# We can ask for the result again:
responses = [
dict(rows=[{"f": [{"v": "1"}]}], totalRows="1"),
]
orig_job_id = job.job_id
result = job.result()
assert result.total_rows == 1
assert not responses # We made all the calls we expected to.

# We wouldn't (and didn't) fail, because we're dealing with a successful job.
# So the job id hasn't changed.
assert job.job_id == orig_job_id


@mock.patch("google.api_core.retry.datetime_helpers")
@mock.patch("time.sleep")
def test_retry_failed_jobs_after_retry_failed(sleep, datetime_helpers, client):
"""
If at first you don't succeed, maybe you will later. :)
"""
conn = client._connection = make_connection()

datetime_helpers.utcnow.return_value = datetime.datetime(2021, 7, 29, 10, 43, 2)

err = dict(reason="rateLimitExceeded")

def api_request(method, path, query_params=None, data=None, **kw):
calls = sleep.mock_calls
if calls:
datetime_helpers.utcnow.return_value += datetime.timedelta(
seconds=calls[-1][1][0]
)
response = dict(status=dict(state="DONE", errors=[err], errorResult=err))
response["jobReference"] = data["jobReference"]
return response

conn.api_request.side_effect = api_request

job = client.query("select 1")
orig_job_id = job.job_id

with pytest.raises(google.api_core.exceptions.RetryError):
job.result()

# We never fot a successful job, so the job id never changed:
assert job.job_id == orig_job_id

# We failed because we couldn't succeed after 120 seconds.
# But we can try again:
responses = [
dict(status=dict(state="DONE", errors=[err], errorResult=err)),
dict(status=dict(state="DONE", errors=[err], errorResult=err)),
dict(status=dict(state="DONE", errors=[err], errorResult=err)),
dict(status=dict(state="DONE")),
dict(rows=[{"f": [{"v": "1"}]}], totalRows="1"),
]

def api_request(method, path, query_params=None, data=None, **kw):
calls = sleep.mock_calls
datetime_helpers.utcnow.return_value += datetime.timedelta(
seconds=calls[-1][1][0]
)
response = responses.pop(0)
if data:
response["jobReference"] = data["jobReference"]
else:
response["jobReference"] = dict(
jobId=path.split("/")[-1], projectId="PROJECT"
)
return response

conn.api_request.side_effect = api_request
result = job.result()
assert result.total_rows == 1
assert not responses # We made all the calls we expected to.
assert job.job_id != orig_job_id