Skip to content

Commit

Permalink
use concurrentmap
Browse files Browse the repository at this point in the history
Signed-off-by: Kiran Prakash <awskiran@amazon.com>
  • Loading branch information
kiranprakash154 committed Jun 12, 2024
1 parent 1084ba9 commit 5a4484d
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.RatioValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.StreamInput;
Expand All @@ -75,7 +76,6 @@
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -175,7 +175,8 @@ public final class IndicesRequestCache implements RemovalListener<ICacheKey<Indi
this.cacheCleanupManager = new IndicesRequestCacheCleanupManager(
threadPool,
INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING.get(settings),
getStalenessThreshold(settings)
getStalenessThreshold(settings),
FeatureFlags.PLUGGABLE_CACHE_SETTING.get(settings)
);
this.cacheEntityLookup = cacheEntityFunction;
this.clusterService = clusterService;
Expand Down Expand Up @@ -506,17 +507,24 @@ public int hashCode() {
* */
class IndicesRequestCacheCleanupManager implements Closeable {
private final Set<CleanupKey> keysToClean;
private final ConcurrentMap<ShardId, HashMap<String, Integer>> cleanupKeyToCountMap;
private final ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap;
private final AtomicInteger staleKeysCount;
private volatile double stalenessThreshold;
private final IndicesRequestCacheCleaner cacheCleaner;
private final boolean pluggableCacheEnabled;

IndicesRequestCacheCleanupManager(ThreadPool threadpool, TimeValue cleanInterval, double stalenessThreshold) {
IndicesRequestCacheCleanupManager(
ThreadPool threadpool,
TimeValue cleanInterval,
double stalenessThreshold,
boolean pluggableCacheEnabled
) {
this.stalenessThreshold = stalenessThreshold;
this.keysToClean = ConcurrentCollections.newConcurrentSet();
this.cleanupKeyToCountMap = ConcurrentCollections.newConcurrentMap();
this.staleKeysCount = new AtomicInteger(0);
this.cacheCleaner = new IndicesRequestCacheCleaner(this, threadpool, cleanInterval);
this.pluggableCacheEnabled = pluggableCacheEnabled;
threadpool.schedule(cacheCleaner, cleanInterval, ThreadPool.Names.SAME);
}

Expand Down Expand Up @@ -555,7 +563,7 @@ void enqueueCleanupKey(CleanupKey cleanupKey) {
* @param cleanupKey the CleanupKey to be updated in the map
*/
private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) {
if (cleanupKey.entity == null) {
if (pluggableCacheEnabled || cleanupKey.entity == null) {
return;
}
IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity();
Expand All @@ -567,7 +575,13 @@ private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) {

// If the key doesn't exist, it's added with a value of 1.
// If the key exists, its value is incremented by 1.
cleanupKeyToCountMap.computeIfAbsent(shardId, k -> new HashMap<>()).merge(cleanupKey.readerCacheKeyId, 1, Integer::sum);
addToCleanupKeyToCountMap(shardId, cleanupKey.readerCacheKeyId);
}

// pkg-private for testing
void addToCleanupKeyToCountMap(ShardId shardId, String readerCacheKeyId) {
cleanupKeyToCountMap.computeIfAbsent(shardId, k -> ConcurrentCollections.newConcurrentMap())
.merge(readerCacheKeyId, 1, Integer::sum);
}

/**
Expand All @@ -586,7 +600,7 @@ private void updateStaleCountOnEntryRemoval(
CleanupKey cleanupKey,
RemovalNotification<ICacheKey<Key>, BytesReference> notification
) {
if (notification.getRemovalReason() == RemovalReason.REPLACED) {
if (pluggableCacheEnabled || notification.getRemovalReason() == RemovalReason.REPLACED) {
// The reason of the notification is REPLACED when a cache entry's value is updated, since replacing an entry
// does not affect the staleness count, we skip such notifications.
return;
Expand Down Expand Up @@ -646,7 +660,7 @@ private void updateStaleCountOnEntryRemoval(
* @param cleanupKey the CleanupKey that has been marked for cleanup
*/
private void incrementStaleKeysCount(CleanupKey cleanupKey) {
if (cleanupKey.entity == null) {
if (pluggableCacheEnabled || cleanupKey.entity == null) {
return;
}
IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity();
Expand Down Expand Up @@ -788,7 +802,7 @@ private synchronized void cleanCache(double stalenessThreshold) {
* @return true if the cache cleanup process can be skipped, false otherwise.
*/
private synchronized boolean canSkipCacheCleanup(double cleanThresholdPercent) {
if (cleanThresholdPercent == 0.0) {
if (pluggableCacheEnabled || cleanThresholdPercent == 0.0) {
return false;
}
double staleKeysInCachePercentage = staleKeysInCachePercentage();
Expand Down Expand Up @@ -825,7 +839,7 @@ public void close() {
}

// for testing
ConcurrentMap<ShardId, HashMap<String, Integer>> getCleanupKeyToCountMap() {
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> getCleanupKeyToCountMap() {
return cleanupKeyToCountMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -105,6 +105,9 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Collections.emptyMap;
Expand Down Expand Up @@ -488,7 +491,7 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount(
indexShard.hashCode()
);
// test the mapping
ConcurrentMap<ShardId, HashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
// shard id should exist
assertTrue(cleanupKeyToCountMap.containsKey(shardId));
// reader CacheKeyId should NOT exist
Expand Down Expand Up @@ -551,7 +554,7 @@ public void testStaleCount_OnRemovalNotificationOfNonStaleKey_DoesNotDecrementsS
);

// test the mapping
ConcurrentMap<ShardId, HashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
// shard id should exist
assertTrue(cleanupKeyToCountMap.containsKey(shardId));
// reader CacheKeyId should NOT exist
Expand Down Expand Up @@ -719,7 +722,7 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception {
cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes());
assertEquals(1, cache.count());
// test the mappings
ConcurrentMap<ShardId, HashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(reader)));

cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes());
Expand Down Expand Up @@ -792,8 +795,55 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception {
IOUtils.close(secondReader);
}

private DirectoryReader getReader(IndexWriter writer, ShardId shardId) throws IOException {
return OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId);
// test adding to cleanupKeyToCountMap with multiple threads
public void testAddToCleanupKeyToCountMap() throws InterruptedException {
threadPool = getThreadPool();
Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build();
cache = getIndicesRequestCache(settings);

int numberOfThreads = 10;
int numberOfIterations = 1000;
Phaser phaser = new Phaser(numberOfThreads + 1); // +1 for the main thread
AtomicBoolean exceptionDetected = new AtomicBoolean(false);

ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);

for (int i = 0; i < numberOfThreads; i++) {
executorService.submit(() -> {
phaser.arriveAndAwaitAdvance(); // Ensure all threads start at the same time
try {
for (int j = 0; j < numberOfIterations; j++) {
cache.cacheCleanupManager.addToCleanupKeyToCountMap(indexShard.shardId(), UUID.randomUUID().toString());
}
} catch (ConcurrentModificationException e) {
e.printStackTrace();
exceptionDetected.set(true); // Set flag if exception is detected
}
});
}
phaser.arriveAndAwaitAdvance(); // Start all threads

// Main thread iterates over the map
executorService.submit(() -> {
try {
for (int j = 0; j < numberOfIterations; j++) {
cache.cacheCleanupManager.getCleanupKeyToCountMap().forEach((k, v) -> {
v.forEach((k1, v1) -> {
// Accessing the map to create contention
v.get(k1);
});
});
}
} catch (ConcurrentModificationException e) {
e.printStackTrace();
exceptionDetected.set(true); // Set flag if exception is detected
}
});

executorService.shutdown();
executorService.awaitTermination(60, TimeUnit.SECONDS);

assertFalse(exceptionDetected.get());
}

private IndicesRequestCache getIndicesRequestCache(Settings settings) {
Expand All @@ -813,6 +863,10 @@ private IndicesRequestCache getIndicesRequestCache(Settings settings) {
);
}

private DirectoryReader getReader(IndexWriter writer, ShardId shardId) throws IOException {
return OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId);
}

private Loader getLoader(DirectoryReader reader) {
return new Loader(reader, 0);
}
Expand Down

0 comments on commit 5a4484d

Please sign in to comment.