Skip to content

Commit

Permalink
fix: use explicitly given project over the client's default project f…
Browse files Browse the repository at this point in the history
…or load jobs (#482)

* fix: use project parameter if given for load jobs

* blacken client tests

* Refactor string concatenations in client tests

* Silence invalid coverage complaint
  • Loading branch information
plamut authored Jan 22, 2021
1 parent cac9062 commit 530e1e8
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 50 deletions.
41 changes: 33 additions & 8 deletions google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2136,11 +2136,11 @@ def load_table_from_file(
try:
if size is None or size >= _MAX_MULTIPART_SIZE:
response = self._do_resumable_upload(
file_obj, job_resource, num_retries, timeout
file_obj, job_resource, num_retries, timeout, project=project
)
else:
response = self._do_multipart_upload(
file_obj, job_resource, size, num_retries, timeout
file_obj, job_resource, size, num_retries, timeout, project=project
)
except resumable_media.InvalidResponse as exc:
raise exceptions.from_http_response(exc.response)
Expand Down Expand Up @@ -2475,7 +2475,9 @@ def load_table_from_json(
timeout=timeout,
)

def _do_resumable_upload(self, stream, metadata, num_retries, timeout):
def _do_resumable_upload(
self, stream, metadata, num_retries, timeout, project=None
):
"""Perform a resumable upload.
Args:
Expand All @@ -2491,21 +2493,27 @@ def _do_resumable_upload(self, stream, metadata, num_retries, timeout):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
project (Optional[str]):
Project ID of the project of where to run the upload. Defaults
to the client's project.
Returns:
requests.Response:
The "200 OK" response object returned after the final chunk
is uploaded.
"""
upload, transport = self._initiate_resumable_upload(
stream, metadata, num_retries, timeout
stream, metadata, num_retries, timeout, project=project
)

while not upload.finished:
response = upload.transmit_next_chunk(transport)

return response

def _initiate_resumable_upload(self, stream, metadata, num_retries, timeout):
def _initiate_resumable_upload(
self, stream, metadata, num_retries, timeout, project=None
):
"""Initiate a resumable upload.
Args:
Expand All @@ -2521,6 +2529,10 @@ def _initiate_resumable_upload(self, stream, metadata, num_retries, timeout):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
project (Optional[str]):
Project ID of the project of where to run the upload. Defaults
to the client's project.
Returns:
Tuple:
Pair of
Expand All @@ -2532,7 +2544,11 @@ def _initiate_resumable_upload(self, stream, metadata, num_retries, timeout):
chunk_size = _DEFAULT_CHUNKSIZE
transport = self._http
headers = _get_upload_headers(self._connection.user_agent)
upload_url = _RESUMABLE_URL_TEMPLATE.format(project=self.project)

if project is None:
project = self.project
upload_url = _RESUMABLE_URL_TEMPLATE.format(project=project)

# TODO: modify ResumableUpload to take a retry.Retry object
# that it can use for the initial RPC.
upload = ResumableUpload(upload_url, chunk_size, headers=headers)
Expand All @@ -2553,7 +2569,9 @@ def _initiate_resumable_upload(self, stream, metadata, num_retries, timeout):

return upload, transport

def _do_multipart_upload(self, stream, metadata, size, num_retries, timeout):
def _do_multipart_upload(
self, stream, metadata, size, num_retries, timeout, project=None
):
"""Perform a multipart upload.
Args:
Expand All @@ -2574,6 +2592,10 @@ def _do_multipart_upload(self, stream, metadata, size, num_retries, timeout):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
project (Optional[str]):
Project ID of the project of where to run the upload. Defaults
to the client's project.
Returns:
requests.Response:
The "200 OK" response object returned after the multipart
Expand All @@ -2591,7 +2613,10 @@ def _do_multipart_upload(self, stream, metadata, size, num_retries, timeout):

headers = _get_upload_headers(self._connection.user_agent)

upload_url = _MULTIPART_URL_TEMPLATE.format(project=self.project)
if project is None:
project = self.project

upload_url = _MULTIPART_URL_TEMPLATE.format(project=project)
upload = MultipartUpload(upload_url, headers=headers)

if num_retries is not None:
Expand Down
Loading

0 comments on commit 530e1e8

Please sign in to comment.