Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

renew lock on each transaction #23

Merged
merged 1 commit into from
May 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static com.playtika.janusgraph.aerospike.AerospikeStoreManager.JANUS_AEROSPIKE_THREAD_GROUP_NAME;
import static com.playtika.janusgraph.aerospike.util.AsyncUtil.shutdownAndAwaitTermination;
import static com.playtika.janusgraph.aerospike.wal.WriteAheadLogManager.getBytesFromUUID;
import static java.time.temporal.ChronoUnit.SECONDS;

/**
Expand All @@ -36,14 +38,15 @@ public class WriteAheadLogCompleter {
private static final Instant JAN_01_2010 = Instant.parse("2010-01-01T00:00:00.00Z");

private static final Value EXCLUSIVE_LOCK_KEY = Value.get((byte)0);
private static final Bin EXCLUSIVE_LOCK_BIN = new Bin("EL", true);

private final IAerospikeClient client;
private final WriteAheadLogManager writeAheadLogManager;
private final long periodInMs;
private final AerospikeStoreManager aerospikeStoreManager;
private final WritePolicy putLockPolicy;
private final Key exclusiveLockKey;
private final Bin exclusiveLockBin;
private int generation = 0;

private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
new NamedThreadFactory(JANUS_AEROSPIKE_THREAD_GROUP_NAME, "wal")
Expand All @@ -57,13 +60,9 @@ public WriteAheadLogCompleter(IAerospikeClient aerospikeClient, String walNamesp
this.writeAheadLogManager = writeAheadLogManager;
this.aerospikeStoreManager = aerospikeStoreManager;

this.putLockPolicy = new WritePolicy();
this.putLockPolicy.recordExistsAction = RecordExistsAction.CREATE_ONLY;
this.putLockPolicy.expiration = (int)Duration.ofMillis(periodInMs).get(SECONDS);
if(this.putLockPolicy.expiration < 1){
throw new IllegalArgumentException("Wrong expiration for WAL lock: "+putLockPolicy.expiration);
}
this.putLockPolicy = buildPutLockPolicy(periodInMs);

this.exclusiveLockBin = new Bin("EL", getBytesFromUUID(UUID.randomUUID()));

//set period to by slightly longer then expiration
this.periodInMs = Duration.ofSeconds(putLockPolicy.expiration + 1).toMillis();
Expand Down Expand Up @@ -91,25 +90,27 @@ private void completeHangedTransactions() {
break;
}

logger.info("Trying to complete transaction id={}, timestamp={}",
transaction.transactionId, transaction.timestamp);
try {
aerospikeStoreManager.processAndDeleteTransaction(
transaction.transactionId, transaction.locks, transaction.mutations, true);
logger.info("Successfully complete transaction id={}", transaction.transactionId);
}
//this is expected behaviour that may have place in case of transaction was interrupted:
// - on 'release locks' stage then transaction will fail and just need to release hanged locks
// - on 'delete wal transaction' stage and just need to remove transaction
catch (PermanentLockingException be) {
logger.info("Failed to complete transaction id={} as it's already completed", transaction.transactionId, be);
aerospikeStoreManager.releaseLocksAndDeleteWalTransaction(transaction.locks, transaction.transactionId);
logger.info("released locks for transaction id={}", transaction.transactionId, be);
}
//even in case of error need to move to the next one
catch (Exception e){
logger.error("!!! Failed to complete transaction id={}, need to be investigated",
transaction.transactionId, e);
if(renewExclusiveLock()) {
logger.info("Trying to complete transaction id={}, timestamp={}",
transaction.transactionId, transaction.timestamp);
try {
aerospikeStoreManager.processAndDeleteTransaction(
transaction.transactionId, transaction.locks, transaction.mutations, true);
logger.info("Successfully complete transaction id={}", transaction.transactionId);
}
//this is expected behaviour that may have place in case of transaction was interrupted:
// - on 'release locks' stage then transaction will fail and just need to release hanged locks
// - on 'delete wal transaction' stage and just need to remove transaction
catch (PermanentLockingException be) {
logger.info("Failed to complete transaction id={} as it's already completed", transaction.transactionId, be);
aerospikeStoreManager.releaseLocksAndDeleteWalTransaction(transaction.locks, transaction.transactionId);
logger.info("released locks for transaction id={}", transaction.transactionId, be);
}
//even in case of error need to move to the next one
catch (Exception e) {
logger.error("!!! Failed to complete transaction id={}, need to be investigated",
transaction.transactionId, e);
}
}
}
}
Expand All @@ -126,7 +127,8 @@ private void completeHangedTransactions() {

private boolean acquireExclusiveLock(){
try {
client.add(putLockPolicy, exclusiveLockKey, EXCLUSIVE_LOCK_BIN);
client.add(putLockPolicy, exclusiveLockKey, exclusiveLockBin);
generation++;
logger.info("Successfully got exclusive lock, will check for hanged transactions");
return true;
} catch (AerospikeException e){
Expand All @@ -141,4 +143,36 @@ private boolean acquireExclusiveLock(){
}
}
}

private WritePolicy buildPutLockPolicy(long expirationInMs){
WritePolicy putLockPolicy = new WritePolicy();
putLockPolicy.recordExistsAction = RecordExistsAction.CREATE_ONLY;
putLockPolicy.expiration = (int)Duration.ofMillis(expirationInMs).get(SECONDS);
if(putLockPolicy.expiration < 1){
throw new IllegalArgumentException("Wrong expiration for WAL lock: "+putLockPolicy.expiration);
}
return putLockPolicy;
}

private boolean renewExclusiveLock(){
try {
client.touch(buildTouchLockPolicy(putLockPolicy.expiration, generation++), exclusiveLockKey);
logger.info("Successfully renewed exclusive lock, will process transaction");
return true;
} catch (AerospikeException e){
logger.error("Failed while renew exclusive lock", e);
throw e;
}
}

private WritePolicy buildTouchLockPolicy(int expiration, int generation){
WritePolicy touchLockPolicy = new WritePolicy();
touchLockPolicy.recordExistsAction = RecordExistsAction.UPDATE_ONLY;
touchLockPolicy.generation = generation;
touchLockPolicy.expiration = expiration;
if(touchLockPolicy.expiration < 1){
throw new IllegalArgumentException("Wrong expiration for WAL lock: "+touchLockPolicy.expiration);
}
return touchLockPolicy;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.playtika.janusgraph.aerospike.wal;

import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Value;
import com.playtika.janusgraph.aerospike.AerospikeStoreManager;
import org.janusgraph.diskstorage.BackendException;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static com.playtika.janusgraph.aerospike.util.AsyncUtil.INITIAL_WAIT_TIMEOUT_IN_SECONDS;
import static com.playtika.janusgraph.aerospike.util.AsyncUtil.WAIT_TIMEOUT_IN_SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class WriteAheadLogCompleterShutdownTest {

IAerospikeClient client = mock(IAerospikeClient.class);
WriteAheadLogManager writeAheadLogManager = mock(WriteAheadLogManager.class);
AerospikeStoreManager storeManager = mock(AerospikeStoreManager.class);

WriteAheadLogCompleter writeAheadLogCompleter = new WriteAheadLogCompleter(
client, "walNamespace", "walSetname",
writeAheadLogManager, 10000, storeManager);

@Test
public void shouldShutdownCorrectly() throws BackendException, InterruptedException {
WriteAheadLogManager.WalTransaction walTransaction = new WriteAheadLogManager.WalTransaction(
Value.get("transId"), 1000, null, null
);

when(writeAheadLogManager.getStaleTransactions()).thenReturn(
IntStream.range(0, 100)
.mapToObj(i -> walTransaction)
.collect(Collectors.toList()));

AtomicInteger processProgress = new AtomicInteger();

int sleepTime = 100;

Mockito.doAnswer(e -> {
processProgress.incrementAndGet();

long start = System.currentTimeMillis();
while(System.currentTimeMillis() - start < 100L){}

return null;
}).when(storeManager).processAndDeleteTransaction(any(), any(), any(), anyBoolean());

writeAheadLogCompleter.start();

while (processProgress.get() < 1) {
Thread.sleep(sleepTime); }

writeAheadLogCompleter.shutdown();

assertThat(processProgress.get()).isEqualTo(INITIAL_WAIT_TIMEOUT_IN_SECONDS * 1000 / sleepTime + 1);
}

}
Original file line number Diff line number Diff line change
@@ -1,71 +1,83 @@
package com.playtika.janusgraph.aerospike.wal;

import com.aerospike.client.IAerospikeClient;
import com.aerospike.AerospikeContainer;
import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Value;
import com.playtika.janusgraph.aerospike.AerospikeStoreManager;
import org.janusgraph.diskstorage.BackendException;
import org.junit.ClassRule;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static com.playtika.janusgraph.aerospike.util.AsyncUtil.INITIAL_WAIT_TIMEOUT_IN_SECONDS;
import static com.playtika.janusgraph.aerospike.util.AsyncUtil.WAIT_TIMEOUT_IN_SECONDS;
import static org.assertj.core.api.Assertions.assertThat;

import java.time.Clock;
import java.util.HashMap;
import java.util.Map;

import static com.playtika.janusgraph.aerospike.AerospikeTestUtils.getAerospikeContainer;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;

public class WriteAheadLogCompleterTest {

IAerospikeClient client = mock(IAerospikeClient.class);
WriteAheadLogManager writeAheadLogManager = mock(WriteAheadLogManager.class);
AerospikeStoreManager storeManager = mock(AerospikeStoreManager.class);
@ClassRule
public static AerospikeContainer container = getAerospikeContainer();

WriteAheadLogCompleter writeAheadLogCompleter = new WriteAheadLogCompleter(
client, "walNamespace", "walSetname",
writeAheadLogManager, 10000, storeManager);
private AerospikeClient client = new AerospikeClient(null, container.getContainerIpAddress(), container.getPort());

@Test
public void shouldShutdownCorrectly() throws BackendException, InterruptedException {
WriteAheadLogManager.WalTransaction walTransaction = new WriteAheadLogManager.WalTransaction(
Value.get("transId"), 1000, null, null
);
static final String WAL_NAMESPACE = container.getNamespace();
static final String WAL_SET_NAME = "wal";

when(writeAheadLogManager.getStaleTransactions()).thenReturn(
IntStream.range(0, 100)
.mapToObj(i -> walTransaction)
.collect(Collectors.toList()));
private Clock clock = mock(Clock.class);

AtomicInteger processProgress = new AtomicInteger();
public static final long STALE_TRANSACTION_LIFETIME_THRESHOLD_IN_MS = 1000;
private WriteAheadLogManager walManager = new WriteAheadLogManager(client, WAL_NAMESPACE, WAL_SET_NAME, clock, STALE_TRANSACTION_LIFETIME_THRESHOLD_IN_MS);

int sleepTime = 100;
private AerospikeStoreManager storeManager = mock(AerospikeStoreManager.class);

Mockito.doAnswer(e -> {
processProgress.incrementAndGet();
WriteAheadLogCompleter writeAheadLogCompleter = new WriteAheadLogCompleter(
client, WAL_NAMESPACE, WAL_SET_NAME,
walManager, 10000, storeManager);

long start = System.currentTimeMillis();
while(System.currentTimeMillis() - start < 100L){}
@Test
public void shouldCompleteStaleTransactions() throws BackendException, InterruptedException {
writeTransaction(0);
writeTransaction(1);
writeTransaction(2);

return null;
}).when(storeManager).processAndDeleteTransaction(any(), any(), any(), anyBoolean());
when(clock.millis()).thenReturn(STALE_TRANSACTION_LIFETIME_THRESHOLD_IN_MS + 5);

writeAheadLogCompleter.start();

while (processProgress.get() < 1) {
Thread.sleep(sleepTime); }
Thread.sleep(100);

writeAheadLogCompleter.shutdown();

assertThat(processProgress.get()).isEqualTo(INITIAL_WAIT_TIMEOUT_IN_SECONDS * 1000 / sleepTime + 1);
verify(storeManager, times(3)).processAndDeleteTransaction(any(), any(), any(), anyBoolean());
}

private void writeTransaction(long timestamp){
Map<String, Map<Value, Map<Value, Value>>> locks = new HashMap<String, Map<Value, Map<Value, Value>>>(){{
put("storeName", new HashMap<Value, Map<Value, Value>>(){{
put(Value.get(new byte[]{1,2,3}),
new HashMap<Value, Value>(){{
put(Value.get(new byte[]{0}), Value.get(new byte[]{1}));
put(Value.get(new byte[]{1}), Value.NULL);
}});
}});
}};

Map<String, Map<Value, Map<Value, Value>>> mutations = new HashMap<String, Map<Value, Map<Value, Value>>>(){{
put("storeName", new HashMap<Value, Map<Value, Value>>(){{
put(Value.get(new byte[]{1,2,3}),
new HashMap<Value, Value>(){{
put(Value.get(new byte[]{0}), Value.NULL);
put(Value.get(new byte[]{1}), Value.get(new byte[]{1}));
}});
}});
}};

when(clock.millis()).thenReturn(timestamp);
walManager.writeTransaction(locks, mutations);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class WriteAheadLogManagerTest {
private WriteAheadLogManager walManager = new WriteAheadLogManager(client, WAL_NAMESPACE, WAL_SET_NAME, clock, 1000);

@Before
public void setUp() throws InterruptedException {
public void setUp() {
client.truncate(null, WAL_NAMESPACE, null, null);
}

Expand Down