Skip to content

Commit

Permalink
[Caching] Move cache removal notifications outside lru lock (#14017)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>
  • Loading branch information
sgup432 committed Jun 6, 2024
1 parent 9da6170 commit 1852b7e
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add ability for Boolean and date field queries to run when only doc_values are enabled ([#11650](https://github.com/opensearch-project/OpenSearch/pull/11650))
- Refactor implementations of query phase searcher, allow QueryCollectorContext to have zero collectors ([#13481](https://github.com/opensearch-project/OpenSearch/pull/13481))
- Adds support to inject telemetry instances to plugins ([#13636](https://github.com/opensearch-project/OpenSearch/pull/13636))
- Move cache removal notifications outside lru lock ([#14017](https://github.com/opensearch-project/OpenSearch/pull/14017))

### Deprecated

Expand Down
53 changes: 46 additions & 7 deletions server/src/main/java/org/opensearch/common/cache/Cache.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.util.concurrent.ReleasableLock;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -396,7 +398,12 @@ private V get(K key, long now, Consumer<Entry<K, V>> onExpiration) {
if (entry == null) {
return null;
} else {
promote(entry, now);
List<RemovalNotification<K, V>> removalNotifications = promote(entry, now).v2();
if (!removalNotifications.isEmpty()) {
for (RemovalNotification<K, V> removalNotification : removalNotifications) {
removalListener.onRemoval(removalNotification);
}
}
return entry.value;
}
}
Expand Down Expand Up @@ -446,8 +453,14 @@ private V compute(K key, CacheLoader<K, V> loader) throws ExecutionException {

BiFunction<? super Entry<K, V>, Throwable, ? extends V> handler = (ok, ex) -> {
if (ok != null) {
List<RemovalNotification<K, V>> removalNotifications = new ArrayList<>();
try (ReleasableLock ignored = lruLock.acquire()) {
promote(ok, now);
removalNotifications = promote(ok, now).v2();
}
if (!removalNotifications.isEmpty()) {
for (RemovalNotification<K, V> removalNotification : removalNotifications) {
removalListener.onRemoval(removalNotification);
}
}
return ok.value;
} else {
Expand Down Expand Up @@ -512,16 +525,22 @@ private void put(K key, V value, long now) {
CacheSegment<K, V> segment = getCacheSegment(key);
Tuple<Entry<K, V>, Entry<K, V>> tuple = segment.put(key, value, now);
boolean replaced = false;
List<RemovalNotification<K, V>> removalNotifications = new ArrayList<>();
try (ReleasableLock ignored = lruLock.acquire()) {
if (tuple.v2() != null && tuple.v2().state == State.EXISTING) {
if (unlink(tuple.v2())) {
replaced = true;
}
}
promote(tuple.v1(), now);
removalNotifications = promote(tuple.v1(), now).v2();
}
if (replaced) {
removalListener.onRemoval(new RemovalNotification<>(tuple.v2().key, tuple.v2().value, RemovalReason.REPLACED));
removalNotifications.add(new RemovalNotification<>(tuple.v2().key, tuple.v2().value, RemovalReason.REPLACED));
}
if (!removalNotifications.isEmpty()) {
for (RemovalNotification<K, V> removalNotification : removalNotifications) {
removalListener.onRemoval(removalNotification);
}
}
}

Expand Down Expand Up @@ -767,8 +786,17 @@ public long getEvictions() {
}
}

private boolean promote(Entry<K, V> entry, long now) {
/**
* Promotes the desired entry to the head of the lru list and tries to see if it needs to evict any entries in
* case the cache size is exceeding or the entry got expired.
* @param entry Entry to be promoted
* @param now the current time
* @return Returns a tuple. v1 signifies whether an entry got promoted, v2 signifies the list of removal
* notifications that the callers needs to handle.
*/
private Tuple<Boolean, List<RemovalNotification<K, V>>> promote(Entry<K, V> entry, long now) {
boolean promoted = true;
List<RemovalNotification<K, V>> removalNotifications = new ArrayList<>();
try (ReleasableLock ignored = lruLock.acquire()) {
switch (entry.state) {
case DELETED:
Expand All @@ -782,10 +810,21 @@ private boolean promote(Entry<K, V> entry, long now) {
break;
}
if (promoted) {
evict(now);
while (tail != null && shouldPrune(tail, now)) {
Entry<K, V> entryToBeRemoved = tail;
CacheSegment<K, V> segment = getCacheSegment(entryToBeRemoved.key);
if (segment != null) {
segment.remove(entryToBeRemoved.key, entryToBeRemoved.value, f -> {});
}
if (unlink(entryToBeRemoved)) {
removalNotifications.add(
new RemovalNotification<>(entryToBeRemoved.key, entryToBeRemoved.value, RemovalReason.EVICTED)
);
}
}
}
}
return promoted;
return new Tuple<>(promoted, removalNotifications);
}

private void evict(long now) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,10 @@
@ExperimentalApi
@FunctionalInterface
public interface RemovalListener<K, V> {

/**
* This may be called from multiple threads at once. So implementation needs to be thread safe.
* @param notification removal notification for desired entry.
*/
void onRemoval(RemovalNotification<K, V> notification);
}

0 comments on commit 1852b7e

Please sign in to comment.