Skip to content

Commit

Permalink
feat: Enable custom predicates for media operations (#1385)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewsg committed Dec 11, 2024
1 parent 5375fa0 commit f3517bf
Show file tree
Hide file tree
Showing 24 changed files with 292 additions and 907 deletions.
7 changes: 6 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ Preview Release

Python Storage 3.0 is currently in a preview state. If you experience that
backwards compatibility for your application is broken with this release for any
reason, please let us know through the Github issues system. Thank you.
reason, please let us know through the Github issues system. While some breaks
of backwards compatibility may be unavoidable due to new features in the major
version release, we will do our best to minimize them. Thank you.

Exception Handling
~~~~~~~~~~~~~~~~~~
Expand Down Expand Up @@ -88,6 +90,9 @@ Miscellaneous

- The BlobWriter class now attempts to terminate an ongoing resumable upload if
the writer exits with an exception.
- Retry behavior is now identical between media operations (uploads and
downloads) and other operations, and custom predicates are now supported for
media operations as well.

Quick Start
-----------
Expand Down
24 changes: 0 additions & 24 deletions google/cloud/storage/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from uuid import uuid4

from google.auth import environment_vars
from google.cloud.storage import _media
from google.cloud.storage.constants import _DEFAULT_TIMEOUT
from google.cloud.storage.retry import DEFAULT_RETRY
from google.cloud.storage.retry import DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED
Expand Down Expand Up @@ -628,29 +627,6 @@ def _bucket_bound_hostname_url(host, scheme=None):
return f"{scheme}://{host}"


def _api_core_retry_to_resumable_media_retry(retry):
"""Convert google.api.core.Retry to google.cloud.storage._media.RetryStrategy.
Custom predicates are not translated.
:type retry: google.api_core.Retry
:param retry: (Optional) The google.api_core.Retry object to translate.
:rtype: google.cloud.storage._media.RetryStrategy
:returns: A RetryStrategy with all applicable attributes copied from input.
"""

if retry is not None:
return _media.RetryStrategy(
max_sleep=retry._maximum,
max_cumulative_retry=retry._deadline,
initial_delay=retry._initial,
multiplier=retry._multiplier,
)
else:
return _media.RetryStrategy(max_retries=0)


def _get_invocation_id():
return "gccl-invocation-id/" + str(uuid4())

Expand Down
2 changes: 0 additions & 2 deletions google/cloud/storage/_media/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@
.. _requests: http://docs.python-requests.org/
"""

from google.cloud.storage._media.common import RetryStrategy
from google.cloud.storage._media.common import UPLOAD_CHUNK_SIZE


__all__ = [
"RetryStrategy",
"UPLOAD_CHUNK_SIZE",
]
60 changes: 54 additions & 6 deletions google/cloud/storage/_media/_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import re

from google.cloud.storage._media import _helpers
from google.cloud.storage._media import common
from google.cloud.storage.exceptions import InvalidResponse
from google.cloud.storage.retry import DEFAULT_RETRY


_CONTENT_RANGE_RE = re.compile(
Expand All @@ -45,14 +45,30 @@ class DownloadBase(object):
end (int): The last byte in a range to be downloaded.
headers (Optional[Mapping[str, str]]): Extra headers that should
be sent with the request, e.g. headers for encrypted data.
retry (Optional[google.api_core.retry.Retry]): How to retry the RPC.
A None value will disable retries. A google.api_core.retry.Retry
value will enable retries, and the object will configure backoff and
timeout options.
See the retry.py source code and docstrings in this package
(google.cloud.storage.retry) for information on retry types and how
to configure them.
Attributes:
media_url (str): The URL containing the media to be downloaded.
start (Optional[int]): The first byte in a range to be downloaded.
end (Optional[int]): The last byte in a range to be downloaded.
"""

def __init__(self, media_url, stream=None, start=None, end=None, headers=None):
def __init__(
self,
media_url,
stream=None,
start=None,
end=None,
headers=None,
retry=DEFAULT_RETRY,
):
self.media_url = media_url
self._stream = stream
self.start = start
Expand All @@ -61,7 +77,7 @@ def __init__(self, media_url, stream=None, start=None, end=None, headers=None):
headers = {}
self._headers = headers
self._finished = False
self._retry_strategy = common.RetryStrategy()
self._retry_strategy = retry

@property
def finished(self):
Expand Down Expand Up @@ -133,6 +149,15 @@ class Download(DownloadBase):
values are "md5", "crc32c", "auto" and None. The default is "auto",
which will try to detect if the C extension for crc32c is installed
and fall back to md5 otherwise.
retry (Optional[google.api_core.retry.Retry]): How to retry the
RPC. A None value will disable retries. A
google.api_core.retry.Retry value will enable retries, and the
object will configure backoff and timeout options.
See the retry.py source code and docstrings in this package
(google.cloud.storage.retry) for information on retry types and how
to configure them.
"""

def __init__(
Expand All @@ -143,9 +168,10 @@ def __init__(
end=None,
headers=None,
checksum="auto",
retry=DEFAULT_RETRY,
):
super(Download, self).__init__(
media_url, stream=stream, start=start, end=end, headers=headers
media_url, stream=stream, start=start, end=end, headers=headers, retry=retry
)
self.checksum = checksum
if self.checksum == "auto":
Expand Down Expand Up @@ -242,6 +268,14 @@ class ChunkedDownload(DownloadBase):
headers (Optional[Mapping[str, str]]): Extra headers that should
be sent with each request, e.g. headers for data encryption
key headers.
retry (Optional[google.api_core.retry.Retry]): How to retry the
RPC. A None value will disable retries. A
google.api_core.retry.Retry value will enable retries, and the
object will configure backoff and timeout options.
See the retry.py source code and docstrings in this package
(google.cloud.storage.retry) for information on retry types and how
to configure them.
Attributes:
media_url (str): The URL containing the media to be downloaded.
Expand All @@ -253,13 +287,27 @@ class ChunkedDownload(DownloadBase):
ValueError: If ``start`` is negative.
"""

def __init__(self, media_url, chunk_size, stream, start=0, end=None, headers=None):
def __init__(
self,
media_url,
chunk_size,
stream,
start=0,
end=None,
headers=None,
retry=DEFAULT_RETRY,
):
if start < 0:
raise ValueError(
"On a chunked download the starting " "value cannot be negative."
)
super(ChunkedDownload, self).__init__(
media_url, stream=stream, start=start, end=end, headers=headers
media_url,
stream=stream,
start=start,
end=end,
headers=headers,
retry=retry,
)
self.chunk_size = chunk_size
self._bytes_downloaded = 0
Expand Down
33 changes: 2 additions & 31 deletions google/cloud/storage/_media/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@
import base64
import hashlib
import logging
import random

from urllib.parse import parse_qs
from urllib.parse import urlencode
from urllib.parse import urlsplit
from urllib.parse import urlunsplit

from google.cloud.storage._media import common
from google.cloud.storage import retry
from google.cloud.storage.exceptions import InvalidResponse


Expand Down Expand Up @@ -101,7 +100,7 @@ def require_status_code(response, status_codes, get_status_code, callback=do_not
"""
status_code = get_status_code(response)
if status_code not in status_codes:
if status_code not in common.RETRYABLE:
if status_code not in retry._RETRYABLE_STATUS_CODES:
callback()
raise InvalidResponse(
response,
Expand All @@ -113,34 +112,6 @@ def require_status_code(response, status_codes, get_status_code, callback=do_not
return status_code


def calculate_retry_wait(base_wait, max_sleep, multiplier=2.0):
"""Calculate the amount of time to wait before a retry attempt.
Wait time grows exponentially with the number of attempts, until
``max_sleep``.
A random amount of jitter (between 0 and 1 seconds) is added to spread out
retry attempts from different clients.
Args:
base_wait (float): The "base" wait time (i.e. without any jitter)
that will be multiplied until it reaches the maximum sleep.
max_sleep (float): Maximum value that a sleep time is allowed to be.
multiplier (float): Multiplier to apply to the base wait.
Returns:
Tuple[float, float]: The new base wait time as well as the wait time
to be applied (with a random amount of jitter between 0 and 1 seconds
added).
"""
new_base_wait = multiplier * base_wait
if new_base_wait > max_sleep:
new_base_wait = max_sleep

jitter_ms = random.randint(0, 1000)
return new_base_wait, new_base_wait + 0.001 * jitter_ms


def _get_metadata_key(checksum_type):
if checksum_type == "md5":
return "md5Hash"
Expand Down
Loading

0 comments on commit f3517bf

Please sign in to comment.