Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PostRestoreHookChanges #681

Merged
merged 4 commits into from
Jun 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions priam/src/main/java/com/netflix/priam/IConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -720,4 +720,36 @@ public interface IConfiguration {
* @return SNS Topic ARN to be used to send notification.
*/
public String getBackupNotificationTopicArn();

/**
* Post restore hook enabled state. If enabled, jar represented by getPostRepairHook is called once download of files is complete, before starting Cassandra.
* @return if post restore hook is enabled
*/
public boolean isPostRestoreHookEnabled();

/**
* Post restore hook to be executed
* @return post restore hook to be executed once restore is complete
*/
public String getPostRestoreHook();


/**
* HeartBeat file of post restore hook
* @return file that indicates heartbeat of post restore hook
*/
public String getPostRestoreHookHeartbeatFileName();


/**
* Done file for post restore hook
* @return file that indicates completion of post restore hook
*/
public String getPostRestoreHookDoneFileName();

/**
* Maximum time Priam has to wait for post restore hook sub-process to complete successfully
* @return time out for post restore hook in days
*/
public int getPostRestoreHookTimeOutInDays();
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -182,6 +183,12 @@ public class PriamConfiguration implements IConfiguration {
private static final String CONFIG_VPC_ROLE_ASSUMPTION_ARN = PRIAM_PRE + ".vpc.roleassumption.arn";
private static final String CONFIG_DUAL_ACCOUNT = PRIAM_PRE + ".roleassumption.dualaccount";

//Post Restore Hook
private static final String CONFIG_POST_RESTORE_HOOK_ENABLED = PRIAM_PRE + ".postrestorehook.enabled";
private static final String CONFIG_POST_RESTORE_HOOK = PRIAM_PRE + ".postrestorehook";
private static final String CONFIG_POST_RESTORE_HOOK_HEARTBEAT_FILENAME = PRIAM_PRE + ".postrestorehook.heartbeat.filename";
private static final String CONFIG_POST_RESTORE_HOOK_DONE_FILENAME = PRIAM_PRE + ".postrestorehook.done.filename";
private static final String CONFIG_POST_RESTORE_HOOK_TIMEOUT_IN_DAYS = PRIAM_PRE + ".postrestorehook.timeout.in.days";

//Running instance meta data
private String RAC;
Expand Down Expand Up @@ -1101,4 +1108,28 @@ public String getBackupNotificationTopicArn() {
return config.get(PRIAM_PRE + ".backup.notification.topic.arn", "");
}

@Override
public boolean isPostRestoreHookEnabled() {
return config.get(CONFIG_POST_RESTORE_HOOK_ENABLED, false);
}

@Override
public String getPostRestoreHook() {
return config.get(CONFIG_POST_RESTORE_HOOK);
}

@Override
public String getPostRestoreHookHeartbeatFileName() {
return config.get(CONFIG_POST_RESTORE_HOOK_HEARTBEAT_FILENAME, getDataFileLocation() + File.separator + "postrestorehook_heartbeat");
}

@Override
public String getPostRestoreHookDoneFileName() {
return config.get(CONFIG_POST_RESTORE_HOOK_DONE_FILENAME, getDataFileLocation() + File.separator + "postrestorehook_done");
}

@Override
public int getPostRestoreHookTimeOutInDays() {
return config.get(CONFIG_POST_RESTORE_HOOK_TIMEOUT_IN_DAYS, 2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.netflix.priam.backup.AbstractBackupPath.BackupFileType;
import com.netflix.priam.health.InstanceState;
import com.netflix.priam.identity.InstanceIdentity;
import com.netflix.priam.scheduler.NamedThreadPoolExecutor;
import com.netflix.priam.scheduler.Task;
import com.netflix.priam.utils.*;
import org.apache.commons.collections4.CollectionUtils;
Expand All @@ -35,8 +36,12 @@
import java.io.File;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.StandardOpenOption;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.ThreadPoolExecutor;

/**
* A means to perform a restore. This class contains the following characteristics:
Expand All @@ -61,11 +66,12 @@ public abstract class AbstractRestore extends Task implements IRestoreStrategy{
private ICassandraProcess cassProcess;
private InstanceState instanceState;
private MetaData metaData;
private IPostRestoreHook postRestoreHook;

public AbstractRestore(IConfiguration config, IBackupFileSystem fs, String name, Sleeper sleeper,
Provider<AbstractBackupPath> pathProvider,
InstanceIdentity instanceIdentity, RestoreTokenSelector tokenSelector,
ICassandraProcess cassProcess, MetaData metaData, InstanceState instanceState) {
ICassandraProcess cassProcess, MetaData metaData, InstanceState instanceState, IPostRestoreHook postRestoreHook) {
super(config);
this.fs = fs;
this.sleeper = sleeper;
Expand All @@ -76,6 +82,7 @@ public AbstractRestore(IConfiguration config, IBackupFileSystem fs, String name,
this.metaData = metaData;
this.instanceState = instanceState;
backupRestoreUtil = new BackupRestoreUtil(config.getRestoreKeyspaceFilter(), config.getRestoreCFFilter());
this.postRestoreHook = postRestoreHook;
}

public static final boolean isRestoreEnabled(IConfiguration conf) {
Expand Down Expand Up @@ -199,6 +206,11 @@ public Void retriableCall() throws Exception {
}

public void restore(Date startTime, Date endTime) throws Exception {
//fail early if post restore hook has invalid parameters
if(!postRestoreHook.hasValidParameters()) {
throw new PostRestoreHookException("Invalid PostRestoreHook parameters");
}

//Set the restore status.
instanceState.getRestoreStatus().resetStatus();
instanceState.getRestoreStatus().setStartDateRange(DateUtil.convert(startTime));
Expand Down Expand Up @@ -270,6 +282,11 @@ public void restore(Date startTime, Date endTime) throws Exception {
instanceState.getRestoreStatus().setExecutionEndTime(LocalDateTime.now());
instanceState.setRestoreStatus(Status.FINISHED);

//Given that files are restored now, kick off post restore hook
logger.info("Starting post restore hook");
postRestoreHook.execute();
logger.info("Completed executing post restore hook");

//Start cassandra if restore is successful.
cassProcess.start(true);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ public AwsCrossAccountCryptographyRestoreStrategy(final IConfiguration config, I
, @Named("filecryptoalgorithm") IFileCryptography fileCryptography
, @Named("pgpcredential") ICredentialGeneric credential
, ICompression compress, Provider<AbstractBackupPath> pathProvider,
InstanceIdentity id, RestoreTokenSelector tokenSelector, MetaData metaData, InstanceState instanceState) {
InstanceIdentity id, RestoreTokenSelector tokenSelector, MetaData metaData, InstanceState instanceState, IPostRestoreHook postRestoreHook) {

super(config, crossAcctfs.getBackupFileSystem(), JOBNAME, sleeper, cassProcess, pathProvider, id, tokenSelector, credential, fileCryptography, compress, metaData, instanceState);
super(config, crossAcctfs.getBackupFileSystem(), JOBNAME, sleeper, cassProcess, pathProvider, id, tokenSelector, credential, fileCryptography, compress, metaData, instanceState, postRestoreHook);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ public abstract class EncryptedRestoreBase extends AbstractRestore{
protected EncryptedRestoreBase(IConfiguration config, IBackupFileSystem fs, String jobName, Sleeper sleeper,
ICassandraProcess cassProcess, Provider<AbstractBackupPath> pathProvider,
InstanceIdentity instanceIdentity, RestoreTokenSelector tokenSelector, ICredentialGeneric pgpCredential,
IFileCryptography fileCryptography, ICompression compress, MetaData metaData, InstanceState instanceState) {
super(config, fs, jobName, sleeper, pathProvider, instanceIdentity, tokenSelector, cassProcess, metaData, instanceState);
IFileCryptography fileCryptography, ICompression compress, MetaData metaData, InstanceState instanceState, IPostRestoreHook postRestoreHook) {
super(config, fs, jobName, sleeper, pathProvider, instanceIdentity, tokenSelector, cassProcess, metaData, instanceState, postRestoreHook);

this.jobName = jobName;
this.pgpCredential = pgpCredential;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ public EncryptedRestoreStrategy(final IConfiguration config, ICassandraProcess c
, @Named("filecryptoalgorithm") IFileCryptography fileCryptography
, @Named("pgpcredential") ICredentialGeneric credential
, ICompression compress, Provider<AbstractBackupPath> pathProvider,
InstanceIdentity id, RestoreTokenSelector tokenSelector, MetaData metaData, InstanceState instanceState
InstanceIdentity id, RestoreTokenSelector tokenSelector, MetaData metaData, InstanceState instanceState, IPostRestoreHook postRestoreHook
) {

super(config, fs, JOBNAME, sleeper, cassProcess, pathProvider, id, tokenSelector, credential, fileCryptography, compress, metaData, instanceState);
super(config, fs, JOBNAME, sleeper, cassProcess, pathProvider, id, tokenSelector, credential, fileCryptography, compress, metaData, instanceState, postRestoreHook);
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ public GoogleCryptographyRestoreStrategy(final IConfiguration config, ICassandra
, @Named("filecryptoalgorithm") IFileCryptography fileCryptography
, @Named("pgpcredential") ICredentialGeneric credential
, ICompression compress, Provider<AbstractBackupPath> pathProvider,
InstanceIdentity id, RestoreTokenSelector tokenSelector, MetaData metaData, InstanceState instanceState
InstanceIdentity id, RestoreTokenSelector tokenSelector, MetaData metaData, InstanceState instanceState, IPostRestoreHook postRestoreHook
) {
super(config, fs, JOBNAME, sleeper, cassProcess, pathProvider, id, tokenSelector, credential, fileCryptography, compress, metaData, instanceState);
super(config, fs, JOBNAME, sleeper, cassProcess, pathProvider, id, tokenSelector, credential, fileCryptography, compress, metaData, instanceState, postRestoreHook);
}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.netflix.priam.restore;

import com.google.inject.ImplementedBy;

/**
* Interface for post restore hook
*/
@ImplementedBy(PostRestoreHook.class)
public interface IPostRestoreHook {
boolean hasValidParameters();
void execute() throws Exception;
}
157 changes: 157 additions & 0 deletions priam/src/main/java/com/netflix/priam/restore/PostRestoreHook.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package com.netflix.priam.restore;

import com.netflix.priam.IConfiguration;
import com.netflix.priam.scheduler.NamedThreadPoolExecutor;
import com.netflix.priam.utils.RetryableCallable;
import com.netflix.priam.utils.Sleeper;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* An implementation of IPostRestoreHook. Kicks off a child process for post restore hook using ProcessBuilder; uses heart beat monitor to monitor progress of the sub process
* and uses a file lock to pass the active state to the sub process
*/
public class PostRestoreHook implements IPostRestoreHook {
private static final Logger logger = LoggerFactory.getLogger(PostRestoreHook.class);
private final IConfiguration config;
private final Sleeper sleeper;
private final int ThreadWaitTimeInMs = 60000;
private final int HeartBeatTimeOutInMs = 60000;
private static String PostRestoreHookCommandDelimiter = " ";
private static String PriamPostRestoreHookFilePrefix = "PriamFileForPostRestoreHook";
private static String PriamPostRestoreHookFileSuffix = ".tmp";
private static String PriamPostRestoreHookFileOptionName = "--parentHookFilePath=";

@Inject
public PostRestoreHook(IConfiguration config, Sleeper sleeper) {
this.config = config;
this.sleeper = sleeper;
}

/**
* Checks parameters to make sure none are blank
* @return if all parameters are valid
*/
public boolean hasValidParameters() {
if(config.isPostRestoreHookEnabled()) {
if(StringUtils.isBlank(config.getPostRestoreHook())
|| StringUtils.isBlank(config.getPostRestoreHookHeartbeatFileName())
|| StringUtils.isBlank(config.getPostRestoreHookDoneFileName())) {
return false;
}
}
return true;
}

/**
* Executes a sub process as part of post restore hook, and waits for the completion of the process. In case of lack of heart beat from the sub process, existing sub process is terminated
* and new sub process is kicked off
* @throws Exception
*/
public void execute() throws Exception {
if(config.isPostRestoreHookEnabled()) {
logger.info("Started PostRestoreHook execution");

//create a temp file to be used to indicate state of the current process, to the sub-process
File tempLockFile = File.createTempFile(PriamPostRestoreHookFilePrefix, PriamPostRestoreHookFileSuffix);
RandomAccessFile raf = new RandomAccessFile(tempLockFile.getPath(), "rw");
FileChannel fileChannel = raf.getChannel();
FileLock lock = fileChannel.lock();

try {
if (lock.isValid()) {
logger.info("Lock on RestoreHookFile acquired");
int countOfProcessStarts = 0;
while (true) {
if (doneFileExists()) {
logger.info("Not starting PostRestoreHook since DONE file already exists.");
break;
}

String postRestoreHook = config.getPostRestoreHook();
//add temp file path as parameter to the jar file
postRestoreHook = postRestoreHook + PostRestoreHookCommandDelimiter + PriamPostRestoreHookFileOptionName + tempLockFile.getAbsolutePath();
String[] processCommandArguments = postRestoreHook.split(PostRestoreHookCommandDelimiter);
ProcessBuilder processBuilder = new ProcessBuilder(processCommandArguments);

//start sub-process
Process process = processBuilder.inheritIO().start();
logger.info("Started PostRestoreHook - Attempt#{}", ++countOfProcessStarts);

//monitor progress of sub-process
monitorPostRestoreHookHeartBeat(process);

//block until sub-process completes or until the timeout
if (!process.waitFor(config.getPostRestoreHookTimeOutInDays(), TimeUnit.DAYS)) {
logger.info("PostRestoreHook process did not complete within {} days. Forcefully terminating the process.", config.getPostRestoreHookTimeOutInDays());
process.destroyForcibly();
}

if (process.exitValue() == 0) {
logger.info("PostRestoreHook process completed successfully");
break;
}
logger.info("PostRestoreHook process exited unsuccessfully");
}
logger.info("Completed PostRestoreHook execution");
} else {
throw new PostRestoreHookException(String.format("Could not acquire lock on a temp file necessary for PostRestoreHook to execute. Path to temp file: %s", tempLockFile.getAbsolutePath()));
}
} finally {
//close and delete temp file
lock.release();
fileChannel.close();
raf.close();
tempLockFile.delete();
}
}
}


/**
* Monitors heart beat of the process
* @param process Process to be monitored
* @throws InterruptedException
* @throws IOException
*/
private void monitorPostRestoreHookHeartBeat(Process process) throws InterruptedException, IOException {
File heartBeatFile = new File(config.getPostRestoreHookHeartbeatFileName());
ThreadPoolExecutor heartBeatPoolExecutor = new NamedThreadPoolExecutor(1, "PostRestoreHook_HeartBeatThreadPool");
heartBeatPoolExecutor.allowCoreThreadTimeOut(true);
heartBeatPoolExecutor.submit(new RetryableCallable<Integer>() {
@Override
public Integer retriableCall() throws Exception {
while (true) {
sleeper.sleep(ThreadWaitTimeInMs);
if(System.currentTimeMillis() - heartBeatFile.lastModified() > HeartBeatTimeOutInMs) {
//kick off post restore hook process, since there is no heartbeat
logger.info("No heartbeat for the last {} ms, killing the existing process.", HeartBeatTimeOutInMs);
if(process.isAlive()) {
process.destroyForcibly();
}
return 0;
}
}
}
});
}

/**
* Checks for presence of DONE file
* @return if done file exists
*/
private boolean doneFileExists() {
File doneFile = new File(config.getPostRestoreHookDoneFileName());
return doneFile.exists();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.netflix.priam.restore;

/**
* Exception raised by PostRestoreHook
*/
public class PostRestoreHookException extends Exception {

public PostRestoreHookException(String message) {
super(message);
}

public PostRestoreHookException(String message, Exception e) {
super(message, e);
}

}
Loading