Skip to content

Commit

Permalink
batch reads options added
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergii Karpenko authored and kptfh committed Sep 25, 2019
1 parent a3df229 commit dceff4c
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public KeyIterator getKeys(SliceQuery query, StoreTransaction txh) {

@Override
public Map<StaticBuffer,EntryList> getSlice(List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh) throws BackendException {
logger.trace("getSlice({}, tx:{}, {}, {})", storeName, txh, keys, query);
logger.trace("getSlice({}, tx:{}, {}, start:{}, end:{})",
storeName, txh, keys, query.getSliceStart(), query.getSliceEnd());

return readOperations.getSlice(storeName, keys, query, txh);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,8 @@ public class ConfigOptions {
"You may consider to start WAL Completer externally than you will be able to suspend and resume it manually",
ConfigOption.Type.LOCAL, true);

public static final ConfigOption<Integer> BATCH_READ_THRESHOLD = new ConfigOption<>(STORAGE_NS,
"batch-read-threshold", "Number of keys when we should start using batch reads instead of parallel",
ConfigOption.Type.LOCAL, 2);

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public BasicOperations(Configuration configuration) {
this.writeAheadLogCompleter = buildWriteAheadLogCompleter(walOperations,
() -> writeAheadLogManager, () -> lockOperations, () -> mutateOperations);

this.readOperations = buildReadOperations(aerospikeOperations);
this.readOperations = buildReadOperations(configuration, aerospikeOperations);
this.scanOperations = buildScanOperations(configuration, aerospikeOperations);
}

Expand Down Expand Up @@ -143,8 +143,8 @@ protected TransactionalOperations buildWalCompleterTransactionalOperations(
return new TransactionalOperations(writeAheadLogManager.get(), lockOperations.get(), mutateOperations.get());
}

protected ReadOperations buildReadOperations(AerospikeOperations aerospikeOperations) {
return new ReadOperations(aerospikeOperations);
protected ReadOperations buildReadOperations(Configuration configuration, AerospikeOperations aerospikeOperations) {
return new ReadOperations(aerospikeOperations, configuration.get(BATCH_READ_THRESHOLD));
}

protected ScanOperations buildScanOperations(Configuration configuration, AerospikeOperations aerospikeOperations){
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package com.playtika.janusgraph.aerospike.operations;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
import com.aerospike.client.cdt.MapOperation;
import com.aerospike.client.cdt.MapReturnType;
import com.aerospike.client.policy.WritePolicy;
import com.playtika.janusgraph.aerospike.util.AsyncUtil;
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.Entry;
import org.janusgraph.diskstorage.EntryList;
import org.janusgraph.diskstorage.PermanentBackendException;
import org.janusgraph.diskstorage.StaticBuffer;
Expand All @@ -17,8 +19,14 @@
import org.janusgraph.diskstorage.util.StaticArrayBuffer;
import org.janusgraph.diskstorage.util.StaticArrayEntry;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static com.playtika.janusgraph.aerospike.operations.AerospikeOperations.ENTRIES_BIN_NAME;
import static com.playtika.janusgraph.aerospike.operations.AerospikeOperations.getValue;
Expand All @@ -28,13 +36,55 @@ public class ReadOperations {
private final AerospikeOperations aerospikeOperations;

private final WritePolicy getPolicy;
private final int bathReadThreshold;

public ReadOperations(AerospikeOperations aerospikeOperations) {
public ReadOperations(AerospikeOperations aerospikeOperations, int bathReadThreshold) {
this.aerospikeOperations = aerospikeOperations;
this.getPolicy = aerospikeOperations.getAerospikePolicyProvider().writePolicy();
this.bathReadThreshold = bathReadThreshold;
}

public Map<StaticBuffer,EntryList> getSlice(String storeName, List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh) throws BackendException {
if(keys.size() == 1){
return getSliceOfOneKey(storeName, keys, query, txh);
} else if(keys.size() < bathReadThreshold){
return getSliceInParallel(storeName, keys, query, txh);
} else {
return getSliceByBatch(storeName, keys, query, txh);
}
}

private Map<StaticBuffer,EntryList> getSliceOfOneKey(String storeName, List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh) throws BackendException {
StaticBuffer key = keys.get(0);
return Collections.singletonMap(key,
getSlice(storeName, new KeySliceQuery(key, query.getSliceStart(), query.getSliceEnd()), txh));
}

private Map<StaticBuffer,EntryList> getSliceByBatch(String storeName, List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh) throws BackendException {
Key[] aerospikeKeys = keys.stream()
.map(keyBuffer -> aerospikeOperations.getKey(storeName, keyBuffer))
.toArray(Key[]::new);
Record[] records = aerospikeOperations.getClient().get(null, aerospikeKeys, ENTRIES_BIN_NAME);

Map<StaticBuffer,EntryList> resultMap = new HashMap<>(keys.size());
for(int i = 0, n = records.length; i < n; i++){
Record record = records[i];
if(record != null) {
SortedMap<?, ?> map = ((SortedMap) record.getMap(ENTRIES_BIN_NAME));
resultMap.put(keys.get(i), recordToEntries(map, isInSlice(query), query.getLimit()));
} else {
resultMap.put(keys.get(i), EntryList.EMPTY_LIST);
}
}
return resultMap;
}

private static Predicate<Entry> isInSlice(SliceQuery query) {
return entry -> entry.getColumn().compareTo(query.getSliceStart()) >= 0
&& entry.getColumn().compareTo(query.getSliceEnd()) < 0;
}

private Map<StaticBuffer,EntryList> getSliceInParallel(String storeName, List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh) throws BackendException {
return AsyncUtil.mapAll(keys, key -> {
try {
return getSlice(storeName, new KeySliceQuery(key, query), txh);
Expand All @@ -44,6 +94,20 @@ public Map<StaticBuffer,EntryList> getSlice(String storeName, List<StaticBuffer>
}, aerospikeOperations.getAerospikeExecutor());
}

private EntryList recordToEntries(Map<?,?> entriesMap, Predicate<? super Entry> entryPredicate, int entriesNo) {

if(entriesMap != null && !entriesMap.isEmpty()){
return entriesMap.entrySet().stream()
.map(entry -> StaticArrayEntry.of(
StaticArrayBuffer.of((ByteBuffer) entry.getKey()),
StaticArrayBuffer.of((byte[]) entry.getValue())))
.filter(entryPredicate)
.limit(entriesNo)
.collect(Collectors.toCollection(EntryArrayList::new));
}
return EntryList.EMPTY_LIST;
}

public EntryList getSlice(String storeName, KeySliceQuery query, StoreTransaction txh) throws BackendException {

try {
Expand Down Expand Up @@ -80,5 +144,4 @@ private EntryList recordToEntries(Record record, int entriesNo) {
}
}


}
2 changes: 1 addition & 1 deletion aerospike-storage-backend/src/test/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<Root level="info">
<AppenderRef ref="Console"/>
</Root>
<Logger name="com.playtika.janusgraph.aerospike" level="trace">
<Logger name="com.playtika.janusgraph.aerospike" level="info" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
</Loggers>
Expand Down

0 comments on commit dceff4c

Please sign in to comment.