Skip to content

Commit

Permalink
parallel calls to aerospike
Browse files Browse the repository at this point in the history
  • Loading branch information
skarpenko authored and kptfh committed Mar 11, 2019
1 parent 073b086 commit 1e8d7c5
Show file tree
Hide file tree
Showing 10 changed files with 151 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;

import static com.playtika.janusgraph.aerospike.ConfigOptions.ALLOW_SCAN;
import static com.playtika.janusgraph.aerospike.ConfigOptions.NAMESPACE;
Expand All @@ -29,30 +30,39 @@ public class AerospikeKeyColumnValueStore implements KeyColumnValueStore {
mutatePolicy.respondAllOps = true;
}


static final String ENTRIES_BIN_NAME = "entries";

private final String namespace;
private final String name; //used as set name
private final Configuration configuration;
private final AerospikeClient client;
private final Executor scanExecutor;
private final LockOperations lockOperations;

AerospikeKeyColumnValueStore(String name,
AerospikeClient client,
Configuration configuration) {
Configuration configuration,
Executor aerospikeExecutor,
Executor scanExecutor) {
this.namespace = configuration.get(NAMESPACE);
this.name = name;
this.client = client;
this.configuration = configuration;
this.lockOperations = new LockOperationsUdf(client, this, configuration);
this.scanExecutor = scanExecutor;
this.lockOperations = new LockOperationsUdf(client, this, configuration, aerospikeExecutor);
}

@Override // This method is only supported by stores which keep keys in byte-order.
public KeyIterator getKeys(KeyRangeQuery query, StoreTransaction txh) {
throw new UnsupportedOperationException();
}

/**
* Used to add new index on existing graph
* @param query
* @param txh
* @return
*/
@Override // This method is only supported by stores which do not keep keys in byte-order.
public KeyIterator getKeys(SliceQuery query, StoreTransaction txh) {
if(!configuration.get(ALLOW_SCAN)){
Expand All @@ -63,14 +73,13 @@ public KeyIterator getKeys(SliceQuery query, StoreTransaction txh) {

AerospikeKeyIterator keyIterator = new AerospikeKeyIterator(client);

Thread thread = new Thread(() -> {
scanExecutor.execute(() -> {
try {
client.scanAll(scanPolicy, namespace, name, keyIterator);
} finally {
keyIterator.terminate();
}
});
thread.start();

return keyIterator;
}
Expand Down Expand Up @@ -154,8 +163,6 @@ void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletion
}

Key aerospikeKey = getKey(key);
WritePolicy mutatePolicy = new WritePolicy();
mutatePolicy.respondAllOps = true;
Record record = client.operate(null, aerospikeKey, operations.toArray(new Operation[0]));
if(entriesNoOperationIndex != -1){
long entriesNoAfterMutation = (Long)record.getList(ENTRIES_BIN_NAME).get(entriesNoOperationIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@
import org.janusgraph.diskstorage.keycolumnvalue.*;
import org.janusgraph.graphdb.configuration.PreInitializeConfigOptions;

import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.playtika.janusgraph.aerospike.ConfigOptions.*;
import static com.playtika.janusgraph.aerospike.util.AsyncUtil.allOf;
import static java.util.Collections.emptyMap;
import static java.util.concurrent.CompletableFuture.runAsync;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.*;


Expand All @@ -30,7 +32,13 @@ public class AerospikeStoreManager extends AbstractStoreManager implements KeyCo
private final AerospikeClient client;

private final Configuration configuration;
private final boolean pessimisticLocking;
private final boolean useLocking;

private final ThreadPoolExecutor scanExecutor = new ThreadPoolExecutor(0, 1,
1, TimeUnit.MINUTES, new LinkedBlockingQueue<>());

private final ThreadPoolExecutor aerospikeExecutor = new ThreadPoolExecutor(4, 40,
1, TimeUnit.MINUTES, new LinkedBlockingQueue<>());

public AerospikeStoreManager(Configuration configuration) {
super(configuration);
Expand All @@ -55,13 +63,13 @@ public AerospikeStoreManager(Configuration configuration) {
client = new AerospikeClient(clientPolicy, hosts);

this.configuration = configuration;
pessimisticLocking = configuration.get(PESSIMISTIC_LOCKING);
this.useLocking = configuration.get(USE_LOCKING);

features = new StandardStoreFeatures.Builder()
.keyConsistent(configuration)
.persists(true)
.locking(pessimisticLocking)
.optimisticLocking(true)
.locking(useLocking)
.optimisticLocking(true) //caused by deferred locking, actual locking happens just before transaction commit
.distributed(true)
.multiQuery(true)
.batchMutation(true)
Expand All @@ -86,7 +94,7 @@ static void registerUdfs(AerospikeClient client){
public AerospikeKeyColumnValueStore openDatabase(String name) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(name), "Database name may not be null or empty");

return new AerospikeKeyColumnValueStore(name, client, configuration);
return new AerospikeKeyColumnValueStore(name, client, configuration, aerospikeExecutor, scanExecutor);
}

@Override
Expand All @@ -96,56 +104,75 @@ public KeyColumnValueStore openDatabase(String name, StoreMetaData.Container met

@Override
public StoreTransaction beginTransaction(final BaseTransactionConfig config) {
return new AerospikeTransaction(config, this);
return new AerospikeTransaction(config);
}

@Override
public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations, StoreTransaction txh) throws BackendException {
acquireLocks(((AerospikeTransaction) txh).getLocks(), mutations);
Map<String, AerospikeLocks> locksByStore = acquireLocks(((AerospikeTransaction) txh).getLocks(), mutations);

try {
mutateMany(mutations);
Map<String, Set<StaticBuffer>> mutatedByStore = mutateMany(mutations);
releaseLocks(locksByStore, mutatedByStore);
} catch (AerospikeException e) {
//here we just release lock on key
// locks that comes from transaction will be released by rollback
releaseLocks(mutations);
releaseLocks(locksByStore, emptyMap());
throw new PermanentBackendException(e);
}
}

private void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations) {
private Map<String, Set<StaticBuffer>> mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations) {

List<CompletableFuture<?>> futures = new ArrayList<>();
Map<String, Set<StaticBuffer>> mutatedByStore = new ConcurrentHashMap<>();

mutations.forEach((storeName, entry) -> {
final AerospikeKeyColumnValueStore store = openDatabase(storeName);
entry.forEach((key, mutation) -> store.mutate(
key, mutation.getAdditions(), mutation.getDeletions(), pessimisticLocking));
entry.forEach((key, mutation) -> futures.add(runAsync(() -> {
store.mutate(key, mutation.getAdditions(), mutation.getDeletions(), useLocking);
mutatedByStore.compute(storeName, (s, keys) -> {
Set<StaticBuffer> keysResult = keys != null ? keys : new HashSet<>();
keysResult.add(key);
return keysResult;
});
}, aerospikeExecutor)));
});

allOf(futures);

return mutatedByStore;
}

private void acquireLocks(List<AerospikeLock> locks, Map<String, Map<StaticBuffer, KCVMutation>> mutations) throws BackendException {
private Map<String, AerospikeLocks> acquireLocks(List<AerospikeLock> locks, Map<String, Map<StaticBuffer, KCVMutation>> mutations) throws BackendException {
Map<String, List<AerospikeLock>> locksByStore = locks.stream()
.collect(Collectors.groupingBy(lock -> lock.storeName));
Map<String, AerospikeLocks> locksAllByStore = new HashMap<>(locksByStore.size());
for(Map.Entry<String, List<AerospikeLock>> entry : locksByStore.entrySet()){
String storeName = entry.getKey();
List<AerospikeLock> locksForStore = entry.getValue();
Map<StaticBuffer, KCVMutation> mutationsForStore = mutations.getOrDefault(storeName, emptyMap());
AerospikeLocks locksAll = new AerospikeLocks(locksForStore.size() + mutationsForStore.size());
locksAll.addLocks(locksForStore);
if (pessimisticLocking) {
if (useLocking) {
locksAll.addLockOnKeys(mutationsForStore.keySet());
}

final AerospikeKeyColumnValueStore store = openDatabase(storeName);
store.getLockOperations().acquireLocks(locksAll.getLocksMap());
locksAllByStore.put(storeName, locksAll);
}
return locksAllByStore;
}

private void releaseLocks(Map<String, Map<StaticBuffer, KCVMutation>> mutations){
if(pessimisticLocking){
mutations.forEach((storeName, mutationsForStore) -> {
final AerospikeKeyColumnValueStore store = openDatabase(storeName);
store.getLockOperations().releaseLockOnKeys(mutationsForStore.keySet());
});
}
private void releaseLocks(Map<String, AerospikeLocks> locksByStore, Map<String, Set<StaticBuffer>> mutatedByStore){
locksByStore.forEach((storeName, locksForStore) -> {
final AerospikeKeyColumnValueStore store = openDatabase(storeName);
Set<StaticBuffer> mutatedForStore = mutatedByStore.get(storeName);
List<StaticBuffer> keysToRelease = locksForStore.getLocksMap().keySet().stream()
//ignore mutated keys as they already have been released
.filter(key -> !mutatedForStore.contains(key))
.collect(Collectors.toList());
store.getLockOperations().releaseLockOnKeys(keysToRelease);
});
}

//called from AerospikeTransaction
Expand All @@ -163,6 +190,8 @@ void releaseLocks(List<AerospikeLock> locks){
@Override
public void close() throws BackendException {
try {
scanExecutor.shutdown();
aerospikeExecutor.shutdown();
client.close();
} catch (AerospikeException e) {
throw new PermanentBackendException(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.playtika.janusgraph.aerospike;

import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.BaseTransactionConfig;
import org.janusgraph.diskstorage.common.AbstractStoreTransaction;

Expand All @@ -9,23 +8,19 @@

final class AerospikeTransaction extends AbstractStoreTransaction {

private final AerospikeStoreManager storeManager;
private final List<AerospikeLock> locks = new ArrayList<>();

AerospikeTransaction(final BaseTransactionConfig config, AerospikeStoreManager storeManager) {
AerospikeTransaction(final BaseTransactionConfig config) {
super(config);
this.storeManager = storeManager;
}

@Override
public void commit() throws BackendException {
storeManager.releaseLocks(locks);
public void commit() {
locks.clear();
}

@Override
public void rollback() throws BackendException {
storeManager.releaseLocks(locks);
public void rollback() {
locks.clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public class ConfigOptions {
public static final ConfigOption<String> NAMESPACE = new ConfigOption<>(STORAGE_NS,
"namespace", "Aerospike namespace to use", ConfigOption.Type.LOCAL, "test");

public static final ConfigOption<Boolean> PESSIMISTIC_LOCKING = new ConfigOption<>(STORAGE_NS,
public static final ConfigOption<Boolean> USE_LOCKING = new ConfigOption<>(STORAGE_NS,
"pessimistic_locking", "Whether to use pessimistic locking", ConfigOption.Type.LOCAL, Boolean.TRUE);


Expand Down
Loading

0 comments on commit 1e8d7c5

Please sign in to comment.