Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tiered Caching] Expose a dynamic setting to disable/enable disk cache #13373

Merged
merged 7 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959))
- [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174))
- Add cluster setting to dynamically configure the buckets for filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179))
- [Tiered Caching] Add a dynamic setting to disable/enable disk cache. ([#13373](https://github.com/opensearch-project/OpenSearch/pull/13373))
- [Remote Store] Add capability of doing refresh as determined by the translog ([#12992](https://github.com/opensearch-project/OpenSearch/pull/12992))
- Batch mode for async fetching shard information in GatewayAllocator for unassigned shards ([#8746](https://github.com/opensearch-project/OpenSearch/pull/8746))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,121 @@ public void testWithExplicitCacheClear() throws Exception {
}, 1, TimeUnit.SECONDS);
}

public void testWithDynamicDiskCacheSetting() throws Exception {
int onHeapCacheSizeInBytes = 10; // Keep it low so that all items are cached onto disk.
internalCluster().startNode(
Settings.builder()
.put(defaultSettings(onHeapCacheSizeInBytes + "b"))
.put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1))
.build()
);
Client client = client();
assertAcked(
client.admin()
.indices()
.prepareCreate("index")
.setMapping("k", "type=keyword")
.setSettings(
Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.refresh_interval", -1)
)
.get()
);
// Update took time policy to zero so that all entries are eligible to be cached on disk.
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
Settings.builder()
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
new TimeValue(0, TimeUnit.MILLISECONDS)
)
.build()
);
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get());
int numberOfIndexedItems = randomIntBetween(5, 10);
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
indexRandom(true, client.prepareIndex("index").setSource("k" + iterator, "hello" + iterator));
}
ensureSearchable("index");
refreshAndWaitForReplication();
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
long perQuerySizeInCacheInBytes = -1;
// Step 1: Hit some queries. We will see misses and queries will be cached(onto disk cache) for subsequent
// requests.
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
SearchResponse resp = client.prepareSearch("index")
.setRequestCache(true)
.setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator))
.get();
if (perQuerySizeInCacheInBytes == -1) {
RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index");
perQuerySizeInCacheInBytes = requestCacheStats.getMemorySizeInBytes();
}
assertSearchResponse(resp);
}

RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index");
assertEquals(numberOfIndexedItems * perQuerySizeInCacheInBytes, requestCacheStats.getMemorySizeInBytes());
assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount());
assertEquals(0, requestCacheStats.getHitCount());
assertEquals(0, requestCacheStats.getEvictions());

// Step 2: Hit same queries again. We will see hits now.
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
SearchResponse resp = client.prepareSearch("index")
.setRequestCache(true)
.setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator))
.get();
assertSearchResponse(resp);
}
requestCacheStats = getRequestCacheStats(client, "index");
assertEquals(numberOfIndexedItems * perQuerySizeInCacheInBytes, requestCacheStats.getMemorySizeInBytes());
sohami marked this conversation as resolved.
Show resolved Hide resolved
assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount());
assertEquals(numberOfIndexedItems, requestCacheStats.getHitCount());
assertEquals(0, requestCacheStats.getEvictions());
long lastKnownHitCount = requestCacheStats.getHitCount();
long lastKnownMissCount = requestCacheStats.getMissCount();

// Step 3: Turn off disk cache now. And hit same queries again. We should not see hits now as all queries
// were cached onto disk cache.
updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
Settings.builder()
.put(TieredSpilloverCacheSettings.DISK_CACHE_ENABLED_SETTING_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), false)
.build()
);
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get());

for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
SearchResponse resp = client.prepareSearch("index")
.setRequestCache(true)
.setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator))
.get();
assertSearchResponse(resp);
}
requestCacheStats = getRequestCacheStats(client, "index");
assertEquals(numberOfIndexedItems * perQuerySizeInCacheInBytes, requestCacheStats.getMemorySizeInBytes()); //
// Still shows disk cache entries as explicit clear or invalidation is required to clean up disk cache.
assertEquals(lastKnownMissCount + numberOfIndexedItems, requestCacheStats.getMissCount());
assertEquals(0, lastKnownHitCount - requestCacheStats.getHitCount()); // No new hits being seen.
lastKnownMissCount = requestCacheStats.getMissCount();
lastKnownHitCount = requestCacheStats.getHitCount();

// Step 4: Invalidate entries via refresh.
// Explicit refresh would invalidate cache entries.
refreshAndWaitForReplication();
assertBusy(() -> {
// Explicit refresh should clear up cache entries
assertTrue(getRequestCacheStats(client, "index").getMemorySizeInBytes() == 0);
}, 1, TimeUnit.SECONDS);
requestCacheStats = getRequestCacheStats(client, "index");
assertEquals(0, lastKnownMissCount - requestCacheStats.getMissCount());
assertEquals(0, lastKnownHitCount - requestCacheStats.getHitCount());
}

private RequestCacheStats getRequestCacheStats(Client client, String indexName) {
return client.admin().indices().prepareStats(indexName).setRequestCache(true).get().getTotal().getRequestCache();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
Expand All @@ -38,6 +39,8 @@
import java.util.function.Function;
import java.util.function.Predicate;

import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.DISK_CACHE_ENABLED_SETTING_MAP;

/**
* This cache spillover the evicted items from heap tier to disk tier. All the new items are first cached on heap
* and the items evicted from on heap cache are moved to disk based cache. If disk based cache also gets full,
Expand Down Expand Up @@ -67,20 +70,23 @@
/**
* Maintains caching tiers in ascending order of cache latency.
*/
private final List<ICache<K, V>> cacheList;
private final Map<ICache<K, V>, Boolean> caches;
private final List<Predicate<V>> policies;

TieredSpilloverCache(Builder<K, V> builder) {
Objects.requireNonNull(builder.onHeapCacheFactory, "onHeap cache builder can't be null");
Objects.requireNonNull(builder.diskCacheFactory, "disk cache builder can't be null");
Objects.requireNonNull(builder.cacheConfig, "cache config can't be null");
Objects.requireNonNull(builder.cacheConfig.getClusterSettings(), "cluster settings can't be null");
this.removalListener = Objects.requireNonNull(builder.removalListener, "Removal listener can't be null");

this.onHeapCache = builder.onHeapCacheFactory.create(
new CacheConfig.Builder<K, V>().setRemovalListener(new RemovalListener<ICacheKey<K>, V>() {
@Override
public void onRemoval(RemovalNotification<ICacheKey<K>, V> notification) {
try (ReleasableLock ignore = writeLock.acquire()) {
if (SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason())
if (caches.get(diskCache)
&& SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason())
&& evaluatePolicies(notification.getValue())) {
diskCache.put(notification.getKey(), notification.getValue());
} else {
Expand All @@ -103,9 +109,15 @@

);
this.diskCache = builder.diskCacheFactory.create(builder.cacheConfig, builder.cacheType, builder.cacheFactories);
this.cacheList = Arrays.asList(onHeapCache, diskCache);
Boolean isDiskCacheEnabled = DISK_CACHE_ENABLED_SETTING_MAP.get(builder.cacheType).get(builder.cacheConfig.getSettings());
LinkedHashMap<ICache<K, V>, Boolean> cacheListMap = new LinkedHashMap<>();
cacheListMap.put(onHeapCache, true);
cacheListMap.put(diskCache, isDiskCacheEnabled);
this.caches = Collections.synchronizedMap(cacheListMap);
this.dimensionNames = builder.cacheConfig.getDimensionNames();
this.policies = builder.policies; // Will never be null; builder initializes it to an empty list
builder.cacheConfig.getClusterSettings()
.addSettingsUpdateConsumer(DISK_CACHE_ENABLED_SETTING_MAP.get(builder.cacheType), this::enableDisableDiskCache);
}

// Package private for testing
Expand All @@ -118,6 +130,13 @@
return diskCache;
}

// Package private for testing.
void enableDisableDiskCache(Boolean isDiskCacheEnabled) {
// When disk cache is disabled, we are not clearing up the disk cache entries yet as that should be part of
// separate cache/clear API.
sohami marked this conversation as resolved.
Show resolved Hide resolved
this.caches.put(diskCache, isDiskCacheEnabled);
}

@Override
public V get(ICacheKey<K> key) {
return getValueFromTieredCache().apply(key);
Expand All @@ -132,7 +151,6 @@

@Override
public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader) throws Exception {

V cacheValue = getValueFromTieredCache().apply(key);
if (cacheValue == null) {
// Add the value to the onHeap cache. We are calling computeIfAbsent which does another get inside.
Expand All @@ -151,19 +169,19 @@
public void invalidate(ICacheKey<K> key) {
// We are trying to invalidate the key from all caches though it would be present in only of them.
// Doing this as we don't know where it is located. We could do a get from both and check that, but what will
// also trigger a hit/miss listener event, so ignoring it for now.
// also count hits/misses stats, so ignoring it for now.
try (ReleasableLock ignore = writeLock.acquire()) {
for (ICache<K, V> cache : cacheList) {
cache.invalidate(key);
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : caches.entrySet()) {
cacheEntry.getKey().invalidate(key);
}
}
}

@Override
public void invalidateAll() {
try (ReleasableLock ignore = writeLock.acquire()) {
for (ICache<K, V> cache : cacheList) {
cache.invalidateAll();
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : caches.entrySet()) {
cacheEntry.getKey().invalidateAll();
}
}
}
Expand All @@ -175,32 +193,39 @@
@SuppressWarnings({ "unchecked" })
@Override
public Iterable<ICacheKey<K>> keys() {
Iterable<ICacheKey<K>>[] iterables = (Iterable<ICacheKey<K>>[]) new Iterable<?>[] { onHeapCache.keys(), diskCache.keys() };
return new ConcatenatedIterables<ICacheKey<K>>(iterables);
List<Iterable<ICacheKey<K>>> iterableList = new ArrayList<>();
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : caches.entrySet()) {
iterableList.add(cacheEntry.getKey().keys());
}
Iterable<ICacheKey<K>>[] iterables = (Iterable<ICacheKey<K>>[]) iterableList.toArray(new Iterable<?>[0]);
return new ConcatenatedIterables<>(iterables);
}

@Override
public long count() {
long count = 0;
for (ICache<K, V> cache : cacheList) {
count += cache.count();
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : caches.entrySet()) {
// Count for all the tiers irrespective of whether they are enabled or not. As eventually
// this will turn to zero once cache is cleared up either via invalidation or manually.
count += cacheEntry.getKey().count();
}
return count;
}

@Override
public void refresh() {
try (ReleasableLock ignore = writeLock.acquire()) {
for (ICache<K, V> cache : cacheList) {
cache.refresh();
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : caches.entrySet()) {
cacheEntry.getKey().refresh();
}
}
}

@Override
public void close() throws IOException {
for (ICache<K, V> cache : cacheList) {
cache.close();
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : caches.entrySet()) {
// Close all the caches here irrespective of whether they are enabled or not.
cacheEntry.getKey().close();

Check warning on line 228 in modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java

View check run for this annotation

Codecov / codecov/patch

modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java#L228

Added line #L228 was not covered by tests
}
}

Expand All @@ -212,13 +237,12 @@
private Function<ICacheKey<K>, V> getValueFromTieredCache() {
return key -> {
try (ReleasableLock ignore = readLock.acquire()) {
for (ICache<K, V> cache : cacheList) {
V value = cache.get(key);
if (value != null) {
// update hit stats
return value;
} else {
// update miss stats
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : caches.entrySet()) {
if (cacheEntry.getValue()) {
V value = cacheEntry.getKey().get(key);
if (value != null) {
return value;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.plugins.CachePlugin;
import org.opensearch.plugins.Plugin;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.DISK_CACHE_ENABLED_SETTING_MAP;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;

/**
Expand All @@ -30,10 +33,15 @@ public class TieredSpilloverCachePlugin extends Plugin implements CachePlugin {
*/
public static final String TIERED_CACHE_SPILLOVER_PLUGIN_NAME = "tieredSpilloverCachePlugin";

private final Settings settings;

/**
* Default constructor
* @param settings settings
*/
public TieredSpilloverCachePlugin() {}
public TieredSpilloverCachePlugin(Settings settings) {
this.settings = settings;
}

@Override
public Map<String, ICache.Factory> getCacheFactoryMap() {
Expand All @@ -54,6 +62,9 @@ public List<Setting<?>> getSettings() {
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
);
settingList.add(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType));
if (FeatureFlags.PLUGGABLE_CACHE_SETTING.get(settings)) {
settingList.add(DISK_CACHE_ENABLED_SETTING_MAP.get(cacheType));
}
}
return settingList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ public class TieredSpilloverCacheSettings {
(key) -> Setting.simpleString(key, "", NodeScope)
);

/**
* Setting to disable/enable disk cache dynamically.
*/
public static final Setting.AffixSetting<Boolean> TIERED_SPILLOVER_DISK_CACHE_SETTING = Setting.suffixKeySetting(
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".disk.store.enabled",
(key) -> Setting.boolSetting(key, true, NodeScope, Setting.Property.Dynamic)
);

/**
* Setting defining the minimum took time for a query to be allowed into the disk cache.
*/
Expand All @@ -63,17 +71,29 @@ public class TieredSpilloverCacheSettings {
public static final Map<CacheType, Setting<TimeValue>> TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;

/**
* Fetches concrete took time policy settings.
* Stores disk cache enabled settings for various cache types as these are dynamic so that can be registered and
* retrieved accordingly.
*/
public static final Map<CacheType, Setting<Boolean>> DISK_CACHE_ENABLED_SETTING_MAP;

/**
* Fetches concrete took time policy and disk cache settings.
*/
static {
Map<CacheType, Setting<TimeValue>> concreteTookTimePolicySettingMap = new HashMap<>();
Map<CacheType, Setting<Boolean>> diskCacheSettingMap = new HashMap<>();
for (CacheType cacheType : CacheType.values()) {
concreteTookTimePolicySettingMap.put(
cacheType,
TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
);
diskCacheSettingMap.put(
cacheType,
TIERED_SPILLOVER_DISK_CACHE_SETTING.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
);
}
TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP = concreteTookTimePolicySettingMap;
DISK_CACHE_ENABLED_SETTING_MAP = diskCacheSettingMap;
}

/**
Expand Down
Loading
Loading