diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 3dea55654..45b68ee09 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -57,6 +57,7 @@ from google.cloud.bigquery._helpers import _record_field_to_json from google.cloud.bigquery._helpers import _str_or_none from google.cloud.bigquery._helpers import _verify_job_config_type +from google.cloud.bigquery._helpers import _del_sub_prop from google.cloud.bigquery._http import Connection from google.cloud.bigquery import _pandas_helpers from google.cloud.bigquery.dataset import Dataset @@ -1338,9 +1339,7 @@ def create_job(self, job_config, retry=DEFAULT_RETRY): load_job_config = google.cloud.bigquery.job.LoadJobConfig.from_api_repr( job_config ) - destination = TableReference.from_api_repr( - job_config["load"]["destinationTable"] - ) + destination = _get_sub_prop(job_config, ["load", "destinationTable"]) source_uris = _get_sub_prop(job_config, ["load", "sourceUris"]) return self.load_table_from_uri( source_uris, destination, job_config=load_job_config, retry=retry @@ -1349,14 +1348,12 @@ def create_job(self, job_config, retry=DEFAULT_RETRY): copy_job_config = google.cloud.bigquery.job.CopyJobConfig.from_api_repr( job_config ) - copy_resource = job_config["copy"] - destination = TableReference.from_api_repr( - copy_resource["destinationTable"] - ) + destination = _get_sub_prop(job_config, ["copy", "destinationTable"]) sources = [] - source_configs = copy_resource.get("sourceTables") + source_configs = _get_sub_prop(job_config, ["copy", "sourceTables"]) + if source_configs is None: - source_configs = [copy_resource["sourceTable"]] + source_configs = [_get_sub_prop(job_config, ["copy", "sourceTable"])] for source_config in source_configs: table_ref = TableReference.from_api_repr(source_config) sources.append(table_ref) @@ -1367,13 +1364,13 @@ def create_job(self, job_config, retry=DEFAULT_RETRY): extract_job_config = google.cloud.bigquery.job.ExtractJobConfig.from_api_repr( job_config ) - source = TableReference.from_api_repr(job_config["extract"]["sourceTable"]) + source = _get_sub_prop(job_config, ["extract", "sourceTable"]) destination_uris = _get_sub_prop(job_config, ["extract", "destinationUris"]) return self.extract_table( source, destination_uris, job_config=extract_job_config, retry=retry ) elif "query" in job_config: - del job_config["query"]["destinationTable"] + _del_sub_prop(job_config, ["query", "destinationTable"]) query_job_config = google.cloud.bigquery.job.QueryJobConfig.from_api_repr( job_config ) diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 5b480fb13..63325e95e 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -2893,6 +2893,68 @@ def test_create_job_query_config(self): configuration, "google.cloud.bigquery.client.Client.query", ) + def test_create_job_query_config_w_rateLimitExceeded_error(self): + from google.cloud.exceptions import Forbidden + from google.cloud.bigquery.retry import DEFAULT_RETRY + + query = "select count(*) from persons" + configuration = { + "query": { + "query": query, + "useLegacySql": False, + "destinationTable": {"tableId": "table_id"}, + } + } + resource = { + "jobReference": {"projectId": self.PROJECT, "jobId": mock.ANY}, + "configuration": { + "query": { + "query": query, + "useLegacySql": False, + "destinationTable": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": "query_destination_table", + }, + } + }, + } + data_without_destination = { + "jobReference": {"projectId": self.PROJECT, "jobId": mock.ANY}, + "configuration": {"query": {"query": query, "useLegacySql": False}}, + } + + creds = _make_credentials() + http = object() + retry = DEFAULT_RETRY.with_deadline(1).with_predicate( + lambda exc: isinstance(exc, Forbidden) + ) + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + + api_request_patcher = mock.patch.object( + client._connection, + "api_request", + side_effect=[ + Forbidden("", errors=[{"reason": "rateLimitExceeded"}]), + resource, + ], + ) + + with api_request_patcher as fake_api_request: + job = client.create_job(job_config=configuration, retry=retry) + + self.assertEqual(job.destination.table_id, "query_destination_table") + self.assertEqual(len(fake_api_request.call_args_list), 2) # was retried once + self.assertEqual( + fake_api_request.call_args_list[1], + mock.call( + method="POST", + path="/projects/PROJECT/jobs", + data=data_without_destination, + timeout=None, + ), + ) + def test_create_job_w_invalid_job_config(self): configuration = {"unknown": {}} creds = _make_credentials()