Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1. Optional incremental backup parallization 2. Updated Bintray API key from master #481

Merged
merged 1 commit into from
Jun 2, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ cache:
- $HOME/.gradle
env:
global:
- secure: m++2Xo5uSazLjulL2PYCU6HWscvylUIvHuVksGzn5aaOBMXjxf9VSn7D+HMW4T9zieUGbAz+UXQGgeGua7HSH+DolEsoPizNjyIO0XldESR4GlxX3pI+o0qROm+Ccib2Q53pyAdKnmbDOWcswQmqK4gOF8XQ/ukxO40vjI7CReI=
- secure: SCExcCZZBb3LxM4gNFWFsK8eYKLfPGRx5KxwfNSR4n7sB91nFfa+JdCmwONvmTJBXILLA52h0AtLVvkPK76ApY9IECKmFzvqUxtPIC8L2bl1Ef8PJbKvxeDrvA/oD3KKHtIcrsxBqtN5RDLdOO+VmZq2aq5TppiRywx3Y0qzr7E=
- secure: CP/PRaI26DVqeL8t2QqCgLdWGEKDShwm4kzWcc7SHjxYLxML55TqS5nm3MeQQHB0B8/dIulJbV/zpRGqiLOguD2qSTF9G8H/rw4zWyGTiAJ5r8TJKEsZTaXBeTyjQg1BK/DKbMTys8PM8ZUrwxqi3NakgpfPW5lMDpV+ZlLdRzo=
- secure: wRkZ5zlzwQf6GbYCNFAOBPHqvFzJyLyVffWJf0Hgj/u77lLkWKpLBSxCoDCXEwlXl9IJGtrVv8jHsTulX4uWlfGnliR75wcEqNLqkJn/aC0/0pRMpNWl4aBBipE4CuI8GwCeV/QdWR9+2TTTriV063t6re59J5+Zhp0KneoRwKI=
- secure: p81PYYYYhjhp1uJHOoT9EfUia8qPMphdykCxQannWMVvIFKXF5ibi/Qd52OUIYuJT9Q5MN9GhlwCtolxmCZu44YMLi99LsoITAv6n08LAFEux1oAOXmGt3ZBllvBFtlW+jlQ5lKmbuuPue3RVbTaWzL3KCCaYQ4uKW/sP31bXic=
- secure: AYWrIC/1JqAVyLhzlDhL163LypPpq7AliDltfey9emxzS73vB5VrGTHG45Em7xdkHN0kc1vED+UFsYks7ym1olXZPsbW29R0Gfd+p1VxCE3Cbhj7x690xNC3wIlSgWGpbgiPs7vhVN80OuNgnpz9/+WRFpKAVrYrcisbvugsDqk=
- secure: Ai75crFDh3imkmxzNQDXIwK49appZuZTrsmdZzgH1Zl/nD33RoZPWUWeeHP0yJoicNqZlU6aEXJnk8JGDWtPIT3VNO7Rsey+yrw041rHNV6bFGpwGdQgKv5CUhRHK+gShgFgnME5THCPKvK51+li1feIoByoYVmEn0gmvcxL0yM=
- secure: ZqBsyaFMBz3cqM7UHq0IRotTPaoBNUHrC7buiAqqhZ5JNM48a2m7ORuu/SENPFfr6xOT5KylCrvT6eD1bkQi5S2jT9Qyoju/9H/hXaAUG89dhwKOxqEE11ZGKF8DlNO/jcmchVnIkgWfEjVKc7vCAww1Y8lgxhyj4gRtnln+F+o=
10 changes: 10 additions & 0 deletions priam/src/main/java/com/netflix/priam/IConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -520,4 +520,14 @@ public interface IConfiguration
* @return
*/
Map<String, String> getExtraEnvParams();

public Boolean isIncrBackupParallelEnabled();
/*
* The number of workers for parallel uploads.
*/
public int getIncrementalBkupMaxConsumers();
/*
* The max number of files queued to be uploaded.
*/
public int getUncrementalBkupQueueSize();
}
13 changes: 11 additions & 2 deletions priam/src/main/java/com/netflix/priam/PriamServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.netflix.priam.backup.IncrementalBackup;
import com.netflix.priam.backup.Restore;
import com.netflix.priam.backup.SnapshotBackup;
import com.netflix.priam.backup.parallel.IncrementalBackupProducer;
import com.netflix.priam.identity.InstanceIdentity;
import com.netflix.priam.restore.AwsCrossAccountCryptographyRestoreStrategy;
import com.netflix.priam.restore.EncryptedRestoreStrategy;
Expand Down Expand Up @@ -155,8 +156,16 @@ else if (UpdateSecuritySettings.firstTimeUpdated)
scheduler.addTask(SnapshotBackup.JOBNAME, SnapshotBackup.class, SnapshotBackup.getTimer(config));

// Start the Incremental backup schedule if enabled
if (config.isIncrBackup())
if (config.isIncrBackup()) {
if ( !config.isIncrBackupParallelEnabled() ) {
scheduler.addTask(IncrementalBackup.JOBNAME, IncrementalBackup.class, IncrementalBackup.getTimer());
logger.info("Added incremental synchronous bkup");
} else {
scheduler.addTask(IncrementalBackupProducer.JOBNAME, IncrementalBackupProducer.class, IncrementalBackupProducer.getTimer());
logger.info("Added incremental async-synchronous bkup, next fired time: " + IncrementalBackupProducer.getTimer().getTrigger().getNextFireTime());
}
}

}

if (config.isBackingUpCommitLogs())
Expand All @@ -183,4 +192,4 @@ public IConfiguration getConfiguration()
return config;
}

}
}
13 changes: 8 additions & 5 deletions priam/src/main/java/com/netflix/priam/backup/AbstractBackup.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public abstract class AbstractBackup extends Task
protected final Map<String, List<String>> FILTER_COLUMN_FAMILY = ImmutableMap.of("system", Arrays.asList("local", "peers", "LocationInfo"));
protected final Provider<AbstractBackupPath> pathFactory;
protected IBackupFileSystem fs;



@Inject
public AbstractBackup(IConfiguration config, @Named("backup") IFileSystemContext backupFileSystemCtx,Provider<AbstractBackupPath> pathFactory)
{
Expand All @@ -55,12 +56,13 @@ public AbstractBackup(IConfiguration config, @Named("backup") IFileSystemContext
this.fs = backupFileSystemCtx.getFileStrategy(config);
}


/*
* A means to override the type of backup strategy chosen via BackupFileSystemContext
*/
protected void setFileSystem(IBackupFileSystem fs) {
this.fs = fs;
}
}

/*
* search for "1:* alphanumeric chars including special chars""literal period"" 1:* alphanumeric chars including special chars"
Expand All @@ -75,7 +77,7 @@ protected boolean isValidCFFilterFormat(String cfFilter) {

/**
* Upload files in the specified dir. Does not delete the file in case of
* error
* error. The files are uploaded serially.
*
* @param parent
* Parent dir
Expand Down Expand Up @@ -118,7 +120,7 @@ public AbstractBackupPath retriableCall() throws Exception
}
return bps;
}

/**
* Upload specified file (RandomAccessFile) with retries
*/
Expand Down Expand Up @@ -181,4 +183,5 @@ public boolean isValidBackupDir(File keyspaceDir, File columnFamilyDir, File bac
public enum DIRECTORYTYPE {
KEYSPACE, CF
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.netflix.priam.backup;

import com.netflix.priam.scheduler.TaskTimer;

public interface IIncrementalBackup {

public static long INCREMENTAL_INTERVAL_IN_MILLISECS = 10L * 1000;

/*
* @return the number of files pending to be uploaded. The semantic depends on whether the implementation
* is synchronous or asynchronous.
*/
public long getNumPendingFiles();

public String getJobName();

}
205 changes: 89 additions & 116 deletions priam/src/main/java/com/netflix/priam/backup/IncrementalBackup.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
Expand All @@ -44,16 +43,16 @@
* Incremental/SSTable backup
*/
@Singleton
public class IncrementalBackup extends AbstractBackup
public class IncrementalBackup extends AbstractBackup implements IIncrementalBackup
{
public static final String JOBNAME = "INCR_BACKUP_THREAD";
private static final Logger logger = LoggerFactory.getLogger(IncrementalBackup.class);

private final List<String> incrementalRemotePaths = new ArrayList<String>();
private IncrementalMetaData metaData;

private final Map<String, List<String>> incrementalCFFilter = new HashMap<String, List<String>>(); //key: keyspace, value: a list of CFs within the keyspace
private final Map<String, Object> incrementalKeyspaceFilter = new HashMap<String, Object>(); //key: keyspace, value: null
private final Map<String, Object> incrementalCFFilter = new HashMap<String, Object>();
private final Map<String, Object> incrementalKeyspaceFilter = new HashMap<String, Object>();

static List<IMessageObserver> observers = new ArrayList<IMessageObserver>();

Expand Down Expand Up @@ -90,138 +89,103 @@ private void populateIncrementalFilters() {
String cfFilters = this.config.getIncrementalCFFilter();
if (cfFilters == null || cfFilters.isEmpty()) {

logger.info("No column family filter set for incremental.");
logger.info("No column family filter set for snapshot.");

} else {

String[] filters = cfFilters.split(",");
for (int i=0; i < filters.length; i++) { //process each filter
if (isValidCFFilterFormat(filters[i])) {

String[] filter = filters[i].split("\\.");
String ksName = filter[0];
String cfName = filter[1];
logger.info("Adding incremental CF filter, keyspaceName: " + ksName + ", cf: " + cfName);

if (this.incrementalCFFilter.containsKey(ksName)) {
//add cf to existing filter
List<String> cfs = this.incrementalCFFilter.get(ksName);
cfs.add(cfName);
this.incrementalCFFilter.put(ksName, cfs);

} else {

List<String> cfs = new ArrayList<String>();
cfs.add(cfName);
this.incrementalCFFilter.put(ksName, cfs);

}

} else {
throw new IllegalArgumentException("Column family filter format is not valid. Format needs to be \"keyspace.columnfamily\". Invalid input: " + filters[i]);
}
} //end processing each filter
String[] cf = cfFilters.split(",");
for (int i=0; i < cf.length; i++) {
if (isValidCFFilterFormat(cf[i])) {
logger.info("Adding incremental CF filter: " + cf[i]);
this.incrementalCFFilter.put(cf[i], null);
} else {
throw new IllegalArgumentException("Column family filter format is not valid. Format needs to be \"keyspace.columnfamily\". Invalid input: " + cf[i]);
}
}

}
}

@Override
public void execute() throws Exception
{
//Clearing remotePath List
incrementalRemotePaths.clear();
File dataDir = new File(config.getDataFileLocation());
if (!dataDir.exists())
{
throw new IllegalArgumentException("The configured 'data file location' does not exist: "
+ config.getDataFileLocation());
}
logger.debug("Scanning for backup in: {}", dataDir.getAbsolutePath());
for (File keyspaceDir : dataDir.listFiles())
{
if (keyspaceDir.isFile())
continue;

if ( isFiltered(DIRECTORYTYPE.KEYSPACE, keyspaceDir.getName()) ) { //keyspace filtered?
logger.info(keyspaceDir.getName() + " is part of keyspace filter, incremental not done.");
continue;
}

for (File columnFamilyDir : keyspaceDir.listFiles())
{

if ( isFiltered(DIRECTORYTYPE.CF, keyspaceDir.getName(), columnFamilyDir.getName()) ) { //CF filtered?
logger.info("keyspace: " + keyspaceDir.getName()
+ ", CF: " + columnFamilyDir.getName() + " is part of CF filter list, incrmental not done.");
continue;
}

File backupDir = new File(columnFamilyDir, "backups");
if (!isValidBackupDir(keyspaceDir, columnFamilyDir, backupDir)) {
continue;
}
//Clearing remotePath List
incrementalRemotePaths.clear();
File dataDir = new File(config.getDataFileLocation());
if (!dataDir.exists())
{
throw new IllegalArgumentException("The configured 'data file location' does not exist: "
+ config.getDataFileLocation());
}
logger.debug("Scanning for backup in: {}", dataDir.getAbsolutePath());
for (File keyspaceDir : dataDir.listFiles())
{
if (keyspaceDir.isFile())
continue;

List<AbstractBackupPath> uploadedFiles = upload(backupDir, BackupFileType.SST);
if ( isFiltered(DIRECTORYTYPE.KEYSPACE, keyspaceDir.getName()) ) { //keyspace filtered?
logger.info(keyspaceDir.getName() + " is part of keyspace filter, incremental not done.");
continue;
}

if ( ! uploadedFiles.isEmpty() ) {
String incrementalUploadTime = AbstractBackupPath.formatDate(uploadedFiles.get(0).getTime()); //format of yyyymmddhhmm (e.g. 201505060901)
String metaFileName = "meta_" + columnFamilyDir.getName() + "_" + incrementalUploadTime;
logger.info("Uploading meta file for incremental backup: " + metaFileName);
this.metaData.setMetaFileName(metaFileName);
this.metaData.set(uploadedFiles, incrementalUploadTime);
logger.info("Uploaded meta file for incremental backup: " + metaFileName);
}
for (File columnFamilyDir : keyspaceDir.listFiles())
{

if ( isFiltered(DIRECTORYTYPE.CF, keyspaceDir.getName(), columnFamilyDir.getName()) ) { //CF filtered?
logger.info("keyspace: " + keyspaceDir.getName()
+ ", CF: " + columnFamilyDir.getName() + " is part of CF filter list, incrmental not done.");
continue;
}

File backupDir = new File(columnFamilyDir, "backups");
if (!isValidBackupDir(keyspaceDir, columnFamilyDir, backupDir)) {
continue;
}

List<AbstractBackupPath> uploadedFiles = upload(backupDir, BackupFileType.SST);

if ( ! uploadedFiles.isEmpty() ) {
String incrementalUploadTime = AbstractBackupPath.formatDate(uploadedFiles.get(0).getTime()); //format of yyyymmddhhmm (e.g. 201505060901)
String metaFileName = "meta_" + columnFamilyDir.getName() + "_" + incrementalUploadTime;
logger.info("Uploading meta file for incremental backup: " + metaFileName);
this.metaData.setMetaFileName(metaFileName);
this.metaData.set(uploadedFiles, incrementalUploadTime);
logger.info("Uploaded meta file for incremental backup: " + metaFileName);
}

}
}

if(incrementalRemotePaths.size() > 0)
{
notifyObservers();
}
}
}
if(incrementalRemotePaths.size() > 0)
{
notifyObservers();
}

}

/*
* @return true if directory should be filter from processing; otherwise, false.
*/
private boolean isFiltered(DIRECTORYTYPE directoryType, String...args) {
if (directoryType.equals(DIRECTORYTYPE.KEYSPACE)) { //start with filtering the parent (keyspace)
if (directoryType.equals(DIRECTORYTYPE.CF)) {
String keyspaceName = args[0];
//Apply each keyspace filter to input string
java.util.Set<String> ksFilters = this.incrementalKeyspaceFilter.keySet();
Iterator<String> it = ksFilters.iterator();
while (it.hasNext()) {
String ksFilter = it.next();
Pattern p = Pattern.compile(ksFilter);
Matcher m = p.matcher(keyspaceName);
if (m.find()) {
logger.info("Keyspace: " + keyspaceName + " matched filter: " + ksFilter);
return true;
}
}

}

if (directoryType.equals(DIRECTORYTYPE.CF)) { //parent (keyspace) is not filtered, now see if the child (CF) is filtered
String keyspaceName = args[0];
if ( !this.incrementalCFFilter.containsKey(keyspaceName) ) {
return false;
String cfName = args[1];
if (this.incrementalKeyspaceFilter.containsKey(keyspaceName)) { //account for keyspace which we want to filter
return true;
} else {
StringBuffer strBuf = new StringBuffer();
strBuf.append(keyspaceName);
strBuf.append('.');
strBuf.append(cfName);
return this.incrementalCFFilter.containsKey(strBuf.toString());
}

} else if (directoryType.equals(DIRECTORYTYPE.KEYSPACE)) {
return this.incrementalKeyspaceFilter.containsKey(args[0]);

String cfName = args[1];
List<String> cfsFilter = this.incrementalCFFilter.get(keyspaceName);
for (int i=0; i < cfsFilter.size(); i++) {
Pattern p = Pattern.compile(cfsFilter.get(i));
Matcher m = p.matcher(cfName);
if (m.find()) {
logger.info(keyspaceName + "." + cfName + " matched filter");
return true;
}
}
}

return false; //if here, current input are not part of keyspae and cf filters
} else {
throw new UnsupportedOperationException("Directory type not supported. Invalid input: " + directoryType.name());
}

}

Expand All @@ -230,7 +194,7 @@ private boolean isFiltered(DIRECTORYTYPE directoryType, String...args) {
*/
public static TaskTimer getTimer()
{
return new SimpleTimer(JOBNAME, 10L * 1000);
return new SimpleTimer(JOBNAME, INCREMENTAL_INTERVAL_IN_MILLISECS);
}

@Override
Expand Down Expand Up @@ -267,5 +231,14 @@ public void notifyObservers()
protected void addToRemotePath(String remotePath) {
incrementalRemotePaths.add(remotePath);
}

@Override
public long getNumPendingFiles() {
throw new UnsupportedOperationException();
}

}
@Override
public String getJobName() {
return JOBNAME;
}
}
Loading