From db24830b84022b5a2ff0c9f9e692ff7bdd1cbd18 Mon Sep 17 00:00:00 2001 From: James Saryerwinnie Date: Tue, 3 Feb 2015 12:40:50 -0800 Subject: [PATCH] Plumb through S3 runtime config This plumbs in the RuntimeConfig into the S3SubCommand, CommandArchitecture, and S3Handler classes. The S3Handler __init__ method changed by removing the multi_threshold and chunksize params as these are now specified via the newly added runtime_config param. Given the interface change, there were a several unit tests that needed updating. --- awscli/customizations/s3/executor.py | 3 + awscli/customizations/s3/s3handler.py | 20 +++-- awscli/customizations/s3/subcommands.py | 10 ++- awscli/customizations/s3/transferconfig.py | 9 -- .../customizations/s3/test_plugin.py | 8 +- tests/unit/customizations/s3/__init__.py | 13 +-- .../unit/customizations/s3/test_s3handler.py | 90 +++++++++++++++---- tests/unit/customizations/s3/test_tasks.py | 7 +- 8 files changed, 105 insertions(+), 55 deletions(-) diff --git a/awscli/customizations/s3/executor.py b/awscli/customizations/s3/executor.py index 94261821ed08..de74eb73deb9 100644 --- a/awscli/customizations/s3/executor.py +++ b/awscli/customizations/s3/executor.py @@ -44,6 +44,8 @@ class Executor(object): def __init__(self, num_threads, result_queue, quiet, only_show_errors, max_queue_size, write_queue): self._max_queue_size = max_queue_size + LOGGER.debug("Using max queue size for s3 tasks of: %s", + self._max_queue_size) self.queue = StablePriorityQueue(maxsize=self._max_queue_size, max_priority=20) self.num_threads = num_threads @@ -78,6 +80,7 @@ def start(self): # explicit about it rather than relying on the threads_list order. # See .join() for more info. self.print_thread.start() + LOGGER.debug("Using a threadpool size of: %s", self.num_threads) for i in range(self.num_threads): worker = Worker(queue=self.queue) worker.setDaemon(True) diff --git a/awscli/customizations/s3/s3handler.py b/awscli/customizations/s3/s3handler.py index 2f336dae0e6c..2e9420f45817 100644 --- a/awscli/customizations/s3/s3handler.py +++ b/awscli/customizations/s3/s3handler.py @@ -16,12 +16,11 @@ import os import sys -from awscli.customizations.s3.transferconfig import MULTI_THRESHOLD, \ - CHUNKSIZE, NUM_THREADS, MAX_QUEUE_SIZE from awscli.customizations.s3.utils import find_chunksize, \ operate, find_bucket_key, relative_path, PrintTask, create_warning from awscli.customizations.s3.executor import Executor from awscli.customizations.s3 import tasks +from awscli.customizations.s3.transferconfig import RuntimeConfig from awscli.compat import six from awscli.compat import queue @@ -42,12 +41,13 @@ class S3Handler(object): class pull tasks from to complete. """ MAX_IO_QUEUE_SIZE = 20 - MAX_EXECUTOR_QUEUE_SIZE = MAX_QUEUE_SIZE - EXECUTOR_NUM_THREADS = NUM_THREADS def __init__(self, session, params, result_queue=None, - multi_threshold=MULTI_THRESHOLD, chunksize=CHUNKSIZE): + runtime_config=None): self.session = session + if runtime_config is None: + runtime_config = RuntimeConfig.defaults() + self._runtime_config = runtime_config # The write_queue has potential for optimizations, so the constant # for maxsize is scoped to this class (as opposed to constants.py) # so we have the ability to change this value later. @@ -68,14 +68,16 @@ def __init__(self, session, params, result_queue=None, for key in self.params.keys(): if key in params: self.params[key] = params[key] - self.multi_threshold = multi_threshold - self.chunksize = chunksize + self.multi_threshold = self._runtime_config['multipart_threshold'] + self.chunksize = self._runtime_config['multipart_chunksize'] + LOGGER.debug("Using a multipart threshold of %s and a part size of %s", + self.multi_threshold, self.chunksize) self.executor = Executor( - num_threads=self.EXECUTOR_NUM_THREADS, + num_threads=self._runtime_config['max_concurrent_requests'], result_queue=self.result_queue, quiet=self.params['quiet'], only_show_errors=self.params['only_show_errors'], - max_queue_size=self.MAX_EXECUTOR_QUEUE_SIZE, + max_queue_size=self._runtime_config['max_queue_size'], write_queue=self.write_queue ) self._multipart_uploads = [] diff --git a/awscli/customizations/s3/subcommands.py b/awscli/customizations/s3/subcommands.py index eadc29d23af2..ef3a31e9299e 100644 --- a/awscli/customizations/s3/subcommands.py +++ b/awscli/customizations/s3/subcommands.py @@ -30,6 +30,7 @@ AppendFilter, find_dest_path_comp_key, human_readable_size from awscli.customizations.s3.syncstrategy.base import MissingFileSync, \ SizeAndLastModifiedSync, NeverSync +from awscli.customizations.s3 import transferconfig RECURSIVE = {'name': 'recursive', 'action': 'store_true', 'dest': 'dir_op', @@ -468,8 +469,11 @@ def _run_main(self, parsed_args, parsed_globals): cmd_params.add_page_size(parsed_args) cmd_params.add_paths(parsed_args.paths) self._handle_rm_force(parsed_globals, cmd_params.parameters) + runtime_config = transferconfig.RuntimeConfig().build_config( + **self._session.get_scoped_config().get('s3', {})) cmd = CommandArchitecture(self._session, self.NAME, - cmd_params.parameters) + cmd_params.parameters, + runtime_config) cmd.set_endpoints() cmd.create_instructions() return cmd.run() @@ -585,11 +589,12 @@ class CommandArchitecture(object): lsit of instructions to wire together an assortment of generators to perform the command. """ - def __init__(self, session, cmd, parameters): + def __init__(self, session, cmd, parameters, runtime_config=None): self.session = session self.cmd = cmd self.parameters = parameters self.instructions = [] + self._runtime_config = runtime_config self._service = self.session.get_service('s3') self._endpoint = None self._source_endpoint = None @@ -731,6 +736,7 @@ def run(self): self._service, self._endpoint, self._source_endpoint, self.parameters) s3handler = S3Handler(self.session, self.parameters, + runtime_config=self._runtime_config, result_queue=result_queue) s3_stream_handler = S3StreamHandler(self.session, self.parameters, result_queue=result_queue) diff --git a/awscli/customizations/s3/transferconfig.py b/awscli/customizations/s3/transferconfig.py index 01b314518ee7..0de538a1dfd7 100644 --- a/awscli/customizations/s3/transferconfig.py +++ b/awscli/customizations/s3/transferconfig.py @@ -19,16 +19,7 @@ 'multipart_chunksize': 8 * (1024 ** 2), 'max_concurrent_requests': 10, 'max_queue_size': 1000, - # TODO: do we need this now that we have sentinels? Need to double check. - # I think this was exposed before because you'd have a 0.2s delay when - # running some of the S3 tests. - 'queue_timeout_wait': 0.2, } -MULTI_THRESHOLD = 8 * (1024 ** 2) -CHUNKSIZE = 7 * (1024 ** 2) -NUM_THREADS = 10 -QUEUE_TIMEOUT_WAIT = 0.2 -MAX_QUEUE_SIZE = 1000 class InvalidConfigError(Exception): diff --git a/tests/integration/customizations/s3/test_plugin.py b/tests/integration/customizations/s3/test_plugin.py index dc64de2888dd..286ab22621d4 100644 --- a/tests/integration/customizations/s3/test_plugin.py +++ b/tests/integration/customizations/s3/test_plugin.py @@ -33,7 +33,7 @@ from awscli.testutils import unittest, FileCreator, get_stdout_encoding from awscli.testutils import aws as _aws from tests.unit.customizations.s3 import create_bucket as _create_bucket -from awscli.customizations.s3 import constants +from awscli.customizations.s3.transferconfig import DEFAULTS @contextlib.contextmanager @@ -1177,7 +1177,9 @@ def test_dryrun_download_large_file(self): class TestMemoryUtilization(BaseS3CLICommand): # These tests verify the memory utilization and growth are what we expect. def extra_setup(self): - expected_memory_usage = constants.NUM_THREADS * constants.CHUNKSIZE + self.num_threads = DEFAULTS['max_concurrent_requests'] + self.chunk_size = DEFAULTS['multipart_chunksize'] + expected_memory_usage = self.num_threads * self.chunk_size # margin for things like python VM overhead, botocore service # objects, etc. 1.5 is really generous, perhaps over time this can be # lowered. @@ -1234,7 +1236,7 @@ def test_stream_large_file(self): # is increased by two chunksizes because that is the maximum # amount of chunks that will be queued while not being operated on # by a thread when performing a streaming multipart upload. - max_mem_allowed = self.max_mem_allowed + 2 * constants.CHUNKSIZE + max_mem_allowed = self.max_mem_allowed + 2 * self.chunk_size full_command = 's3 cp - s3://%s/foo.txt' % bucket_name with open(foo_txt, 'rb') as f: diff --git a/tests/unit/customizations/s3/__init__.py b/tests/unit/customizations/s3/__init__.py index 41190af0e904..d552adeadbd1 100644 --- a/tests/unit/customizations/s3/__init__.py +++ b/tests/unit/customizations/s3/__init__.py @@ -20,18 +20,7 @@ class S3HandlerBaseTest(unittest.TestCase): - """ - This class is used to patch the wait() calls used by the queues. - This makes the tests much faster because the wait is a significantly - shorter amount of time. - """ - def setUp(self): - wait = 'awscli.customizations.s3.transferconfig.QUEUE_TIMEOUT_WAIT' - self.wait_timeout_patch = patch(wait, 0.01) - self.mock_wait = self.wait_timeout_patch.start() - - def tearDown(self): - self.wait_timeout_patch.stop() + pass def make_loc_files(): diff --git a/tests/unit/customizations/s3/test_s3handler.py b/tests/unit/customizations/s3/test_s3handler.py index d97a060306e8..7122f715e9c1 100644 --- a/tests/unit/customizations/s3/test_s3handler.py +++ b/tests/unit/customizations/s3/test_s3handler.py @@ -24,12 +24,17 @@ from awscli.customizations.s3.tasks import CreateMultipartUploadTask, \ UploadPartTask, CreateLocalFileTask from awscli.customizations.s3.utils import MAX_PARTS +from awscli.customizations.s3.transferconfig import RuntimeConfig from tests.unit.customizations.s3.fake_session import FakeSession from tests.unit.customizations.s3 import make_loc_files, clean_loc_files, \ make_s3_files, s3_cleanup, create_bucket, list_contents, list_buckets, \ S3HandlerBaseTest, MockStdIn +def runtime_config(**kwargs): + return RuntimeConfig().build_config(**kwargs) + + class S3HandlerTestDeleteList(S3HandlerBaseTest): """ This tests the ability to delete both files locally and in s3. @@ -147,9 +152,10 @@ def setUp(self): self.endpoint = self.service.get_endpoint('us-east-1') params = {'region': 'us-east-1', 'acl': ['private'], 'quiet': True} self.s3_handler = S3Handler(self.session, params) - self.s3_handler_multi = S3Handler(self.session, multi_threshold=10, - chunksize=2, - params=params) + self.s3_handler_multi = S3Handler( + self.session, params=params, + runtime_config=runtime_config( + multipart_threshold=10, multipart_chunksize=2)) self.bucket = create_bucket(self.session) self.loc_files = make_loc_files() self.s3_files = [self.bucket + '/text1.txt', @@ -281,8 +287,10 @@ def setUp(self): self.service = self.session.get_service('s3') self.endpoint = self.service.get_endpoint('us-east-1') params = {'region': 'us-east-1', 'quiet': True} - self.s3_handler_multi = S3Handler(self.session, params, - multi_threshold=10, chunksize=2) + self.s3_handler_multi = S3Handler( + self.session, params, + runtime_config=runtime_config( + multipart_threshold=10, multipart_chunksize=2)) self.bucket = create_bucket(self.session) self.loc_files = make_loc_files() self.s3_files = [self.bucket + '/text1.txt', @@ -472,8 +480,10 @@ def setUp(self): self.endpoint = self.service.get_endpoint('us-east-1') params = {'region': 'us-east-1'} self.s3_handler = S3Handler(self.session, params) - self.s3_handler_multi = S3Handler(self.session, params, - multi_threshold=10, chunksize=2) + self.s3_handler_multi = S3Handler( + self.session, params, + runtime_config=runtime_config(multipart_threshold=10, + multipart_chunksize=2)) self.bucket = make_s3_files(self.session) self.s3_files = [self.bucket + '/text1.txt', self.bucket + '/another_directory/text2.txt'] @@ -485,9 +495,11 @@ def setUp(self): self.fail_session = FakeSession(connection_error=True) self.fail_session.s3 = self.session.s3 - self.s3_handler_multi_except = S3Handler(self.fail_session, params, - multi_threshold=10, - chunksize=2) + self.s3_handler_multi_except = S3Handler( + self.fail_session, params, + runtime_config=runtime_config( + multipart_threshold=10, + multipart_chunksize=2)) def tearDown(self): super(S3HandlerTestDownload, self).tearDown() @@ -626,7 +638,9 @@ def setUp(self): self.params = {'is_stream': True, 'region': 'us-east-1'} def test_pull_from_stream(self): - s3handler = S3StreamHandler(self.session, self.params, chunksize=2) + s3handler = S3StreamHandler( + self.session, self.params, + runtime_config=runtime_config(multipart_chunksize=2)) input_to_stdin = b'This is a test' size = len(input_to_stdin) # Retrieve the entire string. @@ -667,8 +681,9 @@ def test_upload_stream_not_multipart_task(self): b'bar') def test_upload_stream_is_multipart_task(self): - s3handler = S3StreamHandler(self.session, self.params, - multi_threshold=1) + s3handler = S3StreamHandler( + self.session, self.params, + runtime_config=runtime_config(multipart_threshold=1)) s3handler.executor = mock.Mock() fileinfos = [FileInfo('filename', operation_name='upload', is_stream=True, size=0)] @@ -687,7 +702,9 @@ def test_upload_stream_with_expected_size(self): self.params['expected_size'] = 100000 # With this large of expected size, the chunksize of 2 will have # to change. - s3handler = S3StreamHandler(self.session, self.params, chunksize=2) + s3handler = S3StreamHandler( + self.session, self.params, + runtime_config=runtime_config(multipart_chunksize=2)) s3handler.executor = mock.Mock() fileinfo = FileInfo('filename', operation_name='upload', is_stream=True) @@ -745,8 +762,9 @@ def test_enqueue_multipart_download_stream(self): This test ensures the right calls are made in ``_enqueue_tasks()`` if the file should be a multipart download. """ - s3handler = S3StreamHandler(self.session, self.params, - multi_threshold=5) + s3handler = S3StreamHandler( + self.session, self.params, + runtime_config=runtime_config(multipart_threshold=5)) s3handler.executor = mock.Mock() fileinfo = FileInfo('filename', operation_name='download', is_stream=True) @@ -768,7 +786,9 @@ def test_enqueue_multipart_download_stream(self): self.assertTrue(mock_enqueue_range_tasks.called) def test_enqueue_range_download_tasks_stream(self): - s3handler = S3StreamHandler(self.session, self.params, chunksize=100) + s3handler = S3StreamHandler( + self.session, self.params, + runtime_config=runtime_config(multipart_chunksize=100)) s3handler.executor = mock.Mock() fileinfo = FileInfo('filename', operation_name='download', is_stream=True, size=100) @@ -779,5 +799,41 @@ def test_enqueue_range_download_tasks_stream(self): CreateLocalFileTask) +class TestS3HandlerInitialization(unittest.TestCase): + def setUp(self): + self.arbitrary_params = {'region': 'us-west-2'} + + def test_num_threads_is_plumbed_through(self): + num_threads_override = 20 + + config = runtime_config(max_concurrent_requests=num_threads_override) + handler = S3Handler(session=None, params=self.arbitrary_params, + runtime_config=config) + + self.assertEqual(handler.executor.num_threads, num_threads_override) + + def test_queue_size_is_plumbed_through(self): + max_queue_size_override = 10000 + + config = runtime_config(max_queue_size=max_queue_size_override) + handler = S3Handler(session=None, params=self.arbitrary_params, + runtime_config=config) + + self.assertEqual(handler.executor.queue.maxsize, + max_queue_size_override) + + def test_runtime_config_from_attrs(self): + # These are attrs that are set directly on S3Handler, + # not on some dependent object + config = runtime_config( + multipart_chunksize=1000, + multipart_threshold=10000) + handler = S3Handler(session=None, params=self.arbitrary_params, + runtime_config=config) + + self.assertEqual(handler.chunksize, 1000) + self.assertEqual(handler.multi_threshold, 10000) + + if __name__ == "__main__": unittest.main() diff --git a/tests/unit/customizations/s3/test_tasks.py b/tests/unit/customizations/s3/test_tasks.py index 7faf4e21cf11..cde6ad27511a 100644 --- a/tests/unit/customizations/s3/test_tasks.py +++ b/tests/unit/customizations/s3/test_tasks.py @@ -410,9 +410,10 @@ def test_retried_requests_dont_enqueue_writes_twice(self): success_read, ] self.filename.is_stream = True - task = DownloadPartTask(0, transferconfig.CHUNKSIZE, self.result_queue, - self.service, self.filename, self.context, - self.io_queue) + task = DownloadPartTask( + 0, transferconfig.DEFAULTS['multipart_chunksize'], + self.result_queue, self.service, + self.filename, self.context, self.io_queue) task() call_args_list = self.io_queue.put.call_args_list self.assertEqual(len(call_args_list), 1)