From 53fb91c8dd74e7fb203a562607672e7e7bafe2ea Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Mon, 17 Jun 2024 11:49:01 -0700 Subject: [PATCH 1/6] use concurrentHashMap explicitly Signed-off-by: Kiran Prakash --- CHANGELOG.md | 1 + .../opensearch/indices/IndicesRequestCache.java | 16 +++++++++++----- .../indices/IndicesRequestCacheTests.java | 10 ++++++---- 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b9413d379f88b..8b8dbf59aec58 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Removed ### Fixed +- Used ConcurrentHashMap explicitly in IndicesRequestCache ([#14409](https://github.com/opensearch-project/OpenSearch/pull/14409)) ### Security diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 06cd77a34fe0b..a38af48878069 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -81,6 +81,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -506,7 +507,7 @@ public int hashCode() { * */ class IndicesRequestCacheCleanupManager implements Closeable { private final Set keysToClean; - private final ConcurrentMap> cleanupKeyToCountMap; + private final ConcurrentHashMap> cleanupKeyToCountMap; private final AtomicInteger staleKeysCount; private volatile double stalenessThreshold; private final IndicesRequestCacheCleaner cacheCleaner; @@ -514,7 +515,7 @@ class IndicesRequestCacheCleanupManager implements Closeable { IndicesRequestCacheCleanupManager(ThreadPool threadpool, TimeValue cleanInterval, double stalenessThreshold) { this.stalenessThreshold = stalenessThreshold; this.keysToClean = ConcurrentCollections.newConcurrentSet(); - this.cleanupKeyToCountMap = ConcurrentCollections.newConcurrentMap(); + this.cleanupKeyToCountMap = new ConcurrentHashMap<>(); this.staleKeysCount = new AtomicInteger(0); this.cacheCleaner = new IndicesRequestCacheCleaner(this, threadpool, cleanInterval); threadpool.schedule(cacheCleaner, cleanInterval, ThreadPool.Names.SAME); @@ -572,8 +573,13 @@ private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) { // pkg-private for testing void addToCleanupKeyToCountMap(ShardId shardId, String readerCacheKeyId) { - cleanupKeyToCountMap.computeIfAbsent(shardId, k -> ConcurrentCollections.newConcurrentMap()) - .merge(readerCacheKeyId, 1, Integer::sum); + cleanupKeyToCountMap.compute(shardId, (currentShardId, readerCacheKeyMap) -> { + if (readerCacheKeyMap == null) { + readerCacheKeyMap = new ConcurrentHashMap<>(); + } + readerCacheKeyMap.compute(readerCacheKeyId, (currentReaderCacheKeyId, count) -> (count == null) ? 1 : count + 1); + return readerCacheKeyMap; + }); } /** @@ -831,7 +837,7 @@ public void close() { } // for testing - ConcurrentMap> getCleanupKeyToCountMap() { + ConcurrentHashMap> getCleanupKeyToCountMap() { return cleanupKeyToCountMap; } diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 205712d388cd1..40d38514a210a 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -101,7 +101,6 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -491,7 +490,8 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount( indexShard.hashCode() ); // test the mapping - ConcurrentMap> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap(); + ConcurrentHashMap> cleanupKeyToCountMap = cache.cacheCleanupManager + .getCleanupKeyToCountMap(); // shard id should exist assertTrue(cleanupKeyToCountMap.containsKey(shardId)); // reader CacheKeyId should NOT exist @@ -554,7 +554,8 @@ public void testStaleCount_OnRemovalNotificationOfNonStaleKey_DoesNotDecrementsS ); // test the mapping - ConcurrentMap> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap(); + ConcurrentHashMap> cleanupKeyToCountMap = cache.cacheCleanupManager + .getCleanupKeyToCountMap(); // shard id should exist assertTrue(cleanupKeyToCountMap.containsKey(shardId)); // reader CacheKeyId should NOT exist @@ -722,7 +723,8 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception { cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); assertEquals(1, cache.count()); // test the mappings - ConcurrentMap> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap(); + ConcurrentHashMap> cleanupKeyToCountMap = cache.cacheCleanupManager + .getCleanupKeyToCountMap(); assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(reader))); cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); From 341ba71a66caccc3b601df07fc459e1d04b20f7c Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Mon, 17 Jun 2024 12:00:03 -0700 Subject: [PATCH 2/6] Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash --- .../java/org/opensearch/indices/IndicesRequestCacheTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 40d38514a210a..bcd70e02cac0a 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -843,7 +843,7 @@ public void testAddToCleanupKeyToCountMap() throws Exception { }); executorService.shutdown(); - executorService.awaitTermination(60, TimeUnit.SECONDS); + assertTrue(executorService.awaitTermination(60, TimeUnit.SECONDS)); assertFalse(exceptionDetected.get()); } From 5a56ec12ddc05def801e9fb90b2116c23c32ff39 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Mon, 17 Jun 2024 14:20:39 -0700 Subject: [PATCH 3/6] address comments Signed-off-by: Kiran Prakash --- CHANGELOG.md | 1 - .../opensearch/indices/IndicesRequestCache.java | 11 ++++++----- .../indices/IndicesRequestCacheTests.java | 14 +++++++++----- 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f5744d94b7929..a43c0acf3219a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,7 +25,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Removed ### Fixed -- Used ConcurrentHashMap explicitly in IndicesRequestCache ([#14409](https://github.com/opensearch-project/OpenSearch/pull/14409)) - Fix handling of Short and Byte data types in ScriptProcessor ingest pipeline ([#14379](https://github.com/opensearch-project/OpenSearch/issues/14379)) ### Security diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index a38af48878069..cc07e491b43b0 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -574,11 +574,12 @@ private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) { // pkg-private for testing void addToCleanupKeyToCountMap(ShardId shardId, String readerCacheKeyId) { cleanupKeyToCountMap.compute(shardId, (currentShardId, readerCacheKeyMap) -> { - if (readerCacheKeyMap == null) { - readerCacheKeyMap = new ConcurrentHashMap<>(); - } - readerCacheKeyMap.compute(readerCacheKeyId, (currentReaderCacheKeyId, count) -> (count == null) ? 1 : count + 1); - return readerCacheKeyMap; + final ConcurrentHashMap updatedReaderCacheKeyMap = Objects.requireNonNullElseGet( + readerCacheKeyMap, + ConcurrentHashMap::new + ); + updatedReaderCacheKeyMap.compute(readerCacheKeyId, (currentReaderCacheKeyId, count) -> (count == null) ? 1 : count + 1); + return updatedReaderCacheKeyMap; }); } diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index bcd70e02cac0a..10688de3ab0ae 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -798,7 +798,7 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception { } // test adding to cleanupKeyToCountMap with multiple threads - public void testAddToCleanupKeyToCountMap() throws Exception { + public void testAddingToCleanupKeyToCountMapWorksAppropriatelyWithMultipleThreads() throws Exception { threadPool = getThreadPool(); Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build(); cache = getIndicesRequestCache(settings); @@ -806,7 +806,7 @@ public void testAddToCleanupKeyToCountMap() throws Exception { int numberOfThreads = 10; int numberOfIterations = 1000; Phaser phaser = new Phaser(numberOfThreads + 1); // +1 for the main thread - AtomicBoolean exceptionDetected = new AtomicBoolean(false); + AtomicBoolean concurrentModificationExceptionDetected = new AtomicBoolean(false); ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads); @@ -819,7 +819,7 @@ public void testAddToCleanupKeyToCountMap() throws Exception { } } catch (ConcurrentModificationException e) { logger.error("ConcurrentModificationException detected in thread : " + e.getMessage()); - exceptionDetected.set(true); // Set flag if exception is detected + concurrentModificationExceptionDetected.set(true); // Set flag if exception is detected } }); } @@ -838,13 +838,17 @@ public void testAddToCleanupKeyToCountMap() throws Exception { } } catch (ConcurrentModificationException e) { logger.error("ConcurrentModificationException detected in main thread : " + e.getMessage()); - exceptionDetected.set(true); // Set flag if exception is detected + concurrentModificationExceptionDetected.set(true); // Set flag if exception is detected } }); executorService.shutdown(); assertTrue(executorService.awaitTermination(60, TimeUnit.SECONDS)); - assertFalse(exceptionDetected.get()); + assertEquals( + numberOfThreads * numberOfIterations, + cache.cacheCleanupManager.getCleanupKeyToCountMap().get(indexShard.shardId()).size() + ); + assertFalse(concurrentModificationExceptionDetected.get()); } private IndicesRequestCache getIndicesRequestCache(Settings settings) { From 8a949d4404cf40d1dd20d9c10d5c9ee5a6bcfea3 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Mon, 17 Jun 2024 14:28:34 -0700 Subject: [PATCH 4/6] Update IndicesRequestCache.java Signed-off-by: Kiran Prakash --- .../org/opensearch/indices/IndicesRequestCache.java | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index cc07e491b43b0..c45334201e9b8 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -573,14 +573,8 @@ private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) { // pkg-private for testing void addToCleanupKeyToCountMap(ShardId shardId, String readerCacheKeyId) { - cleanupKeyToCountMap.compute(shardId, (currentShardId, readerCacheKeyMap) -> { - final ConcurrentHashMap updatedReaderCacheKeyMap = Objects.requireNonNullElseGet( - readerCacheKeyMap, - ConcurrentHashMap::new - ); - updatedReaderCacheKeyMap.compute(readerCacheKeyId, (currentReaderCacheKeyId, count) -> (count == null) ? 1 : count + 1); - return updatedReaderCacheKeyMap; - }); + cleanupKeyToCountMap.computeIfAbsent(shardId, k -> new ConcurrentHashMap<>()) + .merge(readerCacheKeyId, 1, Integer::sum); } /** From 256e7ea9a8c403e6b133e137642e9651b91e0e17 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Mon, 17 Jun 2024 14:29:50 -0700 Subject: [PATCH 5/6] Update IndicesRequestCache.java Signed-off-by: Kiran Prakash --- .../main/java/org/opensearch/indices/IndicesRequestCache.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index c45334201e9b8..93946fa11de13 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -573,8 +573,7 @@ private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) { // pkg-private for testing void addToCleanupKeyToCountMap(ShardId shardId, String readerCacheKeyId) { - cleanupKeyToCountMap.computeIfAbsent(shardId, k -> new ConcurrentHashMap<>()) - .merge(readerCacheKeyId, 1, Integer::sum); + cleanupKeyToCountMap.computeIfAbsent(shardId, k -> new ConcurrentHashMap<>()).merge(readerCacheKeyId, 1, Integer::sum); } /** From 40f02692d064da0f9b913ad8d6385912c1354f18 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Mon, 17 Jun 2024 22:46:59 -0700 Subject: [PATCH 6/6] Update IndicesRequestCacheIT.java Signed-off-by: Kiran Prakash --- .../java/org/opensearch/indices/IndicesRequestCacheIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index 299652e4f07a9..0383aca2de33f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -1168,7 +1168,7 @@ public void testCacheCleanupAfterIndexDeletion() throws Exception { }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); } - // when staleness threshold is lower than staleness, it should clean the cache from all indices having stale keys + // when staleness threshold is lower than staleness, it should clean cache from all indices having stale keys public void testStaleKeysCleanupWithMultipleIndices() throws Exception { int cacheCleanIntervalInMillis = 10; String node = internalCluster().startNode(