Skip to content

Commit

Permalink
[Tiered Caching] [Bug Fix] Use concurrentMap instead of HashMap to fi…
Browse files Browse the repository at this point in the history
…x Concurrent Modification Exception (opensearch-project#14221)

* use concurrentmap

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCacheTests.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCacheTests.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update CHANGELOG.md

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCache.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCacheTests.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCacheTests.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* revert feature flags

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* changelog to releaselog

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* use concurrentmap

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCacheTests.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCacheTests.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update CHANGELOG.md

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCache.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCacheTests.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCacheTests.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* revert feature flags

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* changelog to releaselog

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* revert the test removal

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* revert the conflict resolutions

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

---------

Signed-off-by: Kiran Prakash <awskiran@amazon.com>
  • Loading branch information
kiranprakash154 authored and Peter Alfonsi committed Sep 3, 2024
1 parent f85cd00 commit a222a03
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -507,7 +506,7 @@ public int hashCode() {
* */
class IndicesRequestCacheCleanupManager implements Closeable {
private final Set<CleanupKey> keysToClean;
private final ConcurrentMap<ShardId, HashMap<String, Integer>> cleanupKeyToCountMap;
private final ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap;
private final AtomicInteger staleKeysCount;
private volatile double stalenessThreshold;
private final IndicesRequestCacheCleaner cacheCleaner;
Expand Down Expand Up @@ -568,7 +567,13 @@ private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) {

// If the key doesn't exist, it's added with a value of 1.
// If the key exists, its value is incremented by 1.
cleanupKeyToCountMap.computeIfAbsent(shardId, k -> new HashMap<>()).merge(cleanupKey.readerCacheKeyId, 1, Integer::sum);
addToCleanupKeyToCountMap(shardId, cleanupKey.readerCacheKeyId);
}

// pkg-private for testing
void addToCleanupKeyToCountMap(ShardId shardId, String readerCacheKeyId) {
cleanupKeyToCountMap.computeIfAbsent(shardId, k -> ConcurrentCollections.newConcurrentMap())
.merge(readerCacheKeyId, 1, Integer::sum);
}

/**
Expand Down Expand Up @@ -826,7 +831,7 @@ public void close() {
}

// for testing
ConcurrentMap<ShardId, HashMap<String, Integer>> getCleanupKeyToCountMap() {
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> getCleanupKeyToCountMap() {
return cleanupKeyToCountMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -105,7 +105,9 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Collections.emptyMap;
Expand Down Expand Up @@ -489,7 +491,7 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount(
indexShard.hashCode()
);
// test the mapping
ConcurrentMap<ShardId, HashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
// shard id should exist
assertTrue(cleanupKeyToCountMap.containsKey(shardId));
// reader CacheKeyId should NOT exist
Expand Down Expand Up @@ -552,7 +554,7 @@ public void testStaleCount_OnRemovalNotificationOfNonStaleKey_DoesNotDecrementsS
);

// test the mapping
ConcurrentMap<ShardId, HashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
// shard id should exist
assertTrue(cleanupKeyToCountMap.containsKey(shardId));
// reader CacheKeyId should NOT exist
Expand Down Expand Up @@ -720,7 +722,7 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception {
cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes());
assertEquals(1, cache.count());
// test the mappings
ConcurrentMap<ShardId, HashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
ConcurrentMap<ShardId, ConcurrentMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(reader)));

cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes());
Expand Down Expand Up @@ -793,8 +795,54 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception {
IOUtils.close(secondReader);
}

private DirectoryReader getReader(IndexWriter writer, ShardId shardId) throws IOException {
return OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId);
// test adding to cleanupKeyToCountMap with multiple threads
public void testAddToCleanupKeyToCountMap() throws Exception {
threadPool = getThreadPool();
Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build();
cache = getIndicesRequestCache(settings);

int numberOfThreads = 10;
int numberOfIterations = 1000;
Phaser phaser = new Phaser(numberOfThreads + 1); // +1 for the main thread
AtomicBoolean exceptionDetected = new AtomicBoolean(false);

ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);

for (int i = 0; i < numberOfThreads; i++) {
executorService.submit(() -> {
phaser.arriveAndAwaitAdvance(); // Ensure all threads start at the same time
try {
for (int j = 0; j < numberOfIterations; j++) {
cache.cacheCleanupManager.addToCleanupKeyToCountMap(indexShard.shardId(), UUID.randomUUID().toString());
}
} catch (ConcurrentModificationException e) {
logger.error("ConcurrentModificationException detected in thread : " + e.getMessage());
exceptionDetected.set(true); // Set flag if exception is detected
}
});
}
phaser.arriveAndAwaitAdvance(); // Start all threads

// Main thread iterates over the map
executorService.submit(() -> {
try {
for (int j = 0; j < numberOfIterations; j++) {
cache.cacheCleanupManager.getCleanupKeyToCountMap().forEach((k, v) -> {
v.forEach((k1, v1) -> {
// Accessing the map to create contention
v.get(k1);
});
});
}
} catch (ConcurrentModificationException e) {
logger.error("ConcurrentModificationException detected in main thread : " + e.getMessage());
exceptionDetected.set(true); // Set flag if exception is detected
}
});

executorService.shutdown();
executorService.awaitTermination(60, TimeUnit.SECONDS);
assertFalse(exceptionDetected.get());
}

private IndicesRequestCache getIndicesRequestCache(Settings settings) {
Expand All @@ -808,6 +856,10 @@ private IndicesRequestCache getIndicesRequestCache(Settings settings) {
);
}

private DirectoryReader getReader(IndexWriter writer, ShardId shardId) throws IOException {
return OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId);
}

private Loader getLoader(DirectoryReader reader) {
return new Loader(reader, 0);
}
Expand Down

0 comments on commit a222a03

Please sign in to comment.