Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add config variables for changing S3 runtime values #1122

Closed
wants to merge 12 commits into from
20 changes: 0 additions & 20 deletions awscli/customizations/s3/constants.py

This file was deleted.

3 changes: 3 additions & 0 deletions awscli/customizations/s3/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class Executor(object):
def __init__(self, num_threads, result_queue, quiet,
only_show_errors, max_queue_size, write_queue):
self._max_queue_size = max_queue_size
LOGGER.debug("Using max queue size for s3 tasks of: %s",
self._max_queue_size)
self.queue = StablePriorityQueue(maxsize=self._max_queue_size,
max_priority=20)
self.num_threads = num_threads
Expand Down Expand Up @@ -78,6 +80,7 @@ def start(self):
# explicit about it rather than relying on the threads_list order.
# See .join() for more info.
self.print_thread.start()
LOGGER.debug("Using a threadpool size of: %s", self.num_threads)
for i in range(self.num_threads):
worker = Worker(queue=self.queue)
worker.setDaemon(True)
Expand Down
42 changes: 29 additions & 13 deletions awscli/customizations/s3/s3handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,22 @@
import os
import sys

from awscli.customizations.s3.constants import MULTI_THRESHOLD, CHUNKSIZE, \
NUM_THREADS, MAX_UPLOAD_SIZE, MAX_QUEUE_SIZE
from awscli.customizations.s3.utils import find_chunksize, \
operate, find_bucket_key, relative_path, PrintTask, create_warning
from awscli.customizations.s3.executor import Executor
from awscli.customizations.s3 import tasks
from awscli.customizations.s3.transferconfig import RuntimeConfig
from awscli.compat import six
from awscli.compat import queue


LOGGER = logging.getLogger(__name__)
# Maximum object size allowed in S3.
# See: http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html
MAX_UPLOAD_SIZE = 5 * (1024 ** 4)

CommandResult = namedtuple('CommandResult',
['num_tasks_failed', 'num_tasks_warned'])
['num_tasks_failed', 'num_tasks_warned'])


class S3Handler(object):
Expand All @@ -39,12 +41,13 @@ class S3Handler(object):
class pull tasks from to complete.
"""
MAX_IO_QUEUE_SIZE = 20
MAX_EXECUTOR_QUEUE_SIZE = MAX_QUEUE_SIZE
EXECUTOR_NUM_THREADS = NUM_THREADS

def __init__(self, session, params, result_queue=None,
multi_threshold=MULTI_THRESHOLD, chunksize=CHUNKSIZE):
runtime_config=None):
self.session = session
if runtime_config is None:
runtime_config = RuntimeConfig.defaults()
self._runtime_config = runtime_config
# The write_queue has potential for optimizations, so the constant
# for maxsize is scoped to this class (as opposed to constants.py)
# so we have the ability to change this value later.
Expand All @@ -65,14 +68,16 @@ def __init__(self, session, params, result_queue=None,
for key in self.params.keys():
if key in params:
self.params[key] = params[key]
self.multi_threshold = multi_threshold
self.chunksize = chunksize
self.multi_threshold = self._runtime_config['multipart_threshold']
self.chunksize = self._runtime_config['multipart_chunksize']
LOGGER.debug("Using a multipart threshold of %s and a part size of %s",
self.multi_threshold, self.chunksize)
self.executor = Executor(
num_threads=self.EXECUTOR_NUM_THREADS,
num_threads=self._runtime_config['max_concurrent_requests'],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be

num_threads=self._runtime_config['max_concurrent_requests']-1

Because the main thread is acting as a producer by calling the ListObjects operation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After initially implementing that, I ended up making this just match the number of threads. The main reason is that if we do add max - 1, then the minimum value needs to be 2, which I think would confuse customers. It's also not intuitive that you can't set this value to 1. If a user wants to "turn off" parallelism, you'd think you'd set max_concurrent_requests to 1, but they would need to set max_concurrent_requests to 2. I could see us getting questions about that.

Another option is to rename this to be more explicit. If we called it max_transfer_requests maybe that would clear up what this is doing. Although I think max_concurrent_requests is more clear than max_transfer_threads. What do you think?

Or another option would be to be clear in our documentation what this means. It's the number of concurrent requests uploading/downloading/copying to s3. It does not include any ListObjects calls.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I like that last option the best. If I had no knowledge of the internals of the s3 commands, I would also find it confusing that the lowest that you can make it is 2. I do not like max_transfer_requests because it is ambiguous (it seems to imply that we will at most do that maximum amount of transfers for the one command).

result_queue=self.result_queue,
quiet=self.params['quiet'],
only_show_errors=self.params['only_show_errors'],
max_queue_size=self.MAX_EXECUTOR_QUEUE_SIZE,
max_queue_size=self._runtime_config['max_queue_size'],
write_queue=self.write_queue
)
self._multipart_uploads = []
Expand Down Expand Up @@ -111,7 +116,7 @@ def call(self, files):
priority=self.executor.IMMEDIATE_PRIORITY)
self._shutdown()
self.executor.wait_until_shutdown()

return CommandResult(self.executor.num_tasks_failed,
self.executor.num_tasks_warned)

Expand Down Expand Up @@ -350,12 +355,23 @@ class S3StreamHandler(S3Handler):
involves a stream since the logic is different when uploading and
downloading streams.
"""

# This ensures that the number of multipart chunks waiting in the
# executor queue and in the threads is limited.
MAX_EXECUTOR_QUEUE_SIZE = 2
EXECUTOR_NUM_THREADS = 6

def __init__(self, session, params, result_queue=None,
runtime_config=None):
if runtime_config is None:
# Rather than using the .defaults(), streaming
# has different default values so that it does not
# consume large amounts of memory.
runtime_config = RuntimeConfig().build_config(
max_queue_size=self.MAX_EXECUTOR_QUEUE_SIZE,
max_concurrent_requests=self.EXECUTOR_NUM_THREADS)
super(S3StreamHandler, self).__init__(session, params, result_queue,
runtime_config)

def _enqueue_tasks(self, files):
total_files = 0
total_parts = 0
Expand Down Expand Up @@ -490,7 +506,7 @@ def _enqueue_upload_tasks(self, num_uploads, chunksize, upload_context,
task_class=task_class,
payload=payload
)
num_uploads += 1
num_uploads += 1
if not is_remaining:
break
# Once there is no more data left, announce to the context how
Expand Down
Loading