From 9ad52d100541d38ba3ccb8bc42e546c64b2d08d7 Mon Sep 17 00:00:00 2001 From: vinhn Date: Sun, 27 Nov 2016 21:23:03 -0800 Subject: [PATCH] 1. Added precondition for each file upload 2. Fix bug when incrementing AWS slow down counter --- .../netflix/priam/aws/S3EncryptedFileSystem.java | 6 +++--- .../java/com/netflix/priam/aws/S3FileSystem.java | 6 +++--- .../com/netflix/priam/aws/S3FileSystemBase.java | 2 ++ .../com/netflix/priam/backup/AbstractBackup.java | 2 ++ .../netflix/priam/backup/AbstractBackupPath.java | 9 ++++++--- .../java/com/netflix/priam/backup/MetaData.java | 15 ++++++++++++--- .../com/netflix/priam/backup/SnapshotBackup.java | 11 +++++++++-- .../backup/parallel/IncrementalConsumer.java | 8 +++++++- .../merics/AWSSlowDownExceptionMeasurement.java | 6 +++++- .../priam/notification/BackupNotificationMgr.java | 2 +- 10 files changed, 50 insertions(+), 17 deletions(-) diff --git a/priam/src/main/java/com/netflix/priam/aws/S3EncryptedFileSystem.java b/priam/src/main/java/com/netflix/priam/aws/S3EncryptedFileSystem.java index 51e42c41e..890d5f5e8 100755 --- a/priam/src/main/java/com/netflix/priam/aws/S3EncryptedFileSystem.java +++ b/priam/src/main/java/com/netflix/priam/aws/S3EncryptedFileSystem.java @@ -65,7 +65,6 @@ public class S3EncryptedFileSystem extends S3FileSystemBase implements IBackupFi private RateLimiter rateLimiter; //a throttling mechanism, we can limit the amount of bytes uploaded to endpoint per second. private AtomicInteger uploadCount = new AtomicInteger(); private IFileCryptography encryptor; - private int awsSlowDownExceptionCounter; @Inject public S3EncryptedFileSystem(Provider pathProvider, ICompression compress, final IConfiguration config, ICredential cred @@ -126,7 +125,7 @@ public long getBytesUploaded() { @Override public int getAWSSlowDownExceptionCounter() { - return this.awsSlowDownExceptionCounter; + return super.awsSlowDownExceptionCounter; } @@ -277,7 +276,8 @@ public void upload(AbstractBackupPath path, InputStream in) throws BackupRestore } catch(AmazonS3Exception e) { String amazoneErrorCode = e.getErrorCode(); if (amazoneErrorCode.equalsIgnoreCase("slowdown")) { - this.awsSlowDownExceptionCounter += 1; + super.awsSlowDownExceptionCounter += 1; + logger.warn("Received slow down from AWS when uploading file: " + path.getFileName()); } //No need to throw exception as this is not fatal (i.e. this exception does not mean AWS will throttle or fail the upload } catch(Exception e ) { diff --git a/priam/src/main/java/com/netflix/priam/aws/S3FileSystem.java b/priam/src/main/java/com/netflix/priam/aws/S3FileSystem.java index 6e7261e91..2d3f5f449 100644 --- a/priam/src/main/java/com/netflix/priam/aws/S3FileSystem.java +++ b/priam/src/main/java/com/netflix/priam/aws/S3FileSystem.java @@ -80,7 +80,6 @@ public class S3FileSystem extends S3FileSystemBase implements IBackupFileSystem, private final IConfiguration config; private BlockingSubmitThreadPoolExecutor executor; private RateLimiter rateLimiter; - private int awsSlowDownExceptionCounter; @Inject public S3FileSystem(Provider pathProvider, ICompression compress, final IConfiguration config @@ -189,7 +188,8 @@ public void upload(AbstractBackupPath path, InputStream in) throws BackupRestore } catch(AmazonS3Exception e) { String amazoneErrorCode = e.getErrorCode(); if (amazoneErrorCode.equalsIgnoreCase("slowdown")) { - this.awsSlowDownExceptionCounter += 1; + super.awsSlowDownExceptionCounter += 1; + logger.warn("Received slow down from AWS when uploading file: " + path.getFileName()); } //No need to throw exception as this is not fatal (i.e. this exception does not mean AWS will throttle or fail the upload } catch (Exception e) @@ -277,7 +277,7 @@ public long getBytesUploaded() { @Override public int getAWSSlowDownExceptionCounter() { - return this.awsSlowDownExceptionCounter; + return super.awsSlowDownExceptionCounter; } @Override diff --git a/priam/src/main/java/com/netflix/priam/aws/S3FileSystemBase.java b/priam/src/main/java/com/netflix/priam/aws/S3FileSystemBase.java index f3e0735ba..b83be9829 100755 --- a/priam/src/main/java/com/netflix/priam/aws/S3FileSystemBase.java +++ b/priam/src/main/java/com/netflix/priam/aws/S3FileSystemBase.java @@ -34,6 +34,7 @@ public class S3FileSystemBase { protected AmazonS3Client s3Client; protected IMetricPublisher metricPublisher; protected IMeasurement awsSlowDownMeasurement; + protected int awsSlowDownExceptionCounter = 0; public S3FileSystemBase (IMetricPublisher metricPublisher) { this.metricPublisher = metricPublisher; @@ -193,5 +194,6 @@ This measurement is different than most others. Other measurements are applicab */ protected void reinitialize() { bytesUploaded = new AtomicLong(0); //initi + this.awsSlowDownExceptionCounter = 0; } } \ No newline at end of file diff --git a/priam/src/main/java/com/netflix/priam/backup/AbstractBackup.java b/priam/src/main/java/com/netflix/priam/backup/AbstractBackup.java index ea28b0757..30cd36028 100644 --- a/priam/src/main/java/com/netflix/priam/backup/AbstractBackup.java +++ b/priam/src/main/java/com/netflix/priam/backup/AbstractBackup.java @@ -102,6 +102,8 @@ protected List upload(File parent, final BackupFileType type try { logger.info(String.format("Uploading file %s within CF %s for backup", file.getCanonicalFile(), parent.getAbsolutePath())); + backupNotificationMgr.notify(bp, BackupNotificationMgr.STARTED); //pre condition + AbstractBackupPath abp = new RetryableCallable(3, RetryableCallable.DEFAULT_WAIT_TIME) { public AbstractBackupPath retriableCall() throws Exception diff --git a/priam/src/main/java/com/netflix/priam/backup/AbstractBackupPath.java b/priam/src/main/java/com/netflix/priam/backup/AbstractBackupPath.java index 48f3ccbee..b0e45287a 100644 --- a/priam/src/main/java/com/netflix/priam/backup/AbstractBackupPath.java +++ b/priam/src/main/java/com/netflix/priam/backup/AbstractBackupPath.java @@ -34,10 +34,13 @@ import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @ImplementedBy(S3BackupPath.class) public abstract class AbstractBackupPath implements Comparable { + private static final Logger logger = LoggerFactory.getLogger(AbstractBackupPath.class); private static final String FMT = "yyyyMMddHHmm"; private static final DateTimeFormatter DATE_FORMAT = DateTimeFormat.forPattern(FMT); public static final char PATH_SEP = File.separatorChar; @@ -65,7 +68,7 @@ public static enum BackupFileType protected final IConfiguration config; protected File backupFile; protected Date uploadedTs; - protected int awsSlowDownExceptionCounter; + protected int awsSlowDownExceptionCounter = 0; public AbstractBackupPath(IConfiguration config, InstanceIdentity factory) { @@ -339,7 +342,7 @@ public int getAWSSlowDownExceptionCounter() { return this.awsSlowDownExceptionCounter; } - public int setAWSSlowDownExceptionCounter(int val) { - return this.awsSlowDownExceptionCounter = val; + public void setAWSSlowDownExceptionCounter(int val) { + this.awsSlowDownExceptionCounter = val; } } diff --git a/priam/src/main/java/com/netflix/priam/backup/MetaData.java b/priam/src/main/java/com/netflix/priam/backup/MetaData.java index 584d45202..9a5cdf575 100644 --- a/priam/src/main/java/com/netflix/priam/backup/MetaData.java +++ b/priam/src/main/java/com/netflix/priam/backup/MetaData.java @@ -20,6 +20,7 @@ import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; +import java.text.ParseException; import java.util.ArrayList; import java.util.List; @@ -75,9 +76,7 @@ public AbstractBackupPath set(List bps, String snapshotName) { IOUtils.closeQuietly(fr); } - AbstractBackupPath backupfile = pathFactory.get(); - backupfile.parseLocal(metafile, BackupFileType.META); - backupfile.time = backupfile.parseDate(snapshotName); + AbstractBackupPath backupfile = decorateMetaJson(metafile, snapshotName); try { upload(backupfile); @@ -95,6 +94,16 @@ public AbstractBackupPath set(List bps, String snapshotName) return backupfile; } + /* + From the meta.json to be created, populate its meta data for the backup file. + */ + public AbstractBackupPath decorateMetaJson(File metafile, String snapshotName) throws ParseException { + AbstractBackupPath backupfile = pathFactory.get(); + backupfile.parseLocal(metafile, BackupFileType.META); + backupfile.time = backupfile.parseDate(snapshotName); + return backupfile; + } + /* * A list of data files within a meta backup file. The meta backup file can be * daily snapshot (meta.json) or incrementals (meta_keyspace_cf_date.json) diff --git a/priam/src/main/java/com/netflix/priam/backup/SnapshotBackup.java b/priam/src/main/java/com/netflix/priam/backup/SnapshotBackup.java index 2766fa48e..6af4f9627 100644 --- a/priam/src/main/java/com/netflix/priam/backup/SnapshotBackup.java +++ b/priam/src/main/java/com/netflix/priam/backup/SnapshotBackup.java @@ -202,11 +202,18 @@ public void execute() throws Exception } } + + //pre condition notifiy of meta.json upload + File tmpMetaFile = metaData.createTmpMetaFile(); //Note: no need to remove this temp as it is done within createTmpMetaFile() + AbstractBackupPath metaJsonAbp = metaData.decorateMetaJson(tmpMetaFile, snapshotName); + metaJsonAbp.setCompressedFileSize(0); + backupNotificationMgr.notify(metaJsonAbp, BackupNotificationMgr.STARTED); + // Upload meta file AbstractBackupPath metaJson = metaData.set(bps, snapshotName); logger.info("Snapshot upload complete for " + snapshotName); Calendar completed = Calendar.getInstance(TimeZone.getTimeZone("GMT")); - this.postProcesing(snapshotName, startTime, completed.getTime(), metaJson); + postProcesing(snapshotName, startTime, completed.getTime(), metaJson); if(snapshotRemotePaths.size() > 0) { @@ -226,7 +233,7 @@ public void execute() throws Exception } } } - + /* * Performs any post processing (e.g. log success of backup). * diff --git a/priam/src/main/java/com/netflix/priam/backup/parallel/IncrementalConsumer.java b/priam/src/main/java/com/netflix/priam/backup/parallel/IncrementalConsumer.java index 4d53e704d..71d02be44 100644 --- a/priam/src/main/java/com/netflix/priam/backup/parallel/IncrementalConsumer.java +++ b/priam/src/main/java/com/netflix/priam/backup/parallel/IncrementalConsumer.java @@ -46,7 +46,13 @@ public IncrementalConsumer(AbstractBackupPath bp, IBackupFileSystem fs public void run() { logger.info("Consumer - about to upload file: " + this.bp.getFileName()); - + try { + this.backupNotificationMgr.notify(bp, BackupNotificationMgr.STARTED); + } catch (JSONException e) { + logger.error(String.format("JSon exception during precondition notifcation file upload. Local file %s. Ignoring to continue with rest of backup. Msg: %s" + , this.bp.getFileName(), e.getLocalizedMessage())); + } + try { new RetryableCallable() diff --git a/priam/src/main/java/com/netflix/priam/merics/AWSSlowDownExceptionMeasurement.java b/priam/src/main/java/com/netflix/priam/merics/AWSSlowDownExceptionMeasurement.java index 99584ab3a..2701d4cb7 100644 --- a/priam/src/main/java/com/netflix/priam/merics/AWSSlowDownExceptionMeasurement.java +++ b/priam/src/main/java/com/netflix/priam/merics/AWSSlowDownExceptionMeasurement.java @@ -1,9 +1,13 @@ package com.netflix.priam.merics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Created by vinhn on 11/12/16. */ public class AWSSlowDownExceptionMeasurement implements IMeasurement { + private static final Logger logger = LoggerFactory.getLogger(AWSSlowDownExceptionMeasurement.class); private int awsSlowDownExceptionCounter = 0; @Override @@ -13,7 +17,7 @@ public MMEASUREMENT_TYPE getType() { @Override public void incrementFailureCnt(int i) { - this.awsSlowDownExceptionCounter += 1; + this.awsSlowDownExceptionCounter += i; } @Override diff --git a/priam/src/main/java/com/netflix/priam/notification/BackupNotificationMgr.java b/priam/src/main/java/com/netflix/priam/notification/BackupNotificationMgr.java index 5c985a0ee..2802841cc 100644 --- a/priam/src/main/java/com/netflix/priam/notification/BackupNotificationMgr.java +++ b/priam/src/main/java/com/netflix/priam/notification/BackupNotificationMgr.java @@ -14,7 +14,7 @@ */ public class BackupNotificationMgr { - public static final String SUCCESS_VAL = "success", FAILED_VAL = "failed"; + public static final String SUCCESS_VAL = "success", FAILED_VAL = "failed", STARTED = "started"; private final IConfiguration config; private INotificationService notificationService;