Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make delete empty key idempotant #108

Merged
merged 1 commit into from
Jun 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> d

//no need in transactional logic
if(transaction.getLocks().isEmpty()){
mutateOperations.mutate(storeName, keyValue, mutationMap);
mutateOperations.mutate(storeName, keyValue, mutationMap, false);
return;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.playtika.janusgraph.aerospike.operations;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Key;
import com.aerospike.client.Operation;
import com.aerospike.client.Record;
import com.aerospike.client.ResultCode;
import com.aerospike.client.Value;
import com.aerospike.client.cdt.MapOperation;
import com.aerospike.client.cdt.MapOrder;
Expand Down Expand Up @@ -40,15 +42,15 @@ public BasicMutateOperations(AerospikeOperations aerospikeOperations) {
}

@Override
public void mutateMany(Map<String, Map<Value, Map<Value, Value>>> mutationsByStore) {
public void mutateMany(Map<String, Map<Value, Map<Value, Value>>> mutationsByStore, boolean wal) {

List<CompletableFuture<?>> mutations = new ArrayList<>();

mutationsByStore.forEach((storeName, storeMutations) -> {
for(Map.Entry<Value, Map<Value, Value>> mutationEntry : storeMutations.entrySet()){
Value key = mutationEntry.getKey();
Map<Value, Value> mutation = mutationEntry.getValue();
mutations.add(runAsync(() -> mutate(storeName, key, mutation),
mutations.add(runAsync(() -> mutate(storeName, key, mutation, wal),
aerospikeOperations.getAerospikeExecutor()));
}
});
Expand All @@ -57,7 +59,7 @@ public void mutateMany(Map<String, Map<Value, Map<Value, Value>>> mutationsBySto
}

@Override
public void mutate(String storeName, Value keyValue, Map<Value, Value> mutation) {
public void mutate(String storeName, Value keyValue, Map<Value, Value> mutation, boolean wal) {
Key key = aerospikeOperations.getKey(storeName, keyValue);
List<Operation> operations = new ArrayList<>(3);

Expand Down Expand Up @@ -86,7 +88,17 @@ public void mutate(String storeName, Value keyValue, Map<Value, Value> mutation)
}

IAerospikeClient client = aerospikeOperations.getClient();
Record record = client.operate(mutatePolicy, key, operations.toArray(new Operation[0]));
Record record;
try {
record = client.operate(mutatePolicy, key, operations.toArray(new Operation[0]));
} catch (AerospikeException ae){
//need to check to guarantee idempotency
if(wal && ae.getResultCode() == ResultCode.KEY_NOT_FOUND_ERROR){
return;
}
throw ae;
}

if(entriesNoOperationIndex != -1){
long entriesNoAfterMutation = (Long)record.getList(ENTRIES_BIN_NAME).get(entriesNoOperationIndex);
if(entriesNoAfterMutation == 0){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

public interface MutateOperations {

void mutateMany(Map<String, Map<Value, Map<Value, Value>>> mutationsByStore) throws PermanentBackendException;
void mutateMany(Map<String, Map<Value, Map<Value, Value>>> mutationsByStore, boolean wal) throws PermanentBackendException;

void mutate(String storeName, Value key, Map<Value, Value> mutation);
void mutate(String storeName, Value key, Map<Value, Value> mutation, boolean wal);
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ public void mutateTransactionally(Map<String, Map<Value, Map<Value, Value>>> loc
}

void processAndDeleteTransaction(Value transactionId,
Map<String, Map<Value, Map<Value, Value>>> locksByStore,
Map<String, Map<Value, Map<Value, Value>>> mutationsByStore,
boolean checkTransactionId) throws BackendException {
Set<Key> keysLocked = lockOperations.acquireLocks(transactionId, locksByStore, checkTransactionId,
Map<String, Map<Value, Map<Value, Value>>> locksByStore,
Map<String, Map<Value, Map<Value, Value>>> mutationsByStore,
boolean wal) throws BackendException {
Set<Key> keysLocked = lockOperations.acquireLocks(transactionId, locksByStore, wal,
keyLockTypeMap -> releaseLocksAndDeleteWalTransaction(keyLockTypeMap.keySet(), transactionId));
try {
mutateOperations.mutateMany(mutationsByStore);
mutateOperations.mutateMany(mutationsByStore, wal);
releaseLocksAndDeleteWalTransaction(keysLocked, transactionId);
}
catch (AerospikeException e) {
Expand All @@ -53,7 +53,7 @@ private void releaseLocksAndDeleteWalTransaction(Collection<Key> keysLocked, Val
writeAheadLogManager.deleteTransaction(transactionId);
}

void releaseLocksAndDeleteWalTransactionOnError(Map<String, Map<Value, Map<Value, Value>>> locksByStore, Value transactionId) throws BackendException {
void releaseLocksAndDeleteWalTransactionOnError(Map<String, Map<Value, Map<Value, Value>>> locksByStore, Value transactionId) {
List<Key> transactionLockKeys = lockOperations.filterKeysLockedByTransaction(locksByStore, transactionId);
releaseLocksAndDeleteWalTransaction(transactionLockKeys, transactionId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.aerospike.client.policy.RecordExistsAction;
import com.aerospike.client.policy.WritePolicy;
import com.playtika.janusgraph.aerospike.util.NamedThreadFactory;
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.locking.PermanentLockingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -143,10 +142,6 @@ private void completeHangedTransactions() {
}
}
}
catch (BackendException t) {
logger.error("Error while running completeHangedTransactions()", t);
throw new RuntimeException(t);
}
catch (Throwable t) {
logger.error("Error while running completeHangedTransactions()", t);
throw t;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,16 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import static com.playtika.janusgraph.aerospike.AerospikeTestUtils.*;
import static com.playtika.janusgraph.aerospike.AerospikeTestUtils.AEROSPIKE_PROPERTIES;
import static com.playtika.janusgraph.aerospike.AerospikeTestUtils.deleteAllRecords;
import static com.playtika.janusgraph.aerospike.AerospikeTestUtils.getAerospikeConfiguration;
import static com.playtika.janusgraph.aerospike.AerospikeTestUtils.getAerospikeContainer;
import static com.playtika.janusgraph.aerospike.GraphConsistencyAfterFailureTest.buildGraph;
import static com.playtika.janusgraph.aerospike.GraphConsistencyAfterFailureTest.defineSchema;
import static com.playtika.janusgraph.aerospike.TransactionRetentionOnFailureTest.FailingAerospikeStoreManager.*;
import static com.playtika.janusgraph.aerospike.TransactionRetentionOnFailureTest.FailingAerospikeStoreManager.failsCheckValue;
import static com.playtika.janusgraph.aerospike.TransactionRetentionOnFailureTest.FailingAerospikeStoreManager.failsMutate;
import static com.playtika.janusgraph.aerospike.TransactionRetentionOnFailureTest.FailingAerospikeStoreManager.fixAll;
import static com.playtika.janusgraph.aerospike.TransactionRetentionOnFailureTest.FailingAerospikeStoreManager.time;
import static org.assertj.core.api.Assertions.assertThat;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.STORAGE_BACKEND;

Expand Down Expand Up @@ -162,11 +168,11 @@ protected void checkExpectedValues(final Map<String, Map<Value, Map<Value, Value
protected MutateOperations buildMutateOperations(AerospikeOperations aerospikeOperations) {
return new BasicMutateOperations(aerospikeOperations){
@Override
public void mutateMany(Map<String, Map<Value, Map<Value, Value>>> mutationsByStore) {
public void mutateMany(Map<String, Map<Value, Map<Value, Value>>> mutationsByStore, boolean wal) {
if(failsMutate.get()){
throw new RuntimeException();
} else {
super.mutateMany(mutationsByStore);
super.mutateMany(mutationsByStore, wal);
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@ public FlakingMutateOperations(MutateOperations mutateOperations, AtomicBoolean
}

@Override
public void mutateMany(Map<String, Map<Value, Map<Value, Value>>> mutationsByStore) throws PermanentBackendException {
public void mutateMany(Map<String, Map<Value, Map<Value, Value>>> mutationsByStore, boolean wal) throws PermanentBackendException {
if(fails.get()){
Map<String, Map<Value, Map<Value, Value>>> mutationsByStorePartial = selectFlaking(mutationsByStore,
"mutateMany failed flaking in [{}] for key [{}]");

mutateOperations.mutateMany(mutationsByStorePartial);
mutateOperations.mutateMany(mutationsByStorePartial, wal);
throw new RuntimeException();

} else {
mutateOperations.mutateMany(mutationsByStore);
mutateOperations.mutateMany(mutationsByStore, wal);
}
}

@Override
public void mutate(String storeName, Value key, Map<Value, Value> mutation) {
mutateOperations.mutate(storeName, key, mutation);
public void mutate(String storeName, Value key, Map<Value, Value> mutation, boolean wal) {
mutateOperations.mutate(storeName, key, mutation, wal);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.playtika.janusgraph.aerospike.operations;

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Value;
import com.playtika.janusgraph.aerospike.AerospikePolicyProvider;
import org.junit.ClassRule;
import org.junit.Test;
import org.testcontainers.containers.GenericContainer;

import java.util.concurrent.Executors;

import static com.playtika.janusgraph.aerospike.AerospikeTestUtils.AEROSPIKE_PROPERTIES;
import static com.playtika.janusgraph.aerospike.AerospikeTestUtils.getAerospikeConfiguration;
import static com.playtika.janusgraph.aerospike.AerospikeTestUtils.getAerospikeContainer;
import static java.util.Collections.singletonMap;

public class MutateOperationsTest {

@ClassRule
public static GenericContainer container = getAerospikeContainer();

public static final String STORE_NAME = "testStore";
public static final Value KEY = Value.get("testKey");
public static final Value COLUMN_NAME = Value.get("column_name");
public static final Value COLUMN_VALUE = Value.get(new byte[]{1, 2, 3});

private AerospikeClient client = new AerospikeClient(null, container.getContainerIpAddress(),
container.getMappedPort(AEROSPIKE_PROPERTIES.getPort()));

private MutateOperations mutateOperations = new BasicMutateOperations(
new AerospikeOperations("test", AEROSPIKE_PROPERTIES.getNamespace(), client,
new AerospikePolicyProvider(getAerospikeConfiguration(container)),
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));


@Test
public void shouldDeleteKeyIdempotentlyIfWal() {
//when
mutateOperations.mutate(STORE_NAME, KEY,
singletonMap(COLUMN_NAME, COLUMN_VALUE), true);

//then
mutateOperations.mutate(STORE_NAME, KEY,
singletonMap(COLUMN_NAME, Value.NULL), true);

//expect
mutateOperations.mutate(STORE_NAME, KEY,
singletonMap(COLUMN_NAME, Value.NULL), true);
}

@Test(expected = AerospikeException.class)
public void shouldFailOnDeleteIfNotWal() {
//when
mutateOperations.mutate(STORE_NAME, KEY,
singletonMap(COLUMN_NAME, COLUMN_VALUE), false);

//then
mutateOperations.mutate(STORE_NAME, KEY,
singletonMap(COLUMN_NAME, Value.NULL), false);

//expect
mutateOperations.mutate(STORE_NAME, KEY,
singletonMap(COLUMN_NAME, Value.NULL), false);
}

}