diff --git a/README.rst b/README.rst index f80b52d37..402029ea7 100644 --- a/README.rst +++ b/README.rst @@ -45,7 +45,9 @@ Preview Release Python Storage 3.0 is currently in a preview state. If you experience that backwards compatibility for your application is broken with this release for any -reason, please let us know through the Github issues system. Thank you. +reason, please let us know through the Github issues system. While some breaks +of backwards compatibility may be unavoidable due to new features in the major +version release, we will do our best to minimize them. Thank you. Exception Handling ~~~~~~~~~~~~~~~~~~ @@ -88,6 +90,9 @@ Miscellaneous - The BlobWriter class now attempts to terminate an ongoing resumable upload if the writer exits with an exception. +- Retry behavior is now identical between media operations (uploads and + downloads) and other operations, and custom predicates are now supported for + media operations as well. Quick Start ----------- diff --git a/google/cloud/storage/_helpers.py b/google/cloud/storage/_helpers.py index 4c40a0ad8..ea2345719 100644 --- a/google/cloud/storage/_helpers.py +++ b/google/cloud/storage/_helpers.py @@ -26,7 +26,6 @@ from uuid import uuid4 from google.auth import environment_vars -from google.cloud.storage import _media from google.cloud.storage.constants import _DEFAULT_TIMEOUT from google.cloud.storage.retry import DEFAULT_RETRY from google.cloud.storage.retry import DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED @@ -628,29 +627,6 @@ def _bucket_bound_hostname_url(host, scheme=None): return f"{scheme}://{host}" -def _api_core_retry_to_resumable_media_retry(retry): - """Convert google.api.core.Retry to google.cloud.storage._media.RetryStrategy. - - Custom predicates are not translated. - - :type retry: google.api_core.Retry - :param retry: (Optional) The google.api_core.Retry object to translate. - - :rtype: google.cloud.storage._media.RetryStrategy - :returns: A RetryStrategy with all applicable attributes copied from input. - """ - - if retry is not None: - return _media.RetryStrategy( - max_sleep=retry._maximum, - max_cumulative_retry=retry._deadline, - initial_delay=retry._initial, - multiplier=retry._multiplier, - ) - else: - return _media.RetryStrategy(max_retries=0) - - def _get_invocation_id(): return "gccl-invocation-id/" + str(uuid4()) diff --git a/google/cloud/storage/_media/__init__.py b/google/cloud/storage/_media/__init__.py index 3fc9dfb68..edab8f51d 100644 --- a/google/cloud/storage/_media/__init__.py +++ b/google/cloud/storage/_media/__init__.py @@ -26,11 +26,9 @@ .. _requests: http://docs.python-requests.org/ """ -from google.cloud.storage._media.common import RetryStrategy from google.cloud.storage._media.common import UPLOAD_CHUNK_SIZE __all__ = [ - "RetryStrategy", "UPLOAD_CHUNK_SIZE", ] diff --git a/google/cloud/storage/_media/_download.py b/google/cloud/storage/_media/_download.py index 27b74502d..349ddf30c 100644 --- a/google/cloud/storage/_media/_download.py +++ b/google/cloud/storage/_media/_download.py @@ -19,8 +19,8 @@ import re from google.cloud.storage._media import _helpers -from google.cloud.storage._media import common from google.cloud.storage.exceptions import InvalidResponse +from google.cloud.storage.retry import DEFAULT_RETRY _CONTENT_RANGE_RE = re.compile( @@ -45,6 +45,14 @@ class DownloadBase(object): end (int): The last byte in a range to be downloaded. headers (Optional[Mapping[str, str]]): Extra headers that should be sent with the request, e.g. headers for encrypted data. + retry (Optional[google.api_core.retry.Retry]): How to retry the RPC. + A None value will disable retries. A google.api_core.retry.Retry + value will enable retries, and the object will configure backoff and + timeout options. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. Attributes: media_url (str): The URL containing the media to be downloaded. @@ -52,7 +60,15 @@ class DownloadBase(object): end (Optional[int]): The last byte in a range to be downloaded. """ - def __init__(self, media_url, stream=None, start=None, end=None, headers=None): + def __init__( + self, + media_url, + stream=None, + start=None, + end=None, + headers=None, + retry=DEFAULT_RETRY, + ): self.media_url = media_url self._stream = stream self.start = start @@ -61,7 +77,7 @@ def __init__(self, media_url, stream=None, start=None, end=None, headers=None): headers = {} self._headers = headers self._finished = False - self._retry_strategy = common.RetryStrategy() + self._retry_strategy = retry @property def finished(self): @@ -133,6 +149,15 @@ class Download(DownloadBase): values are "md5", "crc32c", "auto" and None. The default is "auto", which will try to detect if the C extension for crc32c is installed and fall back to md5 otherwise. + retry (Optional[google.api_core.retry.Retry]): How to retry the + RPC. A None value will disable retries. A + google.api_core.retry.Retry value will enable retries, and the + object will configure backoff and timeout options. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + """ def __init__( @@ -143,9 +168,10 @@ def __init__( end=None, headers=None, checksum="auto", + retry=DEFAULT_RETRY, ): super(Download, self).__init__( - media_url, stream=stream, start=start, end=end, headers=headers + media_url, stream=stream, start=start, end=end, headers=headers, retry=retry ) self.checksum = checksum if self.checksum == "auto": @@ -242,6 +268,14 @@ class ChunkedDownload(DownloadBase): headers (Optional[Mapping[str, str]]): Extra headers that should be sent with each request, e.g. headers for data encryption key headers. + retry (Optional[google.api_core.retry.Retry]): How to retry the + RPC. A None value will disable retries. A + google.api_core.retry.Retry value will enable retries, and the + object will configure backoff and timeout options. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. Attributes: media_url (str): The URL containing the media to be downloaded. @@ -253,13 +287,27 @@ class ChunkedDownload(DownloadBase): ValueError: If ``start`` is negative. """ - def __init__(self, media_url, chunk_size, stream, start=0, end=None, headers=None): + def __init__( + self, + media_url, + chunk_size, + stream, + start=0, + end=None, + headers=None, + retry=DEFAULT_RETRY, + ): if start < 0: raise ValueError( "On a chunked download the starting " "value cannot be negative." ) super(ChunkedDownload, self).__init__( - media_url, stream=stream, start=start, end=end, headers=headers + media_url, + stream=stream, + start=start, + end=end, + headers=headers, + retry=retry, ) self.chunk_size = chunk_size self._bytes_downloaded = 0 diff --git a/google/cloud/storage/_media/_helpers.py b/google/cloud/storage/_media/_helpers.py index 35deb8cf7..c07101eda 100644 --- a/google/cloud/storage/_media/_helpers.py +++ b/google/cloud/storage/_media/_helpers.py @@ -19,14 +19,13 @@ import base64 import hashlib import logging -import random from urllib.parse import parse_qs from urllib.parse import urlencode from urllib.parse import urlsplit from urllib.parse import urlunsplit -from google.cloud.storage._media import common +from google.cloud.storage import retry from google.cloud.storage.exceptions import InvalidResponse @@ -101,7 +100,7 @@ def require_status_code(response, status_codes, get_status_code, callback=do_not """ status_code = get_status_code(response) if status_code not in status_codes: - if status_code not in common.RETRYABLE: + if status_code not in retry._RETRYABLE_STATUS_CODES: callback() raise InvalidResponse( response, @@ -113,34 +112,6 @@ def require_status_code(response, status_codes, get_status_code, callback=do_not return status_code -def calculate_retry_wait(base_wait, max_sleep, multiplier=2.0): - """Calculate the amount of time to wait before a retry attempt. - - Wait time grows exponentially with the number of attempts, until - ``max_sleep``. - - A random amount of jitter (between 0 and 1 seconds) is added to spread out - retry attempts from different clients. - - Args: - base_wait (float): The "base" wait time (i.e. without any jitter) - that will be multiplied until it reaches the maximum sleep. - max_sleep (float): Maximum value that a sleep time is allowed to be. - multiplier (float): Multiplier to apply to the base wait. - - Returns: - Tuple[float, float]: The new base wait time as well as the wait time - to be applied (with a random amount of jitter between 0 and 1 seconds - added). - """ - new_base_wait = multiplier * base_wait - if new_base_wait > max_sleep: - new_base_wait = max_sleep - - jitter_ms = random.randint(0, 1000) - return new_base_wait, new_base_wait + 0.001 * jitter_ms - - def _get_metadata_key(checksum_type): if checksum_type == "md5": return "md5Hash" diff --git a/google/cloud/storage/_media/_upload.py b/google/cloud/storage/_media/_upload.py index ddf4ecd15..8d89ee5b2 100644 --- a/google/cloud/storage/_media/_upload.py +++ b/google/cloud/storage/_media/_upload.py @@ -30,10 +30,10 @@ import urllib.parse from google.cloud.storage._media import _helpers -from google.cloud.storage._media import common from google.cloud.storage._media import UPLOAD_CHUNK_SIZE from google.cloud.storage.exceptions import InvalidResponse from google.cloud.storage.exceptions import DataCorruption +from google.cloud.storage.retry import DEFAULT_RETRY from xml.etree import ElementTree @@ -87,18 +87,26 @@ class UploadBase(object): upload_url (str): The URL where the content will be uploaded. headers (Optional[Mapping[str, str]]): Extra headers that should be sent with the request, e.g. headers for encrypted data. + retry (Optional[google.api_core.retry.Retry]): How to retry the + RPC. A None value will disable retries. A + google.api_core.retry.Retry value will enable retries, and the + object will configure backoff and timeout options. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. Attributes: upload_url (str): The URL where the content will be uploaded. """ - def __init__(self, upload_url, headers=None): + def __init__(self, upload_url, headers=None, retry=DEFAULT_RETRY): self.upload_url = upload_url if headers is None: headers = {} self._headers = headers self._finished = False - self._retry_strategy = common.RetryStrategy() + self._retry_strategy = retry @property def finished(self): @@ -173,6 +181,14 @@ class SimpleUpload(UploadBase): upload_url (str): The URL where the content will be uploaded. headers (Optional[Mapping[str, str]]): Extra headers that should be sent with the request, e.g. headers for encrypted data. + retry (Optional[google.api_core.retry.Retry]): How to retry the + RPC. A None value will disable retries. A + google.api_core.retry.Retry value will enable retries, and the + object will configure backoff and timeout options. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. Attributes: upload_url (str): The URL where the content will be uploaded. @@ -256,13 +272,21 @@ class MultipartUpload(UploadBase): "crc32c", "auto", and None. The default is "auto", which will try to detect if the C extension for crc32c is installed and fall back to md5 otherwise. + retry (Optional[google.api_core.retry.Retry]): How to retry the + RPC. A None value will disable retries. A + google.api_core.retry.Retry value will enable retries, and the + object will configure backoff and timeout options. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. Attributes: upload_url (str): The URL where the content will be uploaded. """ - def __init__(self, upload_url, headers=None, checksum="auto"): - super(MultipartUpload, self).__init__(upload_url, headers=headers) + def __init__(self, upload_url, headers=None, checksum="auto", retry=DEFAULT_RETRY): + super(MultipartUpload, self).__init__(upload_url, headers=headers, retry=retry) self._checksum_type = checksum if self._checksum_type == "auto": self._checksum_type = ( @@ -369,7 +393,14 @@ class ResumableUpload(UploadBase): host automatically. Supported values are "md5", "crc32c", "auto", and None. The default is "auto", which will try to detect if the C extension for crc32c is installed and fall back to md5 otherwise. + retry (Optional[google.api_core.retry.Retry]): How to retry the + RPC. A None value will disable retries. A + google.api_core.retry.Retry value will enable retries, and the + object will configure backoff and timeout options. + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. Attributes: upload_url (str): The URL where the content will be uploaded. @@ -379,8 +410,10 @@ class ResumableUpload(UploadBase): :data:`.UPLOAD_CHUNK_SIZE`. """ - def __init__(self, upload_url, chunk_size, checksum="auto", headers=None): - super(ResumableUpload, self).__init__(upload_url, headers=headers) + def __init__( + self, upload_url, chunk_size, checksum="auto", headers=None, retry=DEFAULT_RETRY + ): + super(ResumableUpload, self).__init__(upload_url, headers=headers, retry=retry) if chunk_size % UPLOAD_CHUNK_SIZE != 0: raise ValueError( "{} KB must divide chunk size".format(UPLOAD_CHUNK_SIZE / 1024) @@ -906,6 +939,14 @@ class XMLMPUContainer(UploadBase): filename (str): The name (path) of the file to upload. headers (Optional[Mapping[str, str]]): Extra headers that should be sent with every request. + retry (Optional[google.api_core.retry.Retry]): How to retry the + RPC. A None value will disable retries. A + google.api_core.retry.Retry value will enable retries, and the + object will configure backoff and timeout options. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. Attributes: upload_url (str): The URL where the content will be uploaded. @@ -913,8 +954,10 @@ class XMLMPUContainer(UploadBase): response. """ - def __init__(self, upload_url, filename, headers=None, upload_id=None): - super().__init__(upload_url, headers=headers) + def __init__( + self, upload_url, filename, headers=None, upload_id=None, retry=DEFAULT_RETRY + ): + super().__init__(upload_url, headers=headers, retry=retry) self._filename = filename self._upload_id = upload_id self._parts = {} @@ -1201,6 +1244,15 @@ class XMLMPUPart(UploadBase): "auto" and None. The default is "auto", which will try to detect if the C extension for crc32c is installed and fall back to md5 otherwise. + retry (Optional[google.api_core.retry.Retry]): How to retry the + RPC. A None value will disable retries. A + google.api_core.retry.Retry value will enable retries, and the + object will configure backoff and timeout options. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + Attributes: upload_url (str): The URL of the object (without query parameters). upload_id (str): The ID of the upload from the initialization response. @@ -1222,8 +1274,9 @@ def __init__( part_number, headers=None, checksum="auto", + retry=DEFAULT_RETRY, ): - super().__init__(upload_url, headers=headers) + super().__init__(upload_url, headers=headers, retry=retry) self._filename = filename self._start = start self._end = end diff --git a/google/cloud/storage/_media/common.py b/google/cloud/storage/_media/common.py index 2baafa568..2917ea53d 100644 --- a/google/cloud/storage/_media/common.py +++ b/google/cloud/storage/_media/common.py @@ -17,108 +17,5 @@ Includes custom exception types, useful constants and shared helpers. """ -import http.client - -_SLEEP_RETRY_ERROR_MSG = ( - "At most one of `max_cumulative_retry` and `max_retries` " "can be specified." -) - UPLOAD_CHUNK_SIZE = 262144 # 256 * 1024 """int: Chunks in a resumable upload must come in multiples of 256 KB.""" - -MAX_SLEEP = 64.0 -"""float: Maximum amount of time allowed between requests. - -Used during the retry process for sleep after a failed request. -Chosen since it is the power of two nearest to one minute. -""" - -MAX_CUMULATIVE_RETRY = 600.0 -"""float: Maximum total sleep time allowed during retry process. - -This is provided (10 minutes) as a default. When the cumulative sleep -exceeds this limit, no more retries will occur. -""" - -RETRYABLE = ( - http.client.TOO_MANY_REQUESTS, # 429 - http.client.REQUEST_TIMEOUT, # 408 - http.client.INTERNAL_SERVER_ERROR, # 500 - http.client.BAD_GATEWAY, # 502 - http.client.SERVICE_UNAVAILABLE, # 503 - http.client.GATEWAY_TIMEOUT, # 504 -) -"""iterable: HTTP status codes that indicate a retryable error. - -Connection errors are also retried, but are not listed as they are -exceptions, not status codes. -""" - - -class RetryStrategy(object): - """Configuration class for retrying failed requests. - - At most one of ``max_cumulative_retry`` and ``max_retries`` can be - specified (they are both caps on the total number of retries). If - neither are specified, then ``max_cumulative_retry`` is set as - :data:`MAX_CUMULATIVE_RETRY`. - - Args: - max_sleep (Optional[float]): The maximum amount of time to sleep after - a failed request. Default is :attr:`MAX_SLEEP`. - max_cumulative_retry (Optional[float]): The maximum **total** amount of - time to sleep during retry process. - max_retries (Optional[int]): The number of retries to attempt. - initial_delay (Optional[float]): The initial delay. Default 1.0 second. - muiltiplier (Optional[float]): Exponent of the backoff. Default is 2.0. - - Attributes: - max_sleep (float): Maximum amount of time allowed between requests. - max_cumulative_retry (Optional[float]): Maximum total sleep time - allowed during retry process. - max_retries (Optional[int]): The number retries to attempt. - initial_delay (Optional[float]): The initial delay. Default 1.0 second. - muiltiplier (Optional[float]): Exponent of the backoff. Default is 2.0. - - Raises: - ValueError: If both of ``max_cumulative_retry`` and ``max_retries`` - are passed. - """ - - def __init__( - self, - max_sleep=MAX_SLEEP, - max_cumulative_retry=None, - max_retries=None, - initial_delay=1.0, - multiplier=2.0, - ): - if max_cumulative_retry is not None and max_retries is not None: - raise ValueError(_SLEEP_RETRY_ERROR_MSG) - if max_cumulative_retry is None and max_retries is None: - max_cumulative_retry = MAX_CUMULATIVE_RETRY - - self.max_sleep = max_sleep - self.max_cumulative_retry = max_cumulative_retry - self.max_retries = max_retries - self.initial_delay = initial_delay - self.multiplier = multiplier - - def retry_allowed(self, total_sleep, num_retries): - """Check if another retry is allowed. - - Args: - total_sleep (float): With another retry, the amount of sleep that - will be accumulated by the caller. - num_retries (int): With another retry, the number of retries that - will be attempted by the caller. - - Returns: - bool: Indicating if another retry is allowed (depending on either - the cumulative sleep allowed or the maximum number of retries - allowed. - """ - if self.max_cumulative_retry is None: - return num_retries <= self.max_retries - else: - return total_sleep <= self.max_cumulative_retry diff --git a/google/cloud/storage/_media/requests/_request_helpers.py b/google/cloud/storage/_media/requests/_request_helpers.py index ecb0ea4e7..604ffc313 100644 --- a/google/cloud/storage/_media/requests/_request_helpers.py +++ b/google/cloud/storage/_media/requests/_request_helpers.py @@ -17,17 +17,6 @@ This utilities are explicitly catered to ``requests``-like transports. """ -import http.client -import requests.exceptions -import urllib3.exceptions # type: ignore - -import time - -from google.cloud.storage.exceptions import InvalidResponse -from google.cloud.storage._media import common -from google.cloud.storage._media import _helpers - -_DEFAULT_RETRY_STRATEGY = common.RetryStrategy() _SINGLE_GET_CHUNK_SIZE = 8192 # The number of seconds to wait to establish a connection # (connect() call on socket). Avoid setting this to a multiple of 3 to not @@ -36,20 +25,6 @@ # The number of seconds to wait between bytes sent from the server. _DEFAULT_READ_TIMEOUT = 60 -_CONNECTION_ERROR_CLASSES = ( - http.client.BadStatusLine, - http.client.IncompleteRead, - http.client.ResponseNotReady, - requests.exceptions.ConnectionError, - requests.exceptions.ChunkedEncodingError, - requests.exceptions.Timeout, - urllib3.exceptions.PoolError, - urllib3.exceptions.ProtocolError, - urllib3.exceptions.SSLError, - urllib3.exceptions.TimeoutError, - ConnectionError, # Python 3.x only, superclass of ConnectionResetError. -) - class RequestsMixin(object): """Mix-in class implementing ``requests``-specific behavior. @@ -115,67 +90,18 @@ def _get_body(response): return response._content -def wait_and_retry(func, get_status_code, retry_strategy): +def wait_and_retry(func, retry_strategy): """Attempts to retry a call to ``func`` until success. - Expects ``func`` to return an HTTP response and uses ``get_status_code`` - to check if the response is retry-able. - - ``func`` is expected to raise a failure status code as an - InvalidResponse, at which point this method will check the code - against the common.RETRIABLE list of retriable status codes. - - Will retry until :meth:`~.RetryStrategy.retry_allowed` (on the current - ``retry_strategy``) returns :data:`False`. Uses - :func:`_helpers.calculate_retry_wait` to double the wait time (with jitter) - after each attempt. - Args: func (Callable): A callable that takes no arguments and produces an HTTP response which will be checked as retry-able. - get_status_code (Callable[Any, int]): Helper to get a status code - from a response. - retry_strategy (~google.cloud.storage._media.common.RetryStrategy): The + retry_strategy (Optional[google.api_core.retry.Retry]): The strategy to use if the request fails and must be retried. Returns: object: The return value of ``func``. """ - total_sleep = 0.0 - num_retries = 0 - # base_wait will be multiplied by the multiplier on the first retry. - base_wait = float(retry_strategy.initial_delay) / retry_strategy.multiplier - - # Set the retriable_exception_type if possible. We expect requests to be - # present here and the transport to be using requests.exceptions errors, - # but due to loose coupling with the transport layer we can't guarantee it. - - while True: # return on success or when retries exhausted. - error = None - try: - response = func() - except _CONNECTION_ERROR_CLASSES as e: - error = e # Fall through to retry, if there are retries left. - except InvalidResponse as e: - # An InvalidResponse is only retriable if its status code matches. - # The `process_response()` method on a Download or Upload method - # will convert the status code into an exception. - if get_status_code(e.response) in common.RETRYABLE: - error = e # Fall through to retry, if there are retries left. - else: - raise # If the status code is not retriable, raise w/o retry. - else: - return response - - base_wait, wait_time = _helpers.calculate_retry_wait( - base_wait, retry_strategy.max_sleep, retry_strategy.multiplier - ) - num_retries += 1 - total_sleep += wait_time - - # Check if (another) retry is allowed. If retries are exhausted and - # no acceptable response was received, raise the retriable error. - if not retry_strategy.retry_allowed(total_sleep, num_retries): - raise error - - time.sleep(wait_time) + if retry_strategy: + func = retry_strategy(func) + return func() diff --git a/google/cloud/storage/_media/requests/download.py b/google/cloud/storage/_media/requests/download.py index 1396935e3..2c1b9392c 100644 --- a/google/cloud/storage/_media/requests/download.py +++ b/google/cloud/storage/_media/requests/download.py @@ -72,6 +72,14 @@ class Download(_request_helpers.RequestsMixin, _download.Download): values are "md5", "crc32c", "auto" and None. The default is "auto", which will try to detect if the C extension for crc32c is installed and fall back to md5 otherwise. + retry (Optional[google.api_core.retry.Retry]): How to retry the + RPC. A None value will disable retries. A + google.api_core.retry.Retry value will enable retries, and the + object will configure backoff and timeout options. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. Attributes: media_url (str): The URL containing the media to be downloaded. @@ -235,9 +243,7 @@ def retriable_request(): return result - return _request_helpers.wait_and_retry( - retriable_request, self._get_status_code, self._retry_strategy - ) + return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy) class RawDownload(_request_helpers.RawRequestsMixin, _download.Download): @@ -268,6 +274,15 @@ class RawDownload(_request_helpers.RawRequestsMixin, _download.Download): values are "md5", "crc32c", "auto" and None. The default is "auto", which will try to detect if the C extension for crc32c is installed and fall back to md5 otherwise. + retry (Optional[google.api_core.retry.Retry]): How to retry the + RPC. A None value will disable retries. A + google.api_core.retry.Retry value will enable retries, and the + object will configure backoff and timeout options. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + Attributes: media_url (str): The URL containing the media to be downloaded. start (Optional[int]): The first byte in a range to be downloaded. @@ -425,9 +440,7 @@ def retriable_request(): return result - return _request_helpers.wait_and_retry( - retriable_request, self._get_status_code, self._retry_strategy - ) + return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy) class ChunkedDownload(_request_helpers.RequestsMixin, _download.ChunkedDownload): @@ -447,6 +460,14 @@ class ChunkedDownload(_request_helpers.RequestsMixin, _download.ChunkedDownload) headers (Optional[Mapping[str, str]]): Extra headers that should be sent with each request, e.g. headers for data encryption key headers. + retry (Optional[google.api_core.retry.Retry]): How to retry the + RPC. A None value will disable retries. A + google.api_core.retry.Retry value will enable retries, and the + object will configure backoff and timeout options. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. Attributes: media_url (str): The URL containing the media to be downloaded. @@ -500,9 +521,7 @@ def retriable_request(): self._process_response(result) return result - return _request_helpers.wait_and_retry( - retriable_request, self._get_status_code, self._retry_strategy - ) + return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy) class RawChunkedDownload(_request_helpers.RawRequestsMixin, _download.ChunkedDownload): @@ -522,6 +541,14 @@ class RawChunkedDownload(_request_helpers.RawRequestsMixin, _download.ChunkedDow headers (Optional[Mapping[str, str]]): Extra headers that should be sent with each request, e.g. headers for data encryption key headers. + retry (Optional[google.api_core.retry.Retry]): How to retry the + RPC. A None value will disable retries. A + google.api_core.retry.Retry value will enable retries, and the + object will configure backoff and timeout options. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. Attributes: media_url (str): The URL containing the media to be downloaded. @@ -576,9 +603,7 @@ def retriable_request(): self._process_response(result) return result - return _request_helpers.wait_and_retry( - retriable_request, self._get_status_code, self._retry_strategy - ) + return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy) def _add_decoder(response_raw, checksum): diff --git a/google/cloud/storage/_media/requests/upload.py b/google/cloud/storage/_media/requests/upload.py index 7bdea99f5..75d4c53da 100644 --- a/google/cloud/storage/_media/requests/upload.py +++ b/google/cloud/storage/_media/requests/upload.py @@ -79,9 +79,7 @@ def retriable_request(): return result - return _request_helpers.wait_and_retry( - retriable_request, self._get_status_code, self._retry_strategy - ) + return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy) class MultipartUpload(_request_helpers.RequestsMixin, _upload.MultipartUpload): @@ -101,6 +99,14 @@ class MultipartUpload(_request_helpers.RequestsMixin, _upload.MultipartUpload): "crc32c", "auto", and None. The default is "auto", which will try to detect if the C extension for crc32c is installed and fall back to md5 otherwise. + retry (Optional[google.api_core.retry.Retry]): How to retry the + RPC. A None value will disable retries. A + google.api_core.retry.Retry value will enable retries, and the + object will configure backoff and timeout options. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. Attributes: upload_url (str): The URL where the content will be uploaded. @@ -152,9 +158,7 @@ def retriable_request(): return result - return _request_helpers.wait_and_retry( - retriable_request, self._get_status_code, self._retry_strategy - ) + return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy) class ResumableUpload(_request_helpers.RequestsMixin, _upload.ResumableUpload): @@ -339,6 +343,14 @@ class ResumableUpload(_request_helpers.RequestsMixin, _upload.ResumableUpload): host automatically. Supported values are "md5", "crc32c", "auto", and None. The default is "auto", which will try to detect if the C extension for crc32c is installed and fall back to md5 otherwise. + retry (Optional[google.api_core.retry.Retry]): How to retry the + RPC. A None value will disable retries. A + google.api_core.retry.Retry value will enable retries, and the + object will configure backoff and timeout options. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. Attributes: upload_url (str): The URL where the content will be uploaded. @@ -420,9 +432,7 @@ def retriable_request(): return result - return _request_helpers.wait_and_retry( - retriable_request, self._get_status_code, self._retry_strategy - ) + return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy) def transmit_next_chunk( self, @@ -515,9 +525,7 @@ def retriable_request(): return result - return _request_helpers.wait_and_retry( - retriable_request, self._get_status_code, self._retry_strategy - ) + return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy) def recover(self, transport): """Recover from a failure and check the status of the current upload. @@ -555,9 +563,7 @@ def retriable_request(): return result - return _request_helpers.wait_and_retry( - retriable_request, self._get_status_code, self._retry_strategy - ) + return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy) class XMLMPUContainer(_request_helpers.RequestsMixin, _upload.XMLMPUContainer): @@ -587,6 +593,14 @@ class XMLMPUContainer(_request_helpers.RequestsMixin, _upload.XMLMPUContainer): be sent with the :meth:`initiate` request, e.g. headers for encrypted data. These headers will be propagated to individual XMLMPUPart objects spawned from this container as well. + retry (Optional[google.api_core.retry.Retry]): How to retry the + RPC. A None value will disable retries. A + google.api_core.retry.Retry value will enable retries, and the + object will configure backoff and timeout options. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. Attributes: upload_url (str): The URL where the content will be uploaded. @@ -636,9 +650,7 @@ def retriable_request(): return result - return _request_helpers.wait_and_retry( - retriable_request, self._get_status_code, self._retry_strategy - ) + return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy) def finalize( self, @@ -676,9 +688,7 @@ def retriable_request(): return result - return _request_helpers.wait_and_retry( - retriable_request, self._get_status_code, self._retry_strategy - ) + return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy) def cancel( self, @@ -718,9 +728,7 @@ def retriable_request(): return result - return _request_helpers.wait_and_retry( - retriable_request, self._get_status_code, self._retry_strategy - ) + return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy) class XMLMPUPart(_request_helpers.RequestsMixin, _upload.XMLMPUPart): @@ -760,6 +768,4 @@ def retriable_request(): return result - return _request_helpers.wait_and_retry( - retriable_request, self._get_status_code, self._retry_strategy - ) + return _request_helpers.wait_and_retry(retriable_request, self._retry_strategy) diff --git a/google/cloud/storage/blob.py b/google/cloud/storage/blob.py index fc4db8de2..713cde864 100644 --- a/google/cloud/storage/blob.py +++ b/google/cloud/storage/blob.py @@ -54,7 +54,6 @@ from google.cloud.storage._helpers import _scalar_property from google.cloud.storage._helpers import _bucket_bound_hostname_url from google.cloud.storage._helpers import _raise_if_more_than_one_set -from google.cloud.storage._helpers import _api_core_retry_to_resumable_media_retry from google.cloud.storage._helpers import _get_default_headers from google.cloud.storage._helpers import _get_default_storage_base_url from google.cloud.storage._signing import generate_signed_url_v2 @@ -979,7 +978,7 @@ def _do_download( raw_download=False, timeout=_DEFAULT_TIMEOUT, checksum="auto", - retry=None, + retry=DEFAULT_RETRY, ): """Perform a download without any error handling. @@ -1031,9 +1030,7 @@ def _do_download( :type retry: google.api_core.retry.Retry :param retry: (Optional) How to retry the RPC. A None value will disable retries. A google.api_core.retry.Retry value will enable retries, - and the object will configure backoff and timeout options. Custom - predicates (customizable error codes) are not supported for media - operations such as this one. + and the object will configure backoff and timeout options. This private method does not accept ConditionalRetryPolicy values because the information necessary to evaluate the policy is instead @@ -1044,8 +1041,6 @@ def _do_download( to configure them. """ - retry_strategy = _api_core_retry_to_resumable_media_retry(retry) - extra_attributes = { "url.full": download_url, "download.chunk_size": f"{self.chunk_size}", @@ -1069,8 +1064,8 @@ def _do_download( start=start, end=end, checksum=checksum, + retry=retry, ) - download._retry_strategy = retry_strategy with create_trace_span( name=f"Storage.{download_class}/consume", attributes=extra_attributes, @@ -1097,9 +1092,9 @@ def _do_download( headers=headers, start=start if start else 0, end=end, + retry=retry, ) - download._retry_strategy = retry_strategy with create_trace_span( name=f"Storage.{download_class}/consumeNextChunk", attributes=extra_attributes, @@ -1220,11 +1215,6 @@ def download_to_file( (google.cloud.storage.retry) for information on retry types and how to configure them. - Media operations (downloads and uploads) do not support non-default - predicates in a Retry object. The default will always be used. Other - configuration changes for Retry objects such as delays and deadlines - are respected. - :raises: :class:`google.cloud.exceptions.NotFound` """ @@ -1374,11 +1364,6 @@ def download_to_filename( (google.cloud.storage.retry) for information on retry types and how to configure them. - Media operations (downloads and uploads) do not support non-default - predicates in a Retry object. The default will always be used. Other - configuration changes for Retry objects such as delays and deadlines - are respected. - :raises: :class:`google.cloud.exceptions.NotFound` """ @@ -1494,11 +1479,6 @@ def download_as_bytes( (google.cloud.storage.retry) for information on retry types and how to configure them. - Media operations (downloads and uploads) do not support non-default - predicates in a Retry object. The default will always be used. Other - configuration changes for Retry objects such as delays and deadlines - are respected. - :rtype: bytes :returns: The data stored in this blob. @@ -1610,11 +1590,6 @@ def download_as_string( (google.cloud.storage.retry) for information on retry types and how to configure them. - Media operations (downloads and uploads) do not support non-default - predicates in a Retry object. The default will always be used. Other - configuration changes for Retry objects such as delays and deadlines - are respected. - :rtype: bytes :returns: The data stored in this blob. @@ -1726,11 +1701,6 @@ def download_as_text( (google.cloud.storage.retry) for information on retry types and how to configure them. - Media operations (downloads and uploads) do not support non-default - predicates in a Retry object. The default will always be used. Other - configuration changes for Retry objects such as delays and deadlines - are respected. - :rtype: text :returns: The data stored in this blob, decoded to text. """ @@ -1938,9 +1908,7 @@ def _do_multipart_upload( :type retry: google.api_core.retry.Retry :param retry: (Optional) How to retry the RPC. A None value will disable retries. A google.api_core.retry.Retry value will enable retries, - and the object will configure backoff and timeout options. Custom - predicates (customizable error codes) are not supported for media - operations such as this one. + and the object will configure backoff and timeout options. This private method does not accept ConditionalRetryPolicy values because the information necessary to evaluate the policy is instead @@ -2015,9 +1983,9 @@ def _do_multipart_upload( ) upload_url = _add_query_parameters(base_url, name_value_pairs) - upload = MultipartUpload(upload_url, headers=headers, checksum=checksum) - - upload._retry_strategy = _api_core_retry_to_resumable_media_retry(retry) + upload = MultipartUpload( + upload_url, headers=headers, checksum=checksum, retry=retry + ) extra_attributes = { "url.full": upload_url, @@ -2132,9 +2100,7 @@ def _initiate_resumable_upload( :type retry: google.api_core.retry.Retry :param retry: (Optional) How to retry the RPC. A None value will disable retries. A google.api_core.retry.Retry value will enable retries, - and the object will configure backoff and timeout options. Custom - predicates (customizable error codes) are not supported for media - operations such as this one. + and the object will configure backoff and timeout options. This private method does not accept ConditionalRetryPolicy values because the information necessary to evaluate the policy is instead @@ -2211,11 +2177,9 @@ def _initiate_resumable_upload( upload_url = _add_query_parameters(base_url, name_value_pairs) upload = ResumableUpload( - upload_url, chunk_size, headers=headers, checksum=checksum + upload_url, chunk_size, headers=headers, checksum=checksum, retry=retry ) - upload._retry_strategy = _api_core_retry_to_resumable_media_retry(retry) - upload.initiate( transport, stream, @@ -2312,9 +2276,7 @@ def _do_resumable_upload( :type retry: google.api_core.retry.Retry :param retry: (Optional) How to retry the RPC. A None value will disable retries. A google.api_core.retry.Retry value will enable retries, - and the object will configure backoff and timeout options. Custom - predicates (customizable error codes) are not supported for media - operations such as this one. + and the object will configure backoff and timeout options. This private method does not accept ConditionalRetryPolicy values because the information necessary to evaluate the policy is instead @@ -2472,11 +2434,6 @@ def _do_upload( (google.cloud.storage.retry) for information on retry types and how to configure them. - Media operations (downloads and uploads) do not support non-default - predicates in a Retry object. The default will always be used. Other - configuration changes for Retry objects such as delays and deadlines - are respected. - :type command: str :param command: (Optional) Information about which interface for upload was used, @@ -2660,11 +2617,6 @@ def _prep_and_do_upload( (google.cloud.storage.retry) for information on retry types and how to configure them. - Media operations (downloads and uploads) do not support non-default - predicates in a Retry object. The default will always be used. Other - configuration changes for Retry objects such as delays and deadlines - are respected. - :type command: str :param command: (Optional) Information about which interface for upload was used, @@ -2813,10 +2765,6 @@ def upload_from_file( to enable retries regardless of generation precondition setting. See [Configuring Retries](https://cloud.google.com/python/docs/reference/storage/latest/retry_timeout). - Media operations (downloads and uploads) do not support non-default - predicates in a Retry object. Other configuration changes for Retry objects - such as delays and deadlines are respected. - :raises: :class:`~google.cloud.exceptions.GoogleCloudError` if the upload response returns an error status. """ @@ -2960,10 +2908,6 @@ def upload_from_filename( Change the value to ``DEFAULT_RETRY`` or another `google.api_core.retry.Retry` object to enable retries regardless of generation precondition setting. See [Configuring Retries](https://cloud.google.com/python/docs/reference/storage/latest/retry_timeout). - - Media operations (downloads and uploads) do not support non-default - predicates in a Retry object. Other configuration changes for Retry objects - such as delays and deadlines are respected. """ self._handle_filename_and_upload( @@ -3072,10 +3016,6 @@ def upload_from_string( Change the value to ``DEFAULT_RETRY`` or another `google.api_core.retry.Retry` object to enable retries regardless of generation precondition setting. See [Configuring Retries](https://cloud.google.com/python/docs/reference/storage/latest/retry_timeout). - - Media operations (downloads and uploads) do not support non-default - predicates in a Retry object. Other configuration changes for Retry objects - such as delays and deadlines are respected. """ data = _to_bytes(data, encoding="utf-8") string_buffer = BytesIO(data) @@ -3212,10 +3152,6 @@ def create_resumable_upload_session( to enable retries regardless of generation precondition setting. See [Configuring Retries](https://cloud.google.com/python/docs/reference/storage/latest/retry_timeout). - Media operations (downloads and uploads) do not support non-default - predicates in a Retry object. Other configuration changes for Retry objects - such as delays and deadlines are respected. - :rtype: str :returns: The resumable upload session URL. The upload can be completed by making an HTTP PUT request with the @@ -4319,11 +4255,6 @@ def _prep_and_do_download( (google.cloud.storage.retry) for information on retry types and how to configure them. - Media operations (downloads and uploads) do not support non-default - predicates in a Retry object. The default will always be used. Other - configuration changes for Retry objects such as delays and deadlines - are respected. - :type command: str :param command: (Optional) Information about which interface for download was used, diff --git a/google/cloud/storage/client.py b/google/cloud/storage/client.py index e2621356a..57fa7043b 100644 --- a/google/cloud/storage/client.py +++ b/google/cloud/storage/client.py @@ -1196,11 +1196,6 @@ def download_blob_to_file( See the retry.py source code and docstrings in this package (google.cloud.storage.retry) for information on retry types and how to configure them. - - Media operations (downloads and uploads) do not support non-default - predicates in a Retry object. The default will always be used. Other - configuration changes for Retry objects such as delays and deadlines - are respected. """ if not isinstance(blob_or_uri, Blob): diff --git a/google/cloud/storage/retry.py b/google/cloud/storage/retry.py index 3ea3ae4a0..d1d5a7686 100644 --- a/google/cloud/storage/retry.py +++ b/google/cloud/storage/retry.py @@ -17,12 +17,16 @@ See [Retry Strategy for Google Cloud Storage](https://cloud.google.com/storage/docs/retry-strategy#client-libraries) """ +import http + import requests import requests.exceptions as requests_exceptions +import urllib3 from google.api_core import exceptions as api_exceptions from google.api_core import retry from google.auth import exceptions as auth_exceptions +from google.cloud.storage.exceptions import InvalidResponse _RETRYABLE_TYPES = ( @@ -35,11 +39,24 @@ requests.ConnectionError, requests_exceptions.ChunkedEncodingError, requests_exceptions.Timeout, + http.client.BadStatusLine, + http.client.IncompleteRead, + http.client.ResponseNotReady, + urllib3.exceptions.PoolError, + urllib3.exceptions.ProtocolError, + urllib3.exceptions.SSLError, + urllib3.exceptions.TimeoutError, ) -# Some retriable errors don't have their own custom exception in api_core. -_ADDITIONAL_RETRYABLE_STATUS_CODES = (408,) +_RETRYABLE_STATUS_CODES = ( + http.client.TOO_MANY_REQUESTS, # 429 + http.client.REQUEST_TIMEOUT, # 408 + http.client.INTERNAL_SERVER_ERROR, # 500 + http.client.BAD_GATEWAY, # 502 + http.client.SERVICE_UNAVAILABLE, # 503 + http.client.GATEWAY_TIMEOUT, # 504 +) def _should_retry(exc): @@ -47,7 +64,9 @@ def _should_retry(exc): if isinstance(exc, _RETRYABLE_TYPES): return True elif isinstance(exc, api_exceptions.GoogleAPICallError): - return exc.code in _ADDITIONAL_RETRYABLE_STATUS_CODES + return exc.code in _RETRYABLE_STATUS_CODES + elif isinstance(exc, InvalidResponse): + return exc.response.status_code in _RETRYABLE_STATUS_CODES elif isinstance(exc, auth_exceptions.TransportError): return _should_retry(exc.args[0]) else: diff --git a/google/cloud/storage/transfer_manager.py b/google/cloud/storage/transfer_manager.py index 752b80de0..fafe68f1c 100644 --- a/google/cloud/storage/transfer_manager.py +++ b/google/cloud/storage/transfer_manager.py @@ -32,7 +32,6 @@ from google.cloud.storage.blob import _get_host_name from google.cloud.storage.blob import _quote from google.cloud.storage.constants import _DEFAULT_TIMEOUT -from google.cloud.storage._helpers import _api_core_retry_to_resumable_media_retry from google.cloud.storage.retry import DEFAULT_RETRY import google_crc32c @@ -1107,8 +1106,7 @@ def upload_chunks_concurrently( if blob.kms_key_name is not None and "cryptoKeyVersions" not in blob.kms_key_name: headers["x-goog-encryption-kms-key-name"] = blob.kms_key_name - container = XMLMPUContainer(url, filename, headers=headers) - container._retry_strategy = _api_core_retry_to_resumable_media_retry(retry) + container = XMLMPUContainer(url, filename, headers=headers, retry=retry) container.initiate(transport=transport, content_type=content_type) upload_id = container.upload_id @@ -1190,8 +1188,8 @@ def _upload_part( part_number=part_number, checksum=checksum, headers=headers, + retry=retry, ) - part._retry_strategy = _api_core_retry_to_resumable_media_retry(retry) part.upload(client._http) return (part_number, part.etag) diff --git a/tests/resumable_media/unit/requests/test__helpers.py b/tests/resumable_media/unit/requests/test__helpers.py index 98515a2fb..132172bbb 100644 --- a/tests/resumable_media/unit/requests/test__helpers.py +++ b/tests/resumable_media/unit/requests/test__helpers.py @@ -15,14 +15,8 @@ import http.client from unittest import mock -import pytest # type: ignore -import requests.exceptions -import urllib3.exceptions # type: ignore - -from google.cloud.storage._media import common from google.cloud.storage._media.requests import _request_helpers -from google.cloud.storage.exceptions import InvalidResponse EXPECTED_TIMEOUT = (61, 60) @@ -63,345 +57,3 @@ def test__get_body_w_content_consumed(self): def _make_response(status_code): return mock.Mock(status_code=status_code, spec=["status_code"]) - - -def _get_status_code(response): - return response.status_code - - -class Test_wait_and_retry(object): - def test_success_no_retry(self): - truthy = http.client.OK - assert truthy not in common.RETRYABLE - response = _make_response(truthy) - - func = mock.Mock(return_value=response, spec=[]) - retry_strategy = common.RetryStrategy() - ret_val = _request_helpers.wait_and_retry( - func, _get_status_code, retry_strategy - ) - - assert ret_val is response - func.assert_called_once_with() - - @mock.patch("time.sleep") - @mock.patch("random.randint") - def test_success_with_retry(self, randint_mock, sleep_mock): - randint_mock.side_effect = [125, 625, 375] - - status_codes = ( - http.client.INTERNAL_SERVER_ERROR, - http.client.BAD_GATEWAY, - http.client.SERVICE_UNAVAILABLE, - http.client.NOT_FOUND, - ) - responses = [_make_response(status_code) for status_code in status_codes] - - def raise_response(): - raise InvalidResponse(responses.pop(0)) - - func = mock.Mock(side_effect=raise_response) - - retry_strategy = common.RetryStrategy() - try: - _request_helpers.wait_and_retry(func, _get_status_code, retry_strategy) - except InvalidResponse as e: - ret_val = e.response - - assert ret_val.status_code == status_codes[-1] - assert status_codes[-1] not in common.RETRYABLE - - assert func.call_count == 4 - assert func.mock_calls == [mock.call()] * 4 - - assert randint_mock.call_count == 3 - assert randint_mock.mock_calls == [mock.call(0, 1000)] * 3 - - assert sleep_mock.call_count == 3 - sleep_mock.assert_any_call(1.125) - sleep_mock.assert_any_call(2.625) - sleep_mock.assert_any_call(4.375) - - @mock.patch("time.sleep") - @mock.patch("random.randint") - def test_success_with_retry_custom_delay(self, randint_mock, sleep_mock): - randint_mock.side_effect = [125, 625, 375] - - status_codes = ( - http.client.INTERNAL_SERVER_ERROR, - http.client.BAD_GATEWAY, - http.client.SERVICE_UNAVAILABLE, - http.client.NOT_FOUND, - ) - responses = [_make_response(status_code) for status_code in status_codes] - - def raise_response(): - raise InvalidResponse(responses.pop(0)) - - func = mock.Mock(side_effect=raise_response) - - retry_strategy = common.RetryStrategy(initial_delay=3.0, multiplier=4) - try: - _request_helpers.wait_and_retry(func, _get_status_code, retry_strategy) - except InvalidResponse as e: - ret_val = e.response - - assert ret_val.status_code == status_codes[-1] - assert status_codes[-1] not in common.RETRYABLE - - assert func.call_count == 4 - assert func.mock_calls == [mock.call()] * 4 - - assert randint_mock.call_count == 3 - assert randint_mock.mock_calls == [mock.call(0, 1000)] * 3 - - assert sleep_mock.call_count == 3 - sleep_mock.assert_any_call(3.125) # initial delay 3 + jitter 0.125 - sleep_mock.assert_any_call( - 12.625 - ) # previous delay 3 * multiplier 4 + jitter 0.625 - sleep_mock.assert_any_call( - 48.375 - ) # previous delay 12 * multiplier 4 + jitter 0.375 - - @mock.patch("time.sleep") - @mock.patch("random.randint") - def test_retry_success_http_standard_lib_connection_errors( - self, randint_mock, sleep_mock - ): - randint_mock.side_effect = [125, 625, 500, 875, 375] - - status_code = int(http.client.OK) - response = _make_response(status_code) - responses = [ - http.client.BadStatusLine(""), - http.client.IncompleteRead(""), - http.client.ResponseNotReady, - ConnectionError, - response, - ] - func = mock.Mock(side_effect=responses, spec=[]) - - retry_strategy = common.RetryStrategy() - ret_val = _request_helpers.wait_and_retry( - func, _get_status_code, retry_strategy - ) - - assert ret_val == responses[-1] - assert func.call_count == 5 - assert func.mock_calls == [mock.call()] * 5 - assert randint_mock.call_count == 4 - assert randint_mock.mock_calls == [mock.call(0, 1000)] * 4 - assert sleep_mock.call_count == 4 - sleep_mock.assert_any_call(1.125) - sleep_mock.assert_any_call(2.625) - sleep_mock.assert_any_call(4.500) - sleep_mock.assert_any_call(8.875) - - @mock.patch("time.sleep") - @mock.patch("random.randint") - def test_retry_success_requests_lib_connection_errors( - self, randint_mock, sleep_mock - ): - randint_mock.side_effect = [125, 625, 500, 875] - - status_code = int(http.client.OK) - response = _make_response(status_code) - responses = [ - requests.exceptions.ConnectionError, - requests.exceptions.ChunkedEncodingError, - requests.exceptions.Timeout, - response, - ] - func = mock.Mock(side_effect=responses, spec=[]) - - retry_strategy = common.RetryStrategy() - ret_val = _request_helpers.wait_and_retry( - func, _get_status_code, retry_strategy - ) - - assert ret_val == responses[-1] - assert func.call_count == 4 - assert func.mock_calls == [mock.call()] * 4 - assert randint_mock.call_count == 3 - assert randint_mock.mock_calls == [mock.call(0, 1000)] * 3 - assert sleep_mock.call_count == 3 - sleep_mock.assert_any_call(1.125) - sleep_mock.assert_any_call(2.625) - sleep_mock.assert_any_call(4.500) - - @mock.patch("time.sleep") - @mock.patch("random.randint") - def test_retry_success_urllib3_connection_errors(self, randint_mock, sleep_mock): - randint_mock.side_effect = [125, 625, 500, 875, 375] - - status_code = int(http.client.OK) - response = _make_response(status_code) - responses = [ - urllib3.exceptions.PoolError(None, ""), - urllib3.exceptions.ProtocolError, - urllib3.exceptions.SSLError, - urllib3.exceptions.TimeoutError, - response, - ] - func = mock.Mock(side_effect=responses, spec=[]) - - retry_strategy = common.RetryStrategy() - ret_val = _request_helpers.wait_and_retry( - func, _get_status_code, retry_strategy - ) - - assert ret_val == responses[-1] - assert func.call_count == 5 - assert func.mock_calls == [mock.call()] * 5 - assert randint_mock.call_count == 4 - assert randint_mock.mock_calls == [mock.call(0, 1000)] * 4 - assert sleep_mock.call_count == 4 - sleep_mock.assert_any_call(1.125) - sleep_mock.assert_any_call(2.625) - sleep_mock.assert_any_call(4.500) - sleep_mock.assert_any_call(8.875) - - @mock.patch("time.sleep") - @mock.patch("random.randint") - def test_retry_exceeds_max_cumulative(self, randint_mock, sleep_mock): - randint_mock.side_effect = [875, 0, 375, 500, 500, 250, 125] - - status_codes = ( - http.client.SERVICE_UNAVAILABLE, - http.client.GATEWAY_TIMEOUT, - http.client.TOO_MANY_REQUESTS, - http.client.INTERNAL_SERVER_ERROR, - http.client.SERVICE_UNAVAILABLE, - http.client.BAD_GATEWAY, - http.client.TOO_MANY_REQUESTS, - ) - responses = [_make_response(status_code) for status_code in status_codes] - - def raise_response(): - raise InvalidResponse(responses.pop(0)) - - func = mock.Mock(side_effect=raise_response) - - retry_strategy = common.RetryStrategy(max_cumulative_retry=100.0) - try: - _request_helpers.wait_and_retry(func, _get_status_code, retry_strategy) - except InvalidResponse as e: - ret_val = e.response - - assert ret_val.status_code == status_codes[-1] - assert status_codes[-1] in common.RETRYABLE - - assert func.call_count == 7 - assert func.mock_calls == [mock.call()] * 7 - - assert randint_mock.call_count == 7 - assert randint_mock.mock_calls == [mock.call(0, 1000)] * 7 - - assert sleep_mock.call_count == 6 - sleep_mock.assert_any_call(1.875) - sleep_mock.assert_any_call(2.0) - sleep_mock.assert_any_call(4.375) - sleep_mock.assert_any_call(8.5) - sleep_mock.assert_any_call(16.5) - sleep_mock.assert_any_call(32.25) - - @mock.patch("time.sleep") - @mock.patch("random.randint") - def test_retry_exceeds_max_retries(self, randint_mock, sleep_mock): - randint_mock.side_effect = [875, 0, 375, 500, 500, 250, 125] - - status_codes = ( - http.client.SERVICE_UNAVAILABLE, - http.client.GATEWAY_TIMEOUT, - http.client.TOO_MANY_REQUESTS, - http.client.INTERNAL_SERVER_ERROR, - http.client.SERVICE_UNAVAILABLE, - http.client.BAD_GATEWAY, - http.client.TOO_MANY_REQUESTS, - ) - responses = [_make_response(status_code) for status_code in status_codes] - - def raise_response(): - raise InvalidResponse(responses.pop(0)) - - func = mock.Mock(side_effect=raise_response) - - retry_strategy = common.RetryStrategy(max_retries=6) - try: - _request_helpers.wait_and_retry(func, _get_status_code, retry_strategy) - except InvalidResponse as e: - ret_val = e.response - - assert ret_val.status_code == status_codes[-1] - assert status_codes[-1] in common.RETRYABLE - - assert func.call_count == 7 - assert func.mock_calls == [mock.call()] * 7 - - assert randint_mock.call_count == 7 - assert randint_mock.mock_calls == [mock.call(0, 1000)] * 7 - - assert sleep_mock.call_count == 6 - sleep_mock.assert_any_call(1.875) - sleep_mock.assert_any_call(2.0) - sleep_mock.assert_any_call(4.375) - sleep_mock.assert_any_call(8.5) - sleep_mock.assert_any_call(16.5) - sleep_mock.assert_any_call(32.25) - - @mock.patch("time.sleep") - @mock.patch("random.randint") - def test_retry_zero_max_retries(self, randint_mock, sleep_mock): - randint_mock.side_effect = [875, 0, 375] - - status_codes = ( - http.client.SERVICE_UNAVAILABLE, - http.client.GATEWAY_TIMEOUT, - http.client.TOO_MANY_REQUESTS, - ) - responses = [_make_response(status_code) for status_code in status_codes] - - def raise_response(): - raise InvalidResponse(responses.pop(0)) - - func = mock.Mock(side_effect=raise_response) - - retry_strategy = common.RetryStrategy(max_retries=0) - try: - _request_helpers.wait_and_retry(func, _get_status_code, retry_strategy) - except InvalidResponse as e: - ret_val = e.response - - assert func.call_count == 1 - assert func.mock_calls == [mock.call()] * 1 - assert ret_val.status_code == status_codes[0] - - assert randint_mock.call_count == 1 - assert sleep_mock.call_count == 0 - - @mock.patch("time.sleep") - @mock.patch("random.randint") - def test_retry_exceeded_reraises_connection_error(self, randint_mock, sleep_mock): - randint_mock.side_effect = [875, 0, 375, 500, 500, 250, 125] - - responses = [requests.exceptions.ConnectionError] * 7 - func = mock.Mock(side_effect=responses, spec=[]) - - retry_strategy = common.RetryStrategy(max_cumulative_retry=100.0) - with pytest.raises(requests.exceptions.ConnectionError): - _request_helpers.wait_and_retry(func, _get_status_code, retry_strategy) - - assert func.call_count == 7 - assert func.mock_calls == [mock.call()] * 7 - - assert randint_mock.call_count == 7 - assert randint_mock.mock_calls == [mock.call(0, 1000)] * 7 - - assert sleep_mock.call_count == 6 - sleep_mock.assert_any_call(1.875) - sleep_mock.assert_any_call(2.0) - sleep_mock.assert_any_call(4.375) - sleep_mock.assert_any_call(8.5) - sleep_mock.assert_any_call(16.5) - sleep_mock.assert_any_call(32.25) diff --git a/tests/resumable_media/unit/test__download.py b/tests/resumable_media/unit/test__download.py index c6a383db5..54559e45e 100644 --- a/tests/resumable_media/unit/test__download.py +++ b/tests/resumable_media/unit/test__download.py @@ -19,8 +19,8 @@ import pytest # type: ignore from google.cloud.storage._media import _download -from google.cloud.storage._media import common from google.cloud.storage.exceptions import InvalidResponse +from google.cloud.storage.retry import DEFAULT_RETRY EXAMPLE_URL = ( @@ -748,8 +748,4 @@ def _fix_up_virtual(download): def _check_retry_strategy(download): - retry_strategy = download._retry_strategy - assert isinstance(retry_strategy, common.RetryStrategy) - assert retry_strategy.max_sleep == common.MAX_SLEEP - assert retry_strategy.max_cumulative_retry == common.MAX_CUMULATIVE_RETRY - assert retry_strategy.max_retries is None + assert download._retry_strategy == DEFAULT_RETRY diff --git a/tests/resumable_media/unit/test__helpers.py b/tests/resumable_media/unit/test__helpers.py index b4e9f4972..2f7ae0f72 100644 --- a/tests/resumable_media/unit/test__helpers.py +++ b/tests/resumable_media/unit/test__helpers.py @@ -21,7 +21,7 @@ import pytest # type: ignore from google.cloud.storage._media import _helpers -from google.cloud.storage._media import common +from google.cloud.storage.retry import _RETRYABLE_STATUS_CODES from google.cloud.storage.exceptions import InvalidResponse import google_crc32c @@ -130,7 +130,7 @@ def test_failure_with_callback(self): def test_retryable_failure_without_callback(self): status_codes = (http.client.OK,) retryable_responses = [ - _make_response(status_code) for status_code in common.RETRYABLE + _make_response(status_code) for status_code in _RETRYABLE_STATUS_CODES ] callback = mock.Mock(spec=[]) for retryable_response in retryable_responses: @@ -150,40 +150,6 @@ def test_retryable_failure_without_callback(self): callback.assert_not_called() -class Test_calculate_retry_wait(object): - @mock.patch("random.randint", return_value=125) - def test_past_limit(self, randint_mock): - base_wait, wait_time = _helpers.calculate_retry_wait(70.0, 64.0) - - assert base_wait == 64.0 - assert wait_time == 64.125 - randint_mock.assert_called_once_with(0, 1000) - - @mock.patch("random.randint", return_value=250) - def test_at_limit(self, randint_mock): - base_wait, wait_time = _helpers.calculate_retry_wait(50.0, 50.0) - - assert base_wait == 50.0 - assert wait_time == 50.25 - randint_mock.assert_called_once_with(0, 1000) - - @mock.patch("random.randint", return_value=875) - def test_under_limit(self, randint_mock): - base_wait, wait_time = _helpers.calculate_retry_wait(16.0, 33.0) - - assert base_wait == 32.0 - assert wait_time == 32.875 - randint_mock.assert_called_once_with(0, 1000) - - @mock.patch("random.randint", return_value=875) - def test_custom_multiplier(self, randint_mock): - base_wait, wait_time = _helpers.calculate_retry_wait(16.0, 64.0, 3) - - assert base_wait == 48.0 - assert wait_time == 48.875 - randint_mock.assert_called_once_with(0, 1000) - - def _make_response(status_code): return mock.Mock(status_code=status_code, spec=["status_code"]) diff --git a/tests/resumable_media/unit/test__upload.py b/tests/resumable_media/unit/test__upload.py index 438335d44..faabc0f56 100644 --- a/tests/resumable_media/unit/test__upload.py +++ b/tests/resumable_media/unit/test__upload.py @@ -22,9 +22,9 @@ from google.cloud.storage._media import _helpers from google.cloud.storage._media import _upload -from google.cloud.storage._media import common from google.cloud.storage.exceptions import InvalidResponse from google.cloud.storage.exceptions import DataCorruption +from google.cloud.storage.retry import DEFAULT_RETRY URL_PREFIX = "https://www.googleapis.com/upload/storage/v1/b/{BUCKET}/o" @@ -1573,8 +1573,4 @@ def _fix_up_virtual(upload): def _check_retry_strategy(upload): - retry_strategy = upload._retry_strategy - assert isinstance(retry_strategy, common.RetryStrategy) - assert retry_strategy.max_sleep == common.MAX_SLEEP - assert retry_strategy.max_cumulative_retry == common.MAX_CUMULATIVE_RETRY - assert retry_strategy.max_retries is None + assert upload._retry_strategy == DEFAULT_RETRY diff --git a/tests/resumable_media/unit/test_common.py b/tests/resumable_media/unit/test_common.py deleted file mode 100644 index ec3fbefec..000000000 --- a/tests/resumable_media/unit/test_common.py +++ /dev/null @@ -1,86 +0,0 @@ -# Copyright 2017 Google Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from unittest import mock -import pytest # type: ignore - -from google.cloud.storage._media import common -from google.cloud.storage.exceptions import InvalidResponse - - -class TestInvalidResponse(object): - def test_constructor(self): - response = mock.sentinel.response - error = InvalidResponse(response, 1, "a", [b"m"], True) - - assert error.response is response - assert error.args == (1, "a", [b"m"], True) - - -class TestRetryStrategy(object): - def test_constructor_defaults(self): - retry_strategy = common.RetryStrategy() - assert retry_strategy.max_sleep == common.MAX_SLEEP - assert retry_strategy.max_cumulative_retry == common.MAX_CUMULATIVE_RETRY - assert retry_strategy.max_retries is None - - def test_constructor_failure(self): - with pytest.raises(ValueError) as exc_info: - common.RetryStrategy(max_cumulative_retry=600.0, max_retries=12) - - exc_info.match(common._SLEEP_RETRY_ERROR_MSG) - - def test_constructor_custom_delay_and_multiplier(self): - retry_strategy = common.RetryStrategy(initial_delay=3.0, multiplier=4) - assert retry_strategy.max_sleep == common.MAX_SLEEP - assert retry_strategy.max_cumulative_retry == common.MAX_CUMULATIVE_RETRY - assert retry_strategy.max_retries is None - assert retry_strategy.initial_delay == 3.0 - assert retry_strategy.multiplier == 4 - - def test_constructor_explicit_bound_cumulative(self): - max_sleep = 10.0 - max_cumulative_retry = 100.0 - retry_strategy = common.RetryStrategy( - max_sleep=max_sleep, max_cumulative_retry=max_cumulative_retry - ) - - assert retry_strategy.max_sleep == max_sleep - assert retry_strategy.max_cumulative_retry == max_cumulative_retry - assert retry_strategy.max_retries is None - - def test_constructor_explicit_bound_retries(self): - max_sleep = 13.75 - max_retries = 14 - retry_strategy = common.RetryStrategy( - max_sleep=max_sleep, max_retries=max_retries - ) - - assert retry_strategy.max_sleep == max_sleep - assert retry_strategy.max_cumulative_retry is None - assert retry_strategy.max_retries == max_retries - - def test_retry_allowed_bound_cumulative(self): - retry_strategy = common.RetryStrategy(max_cumulative_retry=100.0) - assert retry_strategy.retry_allowed(50.0, 10) - assert retry_strategy.retry_allowed(99.0, 7) - assert retry_strategy.retry_allowed(100.0, 4) - assert not retry_strategy.retry_allowed(101.0, 11) - assert not retry_strategy.retry_allowed(200.0, 6) - - def test_retry_allowed_bound_retries(self): - retry_strategy = common.RetryStrategy(max_retries=6) - assert retry_strategy.retry_allowed(1000.0, 5) - assert retry_strategy.retry_allowed(99.0, 6) - assert not retry_strategy.retry_allowed(625.5, 7) diff --git a/tests/unit/test__helpers.py b/tests/unit/test__helpers.py index eb796adef..224f4841b 100644 --- a/tests/unit/test__helpers.py +++ b/tests/unit/test__helpers.py @@ -716,19 +716,6 @@ def test_hostname_and_scheme(self): self.assertEqual(self._call_fut(host=HOST, scheme=SCHEME), EXPECTED_URL) -class Test__api_core_retry_to_resumable_media_retry(unittest.TestCase): - def test_retry(self): - from google.cloud.storage._helpers import ( - _api_core_retry_to_resumable_media_retry, - ) - - retry_strategy = _api_core_retry_to_resumable_media_retry(retry=DEFAULT_RETRY) - self.assertEqual(retry_strategy.max_sleep, DEFAULT_RETRY._maximum) - self.assertEqual(retry_strategy.max_cumulative_retry, DEFAULT_RETRY._deadline) - self.assertEqual(retry_strategy.initial_delay, DEFAULT_RETRY._initial) - self.assertEqual(retry_strategy.multiplier, DEFAULT_RETRY._multiplier) - - class _MD5Hash(object): def __init__(self, digest_val): self.digest_val = digest_val diff --git a/tests/unit/test_blob.py b/tests/unit/test_blob.py index 74efcfca9..7616c2dc1 100644 --- a/tests/unit/test_blob.py +++ b/tests/unit/test_blob.py @@ -1251,6 +1251,8 @@ def _do_download_helper_wo_chunks( extra_kwargs.update(timeout_kwarg) + retry = extra_kwargs.get("retry", DEFAULT_RETRY) + with patch as patched: if w_range: blob._do_download( @@ -1281,6 +1283,7 @@ def _do_download_helper_wo_chunks( start=1, end=3, checksum="auto", + retry=retry, ) else: patched.assert_called_once_with( @@ -1290,19 +1293,13 @@ def _do_download_helper_wo_chunks( start=None, end=None, checksum="auto", + retry=retry, ) patched.return_value.consume.assert_called_once_with( transport, timeout=expected_timeout ) - retry_strategy = patched.return_value._retry_strategy - retry = extra_kwargs.get("retry", None) - if retry is None: - self.assertEqual(retry_strategy.max_retries, 0) - else: - self.assertEqual(retry_strategy.max_sleep, retry._maximum) - def test__do_download_wo_chunks_wo_range_wo_raw(self): self._do_download_helper_wo_chunks(w_range=False, raw_download=False) @@ -1414,11 +1411,23 @@ def side_effect(*args, **kwargs): if w_range: patched.assert_called_once_with( - download_url, chunk_size, file_obj, headers=headers, start=1, end=3 + download_url, + chunk_size, + file_obj, + headers=headers, + start=1, + end=3, + retry=DEFAULT_RETRY, ) else: patched.assert_called_once_with( - download_url, chunk_size, file_obj, headers=headers, start=0, end=None + download_url, + chunk_size, + file_obj, + headers=headers, + start=0, + end=None, + retry=DEFAULT_RETRY, ) download.consume_next_chunk.assert_called_once_with( transport, timeout=expected_timeout @@ -2793,12 +2802,7 @@ def _initiate_resumable_helper( self.assertEqual(upload._content_type, content_type) self.assertEqual(upload.resumable_url, resumable_url) retry_strategy = upload._retry_strategy - if retry is None: - self.assertEqual(retry_strategy.max_retries, 0) - else: - self.assertEqual(retry_strategy.max_sleep, 60.0) - self.assertEqual(retry_strategy.max_cumulative_retry, 120.0) - self.assertIsNone(retry_strategy.max_retries) + self.assertEqual(retry_strategy, retry) self.assertIs(client._http, transport) # Make sure we never read from the stream. self.assertEqual(stream.tell(), 0) diff --git a/tests/unit/test_exceptions.py b/tests/unit/test_exceptions.py index 3e718d87e..beaa775bc 100644 --- a/tests/unit/test_exceptions.py +++ b/tests/unit/test_exceptions.py @@ -14,6 +14,7 @@ from importlib import reload from unittest.mock import Mock +from unittest.mock import sentinel import sys @@ -69,3 +70,13 @@ def __init__(self, response, *args): finally: del sys.modules["google.resumable_media"] reload(exceptions) + + +def test_InvalidResponse(): + from google.cloud.storage import exceptions + + response = sentinel.response + error = exceptions.InvalidResponse(response, 1, "a", [b"m"], True) + + assert error.response is response + assert error.args == (1, "a", [b"m"], True) diff --git a/tests/unit/test_retry.py b/tests/unit/test_retry.py index 8ebe405d3..04581c06c 100644 --- a/tests/unit/test_retry.py +++ b/tests/unit/test_retry.py @@ -15,6 +15,7 @@ import unittest from google.cloud.storage import _helpers +from google.cloud.storage.exceptions import InvalidResponse import mock @@ -38,7 +39,12 @@ def test_w_retryable_types(self): from google.cloud.storage import retry for exc_type in retry._RETRYABLE_TYPES: - exc = exc_type("testing") + # Some of the types need one positional argument, some two. + # The easiest way to accommodate both is just to use a try/except. + try: + exc = exc_type("testing") + except TypeError: + exc = exc_type("testing", "testing") self.assertTrue(self._call_fut(exc)) def test_w_google_api_call_error_hit(self): @@ -55,6 +61,18 @@ def test_w_google_api_call_error_miss(self): exc.code = 999 self.assertFalse(self._call_fut(exc)) + def test_w_InvalidResponse_hit(self): + response = mock.Mock() + response.status_code = 408 + exc = InvalidResponse(response, "testing") + self.assertTrue(self._call_fut(exc)) + + def test_w_InvalidResponse_miss(self): + response = mock.Mock() + response.status_code = 999 + exc = InvalidResponse(response, "testing") + self.assertFalse(self._call_fut(exc)) + def test_w_stdlib_error_miss(self): exc = ValueError("testing") self.assertFalse(self._call_fut(exc)) diff --git a/tests/unit/test_transfer_manager.py b/tests/unit/test_transfer_manager.py index 7b562786c..37f1ab92d 100644 --- a/tests/unit/test_transfer_manager.py +++ b/tests/unit/test_transfer_manager.py @@ -17,6 +17,7 @@ from google.cloud.storage import Blob from google.cloud.storage import Client from google.cloud.storage import transfer_manager +from google.cloud.storage.retry import DEFAULT_RETRY from google.api_core import exceptions @@ -782,10 +783,6 @@ def test_upload_chunks_concurrently(): container_mock.register_part.assert_any_call(2, ETAG) container_mock.finalize.assert_called_once_with(bucket.client._http) - assert container_mock._retry_strategy.max_sleep == 60.0 - assert container_mock._retry_strategy.max_cumulative_retry == 120.0 - assert container_mock._retry_strategy.max_retries is None - part_mock.upload.assert_called_with(transport) @@ -879,7 +876,6 @@ def test_upload_chunks_concurrently_passes_concurrency_options(): # Conveniently, that gives us a chance to test the auto-delete # exception handling feature. container_mock.cancel.assert_called_once_with(transport) - assert container_mock._retry_strategy.max_retries == 0 pool_patch.assert_called_with(max_workers=MAX_WORKERS) wait_patch.assert_called_with(mock.ANY, timeout=DEADLINE, return_when=mock.ANY) @@ -974,7 +970,7 @@ def test_upload_chunks_concurrently_with_metadata_and_encryption(): **custom_headers, } container_cls_mock.assert_called_once_with( - URL, FILENAME, headers=expected_headers + URL, FILENAME, headers=expected_headers, retry=DEFAULT_RETRY ) container_mock.initiate.assert_called_once_with( transport=transport, content_type=blob.content_type @@ -1121,9 +1117,6 @@ def test__upload_part(): retry=DEFAULT_RETRY, ) part.upload.assert_called_once() - assert part._retry_strategy.max_sleep == 60.0 - assert part._retry_strategy.max_cumulative_retry == 120.0 - assert part._retry_strategy.max_retries is None assert result == (1, ETAG)