Skip to content

Commit

Permalink
[Tiered Caching] Bug fix for IndicesRequestCache StaleKey management (o…
Browse files Browse the repository at this point in the history
…pensearch-project#13070)

* Update IndicesRequestCache.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCacheTests.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update CHANGELOG.md

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* revert

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* revert

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCache.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCacheTests.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCache.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCacheTests.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCache.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCacheTests.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCacheTests.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCacheTests.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCacheTests.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCache.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCacheTests.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCacheTests.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCache.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCacheTests.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCache.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCacheTests.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* code comments only

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* docs changes

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update CHANGELOG.md

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* revert catching AlreadyClosedException

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* assert

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* conflicts

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCacheTests.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCache.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* address comments

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCache.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* Update IndicesRequestCache.java

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* address conflicts

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* spotless apply

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* address comments

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* update code comments

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

* address bug & add tests

Signed-off-by: Kiran Prakash <awskiran@amazon.com>

---------

Signed-off-by: Kiran Prakash <awskiran@amazon.com>
  • Loading branch information
kiranprakash154 authored and Peter Alfonsi committed Aug 30, 2024
1 parent f1d2e72 commit acbbb2f
Show file tree
Hide file tree
Showing 2 changed files with 485 additions and 515 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.cache.serializer.BytesReferenceSerializer;
import org.opensearch.common.cache.service.CacheService;
Expand Down Expand Up @@ -216,19 +217,16 @@ void clear(CacheEntity entity) {
public void onRemoval(RemovalNotification<ICacheKey<Key>, BytesReference> notification) {
// In case this event happens for an old shard, we can safely ignore this as we don't keep track for old
// shards as part of request cache.

// Pass a new removal notification containing Key rather than ICacheKey<Key> to the CacheEntity for backwards compatibility.
Key key = notification.getKey().key;
RemovalNotification<Key, BytesReference> newNotification = new RemovalNotification<>(
key,
notification.getValue(),
notification.getRemovalReason()
);

cacheEntityLookup.apply(key.shardId).ifPresent(entity -> entity.onRemoval(newNotification));
cacheCleanupManager.updateCleanupKeyToCountMapOnCacheEviction(
new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId)
);
CleanupKey cleanupKey = new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId);
cacheCleanupManager.updateStaleCountOnEntryRemoval(cleanupKey, newNotification);
}

private ICacheKey<Key> getICacheKey(Key key) {
Expand Down Expand Up @@ -272,10 +270,11 @@ BytesReference getOrCompute(
OpenSearchDirectoryReader.addReaderCloseListener(reader, cleanupKey);
}
}
cacheCleanupManager.updateCleanupKeyToCountMapOnCacheInsertion(cleanupKey);
cacheCleanupManager.updateStaleCountOnCacheInsert(cleanupKey);
} else {
cacheEntity.onHit();
}

return value;
}

Expand Down Expand Up @@ -508,7 +507,7 @@ void enqueueCleanupKey(CleanupKey cleanupKey) {
*
* @param cleanupKey the CleanupKey to be updated in the map
*/
private void updateCleanupKeyToCountMapOnCacheInsertion(CleanupKey cleanupKey) {
private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) {
if (stalenessThreshold == 0.0 || cleanupKey.entity == null) {
return;
}
Expand All @@ -524,8 +523,29 @@ private void updateCleanupKeyToCountMapOnCacheInsertion(CleanupKey cleanupKey) {
cleanupKeyToCountMap.computeIfAbsent(shardId, k -> new HashMap<>()).merge(cleanupKey.readerCacheKeyId, 1, Integer::sum);
}

private void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) {
if (stalenessThreshold == 0.0 || cleanupKey.entity == null) {
/**
* Handles the eviction of a cache entry.
*
* <p>This method is called when an entry is evicted from the cache.
* We consider all removal notifications except with the reason Replaced
* {@link #incrementStaleKeysCount} would have removed the entries from the map and increment the {@link #staleKeysCount}
* Hence we decrement {@link #staleKeysCount} if we do not find the shardId or readerCacheKeyId in the map.
* Skip decrementing staleKeysCount if we find the shardId or readerCacheKeyId in the map since it would have not been accounted for in the staleKeysCount in
*
* @param cleanupKey the CleanupKey that has been evicted from the cache
* @param notification RemovalNotification of the cache entry evicted
*/
private void updateStaleCountOnEntryRemoval(CleanupKey cleanupKey, RemovalNotification<Key, BytesReference> notification) {
if (notification.getRemovalReason() == RemovalReason.REPLACED) {
// The reason of the notification is REPLACED when a cache entry's value is updated, since replacing an entry
// does not affect the staleness count, we skip such notifications.
return;
}
if (cleanupKey.entity == null) {
// entity will only be null when the shard is closed/deleted
// we would have accounted this in staleKeysCount when the closing/deletion of shard would have closed the associated
// readers
staleKeysCount.decrementAndGet();
return;
}
IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity();
Expand All @@ -535,23 +555,41 @@ private void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) {
}
ShardId shardId = indexShard.shardId();

cleanupKeyToCountMap.computeIfPresent(shardId, (shard, keyCountMap) -> {
keyCountMap.computeIfPresent(cleanupKey.readerCacheKeyId, (key, currentValue) -> {
// decrement the stale key count
cleanupKeyToCountMap.compute(shardId, (key, readerCacheKeyMap) -> {
if (readerCacheKeyMap == null || !readerCacheKeyMap.containsKey(cleanupKey.readerCacheKeyId)) {
// If ShardId is not present or readerCacheKeyId is not present
// it should have already been accounted for and hence been removed from this map
// so decrement staleKeysCount
staleKeysCount.decrementAndGet();
int newValue = currentValue - 1;
// Remove the key if the new value is zero by returning null; otherwise, update with the new value.
return newValue == 0 ? null : newValue;
});
return keyCountMap;
// Return the current map
return readerCacheKeyMap;
} else {
// If it is in the map, it is not stale yet.
// Proceed to adjust the count for the readerCacheKeyId in the map
// but do not decrement the staleKeysCount
Integer count = readerCacheKeyMap.get(cleanupKey.readerCacheKeyId);
// this should never be null
assert (count != null && count >= 0);
// Reduce the count by 1
int newCount = count - 1;
if (newCount > 0) {
// Update the map with the new count
readerCacheKeyMap.put(cleanupKey.readerCacheKeyId, newCount);
} else {
// Remove the readerCacheKeyId entry if new count is zero
readerCacheKeyMap.remove(cleanupKey.readerCacheKeyId);
}
// If after modification, the readerCacheKeyMap is empty, we return null to remove the ShardId entry
return readerCacheKeyMap.isEmpty() ? null : readerCacheKeyMap;
}
});
}

/**
* Updates the count of stale keys in the cache.
* This method is called when a CleanupKey is added to the keysToClean set.
*
* It increments the staleKeysCount by the count of the CleanupKey in the cleanupKeyToCountMap.
* <p>It increments the staleKeysCount by the count of the CleanupKey in the cleanupKeyToCountMap.
* If the CleanupKey's readerCacheKeyId is null or the CleanupKey's entity is not open, it increments the staleKeysCount
* by the total count of keys associated with the CleanupKey's ShardId in the cleanupKeyToCountMap and removes the ShardId from the map.
*
Expand All @@ -569,7 +607,7 @@ private void incrementStaleKeysCount(CleanupKey cleanupKey) {
ShardId shardId = indexShard.shardId();

// Using computeIfPresent to atomically operate on the countMap for a given shardId
cleanupKeyToCountMap.computeIfPresent(shardId, (key, countMap) -> {
cleanupKeyToCountMap.computeIfPresent(shardId, (currentShardId, countMap) -> {
if (cleanupKey.readerCacheKeyId == null) {
// Aggregate and add to staleKeysCount atomically if readerCacheKeyId is null
int totalSum = countMap.values().stream().mapToInt(Integer::intValue).sum();
Expand All @@ -578,18 +616,19 @@ private void incrementStaleKeysCount(CleanupKey cleanupKey) {
return null;
} else {
// Update staleKeysCount based on specific readerCacheKeyId, then remove it from the countMap
countMap.computeIfPresent(cleanupKey.readerCacheKeyId, (k, v) -> {
staleKeysCount.addAndGet(v);
countMap.computeIfPresent(cleanupKey.readerCacheKeyId, (readerCacheKey, count) -> {
staleKeysCount.addAndGet(count);
// Return null to remove the key after updating staleKeysCount
return null;
});

// Check if countMap is empty after removal to decide if we need to remove the shardId entry
if (countMap.isEmpty()) {
return null; // Returning null removes the entry for shardId
// Returning null removes the entry for shardId
return null;
}
}
return countMap; // Return the modified countMap to keep the mapping
// Return the modified countMap to retain updates
return countMap;
});
}

Expand Down Expand Up @@ -715,6 +754,11 @@ public void close() {
this.cacheCleaner.close();
}

// for testing
ConcurrentMap<ShardId, HashMap<String, Integer>> getCleanupKeyToCountMap() {
return cleanupKeyToCountMap;
}

private final class IndicesRequestCacheCleaner implements Runnable, Releasable {

private final IndicesRequestCacheCleanupManager cacheCleanupManager;
Expand Down
Loading

0 comments on commit acbbb2f

Please sign in to comment.