diff --git a/sdk/storage/azure-storage-blob/azure/storage/blob/_download.py b/sdk/storage/azure-storage-blob/azure/storage/blob/_download.py index 9ccc79601baf..d17a211dd380 100644 --- a/sdk/storage/azure-storage-blob/azure/storage/blob/_download.py +++ b/sdk/storage/azure-storage-blob/azure/storage/blob/_download.py @@ -6,11 +6,15 @@ import sys import threading +import time + import warnings from io import BytesIO - from typing import Iterator -from azure.core.exceptions import HttpResponseError + +import requests +from azure.core.exceptions import HttpResponseError, ServiceResponseError + from azure.core.tracing.common import with_current_context from ._shared.encryption import decrypt_blob from ._shared.request_handlers import validate_and_format_range_headers @@ -44,10 +48,9 @@ def process_range_and_offset(start_range, end_range, length, encryption): def process_content(data, start_offset, end_offset, encryption): if data is None: raise ValueError("Response cannot be None.") - try: - content = b"".join(list(data)) - except Exception as error: - raise HttpResponseError(message="Download stream interrupted.", response=data.response, error=error) + + content = b"".join(list(data)) + if content and encryption.get("key") is not None or encryption.get("resolver") is not None: try: return decrypt_blob( @@ -189,19 +192,29 @@ def _download_chunk(self, chunk_start, chunk_end): check_content_md5=self.validate_content ) - try: - _, response = self.client.download( - range=range_header, - range_get_content_md5=range_validation, - validate_content=self.validate_content, - data_stream_total=self.total_size, - download_stream_current=self.progress_total, - **self.request_options - ) - except HttpResponseError as error: - process_storage_error(error) + retry_active = True + retry_total = 3 + while retry_active: + try: + _, response = self.client.download( + range=range_header, + range_get_content_md5=range_validation, + validate_content=self.validate_content, + data_stream_total=self.total_size, + download_stream_current=self.progress_total, + **self.request_options + ) + except HttpResponseError as error: + process_storage_error(error) - chunk_data = process_content(response, offset[0], offset[1], self.encryption_options) + try: + chunk_data = process_content(response, offset[0], offset[1], self.encryption_options) + retry_active = False + except (requests.exceptions.ChunkedEncodingError, requests.exceptions.ConnectionError) as error: + retry_total -= 1 + if retry_total <= 0: + raise ServiceResponseError(error, error=error) + time.sleep(1) # This makes sure that if_match is set so that we can validate # that subsequent downloads are to an unmodified blob @@ -354,16 +367,6 @@ def __init__( # TODO: Set to the stored MD5 when the service returns this self.properties.content_md5 = None - if self.size == 0: - self._current_content = b"" - else: - self._current_content = process_content( - self._response, - self._initial_offset[0], - self._initial_offset[1], - self._encryption_options - ) - def __len__(self): return self.size @@ -376,51 +379,71 @@ def _initial_request(self): check_content_md5=self._validate_content ) - try: - location_mode, response = self._clients.blob.download( - range=range_header, - range_get_content_md5=range_validation, - validate_content=self._validate_content, - data_stream_total=None, - download_stream_current=0, - **self._request_options - ) + retry_active = True + retry_total = 3 + while retry_active: + try: + location_mode, response = self._clients.blob.download( + range=range_header, + range_get_content_md5=range_validation, + validate_content=self._validate_content, + data_stream_total=None, + download_stream_current=0, + **self._request_options + ) - # Check the location we read from to ensure we use the same one - # for subsequent requests. - self._location_mode = location_mode + # Check the location we read from to ensure we use the same one + # for subsequent requests. + self._location_mode = location_mode + + # Parse the total file size and adjust the download size if ranges + # were specified + self._file_size = parse_length_from_content_range(response.properties.content_range) + if self._end_range is not None: + # Use the end range index unless it is over the end of the file + self.size = min(self._file_size, self._end_range - self._start_range + 1) + elif self._start_range is not None: + self.size = self._file_size - self._start_range + else: + self.size = self._file_size - # Parse the total file size and adjust the download size if ranges - # were specified - self._file_size = parse_length_from_content_range(response.properties.content_range) - if self._end_range is not None: - # Use the end range index unless it is over the end of the file - self.size = min(self._file_size, self._end_range - self._start_range + 1) - elif self._start_range is not None: - self.size = self._file_size - self._start_range - else: - self.size = self._file_size - - except HttpResponseError as error: - if self._start_range is None and error.response.status_code == 416: - # Get range will fail on an empty file. If the user did not - # request a range, do a regular get request in order to get - # any properties. - try: - _, response = self._clients.blob.download( - validate_content=self._validate_content, - data_stream_total=0, - download_stream_current=0, - **self._request_options - ) - except HttpResponseError as error: + except HttpResponseError as error: + if self._start_range is None and error.response.status_code == 416: + # Get range will fail on an empty file. If the user did not + # request a range, do a regular get request in order to get + # any properties. + try: + _, response = self._clients.blob.download( + validate_content=self._validate_content, + data_stream_total=0, + download_stream_current=0, + **self._request_options + ) + except HttpResponseError as error: + process_storage_error(error) + + # Set the download size to empty + self.size = 0 + self._file_size = 0 + else: process_storage_error(error) - # Set the download size to empty - self.size = 0 - self._file_size = 0 - else: - process_storage_error(error) + try: + if self.size == 0: + self._current_content = b"" + else: + self._current_content = process_content( + response, + self._initial_offset[0], + self._initial_offset[1], + self._encryption_options + ) + retry_active = False + except (requests.exceptions.ChunkedEncodingError, requests.exceptions.ConnectionError) as error: + retry_total -= 1 + if retry_total <= 0: + raise ServiceResponseError(error, error=error) + time.sleep(1) # get page ranges to optimize downloading sparse page blob if response.properties.blob_type == 'PageBlob': diff --git a/sdk/storage/azure-storage-blob/azure/storage/blob/aio/_download_async.py b/sdk/storage/azure-storage-blob/azure/storage/blob/aio/_download_async.py index 1ad8dc3181c1..1f0530955b1b 100644 --- a/sdk/storage/azure-storage-blob/azure/storage/blob/aio/_download_async.py +++ b/sdk/storage/azure-storage-blob/azure/storage/blob/aio/_download_async.py @@ -10,9 +10,10 @@ from io import BytesIO from itertools import islice import warnings - from typing import AsyncIterator -from azure.core.exceptions import HttpResponseError + +from aiohttp import ClientPayloadError +from azure.core.exceptions import HttpResponseError, ServiceResponseError from .._shared.encryption import decrypt_blob from .._shared.request_handlers import validate_and_format_range_headers from .._shared.response_handlers import process_storage_error, parse_length_from_content_range @@ -22,10 +23,7 @@ async def process_content(data, start_offset, end_offset, encryption): if data is None: raise ValueError("Response cannot be None.") - try: - content = data.response.body() - except Exception as error: - raise HttpResponseError(message="Download stream interrupted.", response=data.response, error=error) + content = data.response.body() if encryption.get('key') is not None or encryption.get('resolver') is not None: try: return decrypt_blob( @@ -91,20 +89,31 @@ async def _download_chunk(self, chunk_start, chunk_end): download_range[1], check_content_md5=self.validate_content ) - try: - _, response = await self.client.download( - range=range_header, - range_get_content_md5=range_validation, - validate_content=self.validate_content, - data_stream_total=self.total_size, - download_stream_current=self.progress_total, - **self.request_options - ) - except HttpResponseError as error: - process_storage_error(error) + retry_active = True + retry_total = 3 + while retry_active: + try: + _, response = await self.client.download( + range=range_header, + range_get_content_md5=range_validation, + validate_content=self.validate_content, + data_stream_total=self.total_size, + download_stream_current=self.progress_total, + **self.request_options + ) + retry_active = False + + except HttpResponseError as error: + process_storage_error(error) + except ClientPayloadError as error: + retry_total -= 1 + if retry_total <= 0: + raise ServiceResponseError(error, error=error) + await asyncio.sleep(1) chunk_data = await process_content(response, offset[0], offset[1], self.encryption_options) + # This makes sure that if_match is set so that we can validate # that subsequent downloads are to an unmodified blob if self.request_options.get('modified_access_conditions'): @@ -277,49 +286,60 @@ async def _initial_request(self): end_range_required=False, check_content_md5=self._validate_content) - try: - location_mode, response = await self._clients.blob.download( - range=range_header, - range_get_content_md5=range_validation, - validate_content=self._validate_content, - data_stream_total=None, - download_stream_current=0, - **self._request_options) - - # Check the location we read from to ensure we use the same one - # for subsequent requests. - self._location_mode = location_mode - - # Parse the total file size and adjust the download size if ranges - # were specified - self._file_size = parse_length_from_content_range(response.properties.content_range) - if self._end_range is not None: - # Use the length unless it is over the end of the file - self.size = min(self._file_size, self._end_range - self._start_range + 1) - elif self._start_range is not None: - self.size = self._file_size - self._start_range - else: - self.size = self._file_size + retry_active = True + retry_total = 3 + while retry_active: + try: + location_mode, response = await self._clients.blob.download( + range=range_header, + range_get_content_md5=range_validation, + validate_content=self._validate_content, + data_stream_total=None, + download_stream_current=0, + **self._request_options) + + # Check the location we read from to ensure we use the same one + # for subsequent requests. + self._location_mode = location_mode + + # Parse the total file size and adjust the download size if ranges + # were specified + self._file_size = parse_length_from_content_range(response.properties.content_range) + if self._end_range is not None: + # Use the length unless it is over the end of the file + self.size = min(self._file_size, self._end_range - self._start_range + 1) + elif self._start_range is not None: + self.size = self._file_size - self._start_range + else: + self.size = self._file_size + retry_active = False - except HttpResponseError as error: - if self._start_range is None and error.response.status_code == 416: - # Get range will fail on an empty file. If the user did not - # request a range, do a regular get request in order to get - # any properties. - try: - _, response = await self._clients.blob.download( - validate_content=self._validate_content, - data_stream_total=0, - download_stream_current=0, - **self._request_options) - except HttpResponseError as error: + except HttpResponseError as error: + if self._start_range is None and error.response.status_code == 416: + # Get range will fail on an empty file. If the user did not + # request a range, do a regular get request in order to get + # any properties. + try: + _, response = await self._clients.blob.download( + validate_content=self._validate_content, + data_stream_total=0, + download_stream_current=0, + **self._request_options) + retry_active = False + except HttpResponseError as error: + process_storage_error(error) + + # Set the download size to empty + self.size = 0 + self._file_size = 0 + else: process_storage_error(error) - # Set the download size to empty - self.size = 0 - self._file_size = 0 - else: - process_storage_error(error) + except ClientPayloadError as error: + retry_total -= 1 + if retry_total <= 0: + raise ServiceResponseError(error, error=error) + await asyncio.sleep(1) # get page ranges to optimize downloading sparse page blob if response.properties.blob_type == 'PageBlob':