Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Rollup] Return empty response when aggs are missing #32796

Merged
merged 4 commits into from
Aug 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions x-pack/docs/en/rest-api/rollup/rollup-search.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ GET /sensor_rollup/_rollup_search
--------------------------------------------------
// CONSOLE
// TEST[setup:sensor_prefab_data]
// TEST[s/_rollup_search/_rollup_search?filter_path=took,timed_out,terminated_early,_shards,hits,aggregations/]

The query is targeting the `sensor_rollup` data, since this contains the rollup data as configured in the job. A `max`
aggregation has been used on the `temperature` field, yielding the following response:
Expand Down Expand Up @@ -194,6 +195,7 @@ GET sensor-1,sensor_rollup/_rollup_search <1>
--------------------------------------------------
// CONSOLE
// TEST[continued]
// TEST[s/_rollup_search/_rollup_search?filter_path=took,timed_out,terminated_early,_shards,hits,aggregations/]
<1> Note the URI now searches `sensor-1` and `sensor_rollup` at the same time

When the search is executed, the Rollup Search endpoint will do two things:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,23 @@ private static SearchResponse doCombineResponse(SearchResponse liveResponse, Lis
? (InternalAggregations)liveResponse.getAggregations()
: InternalAggregations.EMPTY;

rolledResponses.forEach(r -> {
if (r == null || r.getAggregations() == null || r.getAggregations().asList().size() == 0) {
throw new RuntimeException("Expected to find aggregations in rollup response, but none found.");
int missingRollupAggs = rolledResponses.stream().mapToInt(searchResponse -> {
if (searchResponse == null
|| searchResponse.getAggregations() == null
|| searchResponse.getAggregations().asList().size() == 0) {
return 1;
}
});
return 0;
}).sum();

// We had no rollup aggs, so there is nothing to process
if (missingRollupAggs == rolledResponses.size()) {
// Return an empty response, but make sure we include all the shard, failure, etc stats
return mergeFinalResponse(liveResponse, rolledResponses, InternalAggregations.EMPTY);
} else if (missingRollupAggs > 0 && missingRollupAggs != rolledResponses.size()) {
// We were missing some but not all the aggs, unclear how to handle this. Bail.
throw new RuntimeException("Expected to find aggregations in rollup response, but none found.");
}

// The combination process returns a tree that is identical to the non-rolled
// which means we can use aggregation's reduce method to combine, just as if
Expand Down Expand Up @@ -275,27 +287,39 @@ private static SearchResponse doCombineResponse(SearchResponse liveResponse, Lis
new InternalAggregation.ReduceContext(reduceContext.bigArrays(), reduceContext.scriptService(), true));
}

// TODO allow profiling in the future
InternalSearchResponse combinedInternal = new InternalSearchResponse(SearchHits.empty(), currentTree, null, null,
rolledResponses.stream().anyMatch(SearchResponse::isTimedOut),
rolledResponses.stream().anyMatch(SearchResponse::isTimedOut),
rolledResponses.stream().mapToInt(SearchResponse::getNumReducePhases).sum());
return mergeFinalResponse(liveResponse, rolledResponses, currentTree);
}

private static SearchResponse mergeFinalResponse(SearchResponse liveResponse, List<SearchResponse> rolledResponses,
InternalAggregations aggs) {

int totalShards = rolledResponses.stream().mapToInt(SearchResponse::getTotalShards).sum();
int sucessfulShards = rolledResponses.stream().mapToInt(SearchResponse::getSuccessfulShards).sum();
int skippedShards = rolledResponses.stream().mapToInt(SearchResponse::getSkippedShards).sum();
long took = rolledResponses.stream().mapToLong(r -> r.getTook().getMillis()).sum() ;

boolean isTimedOut = rolledResponses.stream().anyMatch(SearchResponse::isTimedOut);
boolean isTerminatedEarly = rolledResponses.stream()
.filter(r -> r.isTerminatedEarly() != null)
.anyMatch(SearchResponse::isTerminatedEarly);
int numReducePhases = rolledResponses.stream().mapToInt(SearchResponse::getNumReducePhases).sum();

if (liveResponse != null) {
totalShards += liveResponse.getTotalShards();
sucessfulShards += liveResponse.getSuccessfulShards();
skippedShards += liveResponse.getSkippedShards();
took = Math.max(took, liveResponse.getTook().getMillis());
isTimedOut = isTimedOut && liveResponse.isTimedOut();
isTerminatedEarly = isTerminatedEarly && liveResponse.isTerminatedEarly();
numReducePhases += liveResponse.getNumReducePhases();
}

InternalSearchResponse combinedInternal = new InternalSearchResponse(SearchHits.empty(), aggs, null, null,
isTimedOut, isTerminatedEarly, numReducePhases);

// Shard failures are ignored atm, so returning an empty array is fine
return new SearchResponse(combinedInternal, null, totalShards, sucessfulShards, skippedShards,
took, ShardSearchFailure.EMPTY_ARRAY, rolledResponses.get(0).getClusters());
took, ShardSearchFailure.EMPTY_ARRAY, rolledResponses.get(0).getClusters());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,18 @@ static MultiSearchRequest createMSearchRequest(SearchRequest request, NamedWrite
rolledSearchSource.size(0);
AggregatorFactories.Builder sourceAgg = request.source().aggregations();

// If there are no aggs in the request, our translation won't create any msearch.
// So just add an dummy request to the msearch and return. This is a bit silly
// but maintains how the regular search API behaves
if (sourceAgg == null || sourceAgg.count() == 0) {

// Note: we can't apply any query rewriting or filtering on the query because there
// are no validated caps, so we have no idea what job is intended here. The only thing
// this affects is doc count, since hits and aggs will both be empty it doesn't really matter.
msearch.add(new SearchRequest(context.getRollupIndices(), request.source()).types(request.types()));
return msearch;
}

// Find our list of "best" job caps
Set<RollupJobCaps> validatedCaps = new HashSet<>();
sourceAgg.getAggregatorFactories()
Expand Down Expand Up @@ -248,11 +260,6 @@ static void validateSearchRequest(SearchRequest request) {
if (request.source().explain() != null && request.source().explain()) {
throw new IllegalArgumentException("Rollup search does not support explaining.");
}

// Rollup is only useful if aggregations are set, throw an exception otherwise
if (request.source().aggregations() == null) {
throw new IllegalArgumentException("Rollup requires at least one aggregation to be set.");
}
}

static QueryBuilder rewriteQuery(QueryBuilder builder, Set<RollupJobCaps> jobCaps) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,11 @@ public void testRolledMissingAggs() {
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
ScriptService scriptService = mock(ScriptService.class);

Exception e = expectThrows(RuntimeException.class,
() -> RollupResponseTranslator.combineResponses(msearch,
new InternalAggregation.ReduceContext(bigArrays, scriptService, true)));
assertThat(e.getMessage(), equalTo("Expected to find aggregations in rollup response, but none found."));
SearchResponse response = RollupResponseTranslator.combineResponses(msearch,
new InternalAggregation.ReduceContext(bigArrays, scriptService, true));
assertNotNull(response);
Aggregations responseAggs = response.getAggregations();
assertThat(responseAggs.asList().size(), equalTo(0));
}

public void testMissingRolledIndex() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,21 +307,22 @@ public void testExplain() {
assertThat(e.getMessage(), equalTo("Rollup search does not support explaining."));
}

public void testNoAgg() {
String[] normalIndices = new String[]{randomAlphaOfLength(10)};
public void testNoRollupAgg() {
String[] normalIndices = new String[]{};
String[] rollupIndices = new String[]{randomAlphaOfLength(10)};
TransportRollupSearchAction.RollupSearchContext ctx
= new TransportRollupSearchAction.RollupSearchContext(normalIndices, rollupIndices, Collections.emptySet());
SearchSourceBuilder source = new SearchSourceBuilder();
source.query(new MatchAllQueryBuilder());
source.size(0);
SearchRequest request = new SearchRequest(normalIndices, source);
SearchRequest request = new SearchRequest(rollupIndices, source);
NamedWriteableRegistry registry = mock(NamedWriteableRegistry.class);
Exception e = expectThrows(IllegalArgumentException.class,
() -> TransportRollupSearchAction.createMSearchRequest(request, registry, ctx));
assertThat(e.getMessage(), equalTo("Rollup requires at least one aggregation to be set."));
MultiSearchRequest msearch = TransportRollupSearchAction.createMSearchRequest(request, registry, ctx);
assertThat(msearch.requests().size(), equalTo(1));
assertThat(msearch.requests().get(0), equalTo(request));
}


public void testNoLiveNoRollup() {
String[] normalIndices = new String[0];
String[] rollupIndices = new String[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,20 @@ setup:
- match: { aggregations.histo.buckets.3.key_as_string: "2017-01-01T08:00:00.000Z" }
- match: { aggregations.histo.buckets.3.doc_count: 20 }

---
"Empty aggregation":

- do:
xpack.rollup.rollup_search:
index: "foo_rollup"
body:
size: 0
aggs: {}

- length: { hits.hits: 0 }
- match: { hits.total: 0 }
- is_false: aggregations


---
"Search with Metric":
Expand Down