diff --git a/awscli/customizations/s3/constants.py b/awscli/customizations/s3/constants.py deleted file mode 100644 index d0877eed26b2..000000000000 --- a/awscli/customizations/s3/constants.py +++ /dev/null @@ -1,20 +0,0 @@ -# Copyright 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"). You -# may not use this file except in compliance with the License. A copy of -# the License is located at -# -# http://aws.amazon.com/apache2.0/ -# -# or in the "license" file accompanying this file. This file is -# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF -# ANY KIND, either express or implied. See the License for the specific -# language governing permissions and limitations under the License. -MULTI_THRESHOLD = 8 * (1024 ** 2) -CHUNKSIZE = 7 * (1024 ** 2) -NUM_THREADS = 10 -QUEUE_TIMEOUT_WAIT = 0.2 -MAX_PARTS = 950 -MAX_SINGLE_UPLOAD_SIZE = 5 * (1024 ** 3) -MAX_UPLOAD_SIZE = 5 * (1024 ** 4) -MAX_QUEUE_SIZE = 1000 diff --git a/awscli/customizations/s3/executor.py b/awscli/customizations/s3/executor.py index 94261821ed08..de74eb73deb9 100644 --- a/awscli/customizations/s3/executor.py +++ b/awscli/customizations/s3/executor.py @@ -44,6 +44,8 @@ class Executor(object): def __init__(self, num_threads, result_queue, quiet, only_show_errors, max_queue_size, write_queue): self._max_queue_size = max_queue_size + LOGGER.debug("Using max queue size for s3 tasks of: %s", + self._max_queue_size) self.queue = StablePriorityQueue(maxsize=self._max_queue_size, max_priority=20) self.num_threads = num_threads @@ -78,6 +80,7 @@ def start(self): # explicit about it rather than relying on the threads_list order. # See .join() for more info. self.print_thread.start() + LOGGER.debug("Using a threadpool size of: %s", self.num_threads) for i in range(self.num_threads): worker = Worker(queue=self.queue) worker.setDaemon(True) diff --git a/awscli/customizations/s3/s3handler.py b/awscli/customizations/s3/s3handler.py index cb0d26f765c5..583502b74af6 100644 --- a/awscli/customizations/s3/s3handler.py +++ b/awscli/customizations/s3/s3handler.py @@ -16,20 +16,22 @@ import os import sys -from awscli.customizations.s3.constants import MULTI_THRESHOLD, CHUNKSIZE, \ - NUM_THREADS, MAX_UPLOAD_SIZE, MAX_QUEUE_SIZE from awscli.customizations.s3.utils import find_chunksize, \ operate, find_bucket_key, relative_path, PrintTask, create_warning from awscli.customizations.s3.executor import Executor from awscli.customizations.s3 import tasks +from awscli.customizations.s3.transferconfig import RuntimeConfig from awscli.compat import six from awscli.compat import queue LOGGER = logging.getLogger(__name__) +# Maximum object size allowed in S3. +# See: http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html +MAX_UPLOAD_SIZE = 5 * (1024 ** 4) CommandResult = namedtuple('CommandResult', - ['num_tasks_failed', 'num_tasks_warned']) + ['num_tasks_failed', 'num_tasks_warned']) class S3Handler(object): @@ -39,12 +41,13 @@ class S3Handler(object): class pull tasks from to complete. """ MAX_IO_QUEUE_SIZE = 20 - MAX_EXECUTOR_QUEUE_SIZE = MAX_QUEUE_SIZE - EXECUTOR_NUM_THREADS = NUM_THREADS def __init__(self, session, params, result_queue=None, - multi_threshold=MULTI_THRESHOLD, chunksize=CHUNKSIZE): + runtime_config=None): self.session = session + if runtime_config is None: + runtime_config = RuntimeConfig.defaults() + self._runtime_config = runtime_config # The write_queue has potential for optimizations, so the constant # for maxsize is scoped to this class (as opposed to constants.py) # so we have the ability to change this value later. @@ -65,14 +68,16 @@ def __init__(self, session, params, result_queue=None, for key in self.params.keys(): if key in params: self.params[key] = params[key] - self.multi_threshold = multi_threshold - self.chunksize = chunksize + self.multi_threshold = self._runtime_config['multipart_threshold'] + self.chunksize = self._runtime_config['multipart_chunksize'] + LOGGER.debug("Using a multipart threshold of %s and a part size of %s", + self.multi_threshold, self.chunksize) self.executor = Executor( - num_threads=self.EXECUTOR_NUM_THREADS, + num_threads=self._runtime_config['max_concurrent_requests'], result_queue=self.result_queue, quiet=self.params['quiet'], only_show_errors=self.params['only_show_errors'], - max_queue_size=self.MAX_EXECUTOR_QUEUE_SIZE, + max_queue_size=self._runtime_config['max_queue_size'], write_queue=self.write_queue ) self._multipart_uploads = [] @@ -111,7 +116,7 @@ def call(self, files): priority=self.executor.IMMEDIATE_PRIORITY) self._shutdown() self.executor.wait_until_shutdown() - + return CommandResult(self.executor.num_tasks_failed, self.executor.num_tasks_warned) @@ -350,12 +355,23 @@ class S3StreamHandler(S3Handler): involves a stream since the logic is different when uploading and downloading streams. """ - # This ensures that the number of multipart chunks waiting in the # executor queue and in the threads is limited. MAX_EXECUTOR_QUEUE_SIZE = 2 EXECUTOR_NUM_THREADS = 6 + def __init__(self, session, params, result_queue=None, + runtime_config=None): + if runtime_config is None: + # Rather than using the .defaults(), streaming + # has different default values so that it does not + # consume large amounts of memory. + runtime_config = RuntimeConfig().build_config( + max_queue_size=self.MAX_EXECUTOR_QUEUE_SIZE, + max_concurrent_requests=self.EXECUTOR_NUM_THREADS) + super(S3StreamHandler, self).__init__(session, params, result_queue, + runtime_config) + def _enqueue_tasks(self, files): total_files = 0 total_parts = 0 @@ -490,7 +506,7 @@ def _enqueue_upload_tasks(self, num_uploads, chunksize, upload_context, task_class=task_class, payload=payload ) - num_uploads += 1 + num_uploads += 1 if not is_remaining: break # Once there is no more data left, announce to the context how diff --git a/awscli/customizations/s3/subcommands.py b/awscli/customizations/s3/subcommands.py index 4e276d633ecc..ef3a31e9299e 100644 --- a/awscli/customizations/s3/subcommands.py +++ b/awscli/customizations/s3/subcommands.py @@ -30,7 +30,7 @@ AppendFilter, find_dest_path_comp_key, human_readable_size from awscli.customizations.s3.syncstrategy.base import MissingFileSync, \ SizeAndLastModifiedSync, NeverSync - +from awscli.customizations.s3 import transferconfig RECURSIVE = {'name': 'recursive', 'action': 'store_true', 'dest': 'dir_op', @@ -38,26 +38,34 @@ "Command is performed on all files or objects " "under the specified directory or prefix.")} + HUMAN_READABLE = {'name': 'human-readable', 'action': 'store_true', 'help_text': "Displays file sizes in human readable format."} -SUMMARIZE = {'name': 'summarize', 'action': 'store_true', 'help_text': ( - "Displays summary information (number of objects, total size).")} + +SUMMARIZE = {'name': 'summarize', 'action': 'store_true', + 'help_text': ( + "Displays summary information " + "(number of objects, total size).")} + DRYRUN = {'name': 'dryrun', 'action': 'store_true', 'help_text': ( "Displays the operations that would be performed using the " "specified command without actually running them.")} + QUIET = {'name': 'quiet', 'action': 'store_true', 'help_text': ( "Does not display the operations performed from the specified " "command.")} + FORCE = {'name': 'force', 'action': 'store_true', 'help_text': ( "Deletes all objects in the bucket including the bucket itself.")} + FOLLOW_SYMLINKS = {'name': 'follow-symlinks', 'action': 'store_true', 'default': True, 'group_name': 'follow_symlinks', 'help_text': ( @@ -69,10 +77,12 @@ "nor ``--no-follow-symlinks`` is specifed, the default " "is to follow symlinks.")} + NO_FOLLOW_SYMLINKS = {'name': 'no-follow-symlinks', 'action': 'store_false', 'dest': 'follow_symlinks', 'default': True, 'group_name': 'follow_symlinks'} + NO_GUESS_MIME_TYPE = {'name': 'no-guess-mime-type', 'action': 'store_false', 'dest': 'guess_mime_type', 'default': True, 'help_text': ( @@ -80,23 +90,27 @@ "uploaded files. By default the mime type of a " "file is guessed when it is uploaded.")} + CONTENT_TYPE = {'name': 'content-type', 'nargs': 1, 'help_text': ( "Specify an explicit content type for this operation. " "This value overrides any guessed mime types.")} + EXCLUDE = {'name': 'exclude', 'action': AppendFilter, 'nargs': 1, 'dest': 'filters', 'help_text': ( "Exclude all files or objects from the command that matches " "the specified pattern.")} + INCLUDE = {'name': 'include', 'action': AppendFilter, 'nargs': 1, 'dest': 'filters', 'help_text': ( "Don't exclude files or objects " "in the command that match the specified pattern")} + ACL = {'name': 'acl', 'nargs': 1, 'choices': ['private', 'public-read', 'public-read-write', 'authenticated-read', 'bucket-owner-read', @@ -108,32 +122,38 @@ "``bucket-owner-read``, ``bucket-owner-full-control`` and " "``log-delivery-write``.")} -GRANTS = {'name': 'grants', 'nargs': '+', - 'help_text': ( - "Grant specific permissions to individual users or groups. You " - "can supply a list of grants of the form::

--grants " - "Permission=Grantee_Type=Grantee_ID [Permission=Grantee_Type=" - "Grantee_ID ...]

Each value contains the following elements:" - "

The " - "Grantee_ID value can be one of:" - "" - "For more information on Amazon S3 access control, see " - 'Access Control')} + +GRANTS = { + 'name': 'grants', 'nargs': '+', + 'help_text': ( + 'Grant specific permissions to individual users or groups. You ' + 'can supply a list of grants of the form::

--grants ' + 'Permission=Grantee_Type=Grantee_ID [Permission=Grantee_Type=' + 'Grantee_ID ...]

Each value contains the following elements:' + '

The ' + 'Grantee_ID value can be one of:' + '' + 'For more information on Amazon S3 access control, see ' + 'Access Control')} + SSE = {'name': 'sse', 'action': 'store_true', 'help_text': ( "Enable Server Side Encryption of the object in S3")} + STORAGE_CLASS = {'name': 'storage-class', 'nargs': 1, 'choices': ['STANDARD', 'REDUCED_REDUNDANCY'], 'help_text': ( @@ -141,6 +161,7 @@ "Valid choices are: STANDARD | REDUCED_REDUNDANCY. " "Defaults to 'STANDARD'")} + WEBSITE_REDIRECT = {'name': 'website-redirect', 'nargs': 1, 'help_text': ( "If the bucket is configured as a website, " @@ -149,16 +170,19 @@ "stores the value of this header in the object " "metadata.")} + CACHE_CONTROL = {'name': 'cache-control', 'nargs': 1, 'help_text': ( "Specifies caching behavior along the " "request/reply chain.")} + CONTENT_DISPOSITION = {'name': 'content-disposition', 'nargs': 1, 'help_text': ( "Specifies presentational information " "for the object.")} + CONTENT_ENCODING = {'name': 'content-encoding', 'nargs': 1, 'help_text': ( "Specifies what content encodings have been " @@ -166,9 +190,11 @@ "mechanisms must be applied to obtain the media-type " "referenced by the Content-Type header field.")} + CONTENT_LANGUAGE = {'name': 'content-language', 'nargs': 1, 'help_text': ("The language the content is in.")} + SOURCE_REGION = {'name': 'source-region', 'nargs': 1, 'help_text': ( "When transferring objects from an s3 bucket to an s3 " @@ -179,8 +205,13 @@ "specified the region of the source will be the same " "as the region of the destination bucket.")} -EXPIRES = {'name': 'expires', 'nargs': 1, 'help_text': ("The date and time at " - "which the object is no longer cacheable.")} + +EXPIRES = { + 'name': 'expires', 'nargs': 1, + 'help_text': ( + "The date and time at which the object is no longer cacheable.") +} + INDEX_DOCUMENT = {'name': 'index-document', 'help_text': ( @@ -192,16 +223,19 @@ 'images/index.html) The suffix must not be empty and ' 'must not include a slash character.')} + ERROR_DOCUMENT = {'name': 'error-document', 'help_text': ( 'The object key name to use when ' 'a 4XX class error occurs.')} + ONLY_SHOW_ERRORS = {'name': 'only-show-errors', 'action': 'store_true', 'help_text': ( 'Only errors and warnings are displayed. All other ' 'output is suppressed.')} + EXPECTED_SIZE = {'name': 'expected-size', 'help_text': ( 'This argument specifies the expected size of a stream ' @@ -213,10 +247,10 @@ PAGE_SIZE = {'name': 'page-size', 'cli_type_name': 'integer', - 'help_text': ( - 'The number of results to return in each response to a list ' - 'operation. The default value is 1000 (the maximum allowed). ' - 'Using a lower value may help if an operation times out.')} + 'help_text': ( + 'The number of results to return in each response to a list ' + 'operation. The default value is 1000 (the maximum allowed). ' + 'Using a lower value may help if an operation times out.')} TRANSFER_ARGS = [DRYRUN, QUIET, RECURSIVE, INCLUDE, EXCLUDE, ACL, @@ -355,7 +389,7 @@ def _make_last_mod_str(self, last_mod): str(last_mod.day).zfill(2), str(last_mod.hour).zfill(2), str(last_mod.minute).zfill(2), - str(last_mod.second).zfill(2)) + str(last_mod.second).zfill(2)) last_mod_str = "%s-%s-%s %s:%s:%s" % last_mod_tup return last_mod_str.ljust(19, ' ') @@ -363,7 +397,10 @@ def _make_size_str(self, size): """ This function creates the size string when objects are being listed. """ - size_str = human_readable_size(size) if self._human_readable else str(size) + if self._human_readable: + size_str = human_readable_size(size) + else: + size_str = str(size) return size_str.rjust(10, ' ') def _print_summary(self): @@ -372,7 +409,10 @@ def _print_summary(self): """ print_str = str(self._total_objects) uni_print("\nTotal Objects: ".rjust(15, ' ') + print_str + "\n") - print_str = human_readable_size(self._size_accumulator) if self._human_readable else str(self._size_accumulator) + if self._human_readable: + print_str = human_readable_size(self._size_accumulator) + else: + print_str = str(self._size_accumulator) uni_print("Total Size: ".rjust(15, ' ') + print_str + "\n") @@ -421,16 +461,19 @@ def _run_main(self, parsed_args, parsed_globals): super(S3TransferCommand, self)._run_main(parsed_args, parsed_globals) self._convert_path_args(parsed_args) params = self._build_call_parameters(parsed_args, {}) - cmd_params = CommandParameters(self._session, self.NAME, params, + cmd_params = CommandParameters(self.NAME, params, self.USAGE) cmd_params.add_region(parsed_globals) cmd_params.add_endpoint_url(parsed_globals) cmd_params.add_verify_ssl(parsed_globals) cmd_params.add_page_size(parsed_args) cmd_params.add_paths(parsed_args.paths) - cmd_params.check_force(parsed_globals) + self._handle_rm_force(parsed_globals, cmd_params.parameters) + runtime_config = transferconfig.RuntimeConfig().build_config( + **self._session.get_scoped_config().get('s3', {})) cmd = CommandArchitecture(self._session, self.NAME, - cmd_params.parameters) + cmd_params.parameters, + runtime_config) cmd.set_endpoints() cmd.create_instructions() return cmd.run() @@ -455,6 +498,26 @@ def _convert_path_args(self, parsed_args): new_path = enc_path.decode('utf-8') parsed_args.paths[i] = new_path + def _handle_rm_force(self, parsed_globals, parameters): + """ + This function recursive deletes objects in a bucket if the force + parameters was thrown when using the remove bucket command. + """ + # XXX: This shouldn't really be here. This was originally moved from + # the CommandParameters class to here, but this is still not the ideal + # place for this code. This should be moved + # to either the CommandArchitecture class, or the RbCommand class where + # the actual operations against S3 are performed. This may require + # some refactoring though to move this to either of those classes. + # For now, moving this out of CommandParameters allows for that class + # to be kept simple. + if 'force' in parameters: + if parameters['force']: + bucket = find_bucket_key(parameters['src'][5:])[0] + path = 's3://' + bucket + del_objects = RmCommand(self._session) + del_objects([path, '--recursive'], parsed_globals) + class CpCommand(S3TransferCommand): NAME = 'cp' @@ -526,11 +589,12 @@ class CommandArchitecture(object): lsit of instructions to wire together an assortment of generators to perform the command. """ - def __init__(self, session, cmd, parameters): + def __init__(self, session, cmd, parameters, runtime_config=None): self.session = session self.cmd = cmd self.parameters = parameters self.instructions = [] + self._runtime_config = runtime_config self._service = self.session.get_service('s3') self._endpoint = None self._source_endpoint = None @@ -668,9 +732,11 @@ def run(self): service=self._service, endpoint=self._endpoint, is_stream=True)] - file_info_builder = FileInfoBuilder(self._service, self._endpoint, - self._source_endpoint, self.parameters) + file_info_builder = FileInfoBuilder( + self._service, self._endpoint, + self._source_endpoint, self.parameters) s3handler = S3Handler(self.session, self.parameters, + runtime_config=self._runtime_config, result_queue=result_queue) s3_stream_handler = S3StreamHandler(self.session, self.parameters, result_queue=result_queue) @@ -689,7 +755,7 @@ def run(self): 's3_handler': [s3handler]} elif self.cmd == 'cp' and self.parameters['is_stream']: command_dict = {'setup': [stream_file_info], - 's3_handler': [s3_stream_handler]} + 's3_handler': [s3_stream_handler]} elif self.cmd == 'cp': command_dict = {'setup': [files], 'file_generator': [file_generator], @@ -749,12 +815,16 @@ class CommandParameters(object): This class is used to do some initial error based on the parameters and arguments passed to the command line. """ - def __init__(self, session, cmd, parameters, usage): + def __init__(self, cmd, parameters, usage): """ Stores command name and parameters. Ensures that the ``dir_op`` flag is true if a certain command is being used. + + :param cmd: The name of the command, e.g. "rm". + :param parameters: A dictionary of parameters. + :param usage: A usage string + """ - self.session = session self.cmd = cmd self.parameters = parameters self.usage = usage @@ -822,14 +892,6 @@ def _normalize_s3_trailing_slash(self, paths): path += '/' paths[i] = path - def _verify_bucket_exists(self, bucket_name): - session = self.session - service = session.get_service('s3') - endpoint = service.get_endpoint(self.parameters['region']) - operation = service.get_operation('ListObjects') - # This will raise an exception if the bucket does not exist. - operation.call(endpoint, bucket=bucket_name, max_keys=0) - def check_path_type(self, paths): """ This initial check ensures that the path types for the specified @@ -882,21 +944,6 @@ def check_src_path(self, paths): else: raise Exception("Error: Local path does not exist") - def check_force(self, parsed_globals): - """ - This function recursive deletes objects in a bucket if the force - parameters was thrown when using the remove bucket command. - """ - if 'force' in self.parameters: - if self.parameters['force']: - bucket = find_bucket_key(self.parameters['src'][5:])[0] - path = 's3://' + bucket - try: - del_objects = RmCommand(self.session) - del_objects([path, '--recursive'], parsed_globals) - except: - pass - def add_region(self, parsed_globals): self.parameters['region'] = parsed_globals.region diff --git a/awscli/customizations/s3/transferconfig.py b/awscli/customizations/s3/transferconfig.py new file mode 100644 index 000000000000..aee99763f4d5 --- /dev/null +++ b/awscli/customizations/s3/transferconfig.py @@ -0,0 +1,76 @@ +# Copyright 2013-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from awscli.customizations.s3.utils import human_readable_to_bytes +# If the user does not specify any overrides, +# these are the default values we use for the s3 transfer +# commands. +DEFAULTS = { + 'multipart_threshold': 8 * (1024 ** 2), + 'multipart_chunksize': 8 * (1024 ** 2), + 'max_concurrent_requests': 10, + 'max_queue_size': 1000, +} + + +class InvalidConfigError(Exception): + pass + + +class RuntimeConfig(object): + + POSITIVE_INTEGERS = ['multipart_chunksize', 'multipart_threshold', + 'max_concurrent_requests', 'max_queue_size'] + HUMAN_READABLE_SIZES = ['multipart_chunksize', 'multipart_threshold'] + + @staticmethod + def defaults(): + return DEFAULTS.copy() + + def build_config(self, **kwargs): + """Create and convert a runtime config dictionary. + + This method will merge and convert S3 runtime configuration + data into a single dictionary that can then be passed to classes + that use this runtime config. + + :param kwargs: Any key in the ``DEFAULTS`` dict. + :return: A dictionary of the merged and converted values. + + """ + runtime_config = DEFAULTS.copy() + if kwargs: + runtime_config.update(kwargs) + self._convert_human_readable_sizes(runtime_config) + self._validate_config(runtime_config) + return runtime_config + + def _convert_human_readable_sizes(self, runtime_config): + for attr in self.HUMAN_READABLE_SIZES: + value = runtime_config.get(attr) + if value is not None and not isinstance(value, int): + runtime_config[attr] = human_readable_to_bytes(value) + + def _validate_config(self, runtime_config): + for attr in self.POSITIVE_INTEGERS: + value = runtime_config.get(attr) + if value is not None: + try: + runtime_config[attr] = int(value) + if not runtime_config[attr] > 0: + self._error_positive_value(attr, value) + except ValueError: + self._error_positive_value(attr, value) + + def _error_positive_value(self, name, value): + raise InvalidConfigError( + "Value for %s must be a positive integer: %s" % (name, value)) diff --git a/awscli/customizations/s3/utils.py b/awscli/customizations/s3/utils.py index a47dc2df2b1f..f54d184786bb 100644 --- a/awscli/customizations/s3/utils.py +++ b/awscli/customizations/s3/utils.py @@ -24,14 +24,28 @@ from dateutil.tz import tzlocal from botocore.compat import unquote_str -from awscli.customizations.s3.constants import MAX_PARTS -from awscli.customizations.s3.constants import MAX_SINGLE_UPLOAD_SIZE from awscli.compat import six from awscli.compat import PY3 from awscli.compat import queue -humanize_suffixes = ('KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB') +HUMANIZE_SUFFIXES = ('KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB') +MAX_PARTS = 10000 +# The maximum file size you can upload via S3 per request. +# See: http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html +# and: http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html +MAX_SINGLE_UPLOAD_SIZE = 5 * (1024 ** 3) +SIZE_SUFFIX = { + 'kb': 1024, + 'mb': 1024 ** 2, + 'gb': 1024 ** 3, + 'tb': 1024 ** 4, + 'kib': 1024, + 'mib': 1024 ** 2, + 'gib': 1024 ** 3, + 'tib': 1024 ** 4, +} + def human_readable_size(value): @@ -61,12 +75,39 @@ def human_readable_size(value): elif bytes_int < base: return '%d Bytes' % bytes_int - for i, suffix in enumerate(humanize_suffixes): + for i, suffix in enumerate(HUMANIZE_SUFFIXES): unit = base ** (i+2) if round((bytes_int / unit) * base) < base: return '%.1f %s' % ((base * bytes_int / unit), suffix) +def human_readable_to_bytes(value): + """Converts a human readable size to bytes. + + :param value: A string such as "10MB". If a suffix is not included, + then the value is assumed to be an integer representing the size + in bytes. + :returns: The converted value in bytes as an integer + + """ + value = value.lower() + if value[-2:] == 'ib': + # Assume IEC suffix. + suffix = value[-3:].lower() + else: + suffix = value[-2:].lower() + has_size_identifier = ( + len(value) >= 2 and suffix in SIZE_SUFFIX) + if not has_size_identifier: + try: + return int(value) + except ValueError: + raise ValueError("Invalid size value: %s" % value) + else: + multiplier = SIZE_SUFFIX[suffix] + return int(value[:-len(suffix)]) * multiplier + + class AppendFilter(argparse.Action): """ This class is used as an action when parsing the parameters. diff --git a/tests/integration/customizations/s3/test_plugin.py b/tests/integration/customizations/s3/test_plugin.py index dc64de2888dd..286ab22621d4 100644 --- a/tests/integration/customizations/s3/test_plugin.py +++ b/tests/integration/customizations/s3/test_plugin.py @@ -33,7 +33,7 @@ from awscli.testutils import unittest, FileCreator, get_stdout_encoding from awscli.testutils import aws as _aws from tests.unit.customizations.s3 import create_bucket as _create_bucket -from awscli.customizations.s3 import constants +from awscli.customizations.s3.transferconfig import DEFAULTS @contextlib.contextmanager @@ -1177,7 +1177,9 @@ def test_dryrun_download_large_file(self): class TestMemoryUtilization(BaseS3CLICommand): # These tests verify the memory utilization and growth are what we expect. def extra_setup(self): - expected_memory_usage = constants.NUM_THREADS * constants.CHUNKSIZE + self.num_threads = DEFAULTS['max_concurrent_requests'] + self.chunk_size = DEFAULTS['multipart_chunksize'] + expected_memory_usage = self.num_threads * self.chunk_size # margin for things like python VM overhead, botocore service # objects, etc. 1.5 is really generous, perhaps over time this can be # lowered. @@ -1234,7 +1236,7 @@ def test_stream_large_file(self): # is increased by two chunksizes because that is the maximum # amount of chunks that will be queued while not being operated on # by a thread when performing a streaming multipart upload. - max_mem_allowed = self.max_mem_allowed + 2 * constants.CHUNKSIZE + max_mem_allowed = self.max_mem_allowed + 2 * self.chunk_size full_command = 's3 cp - s3://%s/foo.txt' % bucket_name with open(foo_txt, 'rb') as f: diff --git a/tests/unit/customizations/s3/__init__.py b/tests/unit/customizations/s3/__init__.py index 02b9e2ebad19..d552adeadbd1 100644 --- a/tests/unit/customizations/s3/__init__.py +++ b/tests/unit/customizations/s3/__init__.py @@ -20,18 +20,7 @@ class S3HandlerBaseTest(unittest.TestCase): - """ - This class is used to patch the wait() calls used by the queues. - This makes the tests much faster because the wait is a significantly - shorter amount of time. - """ - def setUp(self): - wait = 'awscli.customizations.s3.constants.QUEUE_TIMEOUT_WAIT' - self.wait_timeout_patch = patch(wait, 0.01) - self.mock_wait = self.wait_timeout_patch.start() - - def tearDown(self): - self.wait_timeout_patch.stop() + pass def make_loc_files(): diff --git a/tests/unit/customizations/s3/test_s3handler.py b/tests/unit/customizations/s3/test_s3handler.py index 2105d3495770..7122f715e9c1 100644 --- a/tests/unit/customizations/s3/test_s3handler.py +++ b/tests/unit/customizations/s3/test_s3handler.py @@ -23,12 +23,18 @@ from awscli.customizations.s3.fileinfo import FileInfo from awscli.customizations.s3.tasks import CreateMultipartUploadTask, \ UploadPartTask, CreateLocalFileTask +from awscli.customizations.s3.utils import MAX_PARTS +from awscli.customizations.s3.transferconfig import RuntimeConfig from tests.unit.customizations.s3.fake_session import FakeSession from tests.unit.customizations.s3 import make_loc_files, clean_loc_files, \ make_s3_files, s3_cleanup, create_bucket, list_contents, list_buckets, \ S3HandlerBaseTest, MockStdIn +def runtime_config(**kwargs): + return RuntimeConfig().build_config(**kwargs) + + class S3HandlerTestDeleteList(S3HandlerBaseTest): """ This tests the ability to delete both files locally and in s3. @@ -146,9 +152,10 @@ def setUp(self): self.endpoint = self.service.get_endpoint('us-east-1') params = {'region': 'us-east-1', 'acl': ['private'], 'quiet': True} self.s3_handler = S3Handler(self.session, params) - self.s3_handler_multi = S3Handler(self.session, multi_threshold=10, - chunksize=2, - params=params) + self.s3_handler_multi = S3Handler( + self.session, params=params, + runtime_config=runtime_config( + multipart_threshold=10, multipart_chunksize=2)) self.bucket = create_bucket(self.session) self.loc_files = make_loc_files() self.s3_files = [self.bucket + '/text1.txt', @@ -280,8 +287,10 @@ def setUp(self): self.service = self.session.get_service('s3') self.endpoint = self.service.get_endpoint('us-east-1') params = {'region': 'us-east-1', 'quiet': True} - self.s3_handler_multi = S3Handler(self.session, params, - multi_threshold=10, chunksize=2) + self.s3_handler_multi = S3Handler( + self.session, params, + runtime_config=runtime_config( + multipart_threshold=10, multipart_chunksize=2)) self.bucket = create_bucket(self.session) self.loc_files = make_loc_files() self.s3_files = [self.bucket + '/text1.txt', @@ -471,8 +480,10 @@ def setUp(self): self.endpoint = self.service.get_endpoint('us-east-1') params = {'region': 'us-east-1'} self.s3_handler = S3Handler(self.session, params) - self.s3_handler_multi = S3Handler(self.session, params, - multi_threshold=10, chunksize=2) + self.s3_handler_multi = S3Handler( + self.session, params, + runtime_config=runtime_config(multipart_threshold=10, + multipart_chunksize=2)) self.bucket = make_s3_files(self.session) self.s3_files = [self.bucket + '/text1.txt', self.bucket + '/another_directory/text2.txt'] @@ -484,9 +495,11 @@ def setUp(self): self.fail_session = FakeSession(connection_error=True) self.fail_session.s3 = self.session.s3 - self.s3_handler_multi_except = S3Handler(self.fail_session, params, - multi_threshold=10, - chunksize=2) + self.s3_handler_multi_except = S3Handler( + self.fail_session, params, + runtime_config=runtime_config( + multipart_threshold=10, + multipart_chunksize=2)) def tearDown(self): super(S3HandlerTestDownload, self).tearDown() @@ -625,7 +638,9 @@ def setUp(self): self.params = {'is_stream': True, 'region': 'us-east-1'} def test_pull_from_stream(self): - s3handler = S3StreamHandler(self.session, self.params, chunksize=2) + s3handler = S3StreamHandler( + self.session, self.params, + runtime_config=runtime_config(multipart_chunksize=2)) input_to_stdin = b'This is a test' size = len(input_to_stdin) # Retrieve the entire string. @@ -666,8 +681,9 @@ def test_upload_stream_not_multipart_task(self): b'bar') def test_upload_stream_is_multipart_task(self): - s3handler = S3StreamHandler(self.session, self.params, - multi_threshold=1) + s3handler = S3StreamHandler( + self.session, self.params, + runtime_config=runtime_config(multipart_threshold=1)) s3handler.executor = mock.Mock() fileinfos = [FileInfo('filename', operation_name='upload', is_stream=True, size=0)] @@ -686,7 +702,9 @@ def test_upload_stream_with_expected_size(self): self.params['expected_size'] = 100000 # With this large of expected size, the chunksize of 2 will have # to change. - s3handler = S3StreamHandler(self.session, self.params, chunksize=2) + s3handler = S3StreamHandler( + self.session, self.params, + runtime_config=runtime_config(multipart_chunksize=2)) s3handler.executor = mock.Mock() fileinfo = FileInfo('filename', operation_name='upload', is_stream=True) @@ -697,7 +715,7 @@ def test_upload_stream_with_expected_size(self): # UploadPartTasks. changed_chunk_size = submitted_tasks[1][0][0]._chunk_size # New chunksize should have a total parts under 1000. - self.assertTrue(100000/changed_chunk_size < 1000) + self.assertTrue(100000 / float(changed_chunk_size) <= MAX_PARTS) def test_upload_stream_enqueue_upload_task(self): s3handler = S3StreamHandler(self.session, self.params) @@ -744,8 +762,9 @@ def test_enqueue_multipart_download_stream(self): This test ensures the right calls are made in ``_enqueue_tasks()`` if the file should be a multipart download. """ - s3handler = S3StreamHandler(self.session, self.params, - multi_threshold=5) + s3handler = S3StreamHandler( + self.session, self.params, + runtime_config=runtime_config(multipart_threshold=5)) s3handler.executor = mock.Mock() fileinfo = FileInfo('filename', operation_name='download', is_stream=True) @@ -767,7 +786,9 @@ def test_enqueue_multipart_download_stream(self): self.assertTrue(mock_enqueue_range_tasks.called) def test_enqueue_range_download_tasks_stream(self): - s3handler = S3StreamHandler(self.session, self.params, chunksize=100) + s3handler = S3StreamHandler( + self.session, self.params, + runtime_config=runtime_config(multipart_chunksize=100)) s3handler.executor = mock.Mock() fileinfo = FileInfo('filename', operation_name='download', is_stream=True, size=100) @@ -778,5 +799,41 @@ def test_enqueue_range_download_tasks_stream(self): CreateLocalFileTask) +class TestS3HandlerInitialization(unittest.TestCase): + def setUp(self): + self.arbitrary_params = {'region': 'us-west-2'} + + def test_num_threads_is_plumbed_through(self): + num_threads_override = 20 + + config = runtime_config(max_concurrent_requests=num_threads_override) + handler = S3Handler(session=None, params=self.arbitrary_params, + runtime_config=config) + + self.assertEqual(handler.executor.num_threads, num_threads_override) + + def test_queue_size_is_plumbed_through(self): + max_queue_size_override = 10000 + + config = runtime_config(max_queue_size=max_queue_size_override) + handler = S3Handler(session=None, params=self.arbitrary_params, + runtime_config=config) + + self.assertEqual(handler.executor.queue.maxsize, + max_queue_size_override) + + def test_runtime_config_from_attrs(self): + # These are attrs that are set directly on S3Handler, + # not on some dependent object + config = runtime_config( + multipart_chunksize=1000, + multipart_threshold=10000) + handler = S3Handler(session=None, params=self.arbitrary_params, + runtime_config=config) + + self.assertEqual(handler.chunksize, 1000) + self.assertEqual(handler.multi_threshold, 10000) + + if __name__ == "__main__": unittest.main() diff --git a/tests/unit/customizations/s3/test_subcommands.py b/tests/unit/customizations/s3/test_subcommands.py index f99ab2504e07..fd334ace6f38 100644 --- a/tests/unit/customizations/s3/test_subcommands.py +++ b/tests/unit/customizations/s3/test_subcommands.py @@ -20,7 +20,8 @@ import botocore.session from awscli.customizations.s3.s3 import S3 from awscli.customizations.s3.subcommands import CommandParameters, \ - CommandArchitecture, CpCommand, SyncCommand, ListCommand, get_endpoint + CommandArchitecture, CpCommand, SyncCommand, ListCommand, get_endpoint, \ + RbCommand from awscli.customizations.s3.syncstrategy.base import \ SizeAndLastModifiedSync, NeverSync, MissingFileSync from awscli.testutils import unittest, BaseAWSHelpOutputTest @@ -34,6 +35,9 @@ class FakeArgs(object): def __init__(self, **kwargs): self.__dict__.update(kwargs) + def __contains__(self, key): + return key in self.__dict__ + class TestGetEndpoint(unittest.TestCase): def test_endpoint(self): @@ -47,6 +51,28 @@ def test_endpoint(self): self.assertTrue(endpoint.verify) +class TestRbCommand(unittest.TestCase): + def test_rb_command_with_force_deletes_objects_in_bucket(self): + self.session = mock.Mock() + self.session.get_scoped_config.return_value = {} + rb_command = RbCommand(self.session) + parsed_args = FakeArgs(paths='s3://mybucket/', + force=True, + dir_op=False) + parsed_globals = FakeArgs(region=None, endpoint_url=None, + verify_ssl=None) + cmd_name = 'awscli.customizations.s3.subcommands.RmCommand' + arch_name = 'awscli.customizations.s3.subcommands.CommandArchitecture' + with mock.patch(cmd_name) as rm_command: + with mock.patch(arch_name): + rb_command._run_main(parsed_args, + parsed_globals=parsed_globals) + # Because of --force we should have called the + # rm_command with the --recursive option. + rm_command.return_value.assert_called_with( + ['s3://mybucket', '--recursive'], mock.ANY) + + class TestLSCommand(unittest.TestCase): def setUp(self): self.session = mock.Mock() @@ -447,16 +473,14 @@ def setUp(self): self.environ = {} self.environ_patch = patch('os.environ', self.environ) self.environ_patch.start() - self.session = FakeSession() self.mock = MagicMock() self.mock.get_config = MagicMock(return_value={'region': None}) self.loc_files = make_loc_files() - self.bucket = make_s3_files(self.session) + self.bucket = 's3testbucket' def tearDown(self): self.environ_patch.stop() clean_loc_files(self.loc_files) - s3_cleanup(self.bucket, self.session) def test_check_path_type_pass(self): # This tests the class's ability to determine whether the correct @@ -477,7 +501,7 @@ def test_check_path_type_pass(self): 'locallocal': [local_file, local_file]} for cmd in cmds.keys(): - cmd_param = CommandParameters(self.session, cmd, {}, '') + cmd_param = CommandParameters(cmd, {}, '') cmd_param.add_region(mock.Mock()) correct_paths = cmds[cmd] for path_args in correct_paths: @@ -505,7 +529,7 @@ def test_check_path_type_fail(self): 'locallocal': [local_file, local_file]} for cmd in cmds.keys(): - cmd_param = CommandParameters(self.session, cmd, {}, '') + cmd_param = CommandParameters(cmd, {}, '') cmd_param.add_region(mock.Mock()) wrong_paths = cmds[cmd] for path_args in wrong_paths: @@ -531,22 +555,13 @@ def test_check_src_path_pass(self): parameters = {} for filename in files: parameters['dir_op'] = filename[1] - cmd_parameter = CommandParameters(self.session, 'put', - parameters, '') + cmd_parameter = CommandParameters('put', parameters, '') cmd_parameter.add_region(mock.Mock()) cmd_parameter.check_src_path(filename[0]) - def test_check_force(self): - # This checks to make sure that the force parameter is run. If - # successful. The delete command will fail as the bucket is empty - # and be caught by the exception. - cmd_params = CommandParameters(self.session, 'rb', {'force': True},'') - cmd_params.parameters['src'] = 's3://mybucket' - cmd_params.check_force(None) - def test_validate_streaming_paths_upload(self): parameters = {'src': '-', 'dest': 's3://bucket'} - cmd_params = CommandParameters(self.session, 'cp', parameters, '') + cmd_params = CommandParameters('cp', parameters, '') cmd_params._validate_streaming_paths() self.assertTrue(cmd_params.parameters['is_stream']) self.assertTrue(cmd_params.parameters['only_show_errors']) @@ -554,7 +569,7 @@ def test_validate_streaming_paths_upload(self): def test_validate_streaming_paths_download(self): parameters = {'src': 'localfile', 'dest': '-'} - cmd_params = CommandParameters(self.session, 'cp', parameters, '') + cmd_params = CommandParameters('cp', parameters, '') cmd_params._validate_streaming_paths() self.assertTrue(cmd_params.parameters['is_stream']) self.assertTrue(cmd_params.parameters['only_show_errors']) @@ -562,13 +577,13 @@ def test_validate_streaming_paths_download(self): def test_validate_no_streaming_paths(self): parameters = {'src': 'localfile', 'dest': 's3://bucket'} - cmd_params = CommandParameters(self.session, 'cp', parameters, '') + cmd_params = CommandParameters('cp', parameters, '') cmd_params._validate_streaming_paths() self.assertFalse(cmd_params.parameters['is_stream']) def test_validate_streaming_paths_error(self): parameters = {'src': '-', 'dest': 's3://bucket'} - cmd_params = CommandParameters(self.session, 'sync', parameters, '') + cmd_params = CommandParameters('sync', parameters, '') with self.assertRaises(ValueError): cmd_params._validate_streaming_paths() diff --git a/tests/unit/customizations/s3/test_tasks.py b/tests/unit/customizations/s3/test_tasks.py index 96f65dd63fa8..cde6ad27511a 100644 --- a/tests/unit/customizations/s3/test_tasks.py +++ b/tests/unit/customizations/s3/test_tasks.py @@ -18,7 +18,7 @@ from botocore.exceptions import IncompleteReadError -from awscli.customizations.s3 import constants +from awscli.customizations.s3 import transferconfig from awscli.customizations.s3.tasks import CreateLocalFileTask from awscli.customizations.s3.tasks import CompleteDownloadTask from awscli.customizations.s3.tasks import DownloadPartTask @@ -410,9 +410,10 @@ def test_retried_requests_dont_enqueue_writes_twice(self): success_read, ] self.filename.is_stream = True - task = DownloadPartTask(0, constants.CHUNKSIZE, self.result_queue, - self.service, self.filename, self.context, - self.io_queue) + task = DownloadPartTask( + 0, transferconfig.DEFAULTS['multipart_chunksize'], + self.result_queue, self.service, + self.filename, self.context, self.io_queue) task() call_args_list = self.io_queue.put.call_args_list self.assertEqual(len(call_args_list), 1) diff --git a/tests/unit/customizations/s3/test_transferconfig.py b/tests/unit/customizations/s3/test_transferconfig.py new file mode 100644 index 000000000000..cc9be90a8ce7 --- /dev/null +++ b/tests/unit/customizations/s3/test_transferconfig.py @@ -0,0 +1,61 @@ +# Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from awscli.testutils import unittest + +from awscli.customizations.s3 import transferconfig + + +class TestTransferConfig(unittest.TestCase): + + def build_config_with(self, **config_from_user): + return transferconfig.RuntimeConfig().build_config(**config_from_user) + + def test_user_provides_no_config_uses_default(self): + # If the user does not provide any config overrides, + # we should just use the default values defined in + # the module. + config = transferconfig.RuntimeConfig() + runtime_config = config.build_config() + self.assertEqual(runtime_config, transferconfig.DEFAULTS) + + def test_user_provides_partial_overrides(self): + config_from_user = { + 'max_concurrent_requests': '20', + 'multipart_threshold': str(64 * (1024 ** 2)), + } + runtime_config = self.build_config_with(**config_from_user) + # Our overrides were accepted. + self.assertEqual(runtime_config['multipart_threshold'], + int(config_from_user['multipart_threshold'])) + self.assertEqual(runtime_config['max_concurrent_requests'], + int(config_from_user['max_concurrent_requests'])) + # And defaults were used for values not specified. + self.assertEqual(runtime_config['max_queue_size'], + int(transferconfig.DEFAULTS['max_queue_size'])) + + def test_validates_integer_types(self): + with self.assertRaises(transferconfig.InvalidConfigError): + self.build_config_with(max_concurrent_requests="not an int") + + def test_validates_positive_integers(self): + with self.assertRaises(transferconfig.InvalidConfigError): + self.build_config_with(max_concurrent_requests="-10") + + def test_min_value(self): + with self.assertRaises(transferconfig.InvalidConfigError): + self.build_config_with(max_concurrent_requests="0") + + def test_human_readable_sizes_converted_to_bytes(self): + runtime_config = self.build_config_with(multipart_threshold="10MB") + self.assertEqual(runtime_config['multipart_threshold'], + 10 * 1024 * 1024) diff --git a/tests/unit/customizations/s3/test_utils.py b/tests/unit/customizations/s3/test_utils.py index 2a2355f252da..cecb917568eb 100644 --- a/tests/unit/customizations/s3/test_utils.py +++ b/tests/unit/customizations/s3/test_utils.py @@ -22,7 +22,8 @@ from awscli.customizations.s3.utils import AppendFilter from awscli.customizations.s3.utils import create_warning from awscli.customizations.s3.utils import human_readable_size -from awscli.customizations.s3.constants import MAX_SINGLE_UPLOAD_SIZE +from awscli.customizations.s3.utils import human_readable_to_bytes +from awscli.customizations.s3.utils import MAX_SINGLE_UPLOAD_SIZE def test_human_readable_size(): @@ -46,6 +47,28 @@ def _test_human_size_matches(bytes_int, expected): assert_equal(human_readable_size(bytes_int), expected) +def test_convert_human_readable_to_bytes(): + yield _test_convert_human_readable_to_bytes, "1", 1 + yield _test_convert_human_readable_to_bytes, "1024", 1024 + yield _test_convert_human_readable_to_bytes, "1KB", 1024 + yield _test_convert_human_readable_to_bytes, "1kb", 1024 + yield _test_convert_human_readable_to_bytes, "1MB", 1024 ** 2 + yield _test_convert_human_readable_to_bytes, "1GB", 1024 ** 3 + yield _test_convert_human_readable_to_bytes, "1TB", 1024 ** 4 + + # Also because of the "ls" output for s3, we support + # the IEC "mebibyte" format (MiB). + yield _test_convert_human_readable_to_bytes, "1KiB", 1024 + yield _test_convert_human_readable_to_bytes, "1kib", 1024 + yield _test_convert_human_readable_to_bytes, "1MiB", 1024 ** 2 + yield _test_convert_human_readable_to_bytes, "1GiB", 1024 ** 3 + yield _test_convert_human_readable_to_bytes, "1TiB", 1024 ** 4 + + +def _test_convert_human_readable_to_bytes(size_str, expected): + assert_equal(human_readable_to_bytes(size_str), expected) + + class AppendFilterTest(unittest.TestCase): def test_call(self): parser = argparse.ArgumentParser() @@ -102,8 +125,10 @@ def test_large_chunk(self): size because the original ``chunksize`` is too small. """ chunksize = 7 * (1024 ** 2) - size = 8 * (1024 ** 3) - self.assertEqual(find_chunksize(size, chunksize), chunksize * 2) + size = 5 * (1024 ** 4) + # If we try to upload a 5TB file, we'll need to use 896MB part + # sizes. + self.assertEqual(find_chunksize(size, chunksize), 896 * (1024 ** 2)) def test_super_chunk(self): """