From f531d4d4fa84af47ec0928e7ce0d451612bda437 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Wed, 24 Apr 2024 14:50:39 -0700 Subject: [PATCH] [Tiered Caching] Bug fix for IndicesRequestCache StaleKey management (#13070) * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash * Update CHANGELOG.md Signed-off-by: Kiran Prakash * revert Signed-off-by: Kiran Prakash * revert Signed-off-by: Kiran Prakash * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash * code comments only Signed-off-by: Kiran Prakash * docs changes Signed-off-by: Kiran Prakash * Update CHANGELOG.md Signed-off-by: Kiran Prakash * revert catching AlreadyClosedException Signed-off-by: Kiran Prakash * assert Signed-off-by: Kiran Prakash * conflicts Signed-off-by: Kiran Prakash * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash * address comments Signed-off-by: Kiran Prakash * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash * address conflicts Signed-off-by: Kiran Prakash * spotless apply Signed-off-by: Kiran Prakash * address comments Signed-off-by: Kiran Prakash * update code comments Signed-off-by: Kiran Prakash * address bug & add tests Signed-off-by: Kiran Prakash --------- Signed-off-by: Kiran Prakash --- CHANGELOG.md | 1 + .../indices/IndicesRequestCache.java | 95 +- .../indices/IndicesRequestCacheTests.java | 908 ++++++++---------- 3 files changed, 490 insertions(+), 514 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 159fc52cccc4c..94e29c4e99243 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -68,6 +68,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix from and size parameter can be negative when searching ([#13047](https://github.com/opensearch-project/OpenSearch/pull/13047)) - Enabled mockTelemetryPlugin for IT and fixed OOM issues ([#13054](https://github.com/opensearch-project/OpenSearch/pull/13054)) - Fix implement mark() and markSupported() in class FilterStreamInput ([#13098](https://github.com/opensearch-project/OpenSearch/pull/13098)) +- Fix IndicesRequestCache Stale calculation ([#13070](https://github.com/opensearch-project/OpenSearch/pull/13070)] - Fix snapshot _status API to return correct status for partial snapshots ([#12812](https://github.com/opensearch-project/OpenSearch/pull/12812)) - Improve the error messages for _stats with closed indices ([#13012](https://github.com/opensearch-project/OpenSearch/pull/13012)) - Ignore BaseRestHandler unconsumed content check as it's always consumed. ([#13290](https://github.com/opensearch-project/OpenSearch/pull/13290)) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 7c57c760ed1ee..2f2c2c60b6b30 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -47,6 +47,7 @@ import org.opensearch.common.cache.LoadAwareCacheLoader; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.RemovalReason; import org.opensearch.common.cache.policy.CachedQueryResult; import org.opensearch.common.cache.serializer.BytesReferenceSerializer; import org.opensearch.common.cache.service.CacheService; @@ -216,12 +217,11 @@ void clear(CacheEntity entity) { public void onRemoval(RemovalNotification, BytesReference> notification) { // In case this event happens for an old shard, we can safely ignore this as we don't keep track for old // shards as part of request cache. - + // Pass a new removal notification containing Key rather than ICacheKey to the CacheEntity for backwards compatibility. Key key = notification.getKey().key; cacheEntityLookup.apply(key.shardId).ifPresent(entity -> entity.onRemoval(notification)); - cacheCleanupManager.updateCleanupKeyToCountMapOnCacheEviction( - new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId) - ); + CleanupKey cleanupKey = new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId); + cacheCleanupManager.updateStaleCountOnEntryRemoval(cleanupKey, notification); } private ICacheKey getICacheKey(Key key) { @@ -265,10 +265,11 @@ BytesReference getOrCompute( OpenSearchDirectoryReader.addReaderCloseListener(reader, cleanupKey); } } - cacheCleanupManager.updateCleanupKeyToCountMapOnCacheInsertion(cleanupKey); + cacheCleanupManager.updateStaleCountOnCacheInsert(cleanupKey); } else { cacheEntity.onHit(); } + return value; } @@ -501,7 +502,7 @@ void enqueueCleanupKey(CleanupKey cleanupKey) { * * @param cleanupKey the CleanupKey to be updated in the map */ - private void updateCleanupKeyToCountMapOnCacheInsertion(CleanupKey cleanupKey) { + private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) { if (stalenessThreshold == 0.0 || cleanupKey.entity == null) { return; } @@ -517,8 +518,32 @@ private void updateCleanupKeyToCountMapOnCacheInsertion(CleanupKey cleanupKey) { cleanupKeyToCountMap.computeIfAbsent(shardId, k -> new HashMap<>()).merge(cleanupKey.readerCacheKeyId, 1, Integer::sum); } - private void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) { - if (stalenessThreshold == 0.0 || cleanupKey.entity == null) { + /** + * Handles the eviction of a cache entry. + * + *

This method is called when an entry is evicted from the cache. + * We consider all removal notifications except with the reason Replaced + * {@link #incrementStaleKeysCount} would have removed the entries from the map and increment the {@link #staleKeysCount} + * Hence we decrement {@link #staleKeysCount} if we do not find the shardId or readerCacheKeyId in the map. + * Skip decrementing staleKeysCount if we find the shardId or readerCacheKeyId in the map since it would have not been accounted for in the staleKeysCount in + * + * @param cleanupKey the CleanupKey that has been evicted from the cache + * @param notification RemovalNotification of the cache entry evicted + */ + private void updateStaleCountOnEntryRemoval( + CleanupKey cleanupKey, + RemovalNotification, BytesReference> notification + ) { + if (notification.getRemovalReason() == RemovalReason.REPLACED) { + // The reason of the notification is REPLACED when a cache entry's value is updated, since replacing an entry + // does not affect the staleness count, we skip such notifications. + return; + } + if (cleanupKey.entity == null) { + // entity will only be null when the shard is closed/deleted + // we would have accounted this in staleKeysCount when the closing/deletion of shard would have closed the associated + // readers + staleKeysCount.decrementAndGet(); return; } IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity(); @@ -528,15 +553,33 @@ private void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) { } ShardId shardId = indexShard.shardId(); - cleanupKeyToCountMap.computeIfPresent(shardId, (shard, keyCountMap) -> { - keyCountMap.computeIfPresent(cleanupKey.readerCacheKeyId, (key, currentValue) -> { - // decrement the stale key count + cleanupKeyToCountMap.compute(shardId, (key, readerCacheKeyMap) -> { + if (readerCacheKeyMap == null || !readerCacheKeyMap.containsKey(cleanupKey.readerCacheKeyId)) { + // If ShardId is not present or readerCacheKeyId is not present + // it should have already been accounted for and hence been removed from this map + // so decrement staleKeysCount staleKeysCount.decrementAndGet(); - int newValue = currentValue - 1; - // Remove the key if the new value is zero by returning null; otherwise, update with the new value. - return newValue == 0 ? null : newValue; - }); - return keyCountMap; + // Return the current map + return readerCacheKeyMap; + } else { + // If it is in the map, it is not stale yet. + // Proceed to adjust the count for the readerCacheKeyId in the map + // but do not decrement the staleKeysCount + Integer count = readerCacheKeyMap.get(cleanupKey.readerCacheKeyId); + // this should never be null + assert (count != null && count >= 0); + // Reduce the count by 1 + int newCount = count - 1; + if (newCount > 0) { + // Update the map with the new count + readerCacheKeyMap.put(cleanupKey.readerCacheKeyId, newCount); + } else { + // Remove the readerCacheKeyId entry if new count is zero + readerCacheKeyMap.remove(cleanupKey.readerCacheKeyId); + } + // If after modification, the readerCacheKeyMap is empty, we return null to remove the ShardId entry + return readerCacheKeyMap.isEmpty() ? null : readerCacheKeyMap; + } }); } @@ -544,7 +587,7 @@ private void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) { * Updates the count of stale keys in the cache. * This method is called when a CleanupKey is added to the keysToClean set. * - * It increments the staleKeysCount by the count of the CleanupKey in the cleanupKeyToCountMap. + *

It increments the staleKeysCount by the count of the CleanupKey in the cleanupKeyToCountMap. * If the CleanupKey's readerCacheKeyId is null or the CleanupKey's entity is not open, it increments the staleKeysCount * by the total count of keys associated with the CleanupKey's ShardId in the cleanupKeyToCountMap and removes the ShardId from the map. * @@ -562,7 +605,7 @@ private void incrementStaleKeysCount(CleanupKey cleanupKey) { ShardId shardId = indexShard.shardId(); // Using computeIfPresent to atomically operate on the countMap for a given shardId - cleanupKeyToCountMap.computeIfPresent(shardId, (key, countMap) -> { + cleanupKeyToCountMap.computeIfPresent(shardId, (currentShardId, countMap) -> { if (cleanupKey.readerCacheKeyId == null) { // Aggregate and add to staleKeysCount atomically if readerCacheKeyId is null int totalSum = countMap.values().stream().mapToInt(Integer::intValue).sum(); @@ -571,18 +614,19 @@ private void incrementStaleKeysCount(CleanupKey cleanupKey) { return null; } else { // Update staleKeysCount based on specific readerCacheKeyId, then remove it from the countMap - countMap.computeIfPresent(cleanupKey.readerCacheKeyId, (k, v) -> { - staleKeysCount.addAndGet(v); + countMap.computeIfPresent(cleanupKey.readerCacheKeyId, (readerCacheKey, count) -> { + staleKeysCount.addAndGet(count); // Return null to remove the key after updating staleKeysCount return null; }); - // Check if countMap is empty after removal to decide if we need to remove the shardId entry if (countMap.isEmpty()) { - return null; // Returning null removes the entry for shardId + // Returning null removes the entry for shardId + return null; } } - return countMap; // Return the modified countMap to keep the mapping + // Return the modified countMap to retain updates + return countMap; }); } @@ -708,6 +752,11 @@ public void close() { this.cacheCleaner.close(); } + // for testing + ConcurrentMap> getCleanupKeyToCountMap() { + return cleanupKeyToCountMap; + } + private final class IndicesRequestCacheCleaner implements Runnable, Releasable { private final IndicesRequestCacheCleanupManager cacheCleanupManager; diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index e3dca1b7bfda2..051acfe9d085a 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -38,7 +38,6 @@ import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.TermQuery; @@ -51,7 +50,6 @@ import org.opensearch.common.cache.RemovalNotification; import org.opensearch.common.cache.RemovalReason; import org.opensearch.common.cache.module.CacheModule; -import org.opensearch.common.cache.service.CacheService; import org.opensearch.common.cache.stats.ImmutableCacheStats; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; @@ -77,46 +75,58 @@ import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchSingleNodeTestCase; import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import static org.opensearch.indices.IndicesRequestCache.INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class IndicesRequestCacheTests extends OpenSearchSingleNodeTestCase { + private ThreadPool threadPool; + private IndexWriter writer; + private Directory dir; + private IndicesRequestCache cache; + private IndexShard indexShard; + private ThreadPool getThreadPool() { return new ThreadPool(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "default tracer tests").build()); } - public void testBasicOperationsCache() throws Exception { - IndexShard indexShard = createIndex("test").getShard(0); - ThreadPool threadPool = getThreadPool(); - IndicesRequestCache cache = new IndicesRequestCache( - Settings.EMPTY, - (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), - new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), - threadPool, - ClusterServiceUtils.createClusterService(threadPool) - ); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + @Before + public void setup() throws IOException { + dir = newDirectory(); + writer = new IndexWriter(dir, newIndexWriterConfig()); + indexShard = createIndex("test").getShard(0); + } + @After + public void cleanup() throws IOException { + IOUtils.close(writer, dir, cache); + terminate(threadPool); + } + + public void testBasicOperationsCache() throws Exception { + threadPool = getThreadPool(); + cache = getIndicesRequestCache(Settings.EMPTY); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + DirectoryReader reader = getReader(writer, indexShard.shardId()); // initial cache IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); - BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes); + BytesReference value = cache.getOrCompute(entity, loader, reader, getTermBytes()); assertEquals("foo", value.streamInput().readString()); ShardRequestCache requestCacheStats = indexShard.requestCache(); assertEquals(0, requestCacheStats.stats().getHitCount()); @@ -128,7 +138,7 @@ public void testBasicOperationsCache() throws Exception { // cache hit entity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(reader, 0); - value = cache.getOrCompute(entity, loader, reader, termBytes); + value = cache.getOrCompute(entity, loader, reader, getTermBytes()); assertEquals("foo", value.streamInput().readString()); requestCacheStats = indexShard.requestCache(); assertEquals(1, requestCacheStats.stats().getHitCount()); @@ -154,34 +164,21 @@ public void testBasicOperationsCache() throws Exception { assertEquals(0, cache.count()); assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt()); - IOUtils.close(reader, writer, dir, cache); - terminate(threadPool); + IOUtils.close(reader); assertEquals(0, cache.numRegisteredCloseListeners()); } public void testBasicOperationsCacheWithFeatureFlag() throws Exception { - IndexShard indexShard = createIndex("test").getShard(0); - CacheService cacheService = new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(); - ThreadPool threadPool = getThreadPool(); - IndicesRequestCache cache = new IndicesRequestCache( - Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.PLUGGABLE_CACHE, "true").build(), - (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), - cacheService, - threadPool, - ClusterServiceUtils.createClusterService(threadPool) - ); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); - + threadPool = getThreadPool(); + Settings settings = Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.PLUGGABLE_CACHE, "true").build(); + cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + DirectoryReader reader = getReader(writer, indexShard.shardId()); // initial cache IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); - BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes); + BytesReference value = cache.getOrCompute(entity, loader, reader, getTermBytes()); assertEquals("foo", value.streamInput().readString()); ShardRequestCache requestCacheStats = indexShard.requestCache(); assertEquals(0, requestCacheStats.stats().getHitCount()); @@ -193,7 +190,7 @@ public void testBasicOperationsCacheWithFeatureFlag() throws Exception { // cache hit entity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(reader, 0); - value = cache.getOrCompute(entity, loader, reader, termBytes); + value = cache.getOrCompute(entity, loader, reader, getTermBytes()); assertEquals("foo", value.streamInput().readString()); requestCacheStats = indexShard.requestCache(); assertEquals(1, requestCacheStats.stats().getHitCount()); @@ -219,47 +216,28 @@ public void testBasicOperationsCacheWithFeatureFlag() throws Exception { assertEquals(0, cache.count()); assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt()); - IOUtils.close(reader, writer, dir, cache); - terminate(threadPool); + IOUtils.close(reader); assertEquals(0, cache.numRegisteredCloseListeners()); } public void testCacheDifferentReaders() throws Exception { - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexShard indexShard = createIndex("test").getShard(0); - ThreadPool threadPool = getThreadPool(); - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, (shardId -> { - IndexService indexService = null; - try { - indexService = indicesService.indexServiceSafe(shardId.getIndex()); - } catch (IndexNotFoundException ex) { - return Optional.empty(); - } - return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), - new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), - threadPool, - ClusterServiceUtils.createClusterService(threadPool) - ); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); - + threadPool = getThreadPool(); + cache = getIndicesRequestCache(Settings.EMPTY); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + DirectoryReader reader = getReader(writer, indexShard.shardId()); + if (randomBoolean()) { writer.flush(); IOUtils.close(writer); writer = new IndexWriter(dir, newIndexWriterConfig()); } writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); - DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + DirectoryReader secondReader = getReader(writer, indexShard.shardId()); // initial cache IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); - BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes); + BytesReference value = cache.getOrCompute(entity, loader, reader, getTermBytes()); ShardRequestCache requestCacheStats = entity.stats(); assertEquals("foo", value.streamInput().readString()); assertEquals(0, requestCacheStats.stats().getHitCount()); @@ -274,7 +252,7 @@ public void testCacheDifferentReaders() throws Exception { // cache the second IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(secondReader, 0); - value = cache.getOrCompute(entity, loader, secondReader, termBytes); + value = cache.getOrCompute(entity, loader, secondReader, getTermBytes()); requestCacheStats = entity.stats(); assertEquals("bar", value.streamInput().readString()); assertEquals(0, requestCacheStats.stats().getHitCount()); @@ -287,7 +265,7 @@ public void testCacheDifferentReaders() throws Exception { secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(secondReader, 0); - value = cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + value = cache.getOrCompute(secondEntity, loader, secondReader, getTermBytes()); requestCacheStats = entity.stats(); assertEquals("bar", value.streamInput().readString()); assertEquals(1, requestCacheStats.stats().getHitCount()); @@ -298,7 +276,7 @@ public void testCacheDifferentReaders() throws Exception { entity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(reader, 0); - value = cache.getOrCompute(entity, loader, reader, termBytes); + value = cache.getOrCompute(entity, loader, reader, getTermBytes()); assertEquals("foo", value.streamInput().readString()); requestCacheStats = entity.stats(); assertEquals(2, requestCacheStats.stats().getHitCount()); @@ -331,8 +309,7 @@ public void testCacheDifferentReaders() throws Exception { assertEquals(0, cache.count()); assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt()); - IOUtils.close(secondReader, writer, dir, cache); - terminate(threadPool); + IOUtils.close(secondReader); assertEquals(0, cache.numRegisteredCloseListeners()); } @@ -359,55 +336,20 @@ public void testCacheCleanupThresholdSettingValidator_Invalid_Percentage() { assertThrows(IllegalArgumentException.class, () -> { IndicesRequestCache.validateStalenessSetting("500%"); }); } + // when staleness threshold is zero, stale keys should be cleaned up every time cache cleaner is invoked. public void testCacheCleanupBasedOnZeroThreshold() throws Exception { - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexShard indexShard = createIndex("test").getShard(0); - ThreadPool threadPool = getThreadPool(); + threadPool = getThreadPool(); Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0%").build(); - IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> { - IndexService indexService = null; - try { - indexService = indicesService.indexServiceSafe(shardId.getIndex()); - } catch (IndexNotFoundException ex) { - return Optional.empty(); - } - return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), - new CacheModule(new ArrayList<>(), settings).getCacheService(), - threadPool, - ClusterServiceUtils.createClusterService(threadPool) - ); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); - + cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - if (randomBoolean()) { - writer.flush(); - IOUtils.close(writer); - writer = new IndexWriter(dir, newIndexWriterConfig()); - } - writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); - DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + DirectoryReader reader = getReader(writer, indexShard.shardId()); + DirectoryReader secondReader = getReader(writer, indexShard.shardId()); // Get 2 entries into the cache - IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader loader = new Loader(reader, 0); - cache.getOrCompute(entity, loader, reader, termBytes); - - entity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(reader, 0); - cache.getOrCompute(entity, loader, reader, termBytes); - - IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(secondReader, 0); - cache.getOrCompute(entity, loader, secondReader, termBytes); + cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); + assertEquals(1, cache.count()); - secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(secondReader, 0); - cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); assertEquals(2, cache.count()); // Close the reader, to be enqueued for cleanup @@ -419,209 +361,148 @@ public void testCacheCleanupBasedOnZeroThreshold() throws Exception { cache.cacheCleanupManager.cleanCache(); // cleanup should remove the stale-key assertEquals(1, cache.count()); - - IOUtils.close(secondReader, writer, dir, cache); - terminate(threadPool); + IOUtils.close(secondReader); } - public void testCacheCleanupBasedOnStaleThreshold_StalenessEqualToThreshold() throws Exception { - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexShard indexShard = createIndex("test").getShard(0); - ThreadPool threadPool = getThreadPool(); - Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.5").build(); - IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> { - IndexService indexService = null; - try { - indexService = indicesService.indexServiceSafe(shardId.getIndex()); - } catch (IndexNotFoundException ex) { - return Optional.empty(); - } - return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), - new CacheModule(new ArrayList<>(), settings).getCacheService(), - threadPool, - ClusterServiceUtils.createClusterService(threadPool) - ); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + // when staleness count is higher than stale threshold, stale keys should be cleaned up. + public void testCacheCleanupBasedOnStaleThreshold_StalenessHigherThanThreshold() throws Exception { + threadPool = getThreadPool(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.49").build(); + cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - if (randomBoolean()) { - writer.flush(); - IOUtils.close(writer); - writer = new IndexWriter(dir, newIndexWriterConfig()); - } - writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); - DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + DirectoryReader reader = getReader(writer, indexShard.shardId()); + DirectoryReader secondReader = getReader(writer, indexShard.shardId()); // Get 2 entries into the cache - IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader loader = new Loader(reader, 0); - cache.getOrCompute(entity, loader, reader, termBytes); + cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); + assertEquals(1, cache.count()); - entity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(reader, 0); - cache.getOrCompute(entity, loader, reader, termBytes); + cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); + assertEquals(2, cache.count()); - IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(secondReader, 0); - cache.getOrCompute(entity, loader, secondReader, termBytes); + // no stale keys so far + assertEquals(0, cache.cacheCleanupManager.getStaleKeysCount().get()); + // Close the reader, to be enqueued for cleanup + reader.close(); + // 1 out of 2 keys ie 50% are now stale. + assertEquals(1, cache.cacheCleanupManager.getStaleKeysCount().get()); + // cache count should not be affected + assertEquals(2, cache.count()); - secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(secondReader, 0); - cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + // clean cache with 49% staleness threshold + cache.cacheCleanupManager.cleanCache(); + // cleanup should have taken effect with 49% threshold + assertEquals(1, cache.count()); + assertEquals(0, cache.cacheCleanupManager.getStaleKeysCount().get()); + + IOUtils.close(secondReader); + } + + // when staleness count equal to stale threshold, stale keys should be cleaned up. + public void testCacheCleanupBasedOnStaleThreshold_StalenessEqualToThreshold() throws Exception { + threadPool = getThreadPool(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.5").build(); + cache = getIndicesRequestCache(settings); + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = getReader(writer, indexShard.shardId()); + DirectoryReader secondReader = getReader(writer, indexShard.shardId()); + + // Get 2 entries into the cache + cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); + assertEquals(1, cache.count()); + + cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); assertEquals(2, cache.count()); // Close the reader, to be enqueued for cleanup - // 1 out of 2 keys ie 50% are now stale. reader.close(); + // 1 out of 2 keys ie 50% are now stale. + assertEquals(1, cache.cacheCleanupManager.getStaleKeysCount().get()); // cache count should not be affected assertEquals(2, cache.count()); // clean cache with 50% staleness threshold cache.cacheCleanupManager.cleanCache(); // cleanup should have taken effect + assertEquals(0, cache.cacheCleanupManager.getStaleKeysCount().get()); assertEquals(1, cache.count()); - IOUtils.close(secondReader, writer, dir, cache); - terminate(threadPool); + IOUtils.close(secondReader); } + // when a cache entry that is Stale is evicted for any reason, we have to deduct the count from our staleness count public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount() throws Exception { - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexShard indexShard = createIndex("test").getShard(0); - ThreadPool threadPool = getThreadPool(); + threadPool = getThreadPool(); Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); - IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> { - IndexService indexService = null; - try { - indexService = indicesService.indexServiceSafe(shardId.getIndex()); - } catch (IndexNotFoundException ex) { - return Optional.empty(); - } - return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), - new CacheModule(new ArrayList<>(), settings).getCacheService(), - threadPool, - ClusterServiceUtils.createClusterService(threadPool) - ); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); - + cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - if (randomBoolean()) { - writer.flush(); - IOUtils.close(writer); - writer = new IndexWriter(dir, newIndexWriterConfig()); - } - writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); - DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - - // Get 2 entries into the cache - IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader loader = new Loader(reader, 0); - cache.getOrCompute(entity, loader, reader, termBytes); - - entity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(reader, 0); - cache.getOrCompute(entity, loader, reader, termBytes); + ShardId shardId = indexShard.shardId(); + DirectoryReader reader = getReader(writer, indexShard.shardId()); + DirectoryReader secondReader = getReader(writer, indexShard.shardId()); - IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(secondReader, 0); - cache.getOrCompute(entity, loader, secondReader, termBytes); + // Get 2 entries into the cache from 2 different readers + cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); + assertEquals(1, cache.count()); - secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(secondReader, 0); - cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); assertEquals(2, cache.count()); - // Close the reader, to be enqueued for cleanup + // assert no stale keys are accounted so far + assertEquals(0, cache.cacheCleanupManager.getStaleKeysCount().get()); + // Close the reader, this should create a stale key reader.close(); - AtomicInteger staleKeysCount = cache.cacheCleanupManager.getStaleKeysCount(); // 1 out of 2 keys ie 50% are now stale. - assertEquals(1, staleKeysCount.get()); + assertEquals(1, cache.cacheCleanupManager.getStaleKeysCount().get()); // cache count should not be affected assertEquals(2, cache.count()); - OpenSearchDirectoryReader.DelegatingCacheHelper delegatingCacheHelper = - (OpenSearchDirectoryReader.DelegatingCacheHelper) secondReader.getReaderCacheHelper(); - String readerCacheKeyId = delegatingCacheHelper.getDelegatingCacheKey().getId(); - IndicesRequestCache.Key key = new IndicesRequestCache.Key( - ((IndexShard) secondEntity.getCacheIdentity()).shardId(), - termBytes, - readerCacheKeyId - ); + IndicesRequestCache.Key key = new IndicesRequestCache.Key(indexShard.shardId(), getTermBytes(), getReaderCacheKeyId(reader)); + // test the mapping + ConcurrentMap> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap(); + // shard id should exist + assertTrue(cleanupKeyToCountMap.containsKey(shardId)); + // reader CacheKeyId should NOT exist + assertFalse(cleanupKeyToCountMap.get(shardId).containsKey(getReaderCacheKeyId(reader))); + // secondReader CacheKeyId should exist + assertTrue(cleanupKeyToCountMap.get(shardId).containsKey(getReaderCacheKeyId(secondReader))); cache.onRemoval( new RemovalNotification, BytesReference>( new ICacheKey<>(key), - termBytes, + getTermBytes(), RemovalReason.EVICTED ) ); - staleKeysCount = cache.cacheCleanupManager.getStaleKeysCount(); + + // test the mapping, it should stay the same + // shard id should exist + assertTrue(cleanupKeyToCountMap.containsKey(shardId)); + // reader CacheKeyId should NOT exist + assertFalse(cleanupKeyToCountMap.get(shardId).containsKey(getReaderCacheKeyId(reader))); + // secondReader CacheKeyId should exist + assertTrue(cleanupKeyToCountMap.get(shardId).containsKey(getReaderCacheKeyId(secondReader))); // eviction of previous stale key from the cache should decrement staleKeysCount in iRC - assertEquals(0, staleKeysCount.get()); + assertEquals(0, cache.cacheCleanupManager.getStaleKeysCount().get()); - IOUtils.close(secondReader, writer, dir, cache); - terminate(threadPool); + IOUtils.close(secondReader); } - public void testStaleCount_OnRemovalNotificationOfStaleKey_DoesNotDecrementsStaleCount() throws Exception { - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexShard indexShard = createIndex("test").getShard(0); - ThreadPool threadPool = getThreadPool(); + // when a cache entry that is NOT Stale is evicted for any reason, staleness count should NOT be deducted + public void testStaleCount_OnRemovalNotificationOfNonStaleKey_DoesNotDecrementsStaleCount() throws Exception { + threadPool = getThreadPool(); Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); - IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> { - IndexService indexService = null; - try { - indexService = indicesService.indexServiceSafe(shardId.getIndex()); - } catch (IndexNotFoundException ex) { - return Optional.empty(); - } - return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), - new CacheModule(new ArrayList<>(), settings).getCacheService(), - threadPool, - ClusterServiceUtils.createClusterService(threadPool) - ); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); - + cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - if (randomBoolean()) { - writer.flush(); - IOUtils.close(writer); - writer = new IndexWriter(dir, newIndexWriterConfig()); - } - writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); - DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + ShardId shardId = indexShard.shardId(); + DirectoryReader reader = getReader(writer, indexShard.shardId()); + DirectoryReader secondReader = getReader(writer, indexShard.shardId()); // Get 2 entries into the cache - IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader loader = new Loader(reader, 0); - cache.getOrCompute(entity, loader, reader, termBytes); - - entity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(reader, 0); - cache.getOrCompute(entity, loader, reader, termBytes); - - IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(secondReader, 0); - cache.getOrCompute(entity, loader, secondReader, termBytes); + cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); + assertEquals(1, cache.count()); - secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(secondReader, 0); - cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); assertEquals(2, cache.count()); // Close the reader, to be enqueued for cleanup @@ -632,102 +513,251 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DoesNotDecrementsStal // cache count should not be affected assertEquals(2, cache.count()); - OpenSearchDirectoryReader.DelegatingCacheHelper delegatingCacheHelper = (OpenSearchDirectoryReader.DelegatingCacheHelper) reader - .getReaderCacheHelper(); - String readerCacheKeyId = delegatingCacheHelper.getDelegatingCacheKey().getId(); - IndicesRequestCache.Key key = new IndicesRequestCache.Key( - ((IndexShard) secondEntity.getCacheIdentity()).shardId(), - termBytes, - readerCacheKeyId - ); + // evict entry from second reader (this reader is not closed) + IndicesRequestCache.Key key = new IndicesRequestCache.Key(indexShard.shardId(), getTermBytes(), getReaderCacheKeyId(secondReader)); + + // test the mapping + ConcurrentMap> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap(); + // shard id should exist + assertTrue(cleanupKeyToCountMap.containsKey(shardId)); + // reader CacheKeyId should NOT exist + assertFalse(cleanupKeyToCountMap.get(shardId).containsKey(getReaderCacheKeyId(reader))); + // secondReader CacheKeyId should exist + assertTrue(cleanupKeyToCountMap.get(shardId).containsKey(getReaderCacheKeyId(secondReader))); cache.onRemoval( new RemovalNotification, BytesReference>( new ICacheKey<>(key), - termBytes, + getTermBytes(), RemovalReason.EVICTED ) ); + + // test the mapping, shardId entry should be cleaned up + // shard id should NOT exist + assertFalse(cleanupKeyToCountMap.containsKey(shardId)); + staleKeysCount = cache.cacheCleanupManager.getStaleKeysCount(); // eviction of NON-stale key from the cache should NOT decrement staleKeysCount in iRC assertEquals(1, staleKeysCount.get()); - IOUtils.close(secondReader, writer, dir, cache); - terminate(threadPool); + IOUtils.close(secondReader); } - public void testCacheCleanupBasedOnStaleThreshold_StalenessGreaterThanThreshold() throws Exception { - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexShard indexShard = createIndex("test").getShard(0); - ThreadPool threadPool = getThreadPool(); - Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.49").build(); - IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> { - IndexService indexService = null; - try { - indexService = indicesService.indexServiceSafe(shardId.getIndex()); - } catch (IndexNotFoundException ex) { - return Optional.empty(); - } - return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), - new CacheModule(new ArrayList<>(), settings).getCacheService(), - threadPool, - ClusterServiceUtils.createClusterService(threadPool) + // when a cache entry that is NOT Stale is evicted WITHOUT its reader closing, we should NOT deduct it from staleness count + public void testStaleCount_WithoutReaderClosing_DecrementsStaleCount() throws Exception { + threadPool = getThreadPool(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); + cache = getIndicesRequestCache(settings); + + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = getReader(writer, indexShard.shardId()); + DirectoryReader secondReader = getReader(writer, indexShard.shardId()); + + // Get 2 entries into the cache from 2 different readers + cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); + assertEquals(1, cache.count()); + + cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); + assertEquals(2, cache.count()); + + // no keys are stale + assertEquals(0, cache.cacheCleanupManager.getStaleKeysCount().get()); + // create notification for removal of non-stale entry + IndicesRequestCache.Key key = new IndicesRequestCache.Key(indexShard.shardId(), getTermBytes(), getReaderCacheKeyId(reader)); + cache.onRemoval( + new RemovalNotification, BytesReference>( + new ICacheKey<>(key), + getTermBytes(), + RemovalReason.EVICTED + ) ); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + // stale keys count should stay zero + assertEquals(0, cache.cacheCleanupManager.getStaleKeysCount().get()); + + IOUtils.close(reader, secondReader); + } + + // test staleness count based on removal notifications + public void testStaleCount_OnRemovalNotifications() throws Exception { + threadPool = getThreadPool(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); + cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - if (randomBoolean()) { - writer.flush(); - IOUtils.close(writer); - writer = new IndexWriter(dir, newIndexWriterConfig()); + DirectoryReader reader = getReader(writer, indexShard.shardId()); + + // Get 5 entries into the cache + int totalKeys = 5; + IndicesService.IndexShardCacheEntity entity = null; + TermQueryBuilder termQuery = null; + BytesReference termBytes = null; + for (int i = 1; i <= totalKeys; i++) { + termQuery = new TermQueryBuilder("id", "" + i); + termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + entity = new IndicesService.IndexShardCacheEntity(indexShard); + Loader loader = new Loader(reader, 0); + cache.getOrCompute(entity, loader, reader, termBytes); + assertEquals(i, cache.count()); } - writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); - DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + // no keys are stale yet + assertEquals(0, cache.cacheCleanupManager.getStaleKeysCount().get()); + // closing the reader should make all keys stale + reader.close(); + assertEquals(totalKeys, cache.cacheCleanupManager.getStaleKeysCount().get()); - // Get 2 entries into the cache - IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader loader = new Loader(reader, 0); - cache.getOrCompute(entity, loader, reader, termBytes); + String readerCacheKeyId = getReaderCacheKeyId(reader); + IndicesRequestCache.Key key = new IndicesRequestCache.Key( + ((IndexShard) entity.getCacheIdentity()).shardId(), + termBytes, + readerCacheKeyId + ); - entity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(reader, 0); - cache.getOrCompute(entity, loader, reader, termBytes); + int staleCount = cache.cacheCleanupManager.getStaleKeysCount().get(); + // Notification for Replaced should not deduct the staleCount + cache.onRemoval( + new RemovalNotification, BytesReference>( + new ICacheKey<>(key), + getTermBytes(), + RemovalReason.REPLACED + ) + ); + // stale keys count should stay the same + assertEquals(staleCount, cache.cacheCleanupManager.getStaleKeysCount().get()); + + // Notification for all but Replaced should deduct the staleCount + RemovalReason[] reasons = { RemovalReason.INVALIDATED, RemovalReason.EVICTED, RemovalReason.EXPLICIT, RemovalReason.CAPACITY }; + for (RemovalReason reason : reasons) { + cache.onRemoval( + new RemovalNotification, BytesReference>(new ICacheKey<>(key), getTermBytes(), reason) + ); + assertEquals(--staleCount, cache.cacheCleanupManager.getStaleKeysCount().get()); + } + } - IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(secondReader, 0); - cache.getOrCompute(entity, loader, secondReader, termBytes); + // when staleness count less than the stale threshold, stale keys should NOT be cleaned up. + public void testCacheCleanupBasedOnStaleThreshold_StalenessLesserThanThreshold() throws Exception { + threadPool = getThreadPool(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build(); + cache = getIndicesRequestCache(settings); - secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(secondReader, 0); - cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = getReader(writer, indexShard.shardId()); + DirectoryReader secondReader = getReader(writer, indexShard.shardId()); + + // Get 2 entries into the cache + cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); + assertEquals(1, cache.count()); + + cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); assertEquals(2, cache.count()); // Close the reader, to be enqueued for cleanup - // 1 out of 2 keys ie 50% are now stale. reader.close(); + // 1 out of 2 keys ie 50% are now stale. + assertEquals(1, cache.cacheCleanupManager.getStaleKeysCount().get()); // cache count should not be affected assertEquals(2, cache.count()); - // clean cache with 49% staleness threshold + // clean cache with 51% staleness threshold cache.cacheCleanupManager.cleanCache(); - // cleanup should have taken effect with 49% threshold + // cleanup should have been ignored + assertEquals(1, cache.cacheCleanupManager.getStaleKeysCount().get()); + assertEquals(2, cache.count()); + + IOUtils.close(secondReader); + } + + // test the cleanupKeyToCountMap are set appropriately when both readers are closed + public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception { + threadPool = getThreadPool(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); + cache = getIndicesRequestCache(settings); + + writer.addDocument(newDoc(0, "foo")); + ShardId shardId = indexShard.shardId(); + DirectoryReader reader = getReader(writer, shardId); + DirectoryReader secondReader = getReader(writer, shardId); + + // Get 2 entries into the cache from 2 different readers + cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); assertEquals(1, cache.count()); + // test the mappings + ConcurrentMap> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap(); + assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(reader))); - IOUtils.close(secondReader, writer, dir, cache); - terminate(threadPool); + cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); + // test the mapping + assertEquals(2, cache.count()); + assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(secondReader))); + // create another entry for the second reader + cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes("id", "1")); + // test the mapping + assertEquals(3, cache.count()); + assertEquals(2, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(secondReader))); + + // Close the reader, to create stale entries + reader.close(); + // cache count should not be affected + assertEquals(3, cache.count()); + // test the mapping, first reader's entry should be removed from the mapping and accounted for in the staleKeysCount + assertFalse(cleanupKeyToCountMap.get(shardId).containsKey(getReaderCacheKeyId(reader))); + assertEquals(1, cache.cacheCleanupManager.getStaleKeysCount().get()); + // second reader's mapping should not be affected + assertEquals(2, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(secondReader))); + // send removal notification for first reader + IndicesRequestCache.Key key = new IndicesRequestCache.Key(indexShard.shardId(), getTermBytes(), getReaderCacheKeyId(reader)); + cache.onRemoval( + new RemovalNotification, BytesReference>( + new ICacheKey<>(key), + getTermBytes(), + RemovalReason.EVICTED + ) + ); + // test the mapping, it should stay the same + assertFalse(cleanupKeyToCountMap.get(shardId).containsKey(getReaderCacheKeyId(reader))); + // staleKeysCount should be decremented + assertEquals(0, cache.cacheCleanupManager.getStaleKeysCount().get()); + // second reader's mapping should not be affected + assertEquals(2, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(secondReader))); + + // Without closing the secondReader send removal notification of one of its key + key = new IndicesRequestCache.Key(indexShard.shardId(), getTermBytes(), getReaderCacheKeyId(secondReader)); + cache.onRemoval( + new RemovalNotification, BytesReference>( + new ICacheKey<>(key), + getTermBytes(), + RemovalReason.EVICTED + ) + ); + // staleKeysCount should be the same as before + assertEquals(0, cache.cacheCleanupManager.getStaleKeysCount().get()); + // secondReader's readerCacheKeyId count should be decremented by 1 + assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(secondReader))); + // Without closing the secondReader send removal notification of its last key + key = new IndicesRequestCache.Key(indexShard.shardId(), getTermBytes(), getReaderCacheKeyId(secondReader)); + cache.onRemoval( + new RemovalNotification, BytesReference>( + new ICacheKey<>(key), + getTermBytes(), + RemovalReason.EVICTED + ) + ); + // staleKeysCount should be the same as before + assertEquals(0, cache.cacheCleanupManager.getStaleKeysCount().get()); + // since all the readers of this shard is closed, the cleanupKeyToCountMap should have no entries + assertEquals(0, cleanupKeyToCountMap.size()); + + IOUtils.close(secondReader); } - public void testCacheCleanupBasedOnStaleThreshold_StalenessLesserThanThreshold() throws Exception { + private DirectoryReader getReader(IndexWriter writer, ShardId shardId) throws IOException { + return OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); + } + + private IndicesRequestCache getIndicesRequestCache(Settings settings) { IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexShard indexShard = createIndex("test").getShard(0); - ThreadPool threadPool = getThreadPool(); - Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build(); - IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> { + return new IndicesRequestCache(settings, (shardId -> { IndexService indexService = null; try { indexService = indicesService.indexServiceSafe(shardId.getIndex()); @@ -740,52 +770,30 @@ public void testCacheCleanupBasedOnStaleThreshold_StalenessLesserThanThreshold() threadPool, ClusterServiceUtils.createClusterService(threadPool) ); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); - - writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - if (randomBoolean()) { - writer.flush(); - IOUtils.close(writer); - writer = new IndexWriter(dir, newIndexWriterConfig()); - } - writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); - DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - - // Get 2 entries into the cache - IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader loader = new Loader(reader, 0); - cache.getOrCompute(entity, loader, reader, termBytes); - - entity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(reader, 0); - cache.getOrCompute(entity, loader, reader, termBytes); + } - IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(secondReader, 0); - cache.getOrCompute(entity, loader, secondReader, termBytes); + private Loader getLoader(DirectoryReader reader) { + return new Loader(reader, 0); + } - secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(secondReader, 0); - cache.getOrCompute(secondEntity, loader, secondReader, termBytes); - assertEquals(2, cache.count()); + private IndicesService.IndexShardCacheEntity getEntity(IndexShard indexShard) { + return new IndicesService.IndexShardCacheEntity(indexShard); + } - // Close the reader, to be enqueued for cleanup - // 1 out of 2 keys ie 50% are now stale. - reader.close(); - // cache count should not be affected - assertEquals(2, cache.count()); + private BytesReference getTermBytes() throws IOException { + TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); + return XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + } - // clean cache with 51% staleness threshold - cache.cacheCleanupManager.cleanCache(); - // cleanup should have been ignored - assertEquals(2, cache.count()); + private BytesReference getTermBytes(String fieldName, String value) throws IOException { + TermQueryBuilder termQuery = new TermQueryBuilder(fieldName, value); + return XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + } - IOUtils.close(secondReader, writer, dir, cache); - terminate(threadPool); + private String getReaderCacheKeyId(DirectoryReader reader) { + OpenSearchDirectoryReader.DelegatingCacheHelper delegatingCacheHelper = (OpenSearchDirectoryReader.DelegatingCacheHelper) reader + .getReaderCacheHelper(); + return delegatingCacheHelper.getDelegatingCacheKey().getId(); } public void testClosingIndexWipesStats() throws Exception { @@ -795,6 +803,8 @@ public void testClosingIndexWipesStats() throws Exception { Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards).build(); String indexToKeepName = "test"; String indexToCloseName = "test2"; + // delete all indices if already + assertAcked(client().admin().indices().prepareDelete("_all").get()); IndexService indexToKeep = createIndex(indexToKeepName, indexSettings); IndexService indexToClose = createIndex(indexToCloseName, indexSettings); for (int i = 0; i < numShards; i++) { @@ -802,9 +812,9 @@ public void testClosingIndexWipesStats() throws Exception { assertNotNull(indexToKeep.getShard(i)); assertNotNull(indexToClose.getShard(i)); } - ThreadPool threadPool = getThreadPool(); + threadPool = getThreadPool(); Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.001%").build(); - IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> { + cache = new IndicesRequestCache(settings, (shardId -> { IndexService indexService = null; try { indexService = indicesService.indexServiceSafe(shardId.getIndex()); @@ -821,8 +831,6 @@ public void testClosingIndexWipesStats() throws Exception { threadPool, ClusterServiceUtils.createClusterService(threadPool) ); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); writer.addDocument(newDoc(0, "foo")); TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); @@ -895,47 +903,27 @@ public void testClosingIndexWipesStats() throws Exception { for (DirectoryReader reader : readersToKeep) { IOUtils.close(reader); } - IOUtils.close(secondReader, writer, dir, cache); - terminate(threadPool); + IOUtils.close(secondReader); } public void testEviction() throws Exception { final ByteSizeValue size; { - IndexShard indexShard = createIndex("test").getShard(0); - ThreadPool threadPool = getThreadPool(); - IndicesRequestCache cache = new IndicesRequestCache( - Settings.EMPTY, - (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), - new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), - threadPool, - ClusterServiceUtils.createClusterService(threadPool) - ); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); - + threadPool = getThreadPool(); + cache = getIndicesRequestCache(Settings.EMPTY); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader loader = new Loader(reader, 0); - + DirectoryReader reader = getReader(writer, indexShard.shardId()); writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader secondLoader = new Loader(secondReader, 0); - BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes); + BytesReference value1 = cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); assertEquals("foo", value1.streamInput().readString()); - BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes); + BytesReference value2 = cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); assertEquals("bar", value2.streamInput().readString()); size = new ByteSizeValue(cache.getSizeInBytes()); IOUtils.close(reader, secondReader, writer, dir, cache); - terminate(threadPool); } - IndexShard indexShard = createIndex("test1").getShard(0); - ThreadPool threadPool = getThreadPool(); + indexShard = createIndex("test1").getShard(0); IndicesRequestCache cache = new IndicesRequestCache( // Add 5 instead of 1; the key size now depends on the length of dimension names and values so there's more variation Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), size.getBytes() + 5 + "b").build(), @@ -944,83 +932,52 @@ public void testEviction() throws Exception { threadPool, ClusterServiceUtils.createClusterService(threadPool) ); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); - + dir = newDirectory(); + writer = new IndexWriter(dir, newIndexWriterConfig()); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader loader = new Loader(reader, 0); - + DirectoryReader reader = getReader(writer, indexShard.shardId()); writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); - DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader secondLoader = new Loader(secondReader, 0); - + DirectoryReader secondReader = getReader(writer, indexShard.shardId()); writer.updateDocument(new Term("id", "0"), newDoc(0, "baz")); DirectoryReader thirdReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - IndicesService.IndexShardCacheEntity thirddEntity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader thirdLoader = new Loader(thirdReader, 0); - BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes); + BytesReference value1 = cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); assertEquals("foo", value1.streamInput().readString()); - BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes); + BytesReference value2 = cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); assertEquals("bar", value2.streamInput().readString()); logger.info("Memory size: {}", indexShard.requestCache().stats().getMemorySize()); - BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes); + BytesReference value3 = cache.getOrCompute(getEntity(indexShard), getLoader(thirdReader), thirdReader, getTermBytes()); assertEquals("baz", value3.streamInput().readString()); assertEquals(2, cache.count()); assertEquals(1, indexShard.requestCache().stats().getEvictions()); - IOUtils.close(reader, secondReader, thirdReader, writer, dir, cache); - terminate(threadPool); + IOUtils.close(reader, secondReader, thirdReader); } public void testClearAllEntityIdentity() throws Exception { - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexShard indexShard = createIndex("test").getShard(0); - ThreadPool threadPool = getThreadPool(); - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, (shardId -> { - IndexService indexService = null; - try { - indexService = indicesService.indexServiceSafe(shardId.getIndex()); - } catch (IndexNotFoundException ex) { - return Optional.empty(); - } - return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), - new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), - threadPool, - ClusterServiceUtils.createClusterService(threadPool) - ); - - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); - + threadPool = getThreadPool(); + cache = getIndicesRequestCache(Settings.EMPTY); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + DirectoryReader reader = getReader(writer, indexShard.shardId()); IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); - DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + DirectoryReader secondReader = getReader(writer, indexShard.shardId()); IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); Loader secondLoader = new Loader(secondReader, 0); writer.updateDocument(new Term("id", "0"), newDoc(0, "baz")); - DirectoryReader thirdReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + DirectoryReader thirdReader = getReader(writer, indexShard.shardId()); + ; IndicesService.IndexShardCacheEntity thirddEntity = new IndicesService.IndexShardCacheEntity(createIndex("test1").getShard(0)); Loader thirdLoader = new Loader(thirdReader, 0); - BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes); + BytesReference value1 = cache.getOrCompute(entity, loader, reader, getTermBytes()); assertEquals("foo", value1.streamInput().readString()); - BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes); + BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, getTermBytes()); assertEquals("bar", value2.streamInput().readString()); logger.info("Memory size: {}", indexShard.requestCache().stats().getMemorySize()); - BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes); + BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, getTermBytes()); assertEquals("baz", value3.streamInput().readString()); assertEquals(3, cache.count()); RequestCacheStats requestCacheStats = entity.stats().stats(); @@ -1031,14 +988,13 @@ public void testClearAllEntityIdentity() throws Exception { cache.cacheCleanupManager.cleanCache(); assertEquals(1, cache.count()); // third has not been validated since it's a different identity - value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes); + value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, getTermBytes()); requestCacheStats = entity.stats().stats(); requestCacheStats.add(thirddEntity.stats().stats()); assertEquals(hitCount + 1, requestCacheStats.getHitCount()); assertEquals("baz", value3.streamInput().readString()); - IOUtils.close(reader, secondReader, thirdReader, writer, dir, cache); - terminate(threadPool); + IOUtils.close(reader, secondReader, thirdReader); } public Iterable newDoc(int id, String value) { @@ -1050,7 +1006,7 @@ public Iterable newDoc(int id, String value) { private static class Loader implements CheckedSupplier { - private final DirectoryReader reader; + final DirectoryReader reader; private final int id; public boolean loadedFromCache = true; @@ -1074,38 +1030,18 @@ public BytesReference get() { throw new RuntimeException(e); } } - } public void testInvalidate() throws Exception { - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexShard indexShard = createIndex("test").getShard(0); - ThreadPool threadPool = getThreadPool(); - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, (shardId -> { - IndexService indexService = null; - try { - indexService = indicesService.indexServiceSafe(shardId.getIndex()); - } catch (IndexNotFoundException ex) { - return Optional.empty(); - } - return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), - new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), - threadPool, - ClusterServiceUtils.createClusterService(threadPool) - ); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); - + threadPool = getThreadPool(); + IndicesRequestCache cache = getIndicesRequestCache(Settings.EMPTY); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + DirectoryReader reader = getReader(writer, indexShard.shardId()); // initial cache IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); - BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes); + BytesReference value = cache.getOrCompute(entity, loader, reader, getTermBytes()); assertEquals("foo", value.streamInput().readString()); ShardRequestCache requestCacheStats = entity.stats(); assertEquals(0, requestCacheStats.stats().getHitCount()); @@ -1117,7 +1053,7 @@ public void testInvalidate() throws Exception { // cache hit entity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(reader, 0); - value = cache.getOrCompute(entity, loader, reader, termBytes); + value = cache.getOrCompute(entity, loader, reader, getTermBytes()); assertEquals("foo", value.streamInput().readString()); requestCacheStats = entity.stats(); assertEquals(1, requestCacheStats.stats().getHitCount()); @@ -1131,8 +1067,8 @@ public void testInvalidate() throws Exception { // load again after invalidate entity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(reader, 0); - cache.invalidate(entity, reader, termBytes); - value = cache.getOrCompute(entity, loader, reader, termBytes); + cache.invalidate(entity, reader, getTermBytes()); + value = cache.getOrCompute(entity, loader, reader, getTermBytes()); assertEquals("foo", value.streamInput().readString()); requestCacheStats = entity.stats(); assertEquals(1, requestCacheStats.stats().getHitCount()); @@ -1157,16 +1093,11 @@ public void testInvalidate() throws Exception { assertEquals(0, cache.count()); assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt()); - IOUtils.close(reader, writer, dir, cache); - terminate(threadPool); + IOUtils.close(reader); assertEquals(0, cache.numRegisteredCloseListeners()); } public void testEqualsKey() throws IOException { - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - Directory dir = newDirectory(); - IndexWriterConfig config = newIndexWriterConfig(); - IndexWriter writer = new IndexWriter(dir, config); ShardId shardId = new ShardId("foo", "bar", 1); ShardId shardId1 = new ShardId("foo1", "bar1", 2); IndexReader reader1 = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); @@ -1193,13 +1124,9 @@ public void testEqualsKey() throws IOException { } public void testSerializationDeserializationOfCacheKey() throws Exception { - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - IndexService indexService = createIndex("test"); - IndexShard indexShard = indexService.getShard(0); IndicesService.IndexShardCacheEntity shardCacheEntity = new IndicesService.IndexShardCacheEntity(indexShard); String readerCacheKeyId = UUID.randomUUID().toString(); - IndicesRequestCache.Key key1 = new IndicesRequestCache.Key(indexShard.shardId(), termBytes, readerCacheKeyId); + IndicesRequestCache.Key key1 = new IndicesRequestCache.Key(indexShard.shardId(), getTermBytes(), readerCacheKeyId); BytesReference bytesReference = null; try (BytesStreamOutput out = new BytesStreamOutput()) { key1.writeTo(out); @@ -1211,8 +1138,7 @@ public void testSerializationDeserializationOfCacheKey() throws Exception { assertEquals(readerCacheKeyId, key2.readerCacheKeyId); assertEquals(((IndexShard) shardCacheEntity.getCacheIdentity()).shardId(), key2.shardId); - assertEquals(termBytes, key2.value); - + assertEquals(getTermBytes(), key2.value); } private class TestBytesReference extends AbstractBytesReference {