Skip to content

Commit

Permalink
make an option to start WAL Completer externally and suspend and resu…
Browse files Browse the repository at this point in the history
…me it manually
  • Loading branch information
skarpenko authored and kptfh committed Aug 9, 2019
1 parent ab360fd commit d3fda50
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 33 deletions.
6 changes: 6 additions & 0 deletions aerospike-storage-backend/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Map;

import static com.playtika.janusgraph.aerospike.AerospikeKeyColumnValueStore.mutationToMap;
import static com.playtika.janusgraph.aerospike.ConfigOptions.START_WAL_COMPLETER;
import static com.playtika.janusgraph.aerospike.operations.AerospikeOperations.getValue;
import static com.playtika.janusgraph.aerospike.util.AerospikeUtils.isEmptyNamespace;
import static com.playtika.janusgraph.aerospike.util.AerospikeUtils.truncateNamespace;
Expand All @@ -53,7 +54,9 @@ public AerospikeStoreManager(Configuration configuration) {

operations = initOperations(configuration);

operations.getWriteAheadLogCompleter().start();
if(configuration.get(START_WAL_COMPLETER)) {
operations.getWriteAheadLogCompleter().start();
}
}

protected BasicOperations initOperations(Configuration configuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,10 @@ public class ConfigOptions {
"test-environment", "Weather this production or test environment",
ConfigOption.Type.LOCAL, false);

public static final ConfigOption<Boolean> START_WAL_COMPLETER = new ConfigOption<>(STORAGE_NS,
"start-wal-completer", "Whether WAL Completer should be started inside store manager. " +
"You should not start WAL Completer in passive data center. " +
"You may consider to start WAL Completer externally than you will be able to suspend and resume it manually",
ConfigOption.Type.LOCAL, true);

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import static com.google.common.util.concurrent.MoreExecutors.shutdownAndAwaitTermination;
import static com.playtika.janusgraph.aerospike.util.AsyncUtil.WAIT_TIMEOUT_IN_SECONDS;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.*;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.AUTH_PASSWORD;

public class AerospikeOperations {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static com.playtika.janusgraph.aerospike.ConfigOptions.*;
import static com.playtika.janusgraph.aerospike.operations.AerospikeOperations.buildAerospikeClient;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.GRAPH_NAME;

public class BasicOperations implements Operations {

Expand All @@ -41,19 +42,19 @@ public class BasicOperations implements Operations {

public BasicOperations(Configuration configuration) {
this.configuration = configuration;
this.aerospikePolicyProvider = initPolicyProvider(configuration);
this.aerospikeOperations = initAerospikeOperations(configuration, aerospikePolicyProvider);
this.walOperations = initWalOperations(configuration, aerospikeOperations);
this.writeAheadLogManager = initWriteAheadLogManager(walOperations, getClock());
this.lockOperations = initLockOperations(aerospikeOperations);
this.mutateOperations = initMutateOperations(aerospikeOperations);
this.transactionalOperations = initTransactionalOperations(
this.aerospikePolicyProvider = buildPolicyProvider(configuration);
this.aerospikeOperations = buildAerospikeOperations(configuration, aerospikePolicyProvider);
this.walOperations = buildWalOperations(configuration, aerospikeOperations);
this.writeAheadLogManager = buildWriteAheadLogManager(walOperations, getClock());
this.lockOperations = buildLockOperations(aerospikeOperations);
this.mutateOperations = buildMutateOperations(aerospikeOperations);
this.transactionalOperations = buildTransactionalOperations(
() -> writeAheadLogManager, () -> lockOperations, () -> mutateOperations);
this.writeAheadLogCompleter = buildWriteAheadLogCompleter(walOperations,
() -> writeAheadLogManager, () -> lockOperations, () -> mutateOperations);

this.readOperations = initReadOperations(aerospikeOperations);
this.scanOperations = initScanOperations(configuration, aerospikeOperations);
this.readOperations = buildReadOperations(aerospikeOperations);
this.scanOperations = buildScanOperations(configuration, aerospikeOperations);
}

@Override
Expand Down Expand Up @@ -81,11 +82,11 @@ public ScanOperations getScanOperations() {
return scanOperations;
}

protected AerospikePolicyProvider initPolicyProvider(Configuration configuration){
protected AerospikePolicyProvider buildPolicyProvider(Configuration configuration){
return configuration.get(TEST_ENVIRONMENT) ? new TestAerospikePolicyProvider() : new AerospikePolicyProvider();
}

protected AerospikeOperations initAerospikeOperations(Configuration configuration, AerospikePolicyProvider aerospikePolicyProvider) {
protected AerospikeOperations buildAerospikeOperations(Configuration configuration, AerospikePolicyProvider aerospikePolicyProvider) {
ExecutorService aerospikeExecutor = new ThreadPoolExecutor(4, configuration.get(AEROSPIKE_PARALLELISM),
1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(),
new NamedThreadFactory(JANUS_AEROSPIKE_THREAD_GROUP_NAME, "main"));
Expand All @@ -99,30 +100,30 @@ protected AerospikeOperations initAerospikeOperations(Configuration configuratio
aerospikePolicyProvider, aerospikeExecutor);
}

protected WalOperations initWalOperations(Configuration configuration, AerospikeOperations aerospikeOperations){
protected WalOperations buildWalOperations(Configuration configuration, AerospikeOperations aerospikeOperations){
return new WalOperations(configuration, aerospikeOperations);
}

protected Clock getClock() {
return Clock.systemUTC();
}

protected TransactionalOperations initTransactionalOperations(
protected TransactionalOperations buildTransactionalOperations(
Supplier<WriteAheadLogManager> writeAheadLogManager,
Supplier<LockOperations> lockOperations,
Supplier<MutateOperations> mutateOperations){
return new TransactionalOperations(writeAheadLogManager.get(), lockOperations.get(), mutateOperations.get());
}

protected MutateOperations initMutateOperations(AerospikeOperations aerospikeOperations) {
protected MutateOperations buildMutateOperations(AerospikeOperations aerospikeOperations) {
return new BasicMutateOperations(aerospikeOperations);
}

protected LockOperations initLockOperations(AerospikeOperations aerospikeOperations) {
protected LockOperations buildLockOperations(AerospikeOperations aerospikeOperations) {
return new BasicLockOperations(aerospikeOperations);
}

protected WriteAheadLogManager initWriteAheadLogManager(WalOperations walOperations, Clock clock) {
protected WriteAheadLogManager buildWriteAheadLogManager(WalOperations walOperations, Clock clock) {
return new WriteAheadLogManagerBasic(walOperations, clock);
}

Expand All @@ -133,21 +134,21 @@ protected WriteAheadLogCompleter buildWriteAheadLogCompleter(
Supplier<MutateOperations> mutateOperations){
return new WriteAheadLogCompleter(
walOperations,
initWalCompleterTransactionalOperations(writeAheadLogManager, lockOperations, mutateOperations));
buildWalCompleterTransactionalOperations(writeAheadLogManager, lockOperations, mutateOperations));
}

protected TransactionalOperations initWalCompleterTransactionalOperations(
protected TransactionalOperations buildWalCompleterTransactionalOperations(
Supplier<WriteAheadLogManager> writeAheadLogManager,
Supplier<LockOperations> lockOperations,
Supplier<MutateOperations> mutateOperations){
return new TransactionalOperations(writeAheadLogManager.get(), lockOperations.get(), mutateOperations.get());
}

protected ReadOperations initReadOperations(AerospikeOperations aerospikeOperations) {
protected ReadOperations buildReadOperations(AerospikeOperations aerospikeOperations) {
return new ReadOperations(aerospikeOperations);
}

protected ScanOperations initScanOperations(Configuration configuration, AerospikeOperations aerospikeOperations){
protected ScanOperations buildScanOperations(Configuration configuration, AerospikeOperations aerospikeOperations){
ExecutorService scanExecutor = new ThreadPoolExecutor(0, configuration.get(SCAN_PARALLELISM),
1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(),
new NamedThreadFactory(JANUS_AEROSPIKE_THREAD_GROUP_NAME, "scan"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.google.common.util.concurrent.MoreExecutors.shutdownAndAwaitTermination;
import static com.playtika.janusgraph.aerospike.operations.BasicOperations.JANUS_AEROSPIKE_THREAD_GROUP_NAME;
Expand All @@ -47,6 +48,7 @@ public class WriteAheadLogCompleter {
private final Key exclusiveLockKey;
private final Bin exclusiveLockBin;
private int generation = 0;
private AtomicBoolean suspended = new AtomicBoolean(false);

private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
new NamedThreadFactory(JANUS_AEROSPIKE_THREAD_GROUP_NAME, "wal")
Expand Down Expand Up @@ -76,12 +78,36 @@ public void shutdown(){
shutdownAndAwaitTermination(scheduledExecutorService, WAIT_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
}

/**
* You should call it when the data center had been switched into the passive mode
*/
public void suspend(){
this.suspended.set(true);
}

/**
* You should call it when the data center had been switched into the active mode
*/
public void resume(){
this.suspended.set(false);
}

private void completeHangedTransactions() {

if(suspended.get()){
logger.info("WAL execution was suspended");
return;
}

try {
if(acquireExclusiveLock()){
List<WriteAheadLogManagerBasic.WalTransaction> staleTransactions = writeAheadLogManager.getStaleTransactions();
logger.info("Got {} stale transactions", staleTransactions.size());
for(WriteAheadLogManagerBasic.WalTransaction transaction : staleTransactions){
if(suspended.get()){
logger.info("WAL execution was suspended");
break;
}
if(Thread.currentThread().isInterrupted()){
logger.info("WAL execution was interrupted");
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ public FlakingOperations(Configuration configuration) {
super(configuration);
}

protected TransactionalOperations initTransactionalOperations(
protected TransactionalOperations buildTransactionalOperations(
Supplier<WriteAheadLogManager> writeAheadLogManager,
Supplier<LockOperations> lockOperations,
Supplier<MutateOperations> mutateOperations){
return super.initTransactionalOperations(
return super.buildTransactionalOperations(
() -> new FlakingWriteAheadLogManager(writeAheadLogManager.get(), failsDeleteTransaction),
() -> new FlakingLockOperations(lockOperations.get(), failsUnlock, failsAcquire),
() -> new FlakingMutateOperations(mutateOperations.get(), failsMutate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.playtika.janusgraph.aerospike.operations.LockOperations;
import com.playtika.janusgraph.aerospike.operations.MutateOperations;
import com.playtika.janusgraph.aerospike.operations.Operations;
import org.awaitility.Duration;
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.configuration.Configuration;
import org.janusgraph.diskstorage.configuration.ModifiableConfiguration;
Expand All @@ -19,7 +20,7 @@
import static com.playtika.janusgraph.aerospike.AerospikeTestUtils.getAerospikeConfiguration;
import static com.playtika.janusgraph.aerospike.AerospikeTestUtils.getAerospikeContainer;
import static com.playtika.janusgraph.aerospike.ConfigOptions.WAL_STALE_TRANSACTION_LIFETIME_THRESHOLD;
import static com.playtika.janusgraph.aerospike.operations.AerospikeOperations.getValue;
import static org.awaitility.Awaitility.waitAtMost;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.*;
Expand Down Expand Up @@ -48,13 +49,31 @@ public void shouldCompleteStaleTransactions() throws InterruptedException, Backe
SpyOperations spyOperations = new SpyOperations(configuration);
spyOperations.getWriteAheadLogCompleter().start();
WriteAheadLogManager walManager = spyOperations.getTransactionalOperations().getWriteAheadLogManager();
while(!walManager.getStaleTransactions().isEmpty()){
Thread.sleep(100);
}
waitAtMost(Duration.FIVE_SECONDS)
.until(() -> walManager.getStaleTransactions().isEmpty());
spyOperations.close();

verify(spyOperations.transactionalOperationsSpy, times(4)).processAndDeleteTransaction(any(), any(), any(), anyBoolean());
verify(spyOperations.transactionalOperationsSpy).releaseLocksAndDeleteWalTransactionOnError(any(), any());
}

@Test
public void shouldNotCompleteStaleTransactionsIfSuspended() throws InterruptedException, BackendException {
ModifiableConfiguration configuration = getAerospikeConfiguration(container)
.set(WAL_STALE_TRANSACTION_LIFETIME_THRESHOLD, STALE_THRESHOLD);
Operations operations = new BasicOperations(configuration);
writeTransactions(operations.getTransactionalOperations().getWriteAheadLogManager());
operations.close();
Thread.sleep(STALE_THRESHOLD * 3);

SpyOperations spyOperations = new SpyOperations(configuration);
spyOperations.getWriteAheadLogCompleter().suspend();
spyOperations.getWriteAheadLogCompleter().start();

Thread.sleep(500);
spyOperations.close();

verify(spyOperations.spy, times(4)).processAndDeleteTransaction(any(), any(), any(), anyBoolean());
verify(spyOperations.spy).releaseLocksAndDeleteWalTransactionOnError(any(), any());
verify(spyOperations.transactionalOperationsSpy, never()).processAndDeleteTransaction(any(), any(), any(), anyBoolean());;
}

private void writeTransactions(WriteAheadLogManager walManager){
Expand Down Expand Up @@ -106,17 +125,19 @@ private void writeTransaction(WriteAheadLogManager walManager,

private static class SpyOperations extends BasicOperations{

TransactionalOperations spy;
TransactionalOperations transactionalOperationsSpy;

public SpyOperations(Configuration configuration) {
super(configuration);
}

protected TransactionalOperations initWalCompleterTransactionalOperations(
@Override
protected TransactionalOperations buildWalCompleterTransactionalOperations(
Supplier<WriteAheadLogManager> writeAheadLogManager,
Supplier<LockOperations> lockOperations,
Supplier<MutateOperations> mutateOperations){
return spy = spy(new TransactionalOperations(writeAheadLogManager.get(), lockOperations.get(), mutateOperations.get()));
return transactionalOperationsSpy = spy(
super.buildWalCompleterTransactionalOperations(writeAheadLogManager, lockOperations, mutateOperations));
}
}

Expand Down
9 changes: 9 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
<maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version>
<nexus-staging-maven-plugin.version>1.6.8</nexus-staging-maven-plugin.version>
<embedded-aerospike.version>1.19</embedded-aerospike.version>
<awaitility.version>3.1.2</awaitility.version>
</properties>

<profiles>
Expand Down Expand Up @@ -254,6 +255,14 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>


</dependencies>
</dependencyManagement>

Expand Down

0 comments on commit d3fda50

Please sign in to comment.