Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for warning about and skipping files. #881

Merged
merged 3 commits into from
Aug 19, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ CHANGELOG
Next Release (TBD)
==================

* bugfix:``aws s3``: Added support for ignoring and warning
about files that do not exist, user does not have read
permissions, or are special files (i.e. sockets, FIFOs,
character special devices, and block special devices)
(`issue 881 <https://github.com/aws/aws-cli/pull/881>`__)
* feature:Parameter Shorthand: Added support for
``structure(list-scalar, scalar)`` parameter shorthand.
(`issue 882 <https://github.com/aws/aws-cli/pull/882>`__)
Expand Down
29 changes: 24 additions & 5 deletions awscli/customizations/s3/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ def num_tasks_failed(self):
tasks_failed = self.print_thread.num_errors_seen
return tasks_failed

@property
def num_tasks_warned(self):
tasks_warned = 0
if self.print_thread is not None:
tasks_warned = self.print_thread.num_warnings_seen
return tasks_warned

def start(self):
self.io_thread.start()
# Note that we're *not* adding the IO thread to the threads_list.
Expand Down Expand Up @@ -206,14 +213,17 @@ class PrintThread(threading.Thread):
Result Queue
------------

Result queue items are dictionaries that have the following keys:
Result queue items are PrintTask objects that have the following
attributes:

* message: An arbitrary string associated with the entry. This
can be used to communicate the result of the task.
* error: Boolean indicating whether or not the task completely
successfully.
* total_parts: The total number of parts for multipart transfers (
deprecated, will be removed in the future).
* warning: Boolean indicating whether or not a file generated a
warning.

"""
def __init__(self, result_queue, quiet):
Expand All @@ -233,6 +243,7 @@ def __init__(self, result_queue, quiet):
# This is a public attribute that clients can inspect to determine
# whether or not we saw any results indicating that an error occurred.
self.num_errors_seen = 0
self.num_warnings_seen = 0

def set_total_parts(self, total_parts):
with self._lock:
Expand Down Expand Up @@ -262,16 +273,24 @@ def run(self):
pass

def _process_print_task(self, print_task):
print_str = print_task['message']
if print_task['error']:
print_str = print_task.message
if print_task.error:
self.num_errors_seen += 1
warning = False
if print_task.warning:
if print_task.warning:
warning = True
self.num_warnings_seen += 1
final_str = ''
if 'total_parts' in print_task:
if warning:
final_str += print_str.ljust(self._progress_length, ' ')
final_str += '\n'
elif print_task.total_parts:
# Normalize keys so failures and sucess
# look the same.
op_list = print_str.split(':')
print_str = ':'.join(op_list[1:])
total_part = print_task['total_parts']
total_part = print_task.total_parts
self._num_parts += 1
if print_str in self._progress_dict:
self._progress_dict[print_str]['parts'] += 1
Expand Down
83 changes: 80 additions & 3 deletions awscli/customizations/s3/filegenerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,60 @@
# language governing permissions and limitations under the License.
import os
import sys
import stat

import six
from six.moves import queue
from dateutil.parser import parse
from dateutil.tz import tzlocal

from awscli.customizations.s3.utils import find_bucket_key, get_file_stat
from awscli.customizations.s3.utils import BucketLister
from awscli.customizations.s3.utils import BucketLister, create_warning
from awscli.errorhandler import ClientError


def is_special_file(path):
"""
This function checks to see if a special file. It checks if the
file is a character special device, block special device, FIFO, or
socket.
"""
mode = os.stat(path).st_mode
# Character special device.
if stat.S_ISCHR(mode):
return True
# Block special device
if stat.S_ISBLK(mode):
return True
# FIFO.
if stat.S_ISFIFO(mode):
return True
# Socket.
if stat.S_ISSOCK(mode):
return True
return False


def is_readable(path):
"""
This function checks to see if a file or a directory can be read.
This is tested by performing an operation that requires read access
on the file or the directory.
"""
if os.path.isdir(path):
try:
os.listdir(path)
except (OSError, IOError):
return False
else:
try:
with open(path, 'r') as fd:
pass
except (OSError, IOError):
return False
return True


# This class is provided primarily to provide a detailed error message.

class FileDecodingError(Exception):
Expand Down Expand Up @@ -68,11 +112,14 @@ class FileGenerator(object):
``FileInfo`` objects to send to a ``Comparator`` or ``S3Handler``.
"""
def __init__(self, service, endpoint, operation_name,
follow_symlinks=True):
follow_symlinks=True, result_queue=None):
self._service = service
self._endpoint = endpoint
self.operation_name = operation_name
self.follow_symlinks = follow_symlinks
self.result_queue = result_queue
if not result_queue:
self.result_queue = queue.Queue()

def call(self, files):
"""
Expand Down Expand Up @@ -178,14 +225,44 @@ def should_ignore_file(self, path):
"""
This function checks whether a file should be ignored in the
file generation process. This includes symlinks that are not to be
followed.
followed and files that generate warnings.
"""
if not self.follow_symlinks:
if os.path.isdir(path) and path.endswith(os.sep):
# Trailing slash must be removed to check if it is a symlink.
path = path[:-1]
if os.path.islink(path):
return True
warning_triggered = self.triggers_warning(path)
if warning_triggered:
return True
return False

def triggers_warning(self, path):
"""
This function checks the specific types and properties of a file.
If the file would cause trouble, the function adds a
warning to the result queue to be printed out and returns a boolean
value notify whether the file caused a warning to be generated.
Files that generate warnings are skipped. Currently, this function
checks for files that do not exist and files that the user does
not have read access.
"""
if not os.path.exists(path):
warning = create_warning(path, "File does not exist.")
self.result_queue.put(warning)
return True
if is_special_file(path):
warning = create_warning(path,
("File is character special device, "
"block special device, FIFO, or "
"socket."))
self.result_queue.put(warning)
return True
if not is_readable(path):
warning = create_warning(path, "File/Directory is not readable.")
self.result_queue.put(warning)
return True
return False

def list_objects(self, s3_path, dir_op):
Expand Down
33 changes: 22 additions & 11 deletions awscli/customizations/s3/s3handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
from collections import namedtuple
import logging
import math
import os
Expand All @@ -18,12 +19,15 @@
from awscli.customizations.s3.constants import MULTI_THRESHOLD, CHUNKSIZE, \
NUM_THREADS, MAX_UPLOAD_SIZE, MAX_QUEUE_SIZE
from awscli.customizations.s3.utils import find_chunksize, \
operate, find_bucket_key, relative_path
operate, find_bucket_key, relative_path, PrintTask, create_warning
from awscli.customizations.s3.executor import Executor
from awscli.customizations.s3 import tasks

LOGGER = logging.getLogger(__name__)

CommandResult = namedtuple('CommandResult',
['num_tasks_failed', 'num_tasks_warned'])


class S3Handler(object):
"""
Expand All @@ -33,14 +37,16 @@ class pull tasks from to complete.
"""
MAX_IO_QUEUE_SIZE = 20

def __init__(self, session, params, multi_threshold=MULTI_THRESHOLD,
chunksize=CHUNKSIZE):
def __init__(self, session, params, result_queue=None,
multi_threshold=MULTI_THRESHOLD, chunksize=CHUNKSIZE):
self.session = session
self.result_queue = queue.Queue()
# The write_queue has potential for optimizations, so the constant
# for maxsize is scoped to this class (as opposed to constants.py)
# so we have the ability to change this value later.
self.write_queue = queue.Queue(maxsize=self.MAX_IO_QUEUE_SIZE)
self.result_queue = result_queue
if not self.result_queue:
self.result_queue = queue.Queue()
self.params = {'dryrun': False, 'quiet': False, 'acl': None,
'guess_mime_type': True, 'sse': False,
'storage_class': None, 'website_redirect': None,
Expand Down Expand Up @@ -82,19 +88,22 @@ def call(self, files):
except Exception as e:
LOGGER.debug('Exception caught during task execution: %s',
str(e), exc_info=True)
self.result_queue.put({'message': str(e), 'error': True})
self.result_queue.put(PrintTask(message=str(e), error=True))
self.executor.initiate_shutdown(
priority=self.executor.IMMEDIATE_PRIORITY)
self._shutdown()
self.executor.wait_until_shutdown()
except KeyboardInterrupt:
self.result_queue.put({'message': "Cleaning up. Please wait...",
'error': True})
self.result_queue.put(PrintTask(message=("Cleaning up. "
"Please wait..."),
error=True))
self.executor.initiate_shutdown(
priority=self.executor.IMMEDIATE_PRIORITY)
self._shutdown()
self.executor.wait_until_shutdown()
return self.executor.num_tasks_failed

return CommandResult(self.executor.num_tasks_failed,
self.executor.num_tasks_warned)

def _shutdown(self):
# And finally we need to make a pass through all the existing
Expand Down Expand Up @@ -158,9 +167,10 @@ def _enqueue_tasks(self, files):
if hasattr(filename, 'size'):
too_large = filename.size > MAX_UPLOAD_SIZE
if too_large and filename.operation_name == 'upload':
warning = "Warning %s exceeds 5 TB and upload is " \
"being skipped" % relative_path(filename.src)
self.result_queue.put({'message': warning, 'error': True})
warning_message = "File exceeds s3 upload limit of 5 TB."
warning = create_warning(relative_path(filename.src),
message=warning_message)
self.result_queue.put(warning)
elif is_multipart_task and not self.params['dryrun']:
# If we're in dryrun mode, then we don't need the
# real multipart tasks. We can just use a BasicTask
Expand Down Expand Up @@ -300,3 +310,4 @@ def _enqueue_upload_end_task(self, filename, upload_context):
result_queue=self.result_queue, upload_context=upload_context)
self.executor.submit(complete_multipart_upload_task)
self._multipart_uploads.append((upload_context, filename))

25 changes: 17 additions & 8 deletions awscli/customizations/s3/subcommands.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# language governing permissions and limitations under the License.
import os
import six
from six.moves import queue
import sys

from dateutil.parser import parse
Expand Down Expand Up @@ -554,21 +555,25 @@ def run(self):
'mb': 'make_bucket',
'rb': 'remove_bucket'
}
result_queue = queue.Queue()
operation_name = cmd_translation[paths_type][self.cmd]
file_generator = FileGenerator(self._service,
self._source_endpoint,
operation_name,
self.parameters['follow_symlinks'])
self.parameters['follow_symlinks'],
result_queue=result_queue)
rev_generator = FileGenerator(self._service, self._endpoint, '',
self.parameters['follow_symlinks'])
self.parameters['follow_symlinks'],
result_queue=result_queue)
taskinfo = [TaskInfo(src=files['src']['path'],
src_type='s3',
operation_name=operation_name,
service=self._service,
endpoint=self._endpoint)]
file_info_builder = FileInfoBuilder(self._service, self._endpoint,
self._source_endpoint, self.parameters)
s3handler = S3Handler(self.session, self.parameters)
s3handler = S3Handler(self.session, self.parameters,
result_queue=result_queue)

command_dict = {}
if self.cmd == 'sync':
Expand Down Expand Up @@ -620,13 +625,17 @@ def run(self):
# will replaces the files attr with the return value of the
# file_list. The very last call is a single list of
# [s3_handler], and the s3_handler returns the number of
# tasks failed. This means that files[0] now contains
# the number of failed tasks. In terms of the RC, we're
# keeping it simple and saying that > 0 failed tasks
# will give a 1 RC.
# tasks failed and the number of tasks warned.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment above this is out of date. We should get this comment updated.

# This means that files[0] now contains a namedtuple with
# the number of failed tasks and the number of warned tasks.
# In terms of the RC, we're keeping it simple and saying
# that > 0 failed tasks will give a 1 RC and > 0 warned
# tasks will give a 2 RC. Otherwise a RC of zero is returned.
rc = 0
if files[0] > 0:
if files[0].num_tasks_failed > 0:
rc = 1
if files[0].num_tasks_warned > 0:
rc = 2
return rc


Expand Down
Loading