Skip to content

Commit

Permalink
[Fix]Add retry for streaming download (#18164)
Browse files Browse the repository at this point in the history
* [Fix]Add retry for streaming download

* [Fix]Add retry for streaming download

* Update _download.py

* Update _download_async.py
  • Loading branch information
xiafu-msft authored Apr 20, 2021
1 parent f746a2a commit 7e74cb3
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 126 deletions.
161 changes: 92 additions & 69 deletions sdk/storage/azure-storage-blob/azure/storage/blob/_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@

import sys
import threading
import time

import warnings
from io import BytesIO

from typing import Iterator
from azure.core.exceptions import HttpResponseError

import requests
from azure.core.exceptions import HttpResponseError, ServiceResponseError

from azure.core.tracing.common import with_current_context
from ._shared.encryption import decrypt_blob
from ._shared.request_handlers import validate_and_format_range_headers
Expand Down Expand Up @@ -44,10 +48,9 @@ def process_range_and_offset(start_range, end_range, length, encryption):
def process_content(data, start_offset, end_offset, encryption):
if data is None:
raise ValueError("Response cannot be None.")
try:
content = b"".join(list(data))
except Exception as error:
raise HttpResponseError(message="Download stream interrupted.", response=data.response, error=error)

content = b"".join(list(data))

if content and encryption.get("key") is not None or encryption.get("resolver") is not None:
try:
return decrypt_blob(
Expand Down Expand Up @@ -189,19 +192,29 @@ def _download_chunk(self, chunk_start, chunk_end):
check_content_md5=self.validate_content
)

try:
_, response = self.client.download(
range=range_header,
range_get_content_md5=range_validation,
validate_content=self.validate_content,
data_stream_total=self.total_size,
download_stream_current=self.progress_total,
**self.request_options
)
except HttpResponseError as error:
process_storage_error(error)
retry_active = True
retry_total = 3
while retry_active:
try:
_, response = self.client.download(
range=range_header,
range_get_content_md5=range_validation,
validate_content=self.validate_content,
data_stream_total=self.total_size,
download_stream_current=self.progress_total,
**self.request_options
)
except HttpResponseError as error:
process_storage_error(error)

chunk_data = process_content(response, offset[0], offset[1], self.encryption_options)
try:
chunk_data = process_content(response, offset[0], offset[1], self.encryption_options)
retry_active = False
except (requests.exceptions.ChunkedEncodingError, requests.exceptions.ConnectionError) as error:
retry_total -= 1
if retry_total <= 0:
raise ServiceResponseError(error, error=error)
time.sleep(1)

# This makes sure that if_match is set so that we can validate
# that subsequent downloads are to an unmodified blob
Expand Down Expand Up @@ -354,16 +367,6 @@ def __init__(
# TODO: Set to the stored MD5 when the service returns this
self.properties.content_md5 = None

if self.size == 0:
self._current_content = b""
else:
self._current_content = process_content(
self._response,
self._initial_offset[0],
self._initial_offset[1],
self._encryption_options
)

def __len__(self):
return self.size

Expand All @@ -376,51 +379,71 @@ def _initial_request(self):
check_content_md5=self._validate_content
)

try:
location_mode, response = self._clients.blob.download(
range=range_header,
range_get_content_md5=range_validation,
validate_content=self._validate_content,
data_stream_total=None,
download_stream_current=0,
**self._request_options
)
retry_active = True
retry_total = 3
while retry_active:
try:
location_mode, response = self._clients.blob.download(
range=range_header,
range_get_content_md5=range_validation,
validate_content=self._validate_content,
data_stream_total=None,
download_stream_current=0,
**self._request_options
)

# Check the location we read from to ensure we use the same one
# for subsequent requests.
self._location_mode = location_mode
# Check the location we read from to ensure we use the same one
# for subsequent requests.
self._location_mode = location_mode

# Parse the total file size and adjust the download size if ranges
# were specified
self._file_size = parse_length_from_content_range(response.properties.content_range)
if self._end_range is not None:
# Use the end range index unless it is over the end of the file
self.size = min(self._file_size, self._end_range - self._start_range + 1)
elif self._start_range is not None:
self.size = self._file_size - self._start_range
else:
self.size = self._file_size

# Parse the total file size and adjust the download size if ranges
# were specified
self._file_size = parse_length_from_content_range(response.properties.content_range)
if self._end_range is not None:
# Use the end range index unless it is over the end of the file
self.size = min(self._file_size, self._end_range - self._start_range + 1)
elif self._start_range is not None:
self.size = self._file_size - self._start_range
else:
self.size = self._file_size

except HttpResponseError as error:
if self._start_range is None and error.response.status_code == 416:
# Get range will fail on an empty file. If the user did not
# request a range, do a regular get request in order to get
# any properties.
try:
_, response = self._clients.blob.download(
validate_content=self._validate_content,
data_stream_total=0,
download_stream_current=0,
**self._request_options
)
except HttpResponseError as error:
except HttpResponseError as error:
if self._start_range is None and error.response.status_code == 416:
# Get range will fail on an empty file. If the user did not
# request a range, do a regular get request in order to get
# any properties.
try:
_, response = self._clients.blob.download(
validate_content=self._validate_content,
data_stream_total=0,
download_stream_current=0,
**self._request_options
)
except HttpResponseError as error:
process_storage_error(error)

# Set the download size to empty
self.size = 0
self._file_size = 0
else:
process_storage_error(error)

# Set the download size to empty
self.size = 0
self._file_size = 0
else:
process_storage_error(error)
try:
if self.size == 0:
self._current_content = b""
else:
self._current_content = process_content(
response,
self._initial_offset[0],
self._initial_offset[1],
self._encryption_options
)
retry_active = False
except (requests.exceptions.ChunkedEncodingError, requests.exceptions.ConnectionError) as error:
retry_total -= 1
if retry_total <= 0:
raise ServiceResponseError(error, error=error)
time.sleep(1)

# get page ranges to optimize downloading sparse page blob
if response.properties.blob_type == 'PageBlob':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
from io import BytesIO
from itertools import islice
import warnings

from typing import AsyncIterator
from azure.core.exceptions import HttpResponseError

from aiohttp import ClientPayloadError
from azure.core.exceptions import HttpResponseError, ServiceResponseError
from .._shared.encryption import decrypt_blob
from .._shared.request_handlers import validate_and_format_range_headers
from .._shared.response_handlers import process_storage_error, parse_length_from_content_range
Expand All @@ -22,10 +23,7 @@
async def process_content(data, start_offset, end_offset, encryption):
if data is None:
raise ValueError("Response cannot be None.")
try:
content = data.response.body()
except Exception as error:
raise HttpResponseError(message="Download stream interrupted.", response=data.response, error=error)
content = data.response.body()
if encryption.get('key') is not None or encryption.get('resolver') is not None:
try:
return decrypt_blob(
Expand Down Expand Up @@ -91,20 +89,31 @@ async def _download_chunk(self, chunk_start, chunk_end):
download_range[1],
check_content_md5=self.validate_content
)
try:
_, response = await self.client.download(
range=range_header,
range_get_content_md5=range_validation,
validate_content=self.validate_content,
data_stream_total=self.total_size,
download_stream_current=self.progress_total,
**self.request_options
)
except HttpResponseError as error:
process_storage_error(error)
retry_active = True
retry_total = 3
while retry_active:
try:
_, response = await self.client.download(
range=range_header,
range_get_content_md5=range_validation,
validate_content=self.validate_content,
data_stream_total=self.total_size,
download_stream_current=self.progress_total,
**self.request_options
)
retry_active = False

except HttpResponseError as error:
process_storage_error(error)
except ClientPayloadError as error:
retry_total -= 1
if retry_total <= 0:
raise ServiceResponseError(error, error=error)
await asyncio.sleep(1)

chunk_data = await process_content(response, offset[0], offset[1], self.encryption_options)


# This makes sure that if_match is set so that we can validate
# that subsequent downloads are to an unmodified blob
if self.request_options.get('modified_access_conditions'):
Expand Down Expand Up @@ -277,49 +286,60 @@ async def _initial_request(self):
end_range_required=False,
check_content_md5=self._validate_content)

try:
location_mode, response = await self._clients.blob.download(
range=range_header,
range_get_content_md5=range_validation,
validate_content=self._validate_content,
data_stream_total=None,
download_stream_current=0,
**self._request_options)

# Check the location we read from to ensure we use the same one
# for subsequent requests.
self._location_mode = location_mode

# Parse the total file size and adjust the download size if ranges
# were specified
self._file_size = parse_length_from_content_range(response.properties.content_range)
if self._end_range is not None:
# Use the length unless it is over the end of the file
self.size = min(self._file_size, self._end_range - self._start_range + 1)
elif self._start_range is not None:
self.size = self._file_size - self._start_range
else:
self.size = self._file_size
retry_active = True
retry_total = 3
while retry_active:
try:
location_mode, response = await self._clients.blob.download(
range=range_header,
range_get_content_md5=range_validation,
validate_content=self._validate_content,
data_stream_total=None,
download_stream_current=0,
**self._request_options)

# Check the location we read from to ensure we use the same one
# for subsequent requests.
self._location_mode = location_mode

# Parse the total file size and adjust the download size if ranges
# were specified
self._file_size = parse_length_from_content_range(response.properties.content_range)
if self._end_range is not None:
# Use the length unless it is over the end of the file
self.size = min(self._file_size, self._end_range - self._start_range + 1)
elif self._start_range is not None:
self.size = self._file_size - self._start_range
else:
self.size = self._file_size
retry_active = False

except HttpResponseError as error:
if self._start_range is None and error.response.status_code == 416:
# Get range will fail on an empty file. If the user did not
# request a range, do a regular get request in order to get
# any properties.
try:
_, response = await self._clients.blob.download(
validate_content=self._validate_content,
data_stream_total=0,
download_stream_current=0,
**self._request_options)
except HttpResponseError as error:
except HttpResponseError as error:
if self._start_range is None and error.response.status_code == 416:
# Get range will fail on an empty file. If the user did not
# request a range, do a regular get request in order to get
# any properties.
try:
_, response = await self._clients.blob.download(
validate_content=self._validate_content,
data_stream_total=0,
download_stream_current=0,
**self._request_options)
retry_active = False
except HttpResponseError as error:
process_storage_error(error)

# Set the download size to empty
self.size = 0
self._file_size = 0
else:
process_storage_error(error)

# Set the download size to empty
self.size = 0
self._file_size = 0
else:
process_storage_error(error)
except ClientPayloadError as error:
retry_total -= 1
if retry_total <= 0:
raise ServiceResponseError(error, error=error)
await asyncio.sleep(1)

# get page ranges to optimize downloading sparse page blob
if response.properties.blob_type == 'PageBlob':
Expand Down

0 comments on commit 7e74cb3

Please sign in to comment.