Skip to content

Commit

Permalink
Added Point In Time Node Stats API ServiceLayer Changes (opensearch-p…
Browse files Browse the repository at this point in the history
…roject#4030)

* Adds Node Stats api changes for the Point in time

Signed-off-by: Ajay Kumar Movva <movvaam@amazon.com>
  • Loading branch information
ajaymovva committed Aug 8, 2022
1 parent c665e7c commit 6993ac9
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.index.search.stats;

import org.opensearch.Version;
import org.opensearch.common.Nullable;
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -77,6 +78,10 @@ public static class Stats implements Writeable, ToXContentFragment {
private long suggestTimeInMillis;
private long suggestCurrent;

private long pitCount;
private long pitTimeInMillis;
private long pitCurrent;

private Stats() {
// for internal use, initializes all counts to 0
}
Expand All @@ -91,6 +96,9 @@ public Stats(
long scrollCount,
long scrollTimeInMillis,
long scrollCurrent,
long pitCount,
long pitTimeInMillis,
long pitCurrent,
long suggestCount,
long suggestTimeInMillis,
long suggestCurrent
Expand All @@ -110,6 +118,10 @@ public Stats(
this.suggestCount = suggestCount;
this.suggestTimeInMillis = suggestTimeInMillis;
this.suggestCurrent = suggestCurrent;

this.pitCount = pitCount;
this.pitTimeInMillis = pitTimeInMillis;
this.pitCurrent = pitCurrent;
}

private Stats(StreamInput in) throws IOException {
Expand All @@ -128,6 +140,12 @@ private Stats(StreamInput in) throws IOException {
suggestCount = in.readVLong();
suggestTimeInMillis = in.readVLong();
suggestCurrent = in.readVLong();

if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
pitCount = in.readVLong();
pitTimeInMillis = in.readVLong();
pitCurrent = in.readVLong();
}
}

public void add(Stats stats) {
Expand All @@ -146,6 +164,10 @@ public void add(Stats stats) {
suggestCount += stats.suggestCount;
suggestTimeInMillis += stats.suggestTimeInMillis;
suggestCurrent += stats.suggestCurrent;

pitCount += stats.pitCount;
pitTimeInMillis += stats.pitTimeInMillis;
pitCurrent += stats.pitCurrent;
}

public void addForClosingShard(Stats stats) {
Expand All @@ -162,6 +184,10 @@ public void addForClosingShard(Stats stats) {

suggestCount += stats.suggestCount;
suggestTimeInMillis += stats.suggestTimeInMillis;

pitCount += stats.pitCount;
pitTimeInMillis += stats.pitTimeInMillis;
pitCurrent += stats.pitCurrent;
}

public long getQueryCount() {
Expand Down Expand Up @@ -212,6 +238,22 @@ public long getScrollCurrent() {
return scrollCurrent;
}

public long getPitCount() {
return pitCount;
}

public TimeValue getPitTime() {
return new TimeValue(pitTimeInMillis);
}

public long getPitTimeInMillis() {
return pitTimeInMillis;
}

public long getPitCurrent() {
return pitCurrent;
}

public long getSuggestCount() {
return suggestCount;
}
Expand Down Expand Up @@ -249,6 +291,12 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(suggestCount);
out.writeVLong(suggestTimeInMillis);
out.writeVLong(suggestCurrent);

if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeVLong(pitCount);
out.writeVLong(pitTimeInMillis);
out.writeVLong(pitCurrent);
}
}

@Override
Expand All @@ -265,6 +313,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.humanReadableField(Fields.SCROLL_TIME_IN_MILLIS, Fields.SCROLL_TIME, getScrollTime());
builder.field(Fields.SCROLL_CURRENT, scrollCurrent);

builder.field(Fields.PIT_TOTAL, pitCount);
builder.humanReadableField(Fields.PIT_TIME_IN_MILLIS, Fields.PIT_TIME, getPitTime());
builder.field(Fields.PIT_CURRENT, pitCurrent);

builder.field(Fields.SUGGEST_TOTAL, suggestCount);
builder.humanReadableField(Fields.SUGGEST_TIME_IN_MILLIS, Fields.SUGGEST_TIME, getSuggestTime());
builder.field(Fields.SUGGEST_CURRENT, suggestCurrent);
Expand Down Expand Up @@ -385,6 +437,10 @@ static final class Fields {
static final String SCROLL_TIME = "scroll_time";
static final String SCROLL_TIME_IN_MILLIS = "scroll_time_in_millis";
static final String SCROLL_CURRENT = "scroll_current";
static final String PIT_TOTAL = "point_in_time_total";
static final String PIT_TIME = "point_in_time_time";
static final String PIT_TIME_IN_MILLIS = "point_in_time_time_in_millis";
static final String PIT_CURRENT = "point_in_time_current";
static final String SUGGEST_TOTAL = "suggest_total";
static final String SUGGEST_TIME = "suggest_time";
static final String SUGGEST_TIME_IN_MILLIS = "suggest_time_in_millis";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,18 @@ public void onFreeScrollContext(ReaderContext readerContext) {
totalStats.scrollMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - readerContext.getStartTimeInNano()));
}

@Override
public void onNewPitContext(ReaderContext readerContext) {
totalStats.pitCurrent.inc();
}

@Override
public void onFreePitContext(ReaderContext readerContext) {
totalStats.pitCurrent.dec();
assert totalStats.pitCurrent.count() >= 0;
totalStats.pitMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - readerContext.getStartTimeInNano()));
}

/**
* Holder of statistics values
*
Expand All @@ -203,10 +215,12 @@ static final class StatsHolder {
* for one-thousand times as long (i.e., scrolls that execute for almost twelve days on average).
*/
final MeanMetric scrollMetric = new MeanMetric();
final MeanMetric pitMetric = new MeanMetric();
final MeanMetric suggestMetric = new MeanMetric();
final CounterMetric queryCurrent = new CounterMetric();
final CounterMetric fetchCurrent = new CounterMetric();
final CounterMetric scrollCurrent = new CounterMetric();
final CounterMetric pitCurrent = new CounterMetric();
final CounterMetric suggestCurrent = new CounterMetric();

SearchStats.Stats stats() {
Expand All @@ -220,6 +234,9 @@ SearchStats.Stats stats() {
scrollMetric.count(),
TimeUnit.MICROSECONDS.toMillis(scrollMetric.sum()),
scrollCurrent.count(),
pitMetric.count(),
TimeUnit.MICROSECONDS.toMillis(pitMetric.sum()),
pitCurrent.count(),
suggestMetric.count(),
TimeUnit.NANOSECONDS.toMillis(suggestMetric.sum()),
suggestCurrent.count()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ public void testShardLevelSearchGroupStats() throws Exception {
// let's create two dummy search stats with groups
Map<String, Stats> groupStats1 = new HashMap<>();
Map<String, Stats> groupStats2 = new HashMap<>();
groupStats2.put("group1", new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1));
SearchStats searchStats1 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats1);
SearchStats searchStats2 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats2);
groupStats2.put("group1", new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1));
SearchStats searchStats1 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats1);
SearchStats searchStats2 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats2);

// adding these two search stats and checking group stats are correct
searchStats1.add(searchStats2);
Expand Down Expand Up @@ -75,6 +75,9 @@ private static void assertStats(Stats stats, long equalTo) {
assertEquals(equalTo, stats.getScrollCount());
assertEquals(equalTo, stats.getScrollTimeInMillis());
assertEquals(equalTo, stats.getScrollCurrent());
assertEquals(equalTo, stats.getPitCount());
assertEquals(equalTo, stats.getPitTimeInMillis());
assertEquals(equalTo, stats.getPitCurrent());
assertEquals(equalTo, stats.getSuggestCount());
assertEquals(equalTo, stats.getSuggestTimeInMillis());
assertEquals(equalTo, stats.getSuggestCurrent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import org.opensearch.search.builder.PointInTimeBuilder;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.test.OpenSearchSingleNodeTestCase;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;

import java.util.Map;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -74,7 +77,11 @@ public void testCreatePITSuccess() throws ExecutionException, InterruptedExcepti

SearchService service = getInstanceFromNode(SearchService.class);
assertEquals(2, service.getActiveContexts());
validatePitStats("index", 1, 0, 0);
validatePitStats("index", 1, 0, 1);
service.doClose(); // this kills the keep-alive reaper we have to reset the node after this test
validatePitStats("index", 0, 1, 0);
validatePitStats("index", 0, 1, 1);
}

public void testCreatePITWithMultipleIndicesSuccess() throws ExecutionException, InterruptedException {
Expand All @@ -91,7 +98,12 @@ public void testCreatePITWithMultipleIndicesSuccess() throws ExecutionException,
PitTestsUtil.assertUsingGetAllPits(client(), response.getId(), response.getCreationTime());
assertEquals(4, response.getSuccessfulShards());
assertEquals(4, service.getActiveContexts());

validatePitStats("index", 1, 0, 0);
validatePitStats("index1", 1, 0, 0);
service.doClose();
validatePitStats("index", 0, 1, 0);
validatePitStats("index1", 0, 1, 0);
}

public void testCreatePITWithShardReplicasSuccess() throws ExecutionException, InterruptedException {
Expand All @@ -112,7 +124,11 @@ public void testCreatePITWithShardReplicasSuccess() throws ExecutionException, I

SearchService service = getInstanceFromNode(SearchService.class);
assertEquals(2, service.getActiveContexts());
validatePitStats("index", 1, 0, 0);
validatePitStats("index", 1, 0, 1);
service.doClose();
validatePitStats("index", 0, 1, 0);
validatePitStats("index", 0, 1, 1);
}

public void testCreatePITWithNonExistentIndex() {
Expand Down Expand Up @@ -198,6 +214,9 @@ public void testPitSearchOnCloseIndex() throws ExecutionException, InterruptedEx
PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime());
SearchService service = getInstanceFromNode(SearchService.class);
assertEquals(2, service.getActiveContexts());
validatePitStats("index", 1, 0, 0);
validatePitStats("index", 1, 0, 1);

client().admin().indices().prepareClose("index").get();
SearchPhaseExecutionException ex = expectThrows(SearchPhaseExecutionException.class, () -> {
SearchResponse searchResponse = client().prepareSearch()
Expand Down Expand Up @@ -246,7 +265,10 @@ public void testMaxOpenPitContexts() throws Exception {
+ "This limit can be set by changing the [search.max_open_pit_context] setting."
)
);
final int maxPitContexts = SearchService.MAX_OPEN_PIT_CONTEXT.get(Settings.EMPTY);
validatePitStats("index", maxPitContexts, 0, 0);
service.doClose();
validatePitStats("index", 0, maxPitContexts, 0);
}

public void testOpenPitContextsConcurrently() throws Exception {
Expand Down Expand Up @@ -292,7 +314,9 @@ public void testOpenPitContextsConcurrently() throws Exception {
thread.join();
}
assertThat(service.getActiveContexts(), equalTo(maxPitContexts));
validatePitStats("index", maxPitContexts, 0, 0);
service.doClose();
validatePitStats("index", 0, maxPitContexts, 0);
}

/**
Expand Down Expand Up @@ -461,9 +485,11 @@ public void testPitAfterUpdateIndex() throws Exception {
.getTotalHits().value,
Matchers.equalTo(0L)
);
validatePitStats("test", 1, 0, 0);
} finally {
service.doClose();
assertEquals(0, service.getActiveContexts());
validatePitStats("test", 0, 1, 0);
PitTestsUtil.assertGetAllPitsEmpty(client());
}
}
Expand Down Expand Up @@ -505,8 +531,21 @@ public void testConcurrentSearches() throws Exception {

SearchService service = getInstanceFromNode(SearchService.class);
assertEquals(2, service.getActiveContexts());
validatePitStats("index", 1, 0, 0);
validatePitStats("index", 1, 0, 1);
service.doClose();
assertEquals(0, service.getActiveContexts());
validatePitStats("index", 0, 1, 0);
validatePitStats("index", 0, 1, 1);
PitTestsUtil.assertGetAllPitsEmpty(client());
}

public void validatePitStats(String index, long expectedPitCurrent, long expectedPitCount, int shardId) throws ExecutionException,
InterruptedException {
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService indexService = indicesService.indexServiceSafe(resolveIndex(index));
IndexShard indexShard = indexService.getShard(shardId);
assertEquals(expectedPitCurrent, indexShard.searchStats().getTotal().getPitCurrent());
assertEquals(expectedPitCount, indexShard.searchStats().getTotal().getPitCount());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.junit.After;
import org.junit.Before;
import org.opensearch.action.ActionFuture;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.ActionListener;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.action.search.CreatePitAction;
Expand Down Expand Up @@ -76,6 +78,7 @@ public void testDeletePit() throws Exception {
execute = client().execute(CreatePitAction.INSTANCE, request);
pitResponse = execute.get();
pitIds.add(pitResponse.getId());
validatePitStats("index", 10, 0);
DeletePitRequest deletePITRequest = new DeletePitRequest(pitIds);
ActionFuture<DeletePitResponse> deleteExecute = client().execute(DeletePitAction.INSTANCE, deletePITRequest);
DeletePitResponse deletePITResponse = deleteExecute.get();
Expand All @@ -84,6 +87,7 @@ public void testDeletePit() throws Exception {
assertTrue(pitIds.contains(deletePitInfo.getPitId()));
assertTrue(deletePitInfo.isSuccessful());
}
validatePitStats("index", 0, 10);
/**
* Checking deleting the same PIT id again results in succeeded
*/
Expand All @@ -102,6 +106,7 @@ public void testDeletePitWithValidAndDeletedIds() throws Exception {
CreatePitResponse pitResponse = execute.get();
List<String> pitIds = new ArrayList<>();
pitIds.add(pitResponse.getId());
validatePitStats("index", 5, 0);

/**
* Delete Pit #1
Expand All @@ -113,9 +118,11 @@ public void testDeletePitWithValidAndDeletedIds() throws Exception {
assertTrue(pitIds.contains(deletePitInfo.getPitId()));
assertTrue(deletePitInfo.isSuccessful());
}
validatePitStats("index", 0, 5);
execute = client().execute(CreatePitAction.INSTANCE, request);
pitResponse = execute.get();
pitIds.add(pitResponse.getId());
validatePitStats("index", 5, 5);
/**
* Delete PIT with both Ids #1 (which is deleted) and #2 (which is present)
*/
Expand All @@ -126,6 +133,7 @@ public void testDeletePitWithValidAndDeletedIds() throws Exception {
assertTrue(pitIds.contains(deletePitInfo.getPitId()));
assertTrue(deletePitInfo.isSuccessful());
}
validatePitStats("index", 0, 10);
}

public void testDeletePitWithValidAndInvalidIds() throws Exception {
Expand All @@ -148,6 +156,8 @@ public void testDeleteAllPits() throws Exception {
client().prepareIndex("index1").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).execute().get();
ensureGreen();
createPitOnIndex("index1");
validatePitStats("index", 5, 0);
validatePitStats("index1", 5, 0);
DeletePitRequest deletePITRequest = new DeletePitRequest("_all");

/**
Expand All @@ -160,6 +170,8 @@ public void testDeleteAllPits() throws Exception {
assertThat(deletePitInfo.getPitId(), not(blankOrNullString()));
assertTrue(deletePitInfo.isSuccessful());
}
validatePitStats("index", 0, 5);
validatePitStats("index1", 0, 5);
client().admin().indices().prepareDelete("index1").get();
}

Expand Down Expand Up @@ -324,4 +336,16 @@ public void onFailure(Exception e) {}
}
}

public void validatePitStats(String index, long expectedPitCurrent, long expectedPitCount) throws ExecutionException,
InterruptedException {
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.indices(index);
indicesStatsRequest.all();
IndicesStatsResponse indicesStatsResponse = client().admin().indices().stats(indicesStatsRequest).get();
long pitCurrent = indicesStatsResponse.getIndex(index).getTotal().search.getTotal().getPitCurrent();
long pitCount = indicesStatsResponse.getIndex(index).getTotal().search.getTotal().getPitCount();
assertEquals(expectedPitCurrent, pitCurrent);
assertEquals(expectedPitCount, pitCount);
}

}
Loading

0 comments on commit 6993ac9

Please sign in to comment.