Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tiered Cache] Use ConcurrentHashMap explicitly in IndicesRequestCache #14409

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -506,15 +507,15 @@ public int hashCode() {
* */
class IndicesRequestCacheCleanupManager implements Closeable {
private final Set<CleanupKey> keysToClean;
private final ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap;
private final ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> cleanupKeyToCountMap;
private final AtomicInteger staleKeysCount;
private volatile double stalenessThreshold;
private final IndicesRequestCacheCleaner cacheCleaner;

IndicesRequestCacheCleanupManager(ThreadPool threadpool, TimeValue cleanInterval, double stalenessThreshold) {
this.stalenessThreshold = stalenessThreshold;
this.keysToClean = ConcurrentCollections.newConcurrentSet();
this.cleanupKeyToCountMap = ConcurrentCollections.newConcurrentMap();
this.cleanupKeyToCountMap = new ConcurrentHashMap<>();
this.staleKeysCount = new AtomicInteger(0);
this.cacheCleaner = new IndicesRequestCacheCleaner(this, threadpool, cleanInterval);
threadpool.schedule(cacheCleaner, cleanInterval, ThreadPool.Names.SAME);
Expand Down Expand Up @@ -572,8 +573,7 @@ private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) {

// pkg-private for testing
void addToCleanupKeyToCountMap(ShardId shardId, String readerCacheKeyId) {
cleanupKeyToCountMap.computeIfAbsent(shardId, k -> ConcurrentCollections.newConcurrentMap())
.merge(readerCacheKeyId, 1, Integer::sum);
cleanupKeyToCountMap.computeIfAbsent(shardId, k -> new ConcurrentHashMap<>()).merge(readerCacheKeyId, 1, Integer::sum);
}

/**
Expand Down Expand Up @@ -831,7 +831,7 @@ public void close() {
}

// for testing
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> getCleanupKeyToCountMap() {
ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> getCleanupKeyToCountMap() {
return cleanupKeyToCountMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -491,7 +490,8 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount(
indexShard.hashCode()
);
// test the mapping
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager
.getCleanupKeyToCountMap();
// shard id should exist
assertTrue(cleanupKeyToCountMap.containsKey(shardId));
// reader CacheKeyId should NOT exist
Expand Down Expand Up @@ -554,7 +554,8 @@ public void testStaleCount_OnRemovalNotificationOfNonStaleKey_DoesNotDecrementsS
);

// test the mapping
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager
.getCleanupKeyToCountMap();
// shard id should exist
assertTrue(cleanupKeyToCountMap.containsKey(shardId));
// reader CacheKeyId should NOT exist
Expand Down Expand Up @@ -722,7 +723,8 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception {
cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes());
assertEquals(1, cache.count());
// test the mappings
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
ConcurrentHashMap<ShardId, ConcurrentHashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager
.getCleanupKeyToCountMap();
assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(reader)));

cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes());
Expand Down Expand Up @@ -796,15 +798,15 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception {
}

// test adding to cleanupKeyToCountMap with multiple threads
public void testAddToCleanupKeyToCountMap() throws Exception {
public void testAddingToCleanupKeyToCountMapWorksAppropriatelyWithMultipleThreads() throws Exception {
threadPool = getThreadPool();
Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build();
cache = getIndicesRequestCache(settings);

int numberOfThreads = 10;
int numberOfIterations = 1000;
Phaser phaser = new Phaser(numberOfThreads + 1); // +1 for the main thread
AtomicBoolean exceptionDetected = new AtomicBoolean(false);
AtomicBoolean concurrentModificationExceptionDetected = new AtomicBoolean(false);

ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);

Expand All @@ -817,7 +819,7 @@ public void testAddToCleanupKeyToCountMap() throws Exception {
}
} catch (ConcurrentModificationException e) {
logger.error("ConcurrentModificationException detected in thread : " + e.getMessage());
exceptionDetected.set(true); // Set flag if exception is detected
concurrentModificationExceptionDetected.set(true); // Set flag if exception is detected
}
});
}
Expand All @@ -836,13 +838,17 @@ public void testAddToCleanupKeyToCountMap() throws Exception {
}
} catch (ConcurrentModificationException e) {
logger.error("ConcurrentModificationException detected in main thread : " + e.getMessage());
exceptionDetected.set(true); // Set flag if exception is detected
concurrentModificationExceptionDetected.set(true); // Set flag if exception is detected
}
});

executorService.shutdown();
executorService.awaitTermination(60, TimeUnit.SECONDS);
assertFalse(exceptionDetected.get());
assertTrue(executorService.awaitTermination(60, TimeUnit.SECONDS));
assertEquals(
numberOfThreads * numberOfIterations,
cache.cacheCleanupManager.getCleanupKeyToCountMap().get(indexShard.shardId()).size()
);
assertFalse(concurrentModificationExceptionDetected.get());
}

private IndicesRequestCache getIndicesRequestCache(Settings settings) {
Expand Down
Loading