Skip to content

Commit

Permalink
Fix S3 streaming download corruption
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesls committed Jan 12, 2015
1 parent 6afb7c5 commit ed9000b
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 4 deletions.
27 changes: 23 additions & 4 deletions awscli/customizations/s3/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,10 +401,31 @@ def _queue_writes(self, body):
self._part_number, self._filename.dest)
iterate_chunk_size = self.ITERATE_CHUNK_SIZE
body.set_socket_timeout(self.READ_TIMEOUT)
if self._filename.is_stream:
self._queue_writes_for_stream(body)
else:
self._queue_writes_in_chunks(body, iterate_chunk_size)

def _queue_writes_for_stream(self, body):
# We have to handle an output stream differently. The main reason is
# that we cannot seek() in the output stream. This means that we need
# to queue the writes in order. If we queue IO writes in smaller than
# part size chunks, on the case of a retry we'll need to do a range GET
# for only the remaining parts. The other alternative, which is what
# we do here, is to just request the entire chunk size write.
self._context.wait_for_turn(self._part_number)
chunk = body.read(self._chunk_size)
offset = self._part_number * self._chunk_size
LOGGER.debug("Submitting IORequest to write queue.")
self._io_queue.put(
IORequest(self._filename.dest, offset, chunk,
self._filename.is_stream)
)
self._context.done_with_turn()

def _queue_writes_in_chunks(self, body, iterate_chunk_size):
amount_read = 0
current = body.read(iterate_chunk_size)
if self._filename.is_stream:
self._context.wait_for_turn(self._part_number)
while current:
offset = self._part_number * self._chunk_size + amount_read
LOGGER.debug("Submitting IORequest to write queue.")
Expand All @@ -418,8 +439,6 @@ def _queue_writes(self, body):
# Change log message.
LOGGER.debug("Done queueing writes for part number %s to file: %s",
self._part_number, self._filename.dest)
if self._filename.is_stream:
self._context.done_with_turn()


class CreateMultipartUploadTask(BasicTask):
Expand Down
25 changes: 25 additions & 0 deletions tests/unit/customizations/s3/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from botocore.exceptions import IncompleteReadError

from awscli.customizations.s3 import constants
from awscli.customizations.s3.tasks import CreateLocalFileTask
from awscli.customizations.s3.tasks import CompleteDownloadTask
from awscli.customizations.s3.tasks import DownloadPartTask
Expand Down Expand Up @@ -395,6 +396,30 @@ def test_incomplete_read_is_retried(self):
self.assertEqual(DownloadPartTask.TOTAL_ATTEMPTS,
self.service.get_operation.call_count)

def test_retried_requests_dont_enqueue_writes_twice(self):
error_body = mock.Mock()
error_body.read.side_effect = socket.timeout
success_body = mock.Mock()
success_body.read.side_effect = [b'foobar', b'']

incomplete_read = (mock.Mock(), {'Body': error_body})
success_read = (mock.Mock(), {'Body': success_body})
self.service.get_operation.return_value.call.side_effect = [
# The first request results in an error when reading the request.
incomplete_read,
success_read,
]
self.filename.is_stream = True
task = DownloadPartTask(0, constants.CHUNKSIZE, self.result_queue,
self.service, self.filename, self.context,
self.io_queue)
task()
call_args_list = self.io_queue.put.call_args_list
self.assertEqual(len(call_args_list), 1)
self.assertEqual(call_args_list[0],
mock.call(('local/file', 0, b'foobar', True)))
success_body.read.assert_called_with(constants.CHUNKSIZE)


class TestMultipartDownloadContext(unittest.TestCase):
def setUp(self):
Expand Down

0 comments on commit ed9000b

Please sign in to comment.