Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a MovingFunction pipeline aggregation, deprecate MovingAvg agg #29594

Merged
merged 20 commits into from
May 16, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,18 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.painless.spi.PainlessExtension;
import org.elasticsearch.painless.spi.Whitelist;
import org.elasticsearch.painless.spi.WhitelistLoader;
import org.elasticsearch.plugins.ExtensiblePlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.ScriptPlugin;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptEngine;
import org.elasticsearch.search.aggregations.pipeline.movfn.MovFnScript;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -43,18 +46,34 @@
*/
public final class PainlessPlugin extends Plugin implements ScriptPlugin, ExtensiblePlugin {

private final Map<ScriptContext<?>, List<Whitelist>> extendedWhitelists = new HashMap<>();
private static final Map<ScriptContext<?>, List<Whitelist>> whitelists;

/*
* Contexts from Core that need custom whitelists can add them to the map below.
* Whitelist resources should be added as appropriately named, separate files
* under Painless' resources
*/
static {
Map<ScriptContext<?>, List<Whitelist>> map = new HashMap<>();

// Moving Function Pipeline Agg
List<Whitelist> movFn = new ArrayList<>(Whitelist.BASE_WHITELISTS);
movFn.add(WhitelistLoader.loadFromResourceFiles(Whitelist.class, "org.elasticsearch.aggs.movfn.txt"));
map.put(MovFnScript.CONTEXT, movFn);

whitelists = map;
}

@Override
public ScriptEngine getScriptEngine(Settings settings, Collection<ScriptContext<?>> contexts) {
Map<ScriptContext<?>, List<Whitelist>> contextsWithWhitelists = new HashMap<>();
for (ScriptContext<?> context : contexts) {
// we might have a context that only uses the base whitelists, so would not have been filled in by reloadSPI
List<Whitelist> whitelists = extendedWhitelists.get(context);
if (whitelists == null) {
whitelists = new ArrayList<>(Whitelist.BASE_WHITELISTS);
List<Whitelist> contextWhitelists = whitelists.get(context);
if (contextWhitelists == null) {
contextWhitelists = new ArrayList<>(Whitelist.BASE_WHITELISTS);
}
contextsWithWhitelists.put(context, whitelists);
contextsWithWhitelists.put(context, contextWhitelists);
}
return new PainlessScriptEngine(settings, contextsWithWhitelists);
}
Expand All @@ -68,7 +87,7 @@ public List<Setting<?>> getSettings() {
public void reloadSPI(ClassLoader loader) {
for (PainlessExtension extension : ServiceLoader.load(PainlessExtension.class, loader)) {
for (Map.Entry<ScriptContext<?>, List<Whitelist>> entry : extension.getContextWhitelists().entrySet()) {
List<Whitelist> existing = extendedWhitelists.computeIfAbsent(entry.getKey(),
List<Whitelist> existing = whitelists.computeIfAbsent(entry.getKey(),
c -> new ArrayList<>(Whitelist.BASE_WHITELISTS));
existing.addAll(entry.getValue());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#
# Licensed to Elasticsearch under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

# This file contains a whitelist for the Moving Function pipeline aggregator in core

class org.elasticsearch.search.aggregations.pipeline.movfn.MovingFunctions {
double windowMax(Collection)
double windowMin(Collection)
double windowSum(Collection)
double simpleMovAvg(Collection)
double linearMovAvg(Collection)
double ewmaMovAvg(Collection, double)
double holtMovAvg(Collection, double, double)
double holtWintersMovAvg(Collection, double, double, double, int, boolean)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Sanity integration test to make sure the custom context and whitelist work for moving_fn pipeline agg
#
setup:
- do:
indices.create:
index: test
body:
mappings:
_doc:
properties:
value_field:
type: integer
date:
type: date

- do:
bulk:
refresh: true
body:
- index:
_index: test
_type: _doc
_id: 1
- date: "2017-01-01T00:00:00"
value_field: 1
- index:
_index: test
_type: _doc
_id: 2
- date: "2017-01-02T00:00:00"
value_field: 2
- index:
_index: test
_type: _doc
_id: 3
- date: "2017-01-03T00:00:00"
value_field: 3
- index:
_index: test
_type: _doc
_id: 4
- date: "2017-01-04T00:00:00"
value_field: 4
- index:
_index: test
_type: _doc
_id: 5
- date: "2017-01-05T00:00:00"
value_field: 5
- index:
_index: test
_type: _doc
_id: 6
- date: "2017-01-06T00:00:00"
value_field: 6

- do:
indices.refresh:
index: [test]

---
"Basic test":

- do:
search:
body:
size: 0
aggs:
the_histo:
date_histogram:
field: "date"
interval: "1d"
aggs:
the_avg:
avg:
field: "value_field"
the_mov_fn:
moving_fn:
buckets_path: "the_avg"
window: 3
script: "MovingFunctions.windowMax(values)"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should test that the other functions are exposed correctly too?


- match: { hits.total: 6 }
- length: { hits.hits: 0 }
- is_false: aggregations.the_histo.buckets.0.the_mov_fn.value
- match: { aggregations.the_histo.buckets.1.the_mov_fn.value: 1.0 }
- match: { aggregations.the_histo.buckets.2.the_mov_fn.value: 2.0 }
- match: { aggregations.the_histo.buckets.3.the_mov_fn.value: 3.0 }
- match: { aggregations.the_histo.buckets.4.the_mov_fn.value: 4.0 }
- match: { aggregations.the_histo.buckets.5.the_mov_fn.value: 5.0 }




Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
---
"Bad window":

- do:
catch: /\[window\] must be a positive, non-zero integer\./
search:
body:
size: 0
aggs:
the_histo:
date_histogram:
field: "date"
interval: "1d"
aggs:
the_avg:
avg:
field: "value_field"
the_mov_fn:
moving_fn:
buckets_path: "the_avg"
window: -1
script: "MovingFunctions.windowMax(values)"

---
"Not under date_histo":

- do:
catch: /\[window\] must be a positive, non-zero integer\./
search:
body:
size: 0
aggs:
the_avg:
avg:
field: "value_field"
the_mov_fn:
moving_fn:
buckets_path: "the_avg"
window: -1
script: "MovingFunctions.windowMax(values)"

Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.ScriptPlugin;
import org.elasticsearch.search.aggregations.pipeline.movfn.MovFnScript;

/**
* Manages building {@link ScriptService}.
Expand All @@ -48,7 +49,8 @@ public class ScriptModule {
FilterScript.CONTEXT,
SimilarityScript.CONTEXT,
SimilarityWeightScript.CONTEXT,
TemplateScript.CONTEXT
TemplateScript.CONTEXT,
MovFnScript.CONTEXT
).collect(Collectors.toMap(c -> c.name, Function.identity()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@
import org.elasticsearch.search.aggregations.pipeline.movavg.models.LinearModel;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModel;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.SimpleModel;
import org.elasticsearch.search.aggregations.pipeline.movfn.MovFnPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.movfn.MovFnPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregator;
import org.elasticsearch.search.fetch.FetchPhase;
Expand Down Expand Up @@ -514,6 +516,11 @@ private void registerPipelineAggregations(List<SearchPlugin> plugins) {
SerialDiffPipelineAggregationBuilder::new,
SerialDiffPipelineAggregator::new,
SerialDiffPipelineAggregationBuilder::parse));
registerPipelineAggregation(new PipelineAggregationSpec(
MovFnPipelineAggregationBuilder.NAME,
MovFnPipelineAggregationBuilder::new,
MovFnPipelineAggregator::new,
MovFnPipelineAggregationBuilder::parse));

registerFromPlugin(plugins, SearchPlugin::getPipelineAggregations, this::registerPipelineAggregation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public enum GapPolicy {
* GapPolicy in string format (e.g. "ignore")
* @return GapPolicy enum
*/
public static GapPolicy parse(String text, XContentLocation tokenLocation) {
public static GapPolicy parse(String text) {
GapPolicy result = null;
for (GapPolicy policy : values()) {
if (policy.parseField.match(text, LoggingDeprecationHandler.INSTANCE)) {
Expand All @@ -79,7 +79,7 @@ public static GapPolicy parse(String text, XContentLocation tokenLocation) {
for (GapPolicy policy : values()) {
validNames.add(policy.getName());
}
throw new ParsingException(tokenLocation, "Invalid gap policy: [" + text + "], accepted values: " + validNames);
throw new IllegalArgumentException("Invalid gap policy: [" + text + "], accepted values: " + validNames);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we removing the token location here, it seems like useful information to return to the user?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it was incompatible with the new parser style (I didnt think you could get token info with the new way of writing parsers), but while writing this comment I noticed how BucketSort did it:

         PARSER.declareField(BucketSortPipelineAggregationBuilder::gapPolicy, p -> {
             if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
                return GapPolicy.parse(p.text().toLowerCase(Locale.ROOT), p.getTokenLocation());
             }
             throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
         }

which I somehow missed before. I'll put it back to the way it was and using the above snippet :)

}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.movfn.MovFnPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregationBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;

Expand Down Expand Up @@ -114,4 +115,9 @@ public static CumulativeSumPipelineAggregationBuilder cumulativeSum(String name,
public static SerialDiffPipelineAggregationBuilder diff(String name, String bucketsPath) {
return new SerialDiffPipelineAggregationBuilder(name, bucketsPath);
}

public static MovFnPipelineAggregationBuilder movingFunction(String name, Script script,
String bucketsPaths, int window) {
return new MovFnPipelineAggregationBuilder(name, bucketsPaths, script, window);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public final BucketMetricsPipelineAggregationBuilder<?> parse(String pipelineAgg
} else if (BUCKETS_PATH.match(currentFieldName, parser.getDeprecationHandler())) {
bucketsPaths = new String[] { parser.text() };
} else if (GAP_POLICY.match(currentFieldName, parser.getDeprecationHandler())) {
gapPolicy = GapPolicy.parse(parser.text(), parser.getTokenLocation());
gapPolicy = GapPolicy.parse(parser.text());
} else {
parseToken(pipelineAggregatorName, parser, currentFieldName, token, params);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public static BucketScriptPipelineAggregationBuilder parse(String reducerName, X
bucketsPathsMap = new HashMap<>();
bucketsPathsMap.put("_value", parser.text());
} else if (GAP_POLICY.match(currentFieldName, parser.getDeprecationHandler())) {
gapPolicy = GapPolicy.parse(parser.text(), parser.getTokenLocation());
gapPolicy = GapPolicy.parse(parser.text());
} else if (Script.SCRIPT_PARSE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
script = Script.parse(parser);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public static BucketSelectorPipelineAggregationBuilder parse(String reducerName,
bucketsPathsMap = new HashMap<>();
bucketsPathsMap.put("_value", parser.text());
} else if (GAP_POLICY.match(currentFieldName, parser.getDeprecationHandler())) {
gapPolicy = GapPolicy.parse(parser.text(), parser.getTokenLocation());
gapPolicy = GapPolicy.parse(parser.text());
} else if (Script.SCRIPT_PARSE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
script = Script.parse(parser);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public class BucketSortPipelineAggregationBuilder extends AbstractPipelineAggreg
PARSER.declareInt(BucketSortPipelineAggregationBuilder::size, SIZE);
PARSER.declareField(BucketSortPipelineAggregationBuilder::gapPolicy, p -> {
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return GapPolicy.parse(p.text().toLowerCase(Locale.ROOT), p.getTokenLocation());
return GapPolicy.parse(p.text().toLowerCase(Locale.ROOT));
}
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, GAP_POLICY, ObjectParser.ValueType.STRING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public static DerivativePipelineAggregationBuilder parse(String pipelineAggregat
} else if (BUCKETS_PATH_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
bucketsPaths = new String[] { parser.text() };
} else if (GAP_POLICY_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
gapPolicy = GapPolicy.parse(parser.text(), parser.getTokenLocation());
gapPolicy = GapPolicy.parse(parser.text());
} else if (UNIT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
units = parser.text();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ public static MovAvgPipelineAggregationBuilder parse(
} else if (BUCKETS_PATH.match(currentFieldName, parser.getDeprecationHandler())) {
bucketsPaths = new String[] { parser.text() };
} else if (GAP_POLICY.match(currentFieldName, parser.getDeprecationHandler())) {
gapPolicy = GapPolicy.parse(parser.text(), parser.getTokenLocation());
gapPolicy = GapPolicy.parse(parser.text());
} else if (MODEL.match(currentFieldName, parser.getDeprecationHandler())) {
model = parser.text();
} else {
Expand Down
Loading