Skip to content

Commit

Permalink
Plumb through S3 runtime config
Browse files Browse the repository at this point in the history
This plumbs in the RuntimeConfig into the
S3SubCommand, CommandArchitecture, and S3Handler
classes.

The S3Handler __init__ method changed by removing
the multi_threshold and chunksize params as these are
now specified via the newly added runtime_config param.
Given the interface change, there were a several unit tests
that needed updating.
  • Loading branch information
jamesls committed Feb 3, 2015
1 parent 697662b commit db24830
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 55 deletions.
3 changes: 3 additions & 0 deletions awscli/customizations/s3/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class Executor(object):
def __init__(self, num_threads, result_queue, quiet,
only_show_errors, max_queue_size, write_queue):
self._max_queue_size = max_queue_size
LOGGER.debug("Using max queue size for s3 tasks of: %s",
self._max_queue_size)
self.queue = StablePriorityQueue(maxsize=self._max_queue_size,
max_priority=20)
self.num_threads = num_threads
Expand Down Expand Up @@ -78,6 +80,7 @@ def start(self):
# explicit about it rather than relying on the threads_list order.
# See .join() for more info.
self.print_thread.start()
LOGGER.debug("Using a threadpool size of: %s", self.num_threads)
for i in range(self.num_threads):
worker = Worker(queue=self.queue)
worker.setDaemon(True)
Expand Down
20 changes: 11 additions & 9 deletions awscli/customizations/s3/s3handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@
import os
import sys

from awscli.customizations.s3.transferconfig import MULTI_THRESHOLD, \
CHUNKSIZE, NUM_THREADS, MAX_QUEUE_SIZE
from awscli.customizations.s3.utils import find_chunksize, \
operate, find_bucket_key, relative_path, PrintTask, create_warning
from awscli.customizations.s3.executor import Executor
from awscli.customizations.s3 import tasks
from awscli.customizations.s3.transferconfig import RuntimeConfig
from awscli.compat import six
from awscli.compat import queue

Expand All @@ -42,12 +41,13 @@ class S3Handler(object):
class pull tasks from to complete.
"""
MAX_IO_QUEUE_SIZE = 20
MAX_EXECUTOR_QUEUE_SIZE = MAX_QUEUE_SIZE
EXECUTOR_NUM_THREADS = NUM_THREADS

def __init__(self, session, params, result_queue=None,
multi_threshold=MULTI_THRESHOLD, chunksize=CHUNKSIZE):
runtime_config=None):
self.session = session
if runtime_config is None:
runtime_config = RuntimeConfig.defaults()
self._runtime_config = runtime_config
# The write_queue has potential for optimizations, so the constant
# for maxsize is scoped to this class (as opposed to constants.py)
# so we have the ability to change this value later.
Expand All @@ -68,14 +68,16 @@ def __init__(self, session, params, result_queue=None,
for key in self.params.keys():
if key in params:
self.params[key] = params[key]
self.multi_threshold = multi_threshold
self.chunksize = chunksize
self.multi_threshold = self._runtime_config['multipart_threshold']
self.chunksize = self._runtime_config['multipart_chunksize']
LOGGER.debug("Using a multipart threshold of %s and a part size of %s",
self.multi_threshold, self.chunksize)
self.executor = Executor(
num_threads=self.EXECUTOR_NUM_THREADS,
num_threads=self._runtime_config['max_concurrent_requests'],
result_queue=self.result_queue,
quiet=self.params['quiet'],
only_show_errors=self.params['only_show_errors'],
max_queue_size=self.MAX_EXECUTOR_QUEUE_SIZE,
max_queue_size=self._runtime_config['max_queue_size'],
write_queue=self.write_queue
)
self._multipart_uploads = []
Expand Down
10 changes: 8 additions & 2 deletions awscli/customizations/s3/subcommands.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
AppendFilter, find_dest_path_comp_key, human_readable_size
from awscli.customizations.s3.syncstrategy.base import MissingFileSync, \
SizeAndLastModifiedSync, NeverSync
from awscli.customizations.s3 import transferconfig


RECURSIVE = {'name': 'recursive', 'action': 'store_true', 'dest': 'dir_op',
Expand Down Expand Up @@ -468,8 +469,11 @@ def _run_main(self, parsed_args, parsed_globals):
cmd_params.add_page_size(parsed_args)
cmd_params.add_paths(parsed_args.paths)
self._handle_rm_force(parsed_globals, cmd_params.parameters)
runtime_config = transferconfig.RuntimeConfig().build_config(
**self._session.get_scoped_config().get('s3', {}))
cmd = CommandArchitecture(self._session, self.NAME,
cmd_params.parameters)
cmd_params.parameters,
runtime_config)
cmd.set_endpoints()
cmd.create_instructions()
return cmd.run()
Expand Down Expand Up @@ -585,11 +589,12 @@ class CommandArchitecture(object):
lsit of instructions to wire together an assortment of generators to
perform the command.
"""
def __init__(self, session, cmd, parameters):
def __init__(self, session, cmd, parameters, runtime_config=None):
self.session = session
self.cmd = cmd
self.parameters = parameters
self.instructions = []
self._runtime_config = runtime_config
self._service = self.session.get_service('s3')
self._endpoint = None
self._source_endpoint = None
Expand Down Expand Up @@ -731,6 +736,7 @@ def run(self):
self._service, self._endpoint,
self._source_endpoint, self.parameters)
s3handler = S3Handler(self.session, self.parameters,
runtime_config=self._runtime_config,
result_queue=result_queue)
s3_stream_handler = S3StreamHandler(self.session, self.parameters,
result_queue=result_queue)
Expand Down
9 changes: 0 additions & 9 deletions awscli/customizations/s3/transferconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,7 @@
'multipart_chunksize': 8 * (1024 ** 2),
'max_concurrent_requests': 10,
'max_queue_size': 1000,
# TODO: do we need this now that we have sentinels? Need to double check.
# I think this was exposed before because you'd have a 0.2s delay when
# running some of the S3 tests.
'queue_timeout_wait': 0.2,
}
MULTI_THRESHOLD = 8 * (1024 ** 2)
CHUNKSIZE = 7 * (1024 ** 2)
NUM_THREADS = 10
QUEUE_TIMEOUT_WAIT = 0.2
MAX_QUEUE_SIZE = 1000


class InvalidConfigError(Exception):
Expand Down
8 changes: 5 additions & 3 deletions tests/integration/customizations/s3/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from awscli.testutils import unittest, FileCreator, get_stdout_encoding
from awscli.testutils import aws as _aws
from tests.unit.customizations.s3 import create_bucket as _create_bucket
from awscli.customizations.s3 import constants
from awscli.customizations.s3.transferconfig import DEFAULTS


@contextlib.contextmanager
Expand Down Expand Up @@ -1177,7 +1177,9 @@ def test_dryrun_download_large_file(self):
class TestMemoryUtilization(BaseS3CLICommand):
# These tests verify the memory utilization and growth are what we expect.
def extra_setup(self):
expected_memory_usage = constants.NUM_THREADS * constants.CHUNKSIZE
self.num_threads = DEFAULTS['max_concurrent_requests']
self.chunk_size = DEFAULTS['multipart_chunksize']
expected_memory_usage = self.num_threads * self.chunk_size
# margin for things like python VM overhead, botocore service
# objects, etc. 1.5 is really generous, perhaps over time this can be
# lowered.
Expand Down Expand Up @@ -1234,7 +1236,7 @@ def test_stream_large_file(self):
# is increased by two chunksizes because that is the maximum
# amount of chunks that will be queued while not being operated on
# by a thread when performing a streaming multipart upload.
max_mem_allowed = self.max_mem_allowed + 2 * constants.CHUNKSIZE
max_mem_allowed = self.max_mem_allowed + 2 * self.chunk_size

full_command = 's3 cp - s3://%s/foo.txt' % bucket_name
with open(foo_txt, 'rb') as f:
Expand Down
13 changes: 1 addition & 12 deletions tests/unit/customizations/s3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,7 @@


class S3HandlerBaseTest(unittest.TestCase):
"""
This class is used to patch the wait() calls used by the queues.
This makes the tests much faster because the wait is a significantly
shorter amount of time.
"""
def setUp(self):
wait = 'awscli.customizations.s3.transferconfig.QUEUE_TIMEOUT_WAIT'
self.wait_timeout_patch = patch(wait, 0.01)
self.mock_wait = self.wait_timeout_patch.start()

def tearDown(self):
self.wait_timeout_patch.stop()
pass


def make_loc_files():
Expand Down
90 changes: 73 additions & 17 deletions tests/unit/customizations/s3/test_s3handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,17 @@
from awscli.customizations.s3.tasks import CreateMultipartUploadTask, \
UploadPartTask, CreateLocalFileTask
from awscli.customizations.s3.utils import MAX_PARTS
from awscli.customizations.s3.transferconfig import RuntimeConfig
from tests.unit.customizations.s3.fake_session import FakeSession
from tests.unit.customizations.s3 import make_loc_files, clean_loc_files, \
make_s3_files, s3_cleanup, create_bucket, list_contents, list_buckets, \
S3HandlerBaseTest, MockStdIn


def runtime_config(**kwargs):
return RuntimeConfig().build_config(**kwargs)


class S3HandlerTestDeleteList(S3HandlerBaseTest):
"""
This tests the ability to delete both files locally and in s3.
Expand Down Expand Up @@ -147,9 +152,10 @@ def setUp(self):
self.endpoint = self.service.get_endpoint('us-east-1')
params = {'region': 'us-east-1', 'acl': ['private'], 'quiet': True}
self.s3_handler = S3Handler(self.session, params)
self.s3_handler_multi = S3Handler(self.session, multi_threshold=10,
chunksize=2,
params=params)
self.s3_handler_multi = S3Handler(
self.session, params=params,
runtime_config=runtime_config(
multipart_threshold=10, multipart_chunksize=2))
self.bucket = create_bucket(self.session)
self.loc_files = make_loc_files()
self.s3_files = [self.bucket + '/text1.txt',
Expand Down Expand Up @@ -281,8 +287,10 @@ def setUp(self):
self.service = self.session.get_service('s3')
self.endpoint = self.service.get_endpoint('us-east-1')
params = {'region': 'us-east-1', 'quiet': True}
self.s3_handler_multi = S3Handler(self.session, params,
multi_threshold=10, chunksize=2)
self.s3_handler_multi = S3Handler(
self.session, params,
runtime_config=runtime_config(
multipart_threshold=10, multipart_chunksize=2))
self.bucket = create_bucket(self.session)
self.loc_files = make_loc_files()
self.s3_files = [self.bucket + '/text1.txt',
Expand Down Expand Up @@ -472,8 +480,10 @@ def setUp(self):
self.endpoint = self.service.get_endpoint('us-east-1')
params = {'region': 'us-east-1'}
self.s3_handler = S3Handler(self.session, params)
self.s3_handler_multi = S3Handler(self.session, params,
multi_threshold=10, chunksize=2)
self.s3_handler_multi = S3Handler(
self.session, params,
runtime_config=runtime_config(multipart_threshold=10,
multipart_chunksize=2))
self.bucket = make_s3_files(self.session)
self.s3_files = [self.bucket + '/text1.txt',
self.bucket + '/another_directory/text2.txt']
Expand All @@ -485,9 +495,11 @@ def setUp(self):

self.fail_session = FakeSession(connection_error=True)
self.fail_session.s3 = self.session.s3
self.s3_handler_multi_except = S3Handler(self.fail_session, params,
multi_threshold=10,
chunksize=2)
self.s3_handler_multi_except = S3Handler(
self.fail_session, params,
runtime_config=runtime_config(
multipart_threshold=10,
multipart_chunksize=2))

def tearDown(self):
super(S3HandlerTestDownload, self).tearDown()
Expand Down Expand Up @@ -626,7 +638,9 @@ def setUp(self):
self.params = {'is_stream': True, 'region': 'us-east-1'}

def test_pull_from_stream(self):
s3handler = S3StreamHandler(self.session, self.params, chunksize=2)
s3handler = S3StreamHandler(
self.session, self.params,
runtime_config=runtime_config(multipart_chunksize=2))
input_to_stdin = b'This is a test'
size = len(input_to_stdin)
# Retrieve the entire string.
Expand Down Expand Up @@ -667,8 +681,9 @@ def test_upload_stream_not_multipart_task(self):
b'bar')

def test_upload_stream_is_multipart_task(self):
s3handler = S3StreamHandler(self.session, self.params,
multi_threshold=1)
s3handler = S3StreamHandler(
self.session, self.params,
runtime_config=runtime_config(multipart_threshold=1))
s3handler.executor = mock.Mock()
fileinfos = [FileInfo('filename', operation_name='upload',
is_stream=True, size=0)]
Expand All @@ -687,7 +702,9 @@ def test_upload_stream_with_expected_size(self):
self.params['expected_size'] = 100000
# With this large of expected size, the chunksize of 2 will have
# to change.
s3handler = S3StreamHandler(self.session, self.params, chunksize=2)
s3handler = S3StreamHandler(
self.session, self.params,
runtime_config=runtime_config(multipart_chunksize=2))
s3handler.executor = mock.Mock()
fileinfo = FileInfo('filename', operation_name='upload',
is_stream=True)
Expand Down Expand Up @@ -745,8 +762,9 @@ def test_enqueue_multipart_download_stream(self):
This test ensures the right calls are made in ``_enqueue_tasks()``
if the file should be a multipart download.
"""
s3handler = S3StreamHandler(self.session, self.params,
multi_threshold=5)
s3handler = S3StreamHandler(
self.session, self.params,
runtime_config=runtime_config(multipart_threshold=5))
s3handler.executor = mock.Mock()
fileinfo = FileInfo('filename', operation_name='download',
is_stream=True)
Expand All @@ -768,7 +786,9 @@ def test_enqueue_multipart_download_stream(self):
self.assertTrue(mock_enqueue_range_tasks.called)

def test_enqueue_range_download_tasks_stream(self):
s3handler = S3StreamHandler(self.session, self.params, chunksize=100)
s3handler = S3StreamHandler(
self.session, self.params,
runtime_config=runtime_config(multipart_chunksize=100))
s3handler.executor = mock.Mock()
fileinfo = FileInfo('filename', operation_name='download',
is_stream=True, size=100)
Expand All @@ -779,5 +799,41 @@ def test_enqueue_range_download_tasks_stream(self):
CreateLocalFileTask)


class TestS3HandlerInitialization(unittest.TestCase):
def setUp(self):
self.arbitrary_params = {'region': 'us-west-2'}

def test_num_threads_is_plumbed_through(self):
num_threads_override = 20

config = runtime_config(max_concurrent_requests=num_threads_override)
handler = S3Handler(session=None, params=self.arbitrary_params,
runtime_config=config)

self.assertEqual(handler.executor.num_threads, num_threads_override)

def test_queue_size_is_plumbed_through(self):
max_queue_size_override = 10000

config = runtime_config(max_queue_size=max_queue_size_override)
handler = S3Handler(session=None, params=self.arbitrary_params,
runtime_config=config)

self.assertEqual(handler.executor.queue.maxsize,
max_queue_size_override)

def test_runtime_config_from_attrs(self):
# These are attrs that are set directly on S3Handler,
# not on some dependent object
config = runtime_config(
multipart_chunksize=1000,
multipart_threshold=10000)
handler = S3Handler(session=None, params=self.arbitrary_params,
runtime_config=config)

self.assertEqual(handler.chunksize, 1000)
self.assertEqual(handler.multi_threshold, 10000)


if __name__ == "__main__":
unittest.main()
7 changes: 4 additions & 3 deletions tests/unit/customizations/s3/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,10 @@ def test_retried_requests_dont_enqueue_writes_twice(self):
success_read,
]
self.filename.is_stream = True
task = DownloadPartTask(0, transferconfig.CHUNKSIZE, self.result_queue,
self.service, self.filename, self.context,
self.io_queue)
task = DownloadPartTask(
0, transferconfig.DEFAULTS['multipart_chunksize'],
self.result_queue, self.service,
self.filename, self.context, self.io_queue)
task()
call_args_list = self.io_queue.put.call_args_list
self.assertEqual(len(call_args_list), 1)
Expand Down

0 comments on commit db24830

Please sign in to comment.