Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove old id blocks - IdsCleanupOperations should be removed after f… #212

Merged
merged 1 commit into from
Apr 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.aerospike.client.Value;
import com.playtika.janusgraph.aerospike.operations.AerospikeOperations;
import com.playtika.janusgraph.aerospike.operations.ErrorMapper;
import com.playtika.janusgraph.aerospike.operations.IdsCleanupOperations;
import com.playtika.janusgraph.aerospike.operations.MutateOperations;
import com.playtika.janusgraph.aerospike.operations.ReadOperations;
import com.playtika.janusgraph.aerospike.operations.ScanOperations;
Expand Down Expand Up @@ -44,20 +45,23 @@ public class AerospikeKeyColumnValueStore implements KeyColumnValueStore {
private final BatchUpdater<BatchLocks, BatchUpdates, AerospikeLock, Value> batchUpdater;
private final MutateOperations mutateOperations;
private final ScanOperations scanOperations;
private final IdsCleanupOperations idsCleanupOperations;

protected AerospikeKeyColumnValueStore(
String storeName,
ReadOperations readOperations,
AerospikeOperations aerospikeOperations,
BatchUpdater<BatchLocks, BatchUpdates, AerospikeLock, Value> batchUpdater,
MutateOperations mutateOperations,
ScanOperations scanOperations) {
ScanOperations scanOperations,
IdsCleanupOperations idsCleanupOperations) {
this.storeName = storeName;
this.readOperations = readOperations;
this.aerospikeOperations = aerospikeOperations;
this.batchUpdater = batchUpdater;
this.mutateOperations = mutateOperations;
this.scanOperations = scanOperations;
this.idsCleanupOperations = idsCleanupOperations;
}

@Override // This method is only supported by stores which keep keys in byte-order.
Expand Down Expand Up @@ -99,12 +103,21 @@ public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> d
Map<Value, Value> mutationMap = mutationToMap(new KCVMutation(additions, deletions));
Value keyValue = getValue(key);

//no need in transactional logic
if(transaction.getLocks().isEmpty()){
mutateOperations.mutate(storeName, keyValue, mutationMap);
return;
try {
if (transaction.getLocks().isEmpty()) {
//no need in transactional logic
mutateOperations.mutate(storeName, keyValue, mutationMap);
} else {
updateBatch(keyValue, mutationMap, transaction);
}
} finally {
if(idsCleanupOperations != null) {
idsCleanupOperations.cleanUpOldIdsRanges(key);
}
}
}

private void updateBatch(Value keyValue, Map<Value, Value> mutationMap, AerospikeTransaction transaction) throws BackendException {
Map<String, Map<Value, Map<Value, Value>>> locksByStore = transaction.getLocksByStoreKeyColumn();
if(!singleton(storeName).containsAll(locksByStore.keySet())){
throw new IllegalArgumentException();
Expand All @@ -131,7 +144,7 @@ public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> d
transaction.close();
}

static Map<Value, Value> mutationToMap(KCVMutation mutation){
public static Map<Value, Value> mutationToMap(KCVMutation mutation){
Map<Value, Value> map = new HashMap<>(mutation.getAdditions().size() + mutation.getDeletions().size());
for(StaticBuffer deletion : mutation.getDeletions()){
map.put(getValue(deletion), Value.NULL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,12 @@ public KeyColumnValueStore openDatabase(String name) {
operations.getAerospikeOperations(),
operations.batchUpdater(),
operations.mutateOperations(),
operations.getScanOperations());
operations.getScanOperations(),
operations.getIdsCleanupOperations(name));
}



@Override
public KeyColumnValueStore openDatabase(String name, StoreMetaData.Container metaData) {
return openDatabase(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,9 @@ public class ConfigOptions {
public static final ConfigOption<Boolean> CHECK_ALL_MUTATIONS_LOCKED = new ConfigOption<>(STORAGE_NS,
"check-all-mutations-locked", "Checks that all mutations are locked",
ConfigOption.Type.LOCAL, false);

public static final ConfigOption<Long> IDS_BLOCK_TTL = new ConfigOption<>(STORAGE_NS,
"ids-block-ttl", "How long keep history of ids block in milliseconds, default to one week",
ConfigOption.Type.LOCAL, 604800000L);

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static nosql.batch.update.util.AsyncUtil.shutdownAndAwaitTermination;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.STORAGE_HOSTS;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.STORAGE_PORT;
import static org.janusgraph.graphdb.configuration.JanusGraphConstants.JANUSGRAPH_ID_STORE_NAME;

public class AerospikeOperations {

Expand All @@ -34,7 +33,9 @@ public class AerospikeOperations {

private final String namespace;
private final String idsNamespace;
private final String idsStoreName;
private final String graphPrefix;

private final IAerospikeClient client;
private final ExecutorService aerospikeExecutor;

Expand All @@ -45,14 +46,17 @@ public class AerospikeOperations {
private final ScheduledFuture<?> statsFuture;

public AerospikeOperations(String graphPrefix,
String namespace, String idsNamespace,
String namespace,
String idsNamespace,
String idsStoreName,
IAerospikeClient client,
AerospikePolicyProvider aerospikePolicyProvider,
ExecutorService aerospikeExecutor,
ExecutorService batchExecutor) {
this.graphPrefix = graphPrefix+".";
this.namespace = namespace;
this.idsNamespace = idsNamespace;
this.idsStoreName = idsStoreName;
this.client = client;
this.aerospikeExecutor = aerospikeExecutor;
this.aerospikePolicyProvider = aerospikePolicyProvider;
Expand Down Expand Up @@ -89,7 +93,7 @@ public static Value getValue(StaticBuffer staticBuffer) {
}

public Key getKey(String storeName, Value value) {
String namespace = JANUSGRAPH_ID_STORE_NAME.equals(storeName) ? this.idsNamespace : this.namespace;
String namespace = idsStoreName.equals(storeName) ? this.idsNamespace : this.namespace;
return new Key(namespace, getSetName(storeName), value);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,24 @@

import static com.playtika.janusgraph.aerospike.ConfigOptions.AEROSPIKE_EXECUTOR_MAX_THREADS;
import static com.playtika.janusgraph.aerospike.ConfigOptions.GRAPH_PREFIX;
import static com.playtika.janusgraph.aerospike.ConfigOptions.IDS_BLOCK_TTL;
import static com.playtika.janusgraph.aerospike.ConfigOptions.IDS_NAMESPACE;
import static com.playtika.janusgraph.aerospike.ConfigOptions.NAMESPACE;
import static com.playtika.janusgraph.aerospike.ConfigOptions.PARALLEL_READ_THRESHOLD;
import static com.playtika.janusgraph.aerospike.ConfigOptions.SCAN_PARALLELISM;
import static com.playtika.janusgraph.aerospike.ConfigOptions.START_WAL_COMPLETER;
import static com.playtika.janusgraph.aerospike.operations.AerospikeOperations.buildAerospikeClient;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.IDS_STORE_NAME;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.TIMESTAMP_PROVIDER;

public class BasicOperations implements Operations {

private static final Logger logger = LoggerFactory.getLogger(BasicOperations.class);

public static final String JANUS_AEROSPIKE_THREAD_GROUP_NAME = "janus-aerospike";
public static final String JANUS_BATCH_THREAD_GROUP_NAME = "janus-batch";
public static final String JANUS_GROUP_NAME = "janus";
public static final String AEROSPIKE_PREFIX = "aerospike";
public static final String BATCH_PREFIX = "batch";
public static final String IDS_CLEANUP_PREFIX = "ids-cleanup";

private final AerospikeOperations aerospikeOperations;
private final MutateOperations mutateOperations;
Expand All @@ -51,6 +56,8 @@ public class BasicOperations implements Operations {
private final ReadOperations readOperations;
private final ScanOperations scanOperations;

private final IdsCleanupOperations idsCleanupOperations;

public BasicOperations(Configuration configuration) {
this.aerospikeOperations = buildAerospikeOperations(configuration);
WalOperations walOperations = buildWalOperations(configuration, aerospikeOperations);
Expand All @@ -68,6 +75,13 @@ aerospikeOperations, walOperations, getClock(),

this.readOperations = buildReadOperations(configuration, aerospikeOperations);
this.scanOperations = buildScanOperations(configuration, aerospikeOperations);

this.idsCleanupOperations = new IdsCleanupOperations(
configuration.get(IDS_STORE_NAME),
readOperations, mutateOperations,
configuration.get(IDS_BLOCK_TTL),
configuration.get(TIMESTAMP_PROVIDER),
executorService(1, IDS_CLEANUP_PREFIX));
}

protected BatchOperations<BatchLocks, BatchUpdates, AerospikeLock, Value> buildBatchOperations(
Expand Down Expand Up @@ -123,10 +137,10 @@ protected AerospikeOperations buildAerospikeOperations(Configuration configurati
waitForClientToConnect(client);

return new AerospikeOperations(graphPrefix,
namespace, idsNamespace,
namespace, idsNamespace, configuration.get(IDS_STORE_NAME),
client, policyProvider,
executorService(configuration.get(AEROSPIKE_EXECUTOR_MAX_THREADS)),
executorService(8, 8));
executorService(configuration.get(AEROSPIKE_EXECUTOR_MAX_THREADS), AEROSPIKE_PREFIX),
executorService(8, 8, BATCH_PREFIX));
}

private void waitForClientToConnect(IAerospikeClient client) {
Expand Down Expand Up @@ -176,7 +190,7 @@ protected ScanOperations buildScanOperations(Configuration configuration, Aerosp
Integer scanParallelism = configuration.get(SCAN_PARALLELISM);
if(scanParallelism > 0){
return new BasicScanOperations(aerospikeOperations, new NamedThreadFactory(
JANUS_AEROSPIKE_THREAD_GROUP_NAME, "scan"));
JANUS_GROUP_NAME, "scan"));
} else {
return new UnsupportedScanOperations();
}
Expand All @@ -190,18 +204,23 @@ public void close() {
aerospikeOperations.close();
}

public static ExecutorService executorService(int maxThreads){
@Override
public IdsCleanupOperations getIdsCleanupOperations(String storeName) {
return idsCleanupOperations.getIdsStoreName().equals(storeName) ? idsCleanupOperations : null;
}

public static ExecutorService executorService(int maxThreads, String prefix){
return new ThreadPoolExecutor(0, maxThreads,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(JANUS_AEROSPIKE_THREAD_GROUP_NAME, "janus-aerospike"));
new NamedThreadFactory(JANUS_GROUP_NAME, prefix));
}

public static ExecutorService executorService(int maxThreads, int queueCapacity){
public static ExecutorService executorService(int maxThreads, int queueCapacity, String prefix){
return new ThreadPoolExecutor(0, maxThreads,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(queueCapacity),
new NamedThreadFactory(JANUS_BATCH_THREAD_GROUP_NAME, "janus-batch"),
new NamedThreadFactory(JANUS_GROUP_NAME, prefix),
new ThreadPoolExecutor.CallerRunsPolicy());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package com.playtika.janusgraph.aerospike.operations;

import com.aerospike.client.Value;
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.Entry;
import org.janusgraph.diskstorage.EntryList;
import org.janusgraph.diskstorage.StaticBuffer;
import org.janusgraph.diskstorage.keycolumnvalue.KCVMutation;
import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStore;
import org.janusgraph.diskstorage.keycolumnvalue.SliceQuery;
import org.janusgraph.diskstorage.util.BufferUtil;
import org.janusgraph.diskstorage.util.time.TimestampProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;

import static com.playtika.janusgraph.aerospike.AerospikeKeyColumnValueStore.mutationToMap;
import static com.playtika.janusgraph.aerospike.operations.AerospikeOperations.getValue;
import static java.util.Collections.singletonList;

//TODO remove after this issue fixed https://github.com/JanusGraph/janusgraph/issues/3007
public class IdsCleanupOperations {

private static final Logger logger = LoggerFactory.getLogger(IdsCleanupOperations.class);

private static final StaticBuffer LOWER_SLICE = BufferUtil.zeroBuffer(1);
private static final StaticBuffer UPPER_SLICE = BufferUtil.oneBuffer(17);

private static final int PROTECTED_BLOCKS_AMOUNT = 10;


private final String idsStoreName;
private final long ttl;
private final ReadOperations readOperations;
private final MutateOperations mutateOperations;
private final TimestampProvider timestampProvider;
private final ExecutorService executorService;

public IdsCleanupOperations(String idsStoreName,
ReadOperations readOperations, MutateOperations mutateOperations,
long ttl,
TimestampProvider timestampProvider,
ExecutorService executorService) {
this.idsStoreName = idsStoreName;
this.ttl = ttl;
this.readOperations = readOperations;
this.mutateOperations = mutateOperations;
this.timestampProvider = timestampProvider;
this.executorService = executorService;
}

public void cleanUpOldIdsRanges(StaticBuffer key) {
executorService.submit(() -> {
try {
cleanUpOldIdsRangesImpl(key);
} catch (BackendException e) {
logger.error("Error while running cleanup of old ranges for key=[{}]", key, e);
}
});
}

public void cleanUpOldIdsRangesImpl(StaticBuffer key) throws BackendException {
if(ttl == Long.MAX_VALUE){
return;
}

Map<StaticBuffer, EntryList> allIdBlocksMap = readOperations.getSlice(idsStoreName, singletonList(key), new SliceQuery(LOWER_SLICE, UPPER_SLICE));
EntryList blocks = allIdBlocksMap.get(key);

blocks.sort(TIMESTAMP_COMPARATOR);
List<Entry> blocksToCheck = blocks.subList(0, Math.max(blocks.size() - PROTECTED_BLOCKS_AMOUNT, 0));

List<StaticBuffer> columnsToRemove = new ArrayList<>();
for (Entry e : blocksToCheck) {
ByteBuffer byteBuffer = e.asByteBuffer();
long counterVal = byteBuffer.getLong();
long idBlockTimestamp = byteBuffer.getLong();
byte[] instanceNameData = new byte[byteBuffer.remaining()];
byteBuffer.get(instanceNameData);
String instanceName = new String(instanceNameData);

long currentTimestamp = timestampProvider.getTime(timestampProvider.getTime());
long ttlTimestamp = timestampProvider.getTime(Instant.ofEpochMilli(ttl));
if(idBlockTimestamp < currentTimestamp - ttlTimestamp){
columnsToRemove.add(e.getColumn());
logger.info("Added for removal id block - key=[{}], value=[{}], timestamp=[{}], instanceName=[{}]",
key, counterVal, idBlockTimestamp, instanceName);
} else {
logger.trace("Will retain id block - key=[{}], value=[{}], timestamp=[{}], instanceName=[{}]",
key, counterVal, idBlockTimestamp, instanceName);
}
}
if(!columnsToRemove.isEmpty()){
Map<Value, Value> mutationMap = mutationToMap(new KCVMutation(KeyColumnValueStore.NO_ADDITIONS, columnsToRemove));
mutateOperations.mutate(idsStoreName, getValue(key), mutationMap);
logger.info("Removed [{}] old id blocks for - key=[{}]", columnsToRemove.size(), key);
}
}

public String getIdsStoreName() {
return idsStoreName;
}

BlockTimestampComparator TIMESTAMP_COMPARATOR = new BlockTimestampComparator();
private static class BlockTimestampComparator implements Comparator<Entry> {

@Override
public int compare(Entry e1, Entry e2) {
return Long.compare(getTimestamp(e1), getTimestamp(e2));
}
}

private static long getTimestamp(Entry e){
ByteBuffer byteBuffer = e.asByteBuffer();
byteBuffer.getLong();
return byteBuffer.getLong();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ public interface Operations {
ScanOperations getScanOperations();

void close();

IdsCleanupOperations getIdsCleanupOperations(String storeName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class WalOperations {
private final Long staleTransactionLifetimeThresholdInMs;
private final Integer maxBatchSize;
private final String walSetName;
private AerospikeOperations aerospikeOperations;
private final AerospikeOperations aerospikeOperations;

public WalOperations(Configuration configuration, AerospikeOperations aerospikeOperations) {
this.walNamespace = configuration.get(WAL_NAMESPACE);
Expand Down
Loading