From ea2e4e978a4a151863ebf27758dbfb05fab1bf92 Mon Sep 17 00:00:00 2001 From: benjamineac Date: Wed, 8 Dec 2021 11:01:20 +1300 Subject: [PATCH 1/4] implemented backoff for upload --- requirements.txt | 1 + runzi/util/aws/s3_folder_upload.py | 14 ++++++++++++-- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/requirements.txt b/requirements.txt index cccfe96d..613791fa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,3 +10,4 @@ inquirer==2.7.0 Pygments==2.10.0 pyfiglet==0.8.post1 boto3 +backoff==1.11.1 diff --git a/runzi/util/aws/s3_folder_upload.py b/runzi/util/aws/s3_folder_upload.py index 650df90d..4be25255 100644 --- a/runzi/util/aws/s3_folder_upload.py +++ b/runzi/util/aws/s3_folder_upload.py @@ -1,6 +1,7 @@ from botocore.errorfactory import ClientError import boto3 import boto3.session +from botocore.exceptions import ClientError import os from multiprocessing.pool import ThreadPool import datetime as dt @@ -8,6 +9,8 @@ import mimetypes import logging from logging import info, error +import backoff +import botocore.exceptions from runzi.automation.scaling.local_config import WORK_PATH, S3_UPLOAD_WORKERS @@ -15,6 +18,12 @@ logging.basicConfig(level="INFO") S3_REPORT_BUCKET_ROOT = 'opensha/DATA' +def check_if_slowdown(e): + try: + return e.response['Error']['Code'] == 'SlowDown' + except AttributeError: + return False + def upload_to_bucket(id, bucket, root_path=S3_REPORT_BUCKET_ROOT): info(f"Beginning bucket upload... to {bucket}/{root_path}/{id}") t0 = dt.datetime.utcnow() @@ -28,8 +37,9 @@ def upload_to_bucket(id, bucket, root_path=S3_REPORT_BUCKET_ROOT): local_path = os.path.join(root, filename) relative_path = os.path.relpath(local_path, local_directory) s3_path = os.path.join(root_path, id, relative_path) - file_list.append((local_path, bucket, s3_path)) + + @backoff.on_predicate(backoff.expo, check_if_slowdown) def upload(args): """Map function for pool, uploads to S3 Bucket if it doesn't exist already""" local_path, bucket, s3_path = args[0], args[1], args[2] @@ -45,7 +55,7 @@ def upload(args): 'ContentType': mimetype(local_path) }) info("Uploading %s..." % s3_path) - except Exception as e: + except ClientError as e: error(f"exception raised uploading {local_path} => {bucket}/{s3_path}") error(e) From fea704f13b83c32f85ae39d99cf27ca3b38bc827 Mon Sep 17 00:00:00 2001 From: benjamineac Date: Wed, 8 Dec 2021 11:34:15 +1300 Subject: [PATCH 2/4] logger defined as instance; pass logger to backoff --- runzi/util/aws/s3_folder_upload.py | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/runzi/util/aws/s3_folder_upload.py b/runzi/util/aws/s3_folder_upload.py index 4be25255..7cdf4c6f 100644 --- a/runzi/util/aws/s3_folder_upload.py +++ b/runzi/util/aws/s3_folder_upload.py @@ -15,7 +15,9 @@ from runzi.automation.scaling.local_config import WORK_PATH, S3_UPLOAD_WORKERS -logging.basicConfig(level="INFO") +logger = logging.getLogger(__name__) +logger.setLevel('INFO') + S3_REPORT_BUCKET_ROOT = 'opensha/DATA' def check_if_slowdown(e): @@ -25,7 +27,7 @@ def check_if_slowdown(e): return False def upload_to_bucket(id, bucket, root_path=S3_REPORT_BUCKET_ROOT): - info(f"Beginning bucket upload... to {bucket}/{root_path}/{id}") + logger.info(f"Beginning bucket upload... to {bucket}/{root_path}/{id}") t0 = dt.datetime.utcnow() local_directory = WORK_PATH + '/' + id session = boto3.session.Session() @@ -39,13 +41,13 @@ def upload_to_bucket(id, bucket, root_path=S3_REPORT_BUCKET_ROOT): s3_path = os.path.join(root_path, id, relative_path) file_list.append((local_path, bucket, s3_path)) - @backoff.on_predicate(backoff.expo, check_if_slowdown) + @backoff.on_predicate(backoff.expo, check_if_slowdown, logger=logger) def upload(args): """Map function for pool, uploads to S3 Bucket if it doesn't exist already""" local_path, bucket, s3_path = args[0], args[1], args[2] if path_exists(s3_path, bucket): - info("Path found on S3! Skipping %s to %s" % (s3_path, bucket)) + logger.info("Path found on S3! Skipping %s to %s" % (s3_path, bucket)) else: try: @@ -54,10 +56,10 @@ def upload(args): 'ACL':'public-read', 'ContentType': mimetype(local_path) }) - info("Uploading %s..." % s3_path) + logger.info("Uploading %s..." % s3_path) except ClientError as e: - error(f"exception raised uploading {local_path} => {bucket}/{s3_path}") - error(e) + logger.error(f"exception raised uploading {local_path} => {bucket}/{s3_path}") + logger.error(e) def path_exists(path, bucket_name): """Check to see if an object exists on S3""" @@ -71,7 +73,7 @@ def path_exists(path, bucket_name): if path == obj['Key']: return True except ClientError as e: - error(f"exception raised on {bucket_name}/{path}") + logger.error(f"exception raised on {bucket_name}/{path}") raise e @@ -80,15 +82,15 @@ def path_exists(path, bucket_name): pool.close() pool.join() - info("Done! uploaded %s in %s secs" % (len(file_list), (dt.datetime.utcnow() - t0).total_seconds())) + logger.info("Done! uploaded %s in %s secs" % (len(file_list), (dt.datetime.utcnow() - t0).total_seconds())) cleanup(local_directory) def cleanup(directory): try: shutil.rmtree(directory) - info('Cleaned up %s' % directory) + logger.info('Cleaned up %s' % directory) except Exception as e: - error(e) + logger.error(e) def mimetype(local_path): mimetypes.add_type('text/markdown', '.md') @@ -97,3 +99,5 @@ def mimetype(local_path): if mimetype is None: raise Exception("Failed to guess mimetype") return mimetype + +upload_to_bucket('SW52ZXJzaW9uU29sdXRpb246MjMwNi4wU2lHM1E=', 'nzshm-static-reports-test') \ No newline at end of file From 809ac2eeee63c504dda2359a15e96da991f14082 Mon Sep 17 00:00:00 2001 From: benjamineac Date: Wed, 8 Dec 2021 11:36:11 +1300 Subject: [PATCH 3/4] os.path.join instead of string concat for local_directory --- runzi/util/aws/s3_folder_upload.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runzi/util/aws/s3_folder_upload.py b/runzi/util/aws/s3_folder_upload.py index 7cdf4c6f..4396fd95 100644 --- a/runzi/util/aws/s3_folder_upload.py +++ b/runzi/util/aws/s3_folder_upload.py @@ -29,7 +29,7 @@ def check_if_slowdown(e): def upload_to_bucket(id, bucket, root_path=S3_REPORT_BUCKET_ROOT): logger.info(f"Beginning bucket upload... to {bucket}/{root_path}/{id}") t0 = dt.datetime.utcnow() - local_directory = WORK_PATH + '/' + id + local_directory = os.path.join(WORK_PATH, id) session = boto3.session.Session() client = session.client('s3') file_list = [] From 68a1cd0e22cfc1d9dc37f13f375701aba79501d5 Mon Sep 17 00:00:00 2001 From: benjamineac Date: Wed, 8 Dec 2021 11:36:42 +1300 Subject: [PATCH 4/4] remove function call --- runzi/util/aws/s3_folder_upload.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/runzi/util/aws/s3_folder_upload.py b/runzi/util/aws/s3_folder_upload.py index 4396fd95..1434b82a 100644 --- a/runzi/util/aws/s3_folder_upload.py +++ b/runzi/util/aws/s3_folder_upload.py @@ -99,5 +99,3 @@ def mimetype(local_path): if mimetype is None: raise Exception("Failed to guess mimetype") return mimetype - -upload_to_bucket('SW52ZXJzaW9uU29sdXRpb246MjMwNi4wU2lHM1E=', 'nzshm-static-reports-test') \ No newline at end of file