Skip to content

Commit

Permalink
Merge pull request #481 from tulumvinh/3.x
Browse files Browse the repository at this point in the history
1. Optional incremental backup parallization 2. Updated Bintray API key from master
  • Loading branch information
tulumvinh committed Jun 2, 2016
2 parents ef3f117 + 6964369 commit 80a0674
Show file tree
Hide file tree
Showing 19 changed files with 900 additions and 207 deletions.
8 changes: 4 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ cache:
- $HOME/.gradle
env:
global:
- secure: m++2Xo5uSazLjulL2PYCU6HWscvylUIvHuVksGzn5aaOBMXjxf9VSn7D+HMW4T9zieUGbAz+UXQGgeGua7HSH+DolEsoPizNjyIO0XldESR4GlxX3pI+o0qROm+Ccib2Q53pyAdKnmbDOWcswQmqK4gOF8XQ/ukxO40vjI7CReI=
- secure: SCExcCZZBb3LxM4gNFWFsK8eYKLfPGRx5KxwfNSR4n7sB91nFfa+JdCmwONvmTJBXILLA52h0AtLVvkPK76ApY9IECKmFzvqUxtPIC8L2bl1Ef8PJbKvxeDrvA/oD3KKHtIcrsxBqtN5RDLdOO+VmZq2aq5TppiRywx3Y0qzr7E=
- secure: CP/PRaI26DVqeL8t2QqCgLdWGEKDShwm4kzWcc7SHjxYLxML55TqS5nm3MeQQHB0B8/dIulJbV/zpRGqiLOguD2qSTF9G8H/rw4zWyGTiAJ5r8TJKEsZTaXBeTyjQg1BK/DKbMTys8PM8ZUrwxqi3NakgpfPW5lMDpV+ZlLdRzo=
- secure: wRkZ5zlzwQf6GbYCNFAOBPHqvFzJyLyVffWJf0Hgj/u77lLkWKpLBSxCoDCXEwlXl9IJGtrVv8jHsTulX4uWlfGnliR75wcEqNLqkJn/aC0/0pRMpNWl4aBBipE4CuI8GwCeV/QdWR9+2TTTriV063t6re59J5+Zhp0KneoRwKI=
- secure: p81PYYYYhjhp1uJHOoT9EfUia8qPMphdykCxQannWMVvIFKXF5ibi/Qd52OUIYuJT9Q5MN9GhlwCtolxmCZu44YMLi99LsoITAv6n08LAFEux1oAOXmGt3ZBllvBFtlW+jlQ5lKmbuuPue3RVbTaWzL3KCCaYQ4uKW/sP31bXic=
- secure: AYWrIC/1JqAVyLhzlDhL163LypPpq7AliDltfey9emxzS73vB5VrGTHG45Em7xdkHN0kc1vED+UFsYks7ym1olXZPsbW29R0Gfd+p1VxCE3Cbhj7x690xNC3wIlSgWGpbgiPs7vhVN80OuNgnpz9/+WRFpKAVrYrcisbvugsDqk=
- secure: Ai75crFDh3imkmxzNQDXIwK49appZuZTrsmdZzgH1Zl/nD33RoZPWUWeeHP0yJoicNqZlU6aEXJnk8JGDWtPIT3VNO7Rsey+yrw041rHNV6bFGpwGdQgKv5CUhRHK+gShgFgnME5THCPKvK51+li1feIoByoYVmEn0gmvcxL0yM=
- secure: ZqBsyaFMBz3cqM7UHq0IRotTPaoBNUHrC7buiAqqhZ5JNM48a2m7ORuu/SENPFfr6xOT5KylCrvT6eD1bkQi5S2jT9Qyoju/9H/hXaAUG89dhwKOxqEE11ZGKF8DlNO/jcmchVnIkgWfEjVKc7vCAww1Y8lgxhyj4gRtnln+F+o=
10 changes: 10 additions & 0 deletions priam/src/main/java/com/netflix/priam/IConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -520,4 +520,14 @@ public interface IConfiguration
* @return
*/
Map<String, String> getExtraEnvParams();

public Boolean isIncrBackupParallelEnabled();
/*
* The number of workers for parallel uploads.
*/
public int getIncrementalBkupMaxConsumers();
/*
* The max number of files queued to be uploaded.
*/
public int getUncrementalBkupQueueSize();
}
13 changes: 11 additions & 2 deletions priam/src/main/java/com/netflix/priam/PriamServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.netflix.priam.backup.IncrementalBackup;
import com.netflix.priam.backup.Restore;
import com.netflix.priam.backup.SnapshotBackup;
import com.netflix.priam.backup.parallel.IncrementalBackupProducer;
import com.netflix.priam.identity.InstanceIdentity;
import com.netflix.priam.restore.AwsCrossAccountCryptographyRestoreStrategy;
import com.netflix.priam.restore.EncryptedRestoreStrategy;
Expand Down Expand Up @@ -155,8 +156,16 @@ else if (UpdateSecuritySettings.firstTimeUpdated)
scheduler.addTask(SnapshotBackup.JOBNAME, SnapshotBackup.class, SnapshotBackup.getTimer(config));

// Start the Incremental backup schedule if enabled
if (config.isIncrBackup())
if (config.isIncrBackup()) {
if ( !config.isIncrBackupParallelEnabled() ) {
scheduler.addTask(IncrementalBackup.JOBNAME, IncrementalBackup.class, IncrementalBackup.getTimer());
logger.info("Added incremental synchronous bkup");
} else {
scheduler.addTask(IncrementalBackupProducer.JOBNAME, IncrementalBackupProducer.class, IncrementalBackupProducer.getTimer());
logger.info("Added incremental async-synchronous bkup, next fired time: " + IncrementalBackupProducer.getTimer().getTrigger().getNextFireTime());
}
}

}

if (config.isBackingUpCommitLogs())
Expand All @@ -183,4 +192,4 @@ public IConfiguration getConfiguration()
return config;
}

}
}
13 changes: 8 additions & 5 deletions priam/src/main/java/com/netflix/priam/backup/AbstractBackup.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public abstract class AbstractBackup extends Task
protected final Map<String, List<String>> FILTER_COLUMN_FAMILY = ImmutableMap.of("system", Arrays.asList("local", "peers", "LocationInfo"));
protected final Provider<AbstractBackupPath> pathFactory;
protected IBackupFileSystem fs;



@Inject
public AbstractBackup(IConfiguration config, @Named("backup") IFileSystemContext backupFileSystemCtx,Provider<AbstractBackupPath> pathFactory)
{
Expand All @@ -55,12 +56,13 @@ public AbstractBackup(IConfiguration config, @Named("backup") IFileSystemContext
this.fs = backupFileSystemCtx.getFileStrategy(config);
}


/*
* A means to override the type of backup strategy chosen via BackupFileSystemContext
*/
protected void setFileSystem(IBackupFileSystem fs) {
this.fs = fs;
}
}

/*
* search for "1:* alphanumeric chars including special chars""literal period"" 1:* alphanumeric chars including special chars"
Expand All @@ -75,7 +77,7 @@ protected boolean isValidCFFilterFormat(String cfFilter) {

/**
* Upload files in the specified dir. Does not delete the file in case of
* error
* error. The files are uploaded serially.
*
* @param parent
* Parent dir
Expand Down Expand Up @@ -118,7 +120,7 @@ public AbstractBackupPath retriableCall() throws Exception
}
return bps;
}

/**
* Upload specified file (RandomAccessFile) with retries
*/
Expand Down Expand Up @@ -181,4 +183,5 @@ public boolean isValidBackupDir(File keyspaceDir, File columnFamilyDir, File bac
public enum DIRECTORYTYPE {
KEYSPACE, CF
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.netflix.priam.backup;

import com.netflix.priam.scheduler.TaskTimer;

public interface IIncrementalBackup {

public static long INCREMENTAL_INTERVAL_IN_MILLISECS = 10L * 1000;

/*
* @return the number of files pending to be uploaded. The semantic depends on whether the implementation
* is synchronous or asynchronous.
*/
public long getNumPendingFiles();

public String getJobName();

}
205 changes: 89 additions & 116 deletions priam/src/main/java/com/netflix/priam/backup/IncrementalBackup.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
Expand All @@ -44,16 +43,16 @@
* Incremental/SSTable backup
*/
@Singleton
public class IncrementalBackup extends AbstractBackup
public class IncrementalBackup extends AbstractBackup implements IIncrementalBackup
{
public static final String JOBNAME = "INCR_BACKUP_THREAD";
private static final Logger logger = LoggerFactory.getLogger(IncrementalBackup.class);

private final List<String> incrementalRemotePaths = new ArrayList<String>();
private IncrementalMetaData metaData;

private final Map<String, List<String>> incrementalCFFilter = new HashMap<String, List<String>>(); //key: keyspace, value: a list of CFs within the keyspace
private final Map<String, Object> incrementalKeyspaceFilter = new HashMap<String, Object>(); //key: keyspace, value: null
private final Map<String, Object> incrementalCFFilter = new HashMap<String, Object>();
private final Map<String, Object> incrementalKeyspaceFilter = new HashMap<String, Object>();

static List<IMessageObserver> observers = new ArrayList<IMessageObserver>();

Expand Down Expand Up @@ -90,138 +89,103 @@ private void populateIncrementalFilters() {
String cfFilters = this.config.getIncrementalCFFilter();
if (cfFilters == null || cfFilters.isEmpty()) {

logger.info("No column family filter set for incremental.");
logger.info("No column family filter set for snapshot.");

} else {

String[] filters = cfFilters.split(",");
for (int i=0; i < filters.length; i++) { //process each filter
if (isValidCFFilterFormat(filters[i])) {

String[] filter = filters[i].split("\\.");
String ksName = filter[0];
String cfName = filter[1];
logger.info("Adding incremental CF filter, keyspaceName: " + ksName + ", cf: " + cfName);

if (this.incrementalCFFilter.containsKey(ksName)) {
//add cf to existing filter
List<String> cfs = this.incrementalCFFilter.get(ksName);
cfs.add(cfName);
this.incrementalCFFilter.put(ksName, cfs);

} else {

List<String> cfs = new ArrayList<String>();
cfs.add(cfName);
this.incrementalCFFilter.put(ksName, cfs);

}

} else {
throw new IllegalArgumentException("Column family filter format is not valid. Format needs to be \"keyspace.columnfamily\". Invalid input: " + filters[i]);
}
} //end processing each filter
String[] cf = cfFilters.split(",");
for (int i=0; i < cf.length; i++) {
if (isValidCFFilterFormat(cf[i])) {
logger.info("Adding incremental CF filter: " + cf[i]);
this.incrementalCFFilter.put(cf[i], null);
} else {
throw new IllegalArgumentException("Column family filter format is not valid. Format needs to be \"keyspace.columnfamily\". Invalid input: " + cf[i]);
}
}

}
}

@Override
public void execute() throws Exception
{
//Clearing remotePath List
incrementalRemotePaths.clear();
File dataDir = new File(config.getDataFileLocation());
if (!dataDir.exists())
{
throw new IllegalArgumentException("The configured 'data file location' does not exist: "
+ config.getDataFileLocation());
}
logger.debug("Scanning for backup in: {}", dataDir.getAbsolutePath());
for (File keyspaceDir : dataDir.listFiles())
{
if (keyspaceDir.isFile())
continue;

if ( isFiltered(DIRECTORYTYPE.KEYSPACE, keyspaceDir.getName()) ) { //keyspace filtered?
logger.info(keyspaceDir.getName() + " is part of keyspace filter, incremental not done.");
continue;
}

for (File columnFamilyDir : keyspaceDir.listFiles())
{

if ( isFiltered(DIRECTORYTYPE.CF, keyspaceDir.getName(), columnFamilyDir.getName()) ) { //CF filtered?
logger.info("keyspace: " + keyspaceDir.getName()
+ ", CF: " + columnFamilyDir.getName() + " is part of CF filter list, incrmental not done.");
continue;
}

File backupDir = new File(columnFamilyDir, "backups");
if (!isValidBackupDir(keyspaceDir, columnFamilyDir, backupDir)) {
continue;
}
//Clearing remotePath List
incrementalRemotePaths.clear();
File dataDir = new File(config.getDataFileLocation());
if (!dataDir.exists())
{
throw new IllegalArgumentException("The configured 'data file location' does not exist: "
+ config.getDataFileLocation());
}
logger.debug("Scanning for backup in: {}", dataDir.getAbsolutePath());
for (File keyspaceDir : dataDir.listFiles())
{
if (keyspaceDir.isFile())
continue;

List<AbstractBackupPath> uploadedFiles = upload(backupDir, BackupFileType.SST);
if ( isFiltered(DIRECTORYTYPE.KEYSPACE, keyspaceDir.getName()) ) { //keyspace filtered?
logger.info(keyspaceDir.getName() + " is part of keyspace filter, incremental not done.");
continue;
}

if ( ! uploadedFiles.isEmpty() ) {
String incrementalUploadTime = AbstractBackupPath.formatDate(uploadedFiles.get(0).getTime()); //format of yyyymmddhhmm (e.g. 201505060901)
String metaFileName = "meta_" + columnFamilyDir.getName() + "_" + incrementalUploadTime;
logger.info("Uploading meta file for incremental backup: " + metaFileName);
this.metaData.setMetaFileName(metaFileName);
this.metaData.set(uploadedFiles, incrementalUploadTime);
logger.info("Uploaded meta file for incremental backup: " + metaFileName);
}
for (File columnFamilyDir : keyspaceDir.listFiles())
{

if ( isFiltered(DIRECTORYTYPE.CF, keyspaceDir.getName(), columnFamilyDir.getName()) ) { //CF filtered?
logger.info("keyspace: " + keyspaceDir.getName()
+ ", CF: " + columnFamilyDir.getName() + " is part of CF filter list, incrmental not done.");
continue;
}

File backupDir = new File(columnFamilyDir, "backups");
if (!isValidBackupDir(keyspaceDir, columnFamilyDir, backupDir)) {
continue;
}

List<AbstractBackupPath> uploadedFiles = upload(backupDir, BackupFileType.SST);

if ( ! uploadedFiles.isEmpty() ) {
String incrementalUploadTime = AbstractBackupPath.formatDate(uploadedFiles.get(0).getTime()); //format of yyyymmddhhmm (e.g. 201505060901)
String metaFileName = "meta_" + columnFamilyDir.getName() + "_" + incrementalUploadTime;
logger.info("Uploading meta file for incremental backup: " + metaFileName);
this.metaData.setMetaFileName(metaFileName);
this.metaData.set(uploadedFiles, incrementalUploadTime);
logger.info("Uploaded meta file for incremental backup: " + metaFileName);
}

}
}

if(incrementalRemotePaths.size() > 0)
{
notifyObservers();
}
}
}
if(incrementalRemotePaths.size() > 0)
{
notifyObservers();
}

}

/*
* @return true if directory should be filter from processing; otherwise, false.
*/
private boolean isFiltered(DIRECTORYTYPE directoryType, String...args) {
if (directoryType.equals(DIRECTORYTYPE.KEYSPACE)) { //start with filtering the parent (keyspace)
if (directoryType.equals(DIRECTORYTYPE.CF)) {
String keyspaceName = args[0];
//Apply each keyspace filter to input string
java.util.Set<String> ksFilters = this.incrementalKeyspaceFilter.keySet();
Iterator<String> it = ksFilters.iterator();
while (it.hasNext()) {
String ksFilter = it.next();
Pattern p = Pattern.compile(ksFilter);
Matcher m = p.matcher(keyspaceName);
if (m.find()) {
logger.info("Keyspace: " + keyspaceName + " matched filter: " + ksFilter);
return true;
}
}

}

if (directoryType.equals(DIRECTORYTYPE.CF)) { //parent (keyspace) is not filtered, now see if the child (CF) is filtered
String keyspaceName = args[0];
if ( !this.incrementalCFFilter.containsKey(keyspaceName) ) {
return false;
String cfName = args[1];
if (this.incrementalKeyspaceFilter.containsKey(keyspaceName)) { //account for keyspace which we want to filter
return true;
} else {
StringBuffer strBuf = new StringBuffer();
strBuf.append(keyspaceName);
strBuf.append('.');
strBuf.append(cfName);
return this.incrementalCFFilter.containsKey(strBuf.toString());
}

} else if (directoryType.equals(DIRECTORYTYPE.KEYSPACE)) {
return this.incrementalKeyspaceFilter.containsKey(args[0]);

String cfName = args[1];
List<String> cfsFilter = this.incrementalCFFilter.get(keyspaceName);
for (int i=0; i < cfsFilter.size(); i++) {
Pattern p = Pattern.compile(cfsFilter.get(i));
Matcher m = p.matcher(cfName);
if (m.find()) {
logger.info(keyspaceName + "." + cfName + " matched filter");
return true;
}
}
}

return false; //if here, current input are not part of keyspae and cf filters
} else {
throw new UnsupportedOperationException("Directory type not supported. Invalid input: " + directoryType.name());
}

}

Expand All @@ -230,7 +194,7 @@ private boolean isFiltered(DIRECTORYTYPE directoryType, String...args) {
*/
public static TaskTimer getTimer()
{
return new SimpleTimer(JOBNAME, 10L * 1000);
return new SimpleTimer(JOBNAME, INCREMENTAL_INTERVAL_IN_MILLISECS);
}

@Override
Expand Down Expand Up @@ -267,5 +231,14 @@ public void notifyObservers()
protected void addToRemotePath(String remotePath) {
incrementalRemotePaths.add(remotePath);
}

@Override
public long getNumPendingFiles() {
throw new UnsupportedOperationException();
}

}
@Override
public String getJobName() {
return JOBNAME;
}
}
Loading

0 comments on commit 80a0674

Please sign in to comment.