Skip to content

Commit

Permalink
address bug & add tests
Browse files Browse the repository at this point in the history
Signed-off-by: Kiran Prakash <awskiran@amazon.com>
  • Loading branch information
kiranprakash154 committed Apr 24, 2024
1 parent 430e57c commit 581ea2a
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -561,8 +561,8 @@ private void updateStaleCountOnEntryRemoval(CleanupKey cleanupKey, RemovalNotifi
// it should have already been accounted for and hence been removed from this map
// so decrement staleKeysCount
staleKeysCount.decrementAndGet();
// Returning the current value null
return null;
// Return the current map
return readerCacheKeyMap;
} else {
// If it is in the map, it is not stale yet.
// Proceed to adjust the count for the readerCacheKeyId in the map
Expand All @@ -572,12 +572,12 @@ private void updateStaleCountOnEntryRemoval(CleanupKey cleanupKey, RemovalNotifi
assert (count != null && count >= 0);
// Reduce the count by 1
int newCount = count - 1;
if (newCount <= 0) {
// Remove the readerCacheKeyId entry if new count is zero or less
readerCacheKeyMap.remove(cleanupKey.readerCacheKeyId);
} else {
if (newCount > 0) {
// Update the map with the new count
readerCacheKeyMap.put(cleanupKey.readerCacheKeyId, newCount);
} else {
// Remove the readerCacheKeyId entry if new count is zero
readerCacheKeyMap.remove(cleanupKey.readerCacheKeyId);
}
// If after modification, the readerCacheKeyMap is empty, we return null to remove the ShardId entry
return readerCacheKeyMap.isEmpty() ? null : readerCacheKeyMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount(
Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build();
cache = getIndicesRequestCache(settings);
writer.addDocument(newDoc(0, "foo"));
ShardId shardId = indexShard.shardId();
DirectoryReader reader = getReader(writer, indexShard.shardId());
DirectoryReader secondReader = getReader(writer, indexShard.shardId());

Expand All @@ -457,6 +458,14 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount(
assertEquals(2, cache.count());

IndicesRequestCache.Key key = new IndicesRequestCache.Key(indexShard.shardId(), getTermBytes(), getReaderCacheKeyId(reader));
// test the mapping
ConcurrentMap<ShardId, HashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
// shard id should exist
assertTrue(cleanupKeyToCountMap.containsKey(shardId));
// reader CacheKeyId should NOT exist
assertFalse(cleanupKeyToCountMap.get(shardId).containsKey(getReaderCacheKeyId(reader)));
// secondReader CacheKeyId should exist
assertTrue(cleanupKeyToCountMap.get(shardId).containsKey(getReaderCacheKeyId(secondReader)));

cache.onRemoval(
new RemovalNotification<ICacheKey<IndicesRequestCache.Key>, BytesReference>(
Expand All @@ -465,18 +474,27 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount(
RemovalReason.EVICTED
)
);

// test the mapping, it should stay the same
// shard id should exist
assertTrue(cleanupKeyToCountMap.containsKey(shardId));
// reader CacheKeyId should NOT exist
assertFalse(cleanupKeyToCountMap.get(shardId).containsKey(getReaderCacheKeyId(reader)));
// secondReader CacheKeyId should exist
assertTrue(cleanupKeyToCountMap.get(shardId).containsKey(getReaderCacheKeyId(secondReader)));
// eviction of previous stale key from the cache should decrement staleKeysCount in iRC
assertEquals(0, cache.cacheCleanupManager.getStaleKeysCount().get());

IOUtils.close(secondReader);
}

// when a cache entry that is NOT Stale is evicted for any reason, staleness count should NOT be deducted
public void testStaleCount_OnRemovalNotificationOfStaleKey_DoesNotDecrementsStaleCount() throws Exception {
public void testStaleCount_OnRemovalNotificationOfNonStaleKey_DoesNotDecrementsStaleCount() throws Exception {
threadPool = getThreadPool();
Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build();
cache = getIndicesRequestCache(settings);
writer.addDocument(newDoc(0, "foo"));
ShardId shardId = indexShard.shardId();
DirectoryReader reader = getReader(writer, indexShard.shardId());
DirectoryReader secondReader = getReader(writer, indexShard.shardId());

Expand All @@ -498,13 +516,27 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DoesNotDecrementsStal
// evict entry from second reader (this reader is not closed)
IndicesRequestCache.Key key = new IndicesRequestCache.Key(indexShard.shardId(), getTermBytes(), getReaderCacheKeyId(secondReader));

// test the mapping
ConcurrentMap<ShardId, HashMap<String, Integer>> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap();
// shard id should exist
assertTrue(cleanupKeyToCountMap.containsKey(shardId));
// reader CacheKeyId should NOT exist
assertFalse(cleanupKeyToCountMap.get(shardId).containsKey(getReaderCacheKeyId(reader)));
// secondReader CacheKeyId should exist
assertTrue(cleanupKeyToCountMap.get(shardId).containsKey(getReaderCacheKeyId(secondReader)));

cache.onRemoval(
new RemovalNotification<ICacheKey<IndicesRequestCache.Key>, BytesReference>(
new ICacheKey<>(key),
getTermBytes(),
RemovalReason.EVICTED
)
);

// test the mapping, shardId entry should be cleaned up
// shard id should NOT exist
assertFalse(cleanupKeyToCountMap.containsKey(shardId));

staleKeysCount = cache.cacheCleanupManager.getStaleKeysCount();
// eviction of NON-stale key from the cache should NOT decrement staleKeysCount in iRC
assertEquals(1, staleKeysCount.get());
Expand Down Expand Up @@ -658,27 +690,65 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception {
// test the mapping
assertEquals(2, cache.count());
assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(secondReader)));
// create another entry for the second reader
cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes("id", "1"));
// test the mapping
assertEquals(3, cache.count());
assertEquals(2, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(secondReader)));

// Close the reader, to create stale entries
reader.close();
// 1 out of 2 keys ie 50% are now stale.
assertEquals(1, cache.cacheCleanupManager.getStaleKeysCount().get());
// cache count should not be affected
assertEquals(2, cache.count());
// test the mapping
assertEquals(3, cache.count());
// test the mapping, first reader's entry should be removed from the mapping and accounted for in the staleKeysCount
assertFalse(cleanupKeyToCountMap.get(shardId).containsKey(getReaderCacheKeyId(reader)));
assertEquals(1, cache.cacheCleanupManager.getStaleKeysCount().get());
// second reader's mapping should not be affected
assertEquals(2, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(secondReader)));
// send removal notification for first reader
IndicesRequestCache.Key key = new IndicesRequestCache.Key(indexShard.shardId(), getTermBytes(), getReaderCacheKeyId(reader));
cache.onRemoval(
new RemovalNotification<ICacheKey<IndicesRequestCache.Key>, BytesReference>(
new ICacheKey<>(key),
getTermBytes(),
RemovalReason.EVICTED
)
);
// test the mapping, it should stay the same
assertFalse(cleanupKeyToCountMap.get(shardId).containsKey(getReaderCacheKeyId(reader)));
// staleKeysCount should be decremented
assertEquals(0, cache.cacheCleanupManager.getStaleKeysCount().get());
// second reader's mapping should not be affected
assertEquals(2, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(secondReader)));

// Without closing the secondReader send removal notification of one of its key
key = new IndicesRequestCache.Key(indexShard.shardId(), getTermBytes(), getReaderCacheKeyId(secondReader));
cache.onRemoval(
new RemovalNotification<ICacheKey<IndicesRequestCache.Key>, BytesReference>(
new ICacheKey<>(key),
getTermBytes(),
RemovalReason.EVICTED
)
);
// staleKeysCount should be the same as before
assertEquals(0, cache.cacheCleanupManager.getStaleKeysCount().get());
// secondReader's readerCacheKeyId count should be decremented by 1
assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(secondReader)));
// Close the second reader
secondReader.close();
// both keys should now be stale
assertEquals(2, cache.cacheCleanupManager.getStaleKeysCount().get());
// cache count should not be affected
assertEquals(2, cache.count());
// test the mapping
// since all the readers of this shard is closed,
// the cleanupKeyToCountMap should have no entries
// Without closing the secondReader send removal notification of its last key
key = new IndicesRequestCache.Key(indexShard.shardId(), getTermBytes(), getReaderCacheKeyId(secondReader));
cache.onRemoval(
new RemovalNotification<ICacheKey<IndicesRequestCache.Key>, BytesReference>(
new ICacheKey<>(key),
getTermBytes(),
RemovalReason.EVICTED
)
);
// staleKeysCount should be the same as before
assertEquals(0, cache.cacheCleanupManager.getStaleKeysCount().get());
// since all the readers of this shard is closed, the cleanupKeyToCountMap should have no entries
assertEquals(0, cleanupKeyToCountMap.size());

IOUtils.close(secondReader);
}

private DirectoryReader getReader(IndexWriter writer, ShardId shardId) throws IOException {
Expand Down Expand Up @@ -715,6 +785,11 @@ private BytesReference getTermBytes() throws IOException {
return XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false);
}

private BytesReference getTermBytes(String fieldName, String value) throws IOException {
TermQueryBuilder termQuery = new TermQueryBuilder(fieldName, value);
return XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false);
}

private String getReaderCacheKeyId(DirectoryReader reader) {
OpenSearchDirectoryReader.DelegatingCacheHelper delegatingCacheHelper = (OpenSearchDirectoryReader.DelegatingCacheHelper) reader
.getReaderCacheHelper();
Expand Down

0 comments on commit 581ea2a

Please sign in to comment.