Skip to content

Commit

Permalink
Shutdown task executor before aborting multipart uploads
Browse files Browse the repository at this point in the history
There's a race condition in which multipart uploads can still be
kept alive because aborting a multipart uploads can happen while
existing tasks are still being executed.  We must shutdown the
executor before we can then abort multipart uploads.

This order was correct in the normal shutdown case, but for
the case where a user 'Ctrl-C's the process or where unexpected
exceptions propogate back to the S3 handler, the order was reversed.

In other words, the race condition exposed two possibilities:

Proper cleanup:

    t1 ---|start_part|-------------|end_part|---------------------
                                                                 |  join
    t2 ---|start_part|-------------|end_part|---------------------
                                                                 |  join
  main -----------------|ctrl-c|---------------|abort_upload|-----

Bad cleanup:

    t1 ---|start_part|--------------------------------|end_part|--
                                                                 |  join
    t2 ---|start_part|--------------------------------|end_part|--
                                                                 |  join
  main -----------------|ctrl-c|--|abort_upload|------------------

There's also a gap in test coverage here, so I've added a test case
for this case.
  • Loading branch information
jamesls committed Apr 11, 2016
1 parent 6836411 commit c449942
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 20 deletions.
21 changes: 14 additions & 7 deletions awscli/customizations/s3/s3handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,28 +101,35 @@ def call(self, files):
self.executor.print_thread.set_total_files(total_files)
self.executor.print_thread.set_total_parts(total_parts)
self.executor.initiate_shutdown()
self.executor.wait_until_shutdown()
self._shutdown()
self._finalize_shutdown()
except Exception as e:
LOGGER.debug('Exception caught during task execution: %s',
str(e), exc_info=True)
self.result_queue.put(PrintTask(message=str(e), error=True))
self.executor.initiate_shutdown(
priority=self.executor.IMMEDIATE_PRIORITY)
self._shutdown()
self.executor.wait_until_shutdown()
self._finalize_shutdown()
except KeyboardInterrupt:
self.result_queue.put(PrintTask(message=("Cleaning up. "
"Please wait..."),
error=True))
self.executor.initiate_shutdown(
priority=self.executor.IMMEDIATE_PRIORITY)
self._shutdown()
self.executor.wait_until_shutdown()
self._finalize_shutdown()
return CommandResult(self.executor.num_tasks_failed,
self.executor.num_tasks_warned)

def _shutdown(self):
def _finalize_shutdown(self):
# Run all remaining tasks needed to completely shutdown the
# S3 handler. This method will block until shutdown is complete.
# The order here is important. We need to wait until all the
# tasks have been completed before we can cleanup. Otherwise
# we can have race conditions where we're trying to cleanup
# uploads/downloads that are still in progress.
self.executor.wait_until_shutdown()
self._cleanup()

def _cleanup(self):
# And finally we need to make a pass through all the existing
# multipart uploads and abort any pending multipart uploads.
self._abort_pending_multipart_uploads()
Expand Down
50 changes: 37 additions & 13 deletions tests/integration/customizations/s3/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import shutil
import copy

import botocore.session
from botocore.exceptions import ClientError
from awscli.compat import six
from nose.plugins.attrib import attr

Expand Down Expand Up @@ -62,6 +60,19 @@ def aws(command, collect_memory=False, env_vars=None, wait_for_finish=True,
input_file=input_file)


def wait_for_process_exit(process, timeout=60):
deadline = time.time() + timeout
while time.time() < deadline:
rc = process.poll()
if rc is not None:
break
time.sleep(1)
else:
process.kill()
raise AssertionError("CLI did not exist within %s seconds of "
"receiving a Ctrl+C" % timeout)


def _running_on_rhel():
return (
hasattr(platform, 'linux_distribution') and
Expand Down Expand Up @@ -351,26 +362,39 @@ def test_download_ctrl_c_does_not_hang(self):
process = aws('s3 cp s3://%s/foo.txt %s' %
(bucket_name, local_foo_txt), wait_for_finish=False)
# Give it some time to start up and enter it's main task loop.
time.sleep(1)
time.sleep(2)
# The process has 60 seconds to finish after being sent a Ctrl+C,
# otherwise the test fails.
process.send_signal(signal.SIGINT)
deadline = time.time() + 60
while time.time() < deadline:
rc = process.poll()
if rc is not None:
break
time.sleep(1)
else:
process.kill()
self.fail("CLI did not exist within 30 seconds of "
"receiving a Ctrl+C")
wait_for_process_exit(process, timeout=60)
# A Ctrl+C should have a non-zero RC.
# We either caught the process in
# its main polling loop (rc=1), or it was successfully terminated by
# the SIGINT (rc=-2).
self.assertIn(process.returncode, [1, -2])

@attr('slow')
@skip_if_windows('SIGINT not supported on Windows.')
def test_cleans_up_aborted_uploads(self):
bucket_name = self.create_bucket()
foo_txt = self.files.create_file('foo.txt', '')
with open(foo_txt, 'wb') as f:
for i in range(20):
f.write(b'a' * 1024 * 1024)

process = aws('s3 cp %s s3://%s/' % (foo_txt, bucket_name),
wait_for_finish=False)
time.sleep(3)
# The process has 60 seconds to finish after being sent a Ctrl+C,
# otherwise the test fails.
process.send_signal(signal.SIGINT)
wait_for_process_exit(process, timeout=60)
uploads_after = self.client.list_multipart_uploads(
Bucket=bucket_name).get('Uploads', [])
self.assertEqual(uploads_after, [],
"Not all multipart uploads were properly "
"aborted after receiving Ctrl-C: %s" % uploads_after)

def test_cp_to_nonexistent_bucket(self):
foo_txt = self.files.create_file('foo.txt', 'this is foo.txt')
p = aws('s3 cp %s s3://noexist-bucket-foo-bar123/foo.txt' % (foo_txt,))
Expand Down

0 comments on commit c449942

Please sign in to comment.