Skip to content

Commit

Permalink
Cut down memory usage to mutipart upload a stream.
Browse files Browse the repository at this point in the history
  • Loading branch information
kyleknap committed Sep 15, 2014
1 parent 676e6be commit 19ea686
Show file tree
Hide file tree
Showing 14 changed files with 323 additions and 233 deletions.
9 changes: 4 additions & 5 deletions awscli/customizations/s3/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,10 @@ def run(self):
elif isinstance(task, IOCloseRequest):
LOGGER.debug("IOCloseRequest received for %s, closing file.",
task.filename)
if not task.is_stream:
fileobj = self.fd_descriptor_cache.get(task.filename)
if fileobj is not None:
fileobj.close()
del self.fd_descriptor_cache[task.filename]
fileobj = self.fd_descriptor_cache.get(task.filename)
if fileobj is not None:
fileobj.close()
del self.fd_descriptor_cache[task.filename]

def _cleanup(self):
for fileobj in self.fd_descriptor_cache.values():
Expand Down
47 changes: 10 additions & 37 deletions awscli/customizations/s3/filegenerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
from dateutil.tz import tzlocal

from awscli.customizations.s3.utils import find_bucket_key, get_file_stat
from awscli.customizations.s3.utils import BucketLister, create_warning
from awscli.customizations.s3.utils import BucketLister, create_warning, \
find_dest_path_comp_key
from awscli.errorhandler import ClientError


Expand Down Expand Up @@ -95,7 +96,7 @@ def __init__(self, directory, filename):
class FileStat(object):
def __init__(self, src, dest=None, compare_key=None, size=None,
last_update=None, src_type=None, dest_type=None,
operation_name=None, is_stream=False):
operation_name=None):
self.src = src
self.dest = dest
self.compare_key = compare_key
Expand All @@ -104,7 +105,6 @@ def __init__(self, src, dest=None, compare_key=None, size=None,
self.src_type = src_type
self.dest_type = dest_type
self.operation_name = operation_name
self.is_stream = is_stream


class FileGenerator(object):
Expand All @@ -116,8 +116,7 @@ class FileGenerator(object):
``FileInfo`` objects to send to a ``Comparator`` or ``S3Handler``.
"""
def __init__(self, service, endpoint, operation_name,
follow_symlinks=True, page_size=None, result_queue=None,
is_stream=False):
follow_symlinks=True, page_size=None, result_queue=None):
self._service = service
self._endpoint = endpoint
self.operation_name = operation_name
Expand All @@ -126,51 +125,25 @@ def __init__(self, service, endpoint, operation_name,
self.result_queue = result_queue
if not result_queue:
self.result_queue = queue.Queue()
self.is_stream = is_stream

def call(self, files):
"""
This is the generalized function to yield the ``FileInfo`` objects.
``dir_op`` and ``use_src_name`` flags affect which files are used and
ensure the proper destination paths and compare keys are formed.
"""
src = files['src']
dest = files['dest']
src_type = src['type']
dest_type = dest['type']
function_table = {'s3': self.list_objects}
if self.is_stream:
function_table['local'] = self.list_local_file_stream
else:
function_table['local'] = self.list_files
sep_table = {'s3': '/', 'local': os.sep}
source = src['path']
function_table = {'s3': self.list_objects, 'local': self.list_files}
source = files['src']['path']
src_type = files['src']['type']
dest_type = files['dest']['type']
file_list = function_table[src_type](source, files['dir_op'])
for src_path, size, last_update in file_list:
if files['dir_op']:
rel_path = src_path[len(src['path']):]
else:
rel_path = src_path.split(sep_table[src_type])[-1]
compare_key = rel_path.replace(sep_table[src_type], '/')
if files['use_src_name']:
dest_path = dest['path']
dest_path += rel_path.replace(sep_table[src_type],
sep_table[dest_type])
else:
dest_path = dest['path']
dest_path, compare_key = find_dest_path_comp_key(files, src_path)
yield FileStat(src=src_path, dest=dest_path,
compare_key=compare_key, size=size,
last_update=last_update, src_type=src_type,
dest_type=dest_type,
operation_name=self.operation_name,
is_stream=self.is_stream)

def list_local_file_stream(self, path, dir_op):
"""
Yield some dummy values for a local file stream since it does not
actually have a file.
"""
yield '-', 0, None
operation_name=self.operation_name)

def list_files(self, path, dir_op):
"""
Expand Down
11 changes: 11 additions & 0 deletions awscli/customizations/s3/fileinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,17 @@ def __init__(self, src, dest=None, compare_key=None, size=None,
self.source_endpoint = source_endpoint
self.is_stream = is_stream

def set_size_from_s3(self):
"""
This runs a ``HeadObject`` on the s3 object and sets the size.
"""
bucket, key = find_bucket_key(self.src)
params = {'endpoint': self.endpoint,
'bucket': bucket,
'key': key}
response_data, http = operate(self.service, 'HeadObject', params)
self.size = int(response_data['ContentLength'])

def _permission_to_param(self, permission):
if permission == 'read':
return 'grant_read'
Expand Down
7 changes: 4 additions & 3 deletions awscli/customizations/s3/fileinfobuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ class FileInfoBuilder(object):
a ``FileInfo`` object so that the operation can be performed.
"""
def __init__(self, service, endpoint, source_endpoint=None,
parameters = None):
parameters = None, is_stream=False):
self._service = service
self._endpoint = endpoint
self._source_endpoint = endpoint
if source_endpoint:
self._source_endpoint = source_endpoint
self._parameters = parameters
self._parameters = parameters
self._is_stream = is_stream

def call(self, files):
for file_base in files:
Expand All @@ -42,9 +43,9 @@ def _inject_info(self, file_base):
file_info_attr['src_type'] = file_base.src_type
file_info_attr['dest_type'] = file_base.dest_type
file_info_attr['operation_name'] = file_base.operation_name
file_info_attr['is_stream'] = file_base.is_stream
file_info_attr['service'] = self._service
file_info_attr['endpoint'] = self._endpoint
file_info_attr['source_endpoint'] = self._source_endpoint
file_info_attr['parameters'] = self._parameters
file_info_attr['is_stream'] = self._is_stream
return FileInfo(**file_info_attr)
Loading

0 comments on commit 19ea686

Please sign in to comment.