Skip to content

Commit

Permalink
Require combine and reduce scripts in scripted metrics aggregation (#…
Browse files Browse the repository at this point in the history
…33452)

* Make text message not required in constructor for slack

* Remove unnecessary comments in test file

* Throw exception when reduce or combine is not provided; update tests

* Update integration tests for scripted metrics to always include reduce and combine

* Remove some old changes from previous branches

* Rearrange script presence checks to be earlier in build

* Change null check order in script builder for aggregated metrics; correct test scripts in IT

* Add breaking change details to PR
  • Loading branch information
albendz authored and kcm committed Oct 30, 2018
1 parent 53bd71c commit 31d319b
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 36 deletions.
6 changes: 6 additions & 0 deletions docs/reference/migration/migrate_7_0/aggregations.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,9 @@ has been removed. `missing_bucket` should be used instead.
The object used to share aggregation state between the scripts in a Scripted Metric
Aggregation is now a variable called `state` available in the script context, rather than
being provided via the `params` object as `params._agg`.

[float]
==== Make metric aggregation script parameters `reduce_script` and `combine_script` mandatory

The metric aggregation has been changed to require these two script parameters to ensure users are
explicitly defining how their data is processed.
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,14 @@ public Map<String, Object> params() {
protected ScriptedMetricAggregatorFactory doBuild(SearchContext context, AggregatorFactory<?> parent,
Builder subfactoriesBuilder) throws IOException {

if (combineScript == null) {
throw new IllegalArgumentException("[combineScript] must not be null: [" + name + "]");
}

if(reduceScript == null) {
throw new IllegalArgumentException("[reduceScript] must not be null: [" + name + "]");
}

QueryShardContext queryShardContext = context.getQueryShardContext();

// Extract params from scripts and pass them along to ScriptedMetricAggregatorFactory, since it won't have
Expand All @@ -215,16 +223,14 @@ protected ScriptedMetricAggregatorFactory doBuild(SearchContext context, Aggrega
ScriptedMetricAggContexts.MapScript.CONTEXT);
Map<String, Object> mapScriptParams = mapScript.getParams();


ScriptedMetricAggContexts.CombineScript.Factory compiledCombineScript;
Map<String, Object> combineScriptParams;
if (combineScript != null) {
compiledCombineScript = queryShardContext.getScriptService().compile(combineScript,
ScriptedMetricAggContexts.CombineScript.CONTEXT);
combineScriptParams = combineScript.getParams();
} else {
compiledCombineScript = (p, a) -> null;
combineScriptParams = Collections.emptyMap();
}

compiledCombineScript = queryShardContext.getScriptService().compile(combineScript,
ScriptedMetricAggContexts.CombineScript.CONTEXT);
combineScriptParams = combineScript.getParams();

return new ScriptedMetricAggregatorFactory(name, compiledMapScript, mapScriptParams, compiledInitScript,
initScriptParams, compiledCombineScript, combineScriptParams, reduceScript,
params, queryShardContext.lookup(), context, parent, subfactoriesBuilder, metaData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,17 @@ public class ScriptedMetricAggregatorTests extends AggregatorTestCase {
private static final Script MAP_SCRIPT = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "mapScript", Collections.emptyMap());
private static final Script COMBINE_SCRIPT = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "combineScript",
Collections.emptyMap());
private static final Script REDUCE_SCRIPT = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "reduceScript",
Collections.emptyMap());

private static final Script INIT_SCRIPT_SCORE = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "initScriptScore",
Collections.emptyMap());
private static final Script MAP_SCRIPT_SCORE = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "mapScriptScore",
Collections.emptyMap());
private static final Script COMBINE_SCRIPT_SCORE = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "combineScriptScore",
Collections.emptyMap());
private static final Script COMBINE_SCRIPT_NOOP = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "combineScriptNoop",
Collections.emptyMap());

private static final Script INIT_SCRIPT_PARAMS = new Script(ScriptType.INLINE, MockScriptEngine.NAME, "initScriptParams",
Collections.singletonMap("initialValue", 24));
Expand Down Expand Up @@ -96,6 +100,14 @@ public static void initMockScripts() {
Map<String, Object> state = (Map<String, Object>) params.get("state");
return ((List<Integer>) state.get("collector")).stream().mapToInt(Integer::intValue).sum();
});
SCRIPTS.put("combineScriptNoop", params -> {
Map<String, Object> state = (Map<String, Object>) params.get("state");
return state;
});
SCRIPTS.put("reduceScript", params -> {
Map<String, Object> state = (Map<String, Object>) params.get("state");
return state;
});

SCRIPTS.put("initScriptScore", params -> {
Map<String, Object> state = (Map<String, Object>) params.get("state");
Expand Down Expand Up @@ -160,7 +172,7 @@ public void testNoDocs() throws IOException {
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.mapScript(MAP_SCRIPT); // map script is mandatory, even if its not used in this case
aggregationBuilder.mapScript(MAP_SCRIPT).combineScript(COMBINE_SCRIPT_NOOP).reduceScript(REDUCE_SCRIPT);
ScriptedMetric scriptedMetric = search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder);
assertEquals(AGG_NAME, scriptedMetric.getName());
assertNotNull(scriptedMetric.aggregation());
Expand All @@ -169,9 +181,6 @@ public void testNoDocs() throws IOException {
}
}

/**
* without combine script, the "states" map should contain a list of the size of the number of documents matched
*/
public void testScriptedMetricWithoutCombine() throws IOException {
try (Directory directory = newDirectory()) {
int numDocs = randomInt(100);
Expand All @@ -182,15 +191,28 @@ public void testScriptedMetricWithoutCombine() throws IOException {
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT);
ScriptedMetric scriptedMetric = search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder);
assertEquals(AGG_NAME, scriptedMetric.getName());
assertNotNull(scriptedMetric.aggregation());
@SuppressWarnings("unchecked")
Map<String, Object> agg = (Map<String, Object>) scriptedMetric.aggregation();
@SuppressWarnings("unchecked")
List<Integer> list = (List<Integer>) agg.get("collector");
assertEquals(numDocs, list.size());
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT).reduceScript(REDUCE_SCRIPT);
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
() -> search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder));
assertEquals(exception.getMessage(), "[combineScript] must not be null: [scriptedMetric]");
}
}
}

public void testScriptedMetricWithoutReduce() throws IOException {
try (Directory directory = newDirectory()) {
int numDocs = randomInt(100);
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
for (int i = 0; i < numDocs; i++) {
indexWriter.addDocument(singleton(new SortedNumericDocValuesField("number", i)));
}
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT).combineScript(COMBINE_SCRIPT);
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
() -> search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder));
assertEquals(exception.getMessage(), "[reduceScript] must not be null: [scriptedMetric]");
}
}
}
Expand All @@ -208,7 +230,8 @@ public void testScriptedMetricWithCombine() throws IOException {
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT).combineScript(COMBINE_SCRIPT);
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT)
.combineScript(COMBINE_SCRIPT).reduceScript(REDUCE_SCRIPT);
ScriptedMetric scriptedMetric = search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder);
assertEquals(AGG_NAME, scriptedMetric.getName());
assertNotNull(scriptedMetric.aggregation());
Expand All @@ -230,7 +253,8 @@ public void testScriptedMetricWithCombineAccessesScores() throws IOException {
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.initScript(INIT_SCRIPT_SCORE).mapScript(MAP_SCRIPT_SCORE).combineScript(COMBINE_SCRIPT_SCORE);
aggregationBuilder.initScript(INIT_SCRIPT_SCORE).mapScript(MAP_SCRIPT_SCORE)
.combineScript(COMBINE_SCRIPT_SCORE).reduceScript(REDUCE_SCRIPT);
ScriptedMetric scriptedMetric = search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder);
assertEquals(AGG_NAME, scriptedMetric.getName());
assertNotNull(scriptedMetric.aggregation());
Expand All @@ -250,7 +274,8 @@ public void testScriptParamsPassedThrough() throws IOException {

try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.initScript(INIT_SCRIPT_PARAMS).mapScript(MAP_SCRIPT_PARAMS).combineScript(COMBINE_SCRIPT_PARAMS);
aggregationBuilder.initScript(INIT_SCRIPT_PARAMS).mapScript(MAP_SCRIPT_PARAMS)
.combineScript(COMBINE_SCRIPT_PARAMS).reduceScript(REDUCE_SCRIPT);
ScriptedMetric scriptedMetric = search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder);

// The result value depends on the script params.
Expand All @@ -270,8 +295,8 @@ public void testConflictingAggAndScriptParams() throws IOException {
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
Map<String, Object> aggParams = Collections.singletonMap(CONFLICTING_PARAM_NAME, "blah");
aggregationBuilder.params(aggParams).initScript(INIT_SCRIPT_PARAMS).mapScript(MAP_SCRIPT_PARAMS).
combineScript(COMBINE_SCRIPT_PARAMS);
aggregationBuilder.params(aggParams).initScript(INIT_SCRIPT_PARAMS).mapScript(MAP_SCRIPT_PARAMS)
.combineScript(COMBINE_SCRIPT_PARAMS).reduceScript(REDUCE_SCRIPT);

IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () ->
search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder)
Expand All @@ -289,7 +314,8 @@ public void testSelfReferencingAggStateAfterInit() throws IOException {
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.initScript(INIT_SCRIPT_SELF_REF).mapScript(MAP_SCRIPT);
aggregationBuilder.initScript(INIT_SCRIPT_SELF_REF).mapScript(MAP_SCRIPT)
.combineScript(COMBINE_SCRIPT_PARAMS).reduceScript(REDUCE_SCRIPT);

IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () ->
search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder)
Expand All @@ -309,7 +335,8 @@ public void testSelfReferencingAggStateAfterMap() throws IOException {
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT_SELF_REF);
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT_SELF_REF)
.combineScript(COMBINE_SCRIPT_PARAMS).reduceScript(REDUCE_SCRIPT);

IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () ->
search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder)
Expand All @@ -326,7 +353,8 @@ public void testSelfReferencingAggStateAfterCombine() throws IOException {
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
ScriptedMetricAggregationBuilder aggregationBuilder = new ScriptedMetricAggregationBuilder(AGG_NAME);
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT).combineScript(COMBINE_SCRIPT_SELF_REF);
aggregationBuilder.initScript(INIT_SCRIPT).mapScript(MAP_SCRIPT)
.combineScript(COMBINE_SCRIPT_SELF_REF).reduceScript(REDUCE_SCRIPT);

IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () ->
search(newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder)
Expand Down
Loading

0 comments on commit 31d319b

Please sign in to comment.