diff --git a/requirements.txt b/requirements.txt index cccfe96d..613791fa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,3 +10,4 @@ inquirer==2.7.0 Pygments==2.10.0 pyfiglet==0.8.post1 boto3 +backoff==1.11.1 diff --git a/runzi/util/aws/s3_folder_upload.py b/runzi/util/aws/s3_folder_upload.py index 650df90d..1434b82a 100644 --- a/runzi/util/aws/s3_folder_upload.py +++ b/runzi/util/aws/s3_folder_upload.py @@ -1,6 +1,7 @@ from botocore.errorfactory import ClientError import boto3 import boto3.session +from botocore.exceptions import ClientError import os from multiprocessing.pool import ThreadPool import datetime as dt @@ -8,17 +9,27 @@ import mimetypes import logging from logging import info, error +import backoff +import botocore.exceptions from runzi.automation.scaling.local_config import WORK_PATH, S3_UPLOAD_WORKERS -logging.basicConfig(level="INFO") +logger = logging.getLogger(__name__) +logger.setLevel('INFO') + S3_REPORT_BUCKET_ROOT = 'opensha/DATA' +def check_if_slowdown(e): + try: + return e.response['Error']['Code'] == 'SlowDown' + except AttributeError: + return False + def upload_to_bucket(id, bucket, root_path=S3_REPORT_BUCKET_ROOT): - info(f"Beginning bucket upload... to {bucket}/{root_path}/{id}") + logger.info(f"Beginning bucket upload... to {bucket}/{root_path}/{id}") t0 = dt.datetime.utcnow() - local_directory = WORK_PATH + '/' + id + local_directory = os.path.join(WORK_PATH, id) session = boto3.session.Session() client = session.client('s3') file_list = [] @@ -28,14 +39,15 @@ def upload_to_bucket(id, bucket, root_path=S3_REPORT_BUCKET_ROOT): local_path = os.path.join(root, filename) relative_path = os.path.relpath(local_path, local_directory) s3_path = os.path.join(root_path, id, relative_path) - file_list.append((local_path, bucket, s3_path)) + + @backoff.on_predicate(backoff.expo, check_if_slowdown, logger=logger) def upload(args): """Map function for pool, uploads to S3 Bucket if it doesn't exist already""" local_path, bucket, s3_path = args[0], args[1], args[2] if path_exists(s3_path, bucket): - info("Path found on S3! Skipping %s to %s" % (s3_path, bucket)) + logger.info("Path found on S3! Skipping %s to %s" % (s3_path, bucket)) else: try: @@ -44,10 +56,10 @@ def upload(args): 'ACL':'public-read', 'ContentType': mimetype(local_path) }) - info("Uploading %s..." % s3_path) - except Exception as e: - error(f"exception raised uploading {local_path} => {bucket}/{s3_path}") - error(e) + logger.info("Uploading %s..." % s3_path) + except ClientError as e: + logger.error(f"exception raised uploading {local_path} => {bucket}/{s3_path}") + logger.error(e) def path_exists(path, bucket_name): """Check to see if an object exists on S3""" @@ -61,7 +73,7 @@ def path_exists(path, bucket_name): if path == obj['Key']: return True except ClientError as e: - error(f"exception raised on {bucket_name}/{path}") + logger.error(f"exception raised on {bucket_name}/{path}") raise e @@ -70,15 +82,15 @@ def path_exists(path, bucket_name): pool.close() pool.join() - info("Done! uploaded %s in %s secs" % (len(file_list), (dt.datetime.utcnow() - t0).total_seconds())) + logger.info("Done! uploaded %s in %s secs" % (len(file_list), (dt.datetime.utcnow() - t0).total_seconds())) cleanup(local_directory) def cleanup(directory): try: shutil.rmtree(directory) - info('Cleaned up %s' % directory) + logger.info('Cleaned up %s' % directory) except Exception as e: - error(e) + logger.error(e) def mimetype(local_path): mimetypes.add_type('text/markdown', '.md')