Skip to content

Commit

Permalink
Update IndicesRequestCacheIT.java
Browse files Browse the repository at this point in the history
Signed-off-by: Kiran Prakash <awskiran@amazon.com>
  • Loading branch information
kiranprakash154 committed Jun 14, 2024
1 parent 16c8806 commit e76c9b6
Showing 1 changed file with 51 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.MergeSchedulerConfig;
import org.opensearch.index.TieredMergePolicyProvider;
import org.opensearch.index.cache.request.RequestCacheStats;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
Expand Down Expand Up @@ -126,6 +129,8 @@ public void testCacheAggs() throws Exception {
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 0)
// Disable index refreshing to avoid cache being invalidated mid-test
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(-1))
)
.get()
);
Expand All @@ -134,7 +139,8 @@ public void testCacheAggs() throws Exception {
client.prepareIndex(index).setSource("f", "2014-03-10T00:00:00.000Z"),
client.prepareIndex(index).setSource("f", "2014-05-13T00:00:00.000Z")
);
ensureSearchable(index);
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
forceMerge(client, index);

// This is not a random example: serialization with time zones writes shared strings
// which used to not work well with the query cache because of the handles stream output
Expand Down Expand Up @@ -197,6 +203,8 @@ public void testQueryRewrite() throws Exception {
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 5)
.put("index.number_of_routing_shards", 5)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
// Disable index refreshing to avoid cache being invalidated mid-test
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(-1))
)
.get()
);
Expand All @@ -216,10 +224,7 @@ public void testQueryRewrite() throws Exception {
assertCacheState(client, index, 0, 0);

// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge(index).setFlush(true).get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
refreshAndWaitForReplication();
ensureSearchable(index);
forceMerge(client, index);

assertCacheState(client, index, 0, 0);

Expand Down Expand Up @@ -268,6 +273,8 @@ public void testQueryRewriteMissingValues() throws Exception {
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
// Disable index refreshing to avoid cache being invalidated mid-test
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(-1))
)
.get()
);
Expand All @@ -287,10 +294,7 @@ public void testQueryRewriteMissingValues() throws Exception {
assertCacheState(client, index, 0, 0);

// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge(index).setFlush(true).get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
refreshAndWaitForReplication();
ensureSearchable(index);
forceMerge(client, index);

assertCacheState(client, index, 0, 0);

Expand Down Expand Up @@ -335,6 +339,8 @@ public void testQueryRewriteDates() throws Exception {
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
// Disable index refreshing to avoid cache being invalidated mid-test
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(-1))
)
.get()
);
Expand All @@ -354,10 +360,7 @@ public void testQueryRewriteDates() throws Exception {
assertCacheState(client, index, 0, 0);

// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge(index).setFlush(true).get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
refreshAndWaitForReplication();
ensureSearchable(index);
forceMerge(client, index);

assertCacheState(client, index, 0, 0);

Expand Down Expand Up @@ -399,6 +402,8 @@ public void testQueryRewriteDatesWithNow() throws Exception {
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
// Disable index refreshing to avoid cache being invalidated mid-test
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(-1))
.build();
assertAcked(client.admin().indices().prepareCreate("index-1").setMapping("d", "type=date").setSettings(settings).get());
assertAcked(client.admin().indices().prepareCreate("index-2").setMapping("d", "type=date").setSettings(settings).get());
Expand Down Expand Up @@ -480,6 +485,7 @@ public void testCanCache() throws Exception {
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2)
.put("index.number_of_routing_shards", 2)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(-1))
.build();
String index = "index";
assertAcked(client.admin().indices().prepareCreate(index).setMapping("s", "type=date").setSettings(settings).get());
Expand All @@ -499,10 +505,7 @@ public void testCanCache() throws Exception {
assertCacheState(client, index, 0, 0);

// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge(index).setFlush(true).get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
refreshAndWaitForReplication();
ensureSearchable(index);
forceMerge(client, index);

assertCacheState(client, index, 0, 0);

Expand Down Expand Up @@ -644,11 +647,14 @@ public void testProfileDisableCache() throws Exception {
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
// Disable index refreshing to avoid cache being invalidated mid-test
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(-1))
)
.get()
);
indexRandom(true, client.prepareIndex(index).setSource("k", "hello"));
ensureSearchable(index);
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
forceMerge(client, index);

int expectedHits = 0;
int expectedMisses = 0;
Expand Down Expand Up @@ -687,11 +693,14 @@ public void testCacheWithInvalidation() throws Exception {
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.refresh_interval", -1)
// Disable index refreshing to avoid cache being invalidated mid-test
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(-1))
)
.get()
);
indexRandom(true, client.prepareIndex(index).setSource("k", "hello"));
ensureSearchable(index);
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
forceMerge(client, index);
SearchResponse resp = client.prepareSearch(index).setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
assertSearchResponse(resp);
OpenSearchAssertions.assertAllSuccessful(resp);
Expand Down Expand Up @@ -1238,14 +1247,21 @@ public void testDeleteAndCreateSameIndexShardOnSameNode() throws Exception {
logger.info("Creating an index: {} with 2 shards", indexName);
createIndex(
indexName,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
// Disable index refreshing to avoid cache being invalidated mid-test
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(-1))
.build()
);

ensureGreen(indexName);

logger.info("Writing few docs and searching those which will cache items in RequestCache");
indexRandom(true, client.prepareIndex(indexName).setSource("k", "hello"));
indexRandom(true, client.prepareIndex(indexName).setSource("y", "hello again"));
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
forceMerge(client, indexName);
SearchResponse resp = client.prepareSearch(indexName).setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
assertSearchResponse(resp);
resp = client.prepareSearch(indexName).setRequestCache(true).setQuery(QueryBuilders.termQuery("y", "hello")).get();
Expand Down Expand Up @@ -1329,11 +1345,26 @@ private void setupIndex(Client client, String index) throws Exception {
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
// Disable index refreshing to avoid cache being invalidated mid-test
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(-1))
// settings to reduce the chances of background segment merges invalidating the cache
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), 1000d)
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), 2)
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING.getKey(), "100gb")
.put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), 1)
.put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), 1)
)
.get()
);

indexRandom(true, client.prepareIndex(index).setSource("k", "hello"));
indexRandom(true, client.prepareIndex(index).setSource("k", "there"));
}

private void forceMerge(Client client, String index) {
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge(index).setFlush(true).get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
refreshAndWaitForReplication();
ensureSearchable(index);
}

Expand Down

0 comments on commit e76c9b6

Please sign in to comment.