Skip to content

Commit

Permalink
renew lock on each transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
skarpenko authored and kptfh committed May 22, 2019
1 parent 7d3bcc8 commit 055c32f
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static com.playtika.janusgraph.aerospike.AerospikeStoreManager.JANUS_AEROSPIKE_THREAD_GROUP_NAME;
import static com.playtika.janusgraph.aerospike.util.AsyncUtil.shutdownAndAwaitTermination;
import static com.playtika.janusgraph.aerospike.wal.WriteAheadLogManager.getBytesFromUUID;
import static java.time.temporal.ChronoUnit.SECONDS;

/**
Expand All @@ -36,14 +38,15 @@ public class WriteAheadLogCompleter {
private static final Instant JAN_01_2010 = Instant.parse("2010-01-01T00:00:00.00Z");

private static final Value EXCLUSIVE_LOCK_KEY = Value.get((byte)0);
private static final Bin EXCLUSIVE_LOCK_BIN = new Bin("EL", true);

private final IAerospikeClient client;
private final WriteAheadLogManager writeAheadLogManager;
private final long periodInMs;
private final AerospikeStoreManager aerospikeStoreManager;
private final WritePolicy putLockPolicy;
private final Key exclusiveLockKey;
private final Bin exclusiveLockBin;
private int generation = 0;

private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
new NamedThreadFactory(JANUS_AEROSPIKE_THREAD_GROUP_NAME, "wal")
Expand All @@ -57,13 +60,9 @@ public WriteAheadLogCompleter(IAerospikeClient aerospikeClient, String walNamesp
this.writeAheadLogManager = writeAheadLogManager;
this.aerospikeStoreManager = aerospikeStoreManager;

this.putLockPolicy = new WritePolicy();
this.putLockPolicy.recordExistsAction = RecordExistsAction.CREATE_ONLY;
this.putLockPolicy.expiration = (int)Duration.ofMillis(periodInMs).get(SECONDS);
if(this.putLockPolicy.expiration < 1){
throw new IllegalArgumentException("Wrong expiration for WAL lock: "+putLockPolicy.expiration);
}
this.putLockPolicy = buildPutLockPolicy(periodInMs);

this.exclusiveLockBin = new Bin("EL", getBytesFromUUID(UUID.randomUUID()));

//set period to by slightly longer then expiration
this.periodInMs = Duration.ofSeconds(putLockPolicy.expiration + 1).toMillis();
Expand Down Expand Up @@ -91,25 +90,27 @@ private void completeHangedTransactions() {
break;
}

logger.info("Trying to complete transaction id={}, timestamp={}",
transaction.transactionId, transaction.timestamp);
try {
aerospikeStoreManager.processAndDeleteTransaction(
transaction.transactionId, transaction.locks, transaction.mutations, true);
logger.info("Successfully complete transaction id={}", transaction.transactionId);
}
//this is expected behaviour that may have place in case of transaction was interrupted:
// - on 'release locks' stage then transaction will fail and just need to release hanged locks
// - on 'delete wal transaction' stage and just need to remove transaction
catch (PermanentLockingException be) {
logger.info("Failed to complete transaction id={} as it's already completed", transaction.transactionId, be);
aerospikeStoreManager.releaseLocksAndDeleteWalTransaction(transaction.locks, transaction.transactionId);
logger.info("released locks for transaction id={}", transaction.transactionId, be);
}
//even in case of error need to move to the next one
catch (Exception e){
logger.error("!!! Failed to complete transaction id={}, need to be investigated",
transaction.transactionId, e);
if(renewExclusiveLock()) {
logger.info("Trying to complete transaction id={}, timestamp={}",
transaction.transactionId, transaction.timestamp);
try {
aerospikeStoreManager.processAndDeleteTransaction(
transaction.transactionId, transaction.locks, transaction.mutations, true);
logger.info("Successfully complete transaction id={}", transaction.transactionId);
}
//this is expected behaviour that may have place in case of transaction was interrupted:
// - on 'release locks' stage then transaction will fail and just need to release hanged locks
// - on 'delete wal transaction' stage and just need to remove transaction
catch (PermanentLockingException be) {
logger.info("Failed to complete transaction id={} as it's already completed", transaction.transactionId, be);
aerospikeStoreManager.releaseLocksAndDeleteWalTransaction(transaction.locks, transaction.transactionId);
logger.info("released locks for transaction id={}", transaction.transactionId, be);
}
//even in case of error need to move to the next one
catch (Exception e) {
logger.error("!!! Failed to complete transaction id={}, need to be investigated",
transaction.transactionId, e);
}
}
}
}
Expand All @@ -126,7 +127,8 @@ private void completeHangedTransactions() {

private boolean acquireExclusiveLock(){
try {
client.add(putLockPolicy, exclusiveLockKey, EXCLUSIVE_LOCK_BIN);
client.add(putLockPolicy, exclusiveLockKey, exclusiveLockBin);
generation++;
logger.info("Successfully got exclusive lock, will check for hanged transactions");
return true;
} catch (AerospikeException e){
Expand All @@ -141,4 +143,36 @@ private boolean acquireExclusiveLock(){
}
}
}

private WritePolicy buildPutLockPolicy(long expirationInMs){
WritePolicy putLockPolicy = new WritePolicy();
putLockPolicy.recordExistsAction = RecordExistsAction.CREATE_ONLY;
putLockPolicy.expiration = (int)Duration.ofMillis(expirationInMs).get(SECONDS);
if(putLockPolicy.expiration < 1){
throw new IllegalArgumentException("Wrong expiration for WAL lock: "+putLockPolicy.expiration);
}
return putLockPolicy;
}

private boolean renewExclusiveLock(){
try {
client.touch(buildTouchLockPolicy(putLockPolicy.expiration, generation++), exclusiveLockKey);
logger.info("Successfully renewed exclusive lock, will process transaction");
return true;
} catch (AerospikeException e){
logger.error("Failed while renew exclusive lock", e);
throw e;
}
}

private WritePolicy buildTouchLockPolicy(int expiration, int generation){
WritePolicy touchLockPolicy = new WritePolicy();
touchLockPolicy.recordExistsAction = RecordExistsAction.UPDATE_ONLY;
touchLockPolicy.generation = generation;
touchLockPolicy.expiration = expiration;
if(touchLockPolicy.expiration < 1){
throw new IllegalArgumentException("Wrong expiration for WAL lock: "+touchLockPolicy.expiration);
}
return touchLockPolicy;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.playtika.janusgraph.aerospike.wal;

import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Value;
import com.playtika.janusgraph.aerospike.AerospikeStoreManager;
import org.janusgraph.diskstorage.BackendException;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static com.playtika.janusgraph.aerospike.util.AsyncUtil.INITIAL_WAIT_TIMEOUT_IN_SECONDS;
import static com.playtika.janusgraph.aerospike.util.AsyncUtil.WAIT_TIMEOUT_IN_SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class WriteAheadLogCompleterShutdownTest {

IAerospikeClient client = mock(IAerospikeClient.class);
WriteAheadLogManager writeAheadLogManager = mock(WriteAheadLogManager.class);
AerospikeStoreManager storeManager = mock(AerospikeStoreManager.class);

WriteAheadLogCompleter writeAheadLogCompleter = new WriteAheadLogCompleter(
client, "walNamespace", "walSetname",
writeAheadLogManager, 10000, storeManager);

@Test
public void shouldShutdownCorrectly() throws BackendException, InterruptedException {
WriteAheadLogManager.WalTransaction walTransaction = new WriteAheadLogManager.WalTransaction(
Value.get("transId"), 1000, null, null
);

when(writeAheadLogManager.getStaleTransactions()).thenReturn(
IntStream.range(0, 100)
.mapToObj(i -> walTransaction)
.collect(Collectors.toList()));

AtomicInteger processProgress = new AtomicInteger();

int sleepTime = 100;

Mockito.doAnswer(e -> {
processProgress.incrementAndGet();

long start = System.currentTimeMillis();
while(System.currentTimeMillis() - start < 100L){}

return null;
}).when(storeManager).processAndDeleteTransaction(any(), any(), any(), anyBoolean());

writeAheadLogCompleter.start();

while (processProgress.get() < 1) {
Thread.sleep(sleepTime); }

writeAheadLogCompleter.shutdown();

assertThat(processProgress.get()).isEqualTo(INITIAL_WAIT_TIMEOUT_IN_SECONDS * 1000 / sleepTime + 1);
}

}
Original file line number Diff line number Diff line change
@@ -1,71 +1,83 @@
package com.playtika.janusgraph.aerospike.wal;

import com.aerospike.client.IAerospikeClient;
import com.aerospike.AerospikeContainer;
import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Value;
import com.playtika.janusgraph.aerospike.AerospikeStoreManager;
import org.janusgraph.diskstorage.BackendException;
import org.junit.ClassRule;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static com.playtika.janusgraph.aerospike.util.AsyncUtil.INITIAL_WAIT_TIMEOUT_IN_SECONDS;
import static com.playtika.janusgraph.aerospike.util.AsyncUtil.WAIT_TIMEOUT_IN_SECONDS;
import static org.assertj.core.api.Assertions.assertThat;

import java.time.Clock;
import java.util.HashMap;
import java.util.Map;

import static com.playtika.janusgraph.aerospike.AerospikeTestUtils.getAerospikeContainer;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;

public class WriteAheadLogCompleterTest {

IAerospikeClient client = mock(IAerospikeClient.class);
WriteAheadLogManager writeAheadLogManager = mock(WriteAheadLogManager.class);
AerospikeStoreManager storeManager = mock(AerospikeStoreManager.class);
@ClassRule
public static AerospikeContainer container = getAerospikeContainer();

WriteAheadLogCompleter writeAheadLogCompleter = new WriteAheadLogCompleter(
client, "walNamespace", "walSetname",
writeAheadLogManager, 10000, storeManager);
private AerospikeClient client = new AerospikeClient(null, container.getContainerIpAddress(), container.getPort());

@Test
public void shouldShutdownCorrectly() throws BackendException, InterruptedException {
WriteAheadLogManager.WalTransaction walTransaction = new WriteAheadLogManager.WalTransaction(
Value.get("transId"), 1000, null, null
);
static final String WAL_NAMESPACE = container.getNamespace();
static final String WAL_SET_NAME = "wal";

when(writeAheadLogManager.getStaleTransactions()).thenReturn(
IntStream.range(0, 100)
.mapToObj(i -> walTransaction)
.collect(Collectors.toList()));
private Clock clock = mock(Clock.class);

AtomicInteger processProgress = new AtomicInteger();
public static final long STALE_TRANSACTION_LIFETIME_THRESHOLD_IN_MS = 1000;
private WriteAheadLogManager walManager = new WriteAheadLogManager(client, WAL_NAMESPACE, WAL_SET_NAME, clock, STALE_TRANSACTION_LIFETIME_THRESHOLD_IN_MS);

int sleepTime = 100;
private AerospikeStoreManager storeManager = mock(AerospikeStoreManager.class);

Mockito.doAnswer(e -> {
processProgress.incrementAndGet();
WriteAheadLogCompleter writeAheadLogCompleter = new WriteAheadLogCompleter(
client, WAL_NAMESPACE, WAL_SET_NAME,
walManager, 10000, storeManager);

long start = System.currentTimeMillis();
while(System.currentTimeMillis() - start < 100L){}
@Test
public void shouldCompleteStaleTransactions() throws BackendException, InterruptedException {
writeTransaction(0);
writeTransaction(1);
writeTransaction(2);

return null;
}).when(storeManager).processAndDeleteTransaction(any(), any(), any(), anyBoolean());
when(clock.millis()).thenReturn(STALE_TRANSACTION_LIFETIME_THRESHOLD_IN_MS + 5);

writeAheadLogCompleter.start();

while (processProgress.get() < 1) {
Thread.sleep(sleepTime); }
Thread.sleep(100);

writeAheadLogCompleter.shutdown();

assertThat(processProgress.get()).isEqualTo(INITIAL_WAIT_TIMEOUT_IN_SECONDS * 1000 / sleepTime + 1);
verify(storeManager, times(3)).processAndDeleteTransaction(any(), any(), any(), anyBoolean());
}

private void writeTransaction(long timestamp){
Map<String, Map<Value, Map<Value, Value>>> locks = new HashMap<String, Map<Value, Map<Value, Value>>>(){{
put("storeName", new HashMap<Value, Map<Value, Value>>(){{
put(Value.get(new byte[]{1,2,3}),
new HashMap<Value, Value>(){{
put(Value.get(new byte[]{0}), Value.get(new byte[]{1}));
put(Value.get(new byte[]{1}), Value.NULL);
}});
}});
}};

Map<String, Map<Value, Map<Value, Value>>> mutations = new HashMap<String, Map<Value, Map<Value, Value>>>(){{
put("storeName", new HashMap<Value, Map<Value, Value>>(){{
put(Value.get(new byte[]{1,2,3}),
new HashMap<Value, Value>(){{
put(Value.get(new byte[]{0}), Value.NULL);
put(Value.get(new byte[]{1}), Value.get(new byte[]{1}));
}});
}});
}};

when(clock.millis()).thenReturn(timestamp);
walManager.writeTransaction(locks, mutations);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class WriteAheadLogManagerTest {
private WriteAheadLogManager walManager = new WriteAheadLogManager(client, WAL_NAMESPACE, WAL_SET_NAME, clock, 1000);

@Before
public void setUp() throws InterruptedException {
public void setUp() {
client.truncate(null, WAL_NAMESPACE, null, null);
}

Expand Down

0 comments on commit 055c32f

Please sign in to comment.