Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid possible job already exists error #751

Merged
merged 2 commits into from
Jul 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3190,6 +3190,7 @@ def query(
If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.QueryJobConfig`
class.
"""
job_id_given = job_id is not None
plamut marked this conversation as resolved.
Show resolved Hide resolved
job_id = _make_job_id(job_id, job_id_prefix)

if project is None:
Expand Down Expand Up @@ -3221,9 +3222,30 @@ def query(

job_ref = job._JobReference(job_id, project=project, location=location)
query_job = job.QueryJob(job_ref, query, client=self, job_config=job_config)
query_job._begin(retry=retry, timeout=timeout)

return query_job
try:
query_job._begin(retry=retry, timeout=timeout)
except core_exceptions.Conflict as create_exc:
# The thought is if someone is providing their own job IDs and they get
# their job ID generation wrong, this could end up returning results for
# the wrong query. We thus only try to recover if job ID was not given.
if job_id_given:
raise create_exc

try:
query_job = self.get_job(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, there is a slight problem with this change - self.get_job has a different return type to this function. It can return LoadJob, etc as well as the QueryJob we're expecting so the actual return type doesn't match what is declared for this function.

I don't understand the situations that could result in this code being called, but presumably in reality this would always be a QueryJob? Unfortunately this is causing me problems when running pylint over some code that calls this, because it thinks the function can return LoadJob, and that has a different set of members to QueryJob.

Many thanks,
Andrew Wilkinson

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, in this context self.get_job() returns a QueryJob, because job_id is the same ID that was used a few lines above when constructing a new query job (and then starting it).

This project uses pytype for static type checks and it did not complain, but apparently pylint could not deduce the same and reported a false issue.

Could you tell pylint to ignore return type in that specific line where query() is called? IMHO that justifiable, because pylint is wrong there.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having looked into this a bit further I agree that pylint is wrong. It's a bit of a pain to have disable this check every time we call query, but I think this is a sign that pylint is aging and not keeping up with modern Python's type syntax.

Sorry for the noise.

Cheers,
Andrew

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries, it was a perfectly valid comment.

Ideally, pylint would allow ignoring particular warnings for lines matching a regex, but I'm not sure if that's currently supported? It would make disabling those false positives much cleaner compared to spamming the # pylint: disable=... comments all around the code.

Copy link

@andrewjw andrewjw Jul 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sadly the error isn't raised on the call to query, but when you try and use the return value. In my case this is accessing num_dml_affected_rows, which only exists on QueryJob, and not on LoadJob. Even if it did support disabling errors using a regex, I'm not sure it would be practical to create one.

It's been bugging me why this wouldn't be picked up by the type checker. I think I've tracked it down to the fact that LoadJob, QueryJob, etc all derive from _AsyncJob, which in turn derives from google.api_core.future.polling.PollingFuture. The problem is that google.api_core.future.polling.PollingFuture is not typable, so it gets turns into an Any type, which makes all the job types equivalent and therefore doesn't generate an error. When testing with mypy you have to add # type: ignore to the PollingFuture import line explicitly, but I guess pytype is more forgiving.

I've create the attached file demonstrating the problem (annoyingly github won't let me attach the file as a .py). As currently written it'll generate an error in both mypy and pytype, but swap the comments on lines 5 and 6 and the error goes away.

Anyway, I have a reasonable workaround, so if you want to leave this that's absolutely fine. If in future the python-api-core library adds typing then I expect this to break though. Adding an assert isinstance(query_job, job.QueryJob) will resolve the issue.

Cheers,
Andrew
invalid_return_union.txt

job_id,
project=project,
location=location,
retry=retry,
timeout=timeout,
)
except core_exceptions.GoogleAPIError: # (includes RetryError)
raise create_exc
else:
return query_job
else:
return query_job

def insert_rows(
self,
Expand Down
75 changes: 75 additions & 0 deletions tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4617,6 +4617,81 @@ def test_query_w_query_parameters(self):
},
)

def test_query_job_rpc_fail_w_random_error(self):
from google.api_core.exceptions import Unknown
from google.cloud.bigquery.job import QueryJob

creds = _make_credentials()
http = object()
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)

job_create_error = Unknown("Not sure what went wrong.")
job_begin_patcher = mock.patch.object(
QueryJob, "_begin", side_effect=job_create_error
)
with job_begin_patcher:
with pytest.raises(Unknown, match="Not sure what went wrong."):
client.query("SELECT 1;", job_id="123")

def test_query_job_rpc_fail_w_conflict_job_id_given(self):
from google.api_core.exceptions import Conflict
from google.cloud.bigquery.job import QueryJob

creds = _make_credentials()
http = object()
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)

job_create_error = Conflict("Job already exists.")
job_begin_patcher = mock.patch.object(
QueryJob, "_begin", side_effect=job_create_error
)
with job_begin_patcher:
with pytest.raises(Conflict, match="Job already exists."):
client.query("SELECT 1;", job_id="123")

def test_query_job_rpc_fail_w_conflict_random_id_job_fetch_fails(self):
from google.api_core.exceptions import Conflict
from google.api_core.exceptions import DataLoss
from google.cloud.bigquery.job import QueryJob

creds = _make_credentials()
http = object()
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)

job_create_error = Conflict("Job already exists.")
job_begin_patcher = mock.patch.object(
QueryJob, "_begin", side_effect=job_create_error
)
get_job_patcher = mock.patch.object(
client, "get_job", side_effect=DataLoss("we lost yor job, sorry")
)

with job_begin_patcher, get_job_patcher:
# If get job request fails, the original exception should be raised.
with pytest.raises(Conflict, match="Job already exists."):
client.query("SELECT 1;", job_id=None)

def test_query_job_rpc_fail_w_conflict_random_id_job_fetch_succeeds(self):
from google.api_core.exceptions import Conflict
from google.cloud.bigquery.job import QueryJob

creds = _make_credentials()
http = object()
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)

job_create_error = Conflict("Job already exists.")
job_begin_patcher = mock.patch.object(
QueryJob, "_begin", side_effect=job_create_error
)
get_job_patcher = mock.patch.object(
client, "get_job", return_value=mock.sentinel.query_job
)

with job_begin_patcher, get_job_patcher:
result = client.query("SELECT 1;", job_id=None)

assert result is mock.sentinel.query_job

def test_insert_rows_w_timeout(self):
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.table import Table
Expand Down