diff --git a/aerospike-storage-backend/src/main/java/com/playtika/janusgraph/aerospike/wal/WriteAheadLogCompleter.java b/aerospike-storage-backend/src/main/java/com/playtika/janusgraph/aerospike/wal/WriteAheadLogCompleter.java index e244a768..2c263247 100644 --- a/aerospike-storage-backend/src/main/java/com/playtika/janusgraph/aerospike/wal/WriteAheadLogCompleter.java +++ b/aerospike-storage-backend/src/main/java/com/playtika/janusgraph/aerospike/wal/WriteAheadLogCompleter.java @@ -18,12 +18,14 @@ import java.time.Duration; import java.time.Instant; import java.util.List; +import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import static com.playtika.janusgraph.aerospike.AerospikeStoreManager.JANUS_AEROSPIKE_THREAD_GROUP_NAME; import static com.playtika.janusgraph.aerospike.util.AsyncUtil.shutdownAndAwaitTermination; +import static com.playtika.janusgraph.aerospike.wal.WriteAheadLogManager.getBytesFromUUID; import static java.time.temporal.ChronoUnit.SECONDS; /** @@ -36,7 +38,6 @@ public class WriteAheadLogCompleter { private static final Instant JAN_01_2010 = Instant.parse("2010-01-01T00:00:00.00Z"); private static final Value EXCLUSIVE_LOCK_KEY = Value.get((byte)0); - private static final Bin EXCLUSIVE_LOCK_BIN = new Bin("EL", true); private final IAerospikeClient client; private final WriteAheadLogManager writeAheadLogManager; @@ -44,6 +45,8 @@ public class WriteAheadLogCompleter { private final AerospikeStoreManager aerospikeStoreManager; private final WritePolicy putLockPolicy; private final Key exclusiveLockKey; + private final Bin exclusiveLockBin; + private int generation = 0; private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( new NamedThreadFactory(JANUS_AEROSPIKE_THREAD_GROUP_NAME, "wal") @@ -57,13 +60,9 @@ public WriteAheadLogCompleter(IAerospikeClient aerospikeClient, String walNamesp this.writeAheadLogManager = writeAheadLogManager; this.aerospikeStoreManager = aerospikeStoreManager; - this.putLockPolicy = new WritePolicy(); - this.putLockPolicy.recordExistsAction = RecordExistsAction.CREATE_ONLY; - this.putLockPolicy.expiration = (int)Duration.ofMillis(periodInMs).get(SECONDS); - if(this.putLockPolicy.expiration < 1){ - throw new IllegalArgumentException("Wrong expiration for WAL lock: "+putLockPolicy.expiration); - } + this.putLockPolicy = buildPutLockPolicy(periodInMs); + this.exclusiveLockBin = new Bin("EL", getBytesFromUUID(UUID.randomUUID())); //set period to by slightly longer then expiration this.periodInMs = Duration.ofSeconds(putLockPolicy.expiration + 1).toMillis(); @@ -91,25 +90,27 @@ private void completeHangedTransactions() { break; } - logger.info("Trying to complete transaction id={}, timestamp={}", - transaction.transactionId, transaction.timestamp); - try { - aerospikeStoreManager.processAndDeleteTransaction( - transaction.transactionId, transaction.locks, transaction.mutations, true); - logger.info("Successfully complete transaction id={}", transaction.transactionId); - } - //this is expected behaviour that may have place in case of transaction was interrupted: - // - on 'release locks' stage then transaction will fail and just need to release hanged locks - // - on 'delete wal transaction' stage and just need to remove transaction - catch (PermanentLockingException be) { - logger.info("Failed to complete transaction id={} as it's already completed", transaction.transactionId, be); - aerospikeStoreManager.releaseLocksAndDeleteWalTransaction(transaction.locks, transaction.transactionId); - logger.info("released locks for transaction id={}", transaction.transactionId, be); - } - //even in case of error need to move to the next one - catch (Exception e){ - logger.error("!!! Failed to complete transaction id={}, need to be investigated", - transaction.transactionId, e); + if(renewExclusiveLock()) { + logger.info("Trying to complete transaction id={}, timestamp={}", + transaction.transactionId, transaction.timestamp); + try { + aerospikeStoreManager.processAndDeleteTransaction( + transaction.transactionId, transaction.locks, transaction.mutations, true); + logger.info("Successfully complete transaction id={}", transaction.transactionId); + } + //this is expected behaviour that may have place in case of transaction was interrupted: + // - on 'release locks' stage then transaction will fail and just need to release hanged locks + // - on 'delete wal transaction' stage and just need to remove transaction + catch (PermanentLockingException be) { + logger.info("Failed to complete transaction id={} as it's already completed", transaction.transactionId, be); + aerospikeStoreManager.releaseLocksAndDeleteWalTransaction(transaction.locks, transaction.transactionId); + logger.info("released locks for transaction id={}", transaction.transactionId, be); + } + //even in case of error need to move to the next one + catch (Exception e) { + logger.error("!!! Failed to complete transaction id={}, need to be investigated", + transaction.transactionId, e); + } } } } @@ -126,7 +127,8 @@ private void completeHangedTransactions() { private boolean acquireExclusiveLock(){ try { - client.add(putLockPolicy, exclusiveLockKey, EXCLUSIVE_LOCK_BIN); + client.add(putLockPolicy, exclusiveLockKey, exclusiveLockBin); + generation++; logger.info("Successfully got exclusive lock, will check for hanged transactions"); return true; } catch (AerospikeException e){ @@ -141,4 +143,36 @@ private boolean acquireExclusiveLock(){ } } } + + private WritePolicy buildPutLockPolicy(long expirationInMs){ + WritePolicy putLockPolicy = new WritePolicy(); + putLockPolicy.recordExistsAction = RecordExistsAction.CREATE_ONLY; + putLockPolicy.expiration = (int)Duration.ofMillis(expirationInMs).get(SECONDS); + if(putLockPolicy.expiration < 1){ + throw new IllegalArgumentException("Wrong expiration for WAL lock: "+putLockPolicy.expiration); + } + return putLockPolicy; + } + + private boolean renewExclusiveLock(){ + try { + client.touch(buildTouchLockPolicy(putLockPolicy.expiration, generation++), exclusiveLockKey); + logger.info("Successfully renewed exclusive lock, will process transaction"); + return true; + } catch (AerospikeException e){ + logger.error("Failed while renew exclusive lock", e); + throw e; + } + } + + private WritePolicy buildTouchLockPolicy(int expiration, int generation){ + WritePolicy touchLockPolicy = new WritePolicy(); + touchLockPolicy.recordExistsAction = RecordExistsAction.UPDATE_ONLY; + touchLockPolicy.generation = generation; + touchLockPolicy.expiration = expiration; + if(touchLockPolicy.expiration < 1){ + throw new IllegalArgumentException("Wrong expiration for WAL lock: "+touchLockPolicy.expiration); + } + return touchLockPolicy; + } } diff --git a/aerospike-storage-backend/src/test/java/com/playtika/janusgraph/aerospike/wal/WriteAheadLogCompleterShutdownTest.java b/aerospike-storage-backend/src/test/java/com/playtika/janusgraph/aerospike/wal/WriteAheadLogCompleterShutdownTest.java new file mode 100644 index 00000000..ea85d6c5 --- /dev/null +++ b/aerospike-storage-backend/src/test/java/com/playtika/janusgraph/aerospike/wal/WriteAheadLogCompleterShutdownTest.java @@ -0,0 +1,71 @@ +package com.playtika.janusgraph.aerospike.wal; + +import com.aerospike.client.IAerospikeClient; +import com.aerospike.client.Value; +import com.playtika.janusgraph.aerospike.AerospikeStoreManager; +import org.janusgraph.diskstorage.BackendException; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static com.playtika.janusgraph.aerospike.util.AsyncUtil.INITIAL_WAIT_TIMEOUT_IN_SECONDS; +import static com.playtika.janusgraph.aerospike.util.AsyncUtil.WAIT_TIMEOUT_IN_SECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class WriteAheadLogCompleterShutdownTest { + + IAerospikeClient client = mock(IAerospikeClient.class); + WriteAheadLogManager writeAheadLogManager = mock(WriteAheadLogManager.class); + AerospikeStoreManager storeManager = mock(AerospikeStoreManager.class); + + WriteAheadLogCompleter writeAheadLogCompleter = new WriteAheadLogCompleter( + client, "walNamespace", "walSetname", + writeAheadLogManager, 10000, storeManager); + + @Test + public void shouldShutdownCorrectly() throws BackendException, InterruptedException { + WriteAheadLogManager.WalTransaction walTransaction = new WriteAheadLogManager.WalTransaction( + Value.get("transId"), 1000, null, null + ); + + when(writeAheadLogManager.getStaleTransactions()).thenReturn( + IntStream.range(0, 100) + .mapToObj(i -> walTransaction) + .collect(Collectors.toList())); + + AtomicInteger processProgress = new AtomicInteger(); + + int sleepTime = 100; + + Mockito.doAnswer(e -> { + processProgress.incrementAndGet(); + + long start = System.currentTimeMillis(); + while(System.currentTimeMillis() - start < 100L){} + + return null; + }).when(storeManager).processAndDeleteTransaction(any(), any(), any(), anyBoolean()); + + writeAheadLogCompleter.start(); + + while (processProgress.get() < 1) { + Thread.sleep(sleepTime); } + + writeAheadLogCompleter.shutdown(); + + assertThat(processProgress.get()).isEqualTo(INITIAL_WAIT_TIMEOUT_IN_SECONDS * 1000 / sleepTime + 1); + } + +} diff --git a/aerospike-storage-backend/src/test/java/com/playtika/janusgraph/aerospike/wal/WriteAheadLogCompleterTest.java b/aerospike-storage-backend/src/test/java/com/playtika/janusgraph/aerospike/wal/WriteAheadLogCompleterTest.java index bdb45548..a0e69ad6 100644 --- a/aerospike-storage-backend/src/test/java/com/playtika/janusgraph/aerospike/wal/WriteAheadLogCompleterTest.java +++ b/aerospike-storage-backend/src/test/java/com/playtika/janusgraph/aerospike/wal/WriteAheadLogCompleterTest.java @@ -1,71 +1,83 @@ package com.playtika.janusgraph.aerospike.wal; -import com.aerospike.client.IAerospikeClient; +import com.aerospike.AerospikeContainer; +import com.aerospike.client.AerospikeClient; import com.aerospike.client.Value; import com.playtika.janusgraph.aerospike.AerospikeStoreManager; import org.janusgraph.diskstorage.BackendException; +import org.junit.ClassRule; import org.junit.Test; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Date; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static com.playtika.janusgraph.aerospike.util.AsyncUtil.INITIAL_WAIT_TIMEOUT_IN_SECONDS; -import static com.playtika.janusgraph.aerospike.util.AsyncUtil.WAIT_TIMEOUT_IN_SECONDS; -import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Clock; +import java.util.HashMap; +import java.util.Map; + +import static com.playtika.janusgraph.aerospike.AerospikeTestUtils.getAerospikeContainer; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; public class WriteAheadLogCompleterTest { - IAerospikeClient client = mock(IAerospikeClient.class); - WriteAheadLogManager writeAheadLogManager = mock(WriteAheadLogManager.class); - AerospikeStoreManager storeManager = mock(AerospikeStoreManager.class); + @ClassRule + public static AerospikeContainer container = getAerospikeContainer(); - WriteAheadLogCompleter writeAheadLogCompleter = new WriteAheadLogCompleter( - client, "walNamespace", "walSetname", - writeAheadLogManager, 10000, storeManager); + private AerospikeClient client = new AerospikeClient(null, container.getContainerIpAddress(), container.getPort()); - @Test - public void shouldShutdownCorrectly() throws BackendException, InterruptedException { - WriteAheadLogManager.WalTransaction walTransaction = new WriteAheadLogManager.WalTransaction( - Value.get("transId"), 1000, null, null - ); + static final String WAL_NAMESPACE = container.getNamespace(); + static final String WAL_SET_NAME = "wal"; - when(writeAheadLogManager.getStaleTransactions()).thenReturn( - IntStream.range(0, 100) - .mapToObj(i -> walTransaction) - .collect(Collectors.toList())); + private Clock clock = mock(Clock.class); - AtomicInteger processProgress = new AtomicInteger(); + public static final long STALE_TRANSACTION_LIFETIME_THRESHOLD_IN_MS = 1000; + private WriteAheadLogManager walManager = new WriteAheadLogManager(client, WAL_NAMESPACE, WAL_SET_NAME, clock, STALE_TRANSACTION_LIFETIME_THRESHOLD_IN_MS); - int sleepTime = 100; + private AerospikeStoreManager storeManager = mock(AerospikeStoreManager.class); - Mockito.doAnswer(e -> { - processProgress.incrementAndGet(); + WriteAheadLogCompleter writeAheadLogCompleter = new WriteAheadLogCompleter( + client, WAL_NAMESPACE, WAL_SET_NAME, + walManager, 10000, storeManager); - long start = System.currentTimeMillis(); - while(System.currentTimeMillis() - start < 100L){} + @Test + public void shouldCompleteStaleTransactions() throws BackendException, InterruptedException { + writeTransaction(0); + writeTransaction(1); + writeTransaction(2); - return null; - }).when(storeManager).processAndDeleteTransaction(any(), any(), any(), anyBoolean()); + when(clock.millis()).thenReturn(STALE_TRANSACTION_LIFETIME_THRESHOLD_IN_MS + 5); writeAheadLogCompleter.start(); - while (processProgress.get() < 1) { - Thread.sleep(sleepTime); } + Thread.sleep(100); writeAheadLogCompleter.shutdown(); - assertThat(processProgress.get()).isEqualTo(INITIAL_WAIT_TIMEOUT_IN_SECONDS * 1000 / sleepTime + 1); + verify(storeManager, times(3)).processAndDeleteTransaction(any(), any(), any(), anyBoolean()); + } + + private void writeTransaction(long timestamp){ + Map>> locks = new HashMap>>(){{ + put("storeName", new HashMap>(){{ + put(Value.get(new byte[]{1,2,3}), + new HashMap(){{ + put(Value.get(new byte[]{0}), Value.get(new byte[]{1})); + put(Value.get(new byte[]{1}), Value.NULL); + }}); + }}); + }}; + + Map>> mutations = new HashMap>>(){{ + put("storeName", new HashMap>(){{ + put(Value.get(new byte[]{1,2,3}), + new HashMap(){{ + put(Value.get(new byte[]{0}), Value.NULL); + put(Value.get(new byte[]{1}), Value.get(new byte[]{1})); + }}); + }}); + }}; + + when(clock.millis()).thenReturn(timestamp); + walManager.writeTransaction(locks, mutations); } } diff --git a/aerospike-storage-backend/src/test/java/com/playtika/janusgraph/aerospike/wal/WriteAheadLogManagerTest.java b/aerospike-storage-backend/src/test/java/com/playtika/janusgraph/aerospike/wal/WriteAheadLogManagerTest.java index 8863e376..4bf812ea 100644 --- a/aerospike-storage-backend/src/test/java/com/playtika/janusgraph/aerospike/wal/WriteAheadLogManagerTest.java +++ b/aerospike-storage-backend/src/test/java/com/playtika/janusgraph/aerospike/wal/WriteAheadLogManagerTest.java @@ -33,7 +33,7 @@ public class WriteAheadLogManagerTest { private WriteAheadLogManager walManager = new WriteAheadLogManager(client, WAL_NAMESPACE, WAL_SET_NAME, clock, 1000); @Before - public void setUp() throws InterruptedException { + public void setUp() { client.truncate(null, WAL_NAMESPACE, null, null); }