Skip to content

Commit

Permalink
Merge pull request #541 from tulumvinh/3.x
Browse files Browse the repository at this point in the history
1. Added precondition for each file upload  2. Fix bug when increment ing AWS slow down counter
  • Loading branch information
tulumvinh authored Nov 28, 2016
2 parents 8c2a64d + 9ad52d1 commit 2e3736a
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ public class S3EncryptedFileSystem extends S3FileSystemBase implements IBackupFi
private RateLimiter rateLimiter; //a throttling mechanism, we can limit the amount of bytes uploaded to endpoint per second.
private AtomicInteger uploadCount = new AtomicInteger();
private IFileCryptography encryptor;
private int awsSlowDownExceptionCounter;

@Inject
public S3EncryptedFileSystem(Provider<AbstractBackupPath> pathProvider, ICompression compress, final IConfiguration config, ICredential cred
Expand Down Expand Up @@ -126,7 +125,7 @@ public long getBytesUploaded() {

@Override
public int getAWSSlowDownExceptionCounter() {
return this.awsSlowDownExceptionCounter;
return super.awsSlowDownExceptionCounter;
}


Expand Down Expand Up @@ -277,7 +276,8 @@ public void upload(AbstractBackupPath path, InputStream in) throws BackupRestore
} catch(AmazonS3Exception e) {
String amazoneErrorCode = e.getErrorCode();
if (amazoneErrorCode.equalsIgnoreCase("slowdown")) {
this.awsSlowDownExceptionCounter += 1;
super.awsSlowDownExceptionCounter += 1;
logger.warn("Received slow down from AWS when uploading file: " + path.getFileName());
}
//No need to throw exception as this is not fatal (i.e. this exception does not mean AWS will throttle or fail the upload
} catch(Exception e ) {
Expand Down
6 changes: 3 additions & 3 deletions priam/src/main/java/com/netflix/priam/aws/S3FileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ public class S3FileSystem extends S3FileSystemBase implements IBackupFileSystem,
private final IConfiguration config;
private BlockingSubmitThreadPoolExecutor executor;
private RateLimiter rateLimiter;
private int awsSlowDownExceptionCounter;

@Inject
public S3FileSystem(Provider<AbstractBackupPath> pathProvider, ICompression compress, final IConfiguration config
Expand Down Expand Up @@ -189,7 +188,8 @@ public void upload(AbstractBackupPath path, InputStream in) throws BackupRestore
} catch(AmazonS3Exception e) {
String amazoneErrorCode = e.getErrorCode();
if (amazoneErrorCode.equalsIgnoreCase("slowdown")) {
this.awsSlowDownExceptionCounter += 1;
super.awsSlowDownExceptionCounter += 1;
logger.warn("Received slow down from AWS when uploading file: " + path.getFileName());
}
//No need to throw exception as this is not fatal (i.e. this exception does not mean AWS will throttle or fail the upload
} catch (Exception e)
Expand Down Expand Up @@ -277,7 +277,7 @@ public long getBytesUploaded() {

@Override
public int getAWSSlowDownExceptionCounter() {
return this.awsSlowDownExceptionCounter;
return super.awsSlowDownExceptionCounter;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class S3FileSystemBase {
protected AmazonS3Client s3Client;
protected IMetricPublisher metricPublisher;
protected IMeasurement awsSlowDownMeasurement;
protected int awsSlowDownExceptionCounter = 0;

public S3FileSystemBase (IMetricPublisher metricPublisher) {
this.metricPublisher = metricPublisher;
Expand Down Expand Up @@ -193,5 +194,6 @@ This measurement is different than most others. Other measurements are applicab
*/
protected void reinitialize() {
bytesUploaded = new AtomicLong(0); //initi
this.awsSlowDownExceptionCounter = 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ protected List<AbstractBackupPath> upload(File parent, final BackupFileType type
try
{
logger.info(String.format("Uploading file %s within CF %s for backup", file.getCanonicalFile(), parent.getAbsolutePath()));
backupNotificationMgr.notify(bp, BackupNotificationMgr.STARTED); //pre condition

AbstractBackupPath abp = new RetryableCallable<AbstractBackupPath>(3, RetryableCallable.DEFAULT_WAIT_TIME)
{
public AbstractBackupPath retriableCall() throws Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ImplementedBy(S3BackupPath.class)
public abstract class AbstractBackupPath implements Comparable<AbstractBackupPath>
{
private static final Logger logger = LoggerFactory.getLogger(AbstractBackupPath.class);
private static final String FMT = "yyyyMMddHHmm";
private static final DateTimeFormatter DATE_FORMAT = DateTimeFormat.forPattern(FMT);
public static final char PATH_SEP = File.separatorChar;
Expand Down Expand Up @@ -65,7 +68,7 @@ public static enum BackupFileType
protected final IConfiguration config;
protected File backupFile;
protected Date uploadedTs;
protected int awsSlowDownExceptionCounter;
protected int awsSlowDownExceptionCounter = 0;

public AbstractBackupPath(IConfiguration config, InstanceIdentity factory)
{
Expand Down Expand Up @@ -339,7 +342,7 @@ public int getAWSSlowDownExceptionCounter() {
return this.awsSlowDownExceptionCounter;
}

public int setAWSSlowDownExceptionCounter(int val) {
return this.awsSlowDownExceptionCounter = val;
public void setAWSSlowDownExceptionCounter(int val) {
this.awsSlowDownExceptionCounter = val;
}
}
15 changes: 12 additions & 3 deletions priam/src/main/java/com/netflix/priam/backup/MetaData.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -75,9 +76,7 @@ public AbstractBackupPath set(List<AbstractBackupPath> bps, String snapshotName)
{
IOUtils.closeQuietly(fr);
}
AbstractBackupPath backupfile = pathFactory.get();
backupfile.parseLocal(metafile, BackupFileType.META);
backupfile.time = backupfile.parseDate(snapshotName);
AbstractBackupPath backupfile = decorateMetaJson(metafile, snapshotName);
try
{
upload(backupfile);
Expand All @@ -95,6 +94,16 @@ public AbstractBackupPath set(List<AbstractBackupPath> bps, String snapshotName)
return backupfile;
}

/*
From the meta.json to be created, populate its meta data for the backup file.
*/
public AbstractBackupPath decorateMetaJson(File metafile, String snapshotName) throws ParseException {
AbstractBackupPath backupfile = pathFactory.get();
backupfile.parseLocal(metafile, BackupFileType.META);
backupfile.time = backupfile.parseDate(snapshotName);
return backupfile;
}

/*
* A list of data files within a meta backup file. The meta backup file can be
* daily snapshot (meta.json) or incrementals (meta_keyspace_cf_date.json)
Expand Down
11 changes: 9 additions & 2 deletions priam/src/main/java/com/netflix/priam/backup/SnapshotBackup.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,18 @@ public void execute() throws Exception
}

}

//pre condition notifiy of meta.json upload
File tmpMetaFile = metaData.createTmpMetaFile(); //Note: no need to remove this temp as it is done within createTmpMetaFile()
AbstractBackupPath metaJsonAbp = metaData.decorateMetaJson(tmpMetaFile, snapshotName);
metaJsonAbp.setCompressedFileSize(0);
backupNotificationMgr.notify(metaJsonAbp, BackupNotificationMgr.STARTED);

// Upload meta file
AbstractBackupPath metaJson = metaData.set(bps, snapshotName);
logger.info("Snapshot upload complete for " + snapshotName);
Calendar completed = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
this.postProcesing(snapshotName, startTime, completed.getTime(), metaJson);
postProcesing(snapshotName, startTime, completed.getTime(), metaJson);

if(snapshotRemotePaths.size() > 0)
{
Expand All @@ -226,7 +233,7 @@ public void execute() throws Exception
}
}
}

/*
* Performs any post processing (e.g. log success of backup).
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,13 @@ public IncrementalConsumer(AbstractBackupPath bp, IBackupFileSystem fs
public void run() {

logger.info("Consumer - about to upload file: " + this.bp.getFileName());

try {
this.backupNotificationMgr.notify(bp, BackupNotificationMgr.STARTED);
} catch (JSONException e) {
logger.error(String.format("JSon exception during precondition notifcation file upload. Local file %s. Ignoring to continue with rest of backup. Msg: %s"
, this.bp.getFileName(), e.getLocalizedMessage()));
}

try {

new RetryableCallable<Void>()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package com.netflix.priam.merics;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Created by vinhn on 11/12/16.
*/
public class AWSSlowDownExceptionMeasurement implements IMeasurement<Object> {
private static final Logger logger = LoggerFactory.getLogger(AWSSlowDownExceptionMeasurement.class);
private int awsSlowDownExceptionCounter = 0;

@Override
Expand All @@ -13,7 +17,7 @@ public MMEASUREMENT_TYPE getType() {

@Override
public void incrementFailureCnt(int i) {
this.awsSlowDownExceptionCounter += 1;
this.awsSlowDownExceptionCounter += i;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/
public class BackupNotificationMgr {

public static final String SUCCESS_VAL = "success", FAILED_VAL = "failed";
public static final String SUCCESS_VAL = "success", FAILED_VAL = "failed", STARTED = "started";

private final IConfiguration config;
private INotificationService notificationService;
Expand Down

0 comments on commit 2e3736a

Please sign in to comment.