Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Rollup] Only allow aggregating on multiples of configured interval #32052

Merged
merged 13 commits into from
Aug 29, 2018
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions x-pack/docs/en/rollup/overview.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ So while the cost of storing a millisecond of sensor data from ten years ago is
reading often diminishes with time. It's not useless -- it could easily contribute to a useful analysis -- but it's reduced
value often leads to deletion rather than paying the fixed storage cost.

[float]
=== Rollup store historical data at reduced granularity

That's where Rollup comes into play. The Rollup functionality summarizes old, high-granularity data into a reduced
Expand All @@ -35,6 +36,7 @@ automates this process of summarizing historical data.

Details about setting up and configuring Rollup are covered in <<rollup-put-job,Create Job API>>

[float]
=== Rollup uses standard query DSL

The Rollup feature exposes a new search endpoint (`/_rollup_search` vs the standard `/_search`) which knows how to search
Expand All @@ -48,6 +50,7 @@ are covered more in <<rollup-search-limitations, Rollup Search limitations>>.
But if your queries, aggregations and dashboards only use the available functionality, redirecting them to historical
data is trivial.

[float]
=== Rollup merges "live" and "rolled" data

A useful feature of Rollup is the ability to query both "live", realtime data in addition to historical "rolled" data
Expand All @@ -61,6 +64,7 @@ would only see data older than a month. The RollupSearch endpoint, however, sup
It will take the results from both data sources and merge them together. If there is overlap between the "live" and
"rolled" data, live data is preferred to increase accuracy.

[float]
=== Rollup is multi-interval aware

Finally, Rollup is capable of intelligently utilizing the best interval available. If you've worked with summarizing
Expand Down
149 changes: 90 additions & 59 deletions x-pack/docs/en/rollup/rollup-getting-started.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ GET /sensor_rollup/_rollup_search
"timeline": {
"date_histogram": {
"field": "timestamp",
"interval": "7d"
"interval": "1w"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why changing to a calendar interval here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I was changing units to fixed and just got carried away without thinking what I was doing. Thanks for spotting

},
"aggs": {
"nodes": {
Expand Down Expand Up @@ -223,70 +223,101 @@ Which returns a corresponding response:
[source,js]
----
{
"took" : 93,
"timed_out" : false,
"terminated_early" : false,
"_shards" : ... ,
"hits" : {
"total" : 0,
"max_score" : 0.0,
"hits" : [ ]
},
"aggregations" : {
"timeline" : {
"meta" : { },
"buckets" : [
{
"key_as_string" : "2018-01-18T00:00:00.000Z",
"key" : 1516233600000,
"doc_count" : 6,
"nodes" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "a",
"doc_count" : 2,
"max_temperature" : {
"value" : 202.0
},
"avg_voltage" : {
"value" : 5.1499998569488525
}
},
{
"key" : "b",
"doc_count" : 2,
"max_temperature" : {
"value" : 201.0
},
"avg_voltage" : {
"value" : 5.700000047683716
}
},
{
"key" : "c",
"doc_count" : 2,
"max_temperature" : {
"value" : 202.0
},
"avg_voltage" : {
"value" : 4.099999904632568
}
}
]
}
}
]
}
}
"took" : 93,
"timed_out" : false,
"terminated_early" : false,
"_shards" : ... ,
"hits" : {
"total" : 0,
"max_score" : 0.0,
"hits" : [ ]
},
"aggregations" : {
"timeline" : {
"meta" : { },
"buckets" : [
{
"key_as_string" : "2018-01-15T00:00:00.000Z",
"key" : 1515974400000,
"doc_count" : 4,
"nodes" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "c",
"doc_count" : 2,
"max_temperature" : {
"value" : 202.0
},
"avg_voltage" : {
"value" : 4.099999904632568
}
},
{
"key" : "a",
"doc_count" : 1,
"max_temperature" : {
"value" : 202.0
},
"avg_voltage" : {
"value" : 5.099999904632568
}
},
{
"key" : "b",
"doc_count" : 1,
"max_temperature" : {
"value" : 198.0
},
"avg_voltage" : {
"value" : 5.599999904632568
}
}
]
}
},
{
"key_as_string" : "2018-01-22T00:00:00.000Z",
"key" : 1516579200000,
"doc_count" : 2,
"nodes" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "a",
"doc_count" : 1,
"max_temperature" : {
"value" : 200.0
},
"avg_voltage" : {
"value" : 5.199999809265137
}
},
{
"key" : "b",
"doc_count" : 1,
"max_temperature" : {
"value" : 201.0
},
"avg_voltage" : {
"value" : 5.800000190734863
}
}
]
}
}
]
}
}
}
----
// TESTRESPONSE[s/"took" : 93/"took" : $body.$_path/]
// TESTRESPONSE[s/"_shards" : \.\.\. /"_shards" : $body.$_path/]

In addition to being more complicated (date histogram and a terms aggregation, plus an additional average metric), you'll notice
the date_histogram uses a `7d` interval instead of `1h`.
the date_histogram uses a `1w` interval instead of `1h`.

[float]
=== Conclusion
Expand Down
21 changes: 18 additions & 3 deletions x-pack/docs/en/rollup/rollup-search-limitations.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,24 @@ The response will tell you that the field and aggregation were not possible, bec
[float]
=== Interval Granularity

Rollups are stored at a certain granularity, as defined by the `date_histogram` group in the configuration. If data is rolled up at hourly
intervals, the <<rollup-search>> API can aggregate on any time interval hourly or greater. Intervals that are less than an hour will throw
an exception, since the data simply doesn't exist for finer granularities.
Rollups are stored at a certain granularity, as defined by the `date_histogram` group in the configuration. This means you
can only search/aggregate the rollup data with an interval that is greater-than or equal to the configured rollup interval.

For example, if data is rolled up at hourlyintervals, the <<rollup-search>> API can aggregate on any time interval
hourly or greater. Intervals that are less than an hour will throwan exception, since the data simply doesn't
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: throwan => "throw an"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: hourlyintervals => hourly intervals

exist for finer granularities.

.Requests must be multiples of the config
**********************************
Perhaps not immediately apparent, but the interval specified in an aggregation request must be a whole
multiple of the configured interval. If the job was configured to rollup on `3d` intervals, you can only
query and aggregate on multiples of three (`3d`, `6d`, `9d`, etc).

A non-multiple wouldn't work, since the rolled up data wouldn't cleanly "overlap" with the buckets generated
by the aggregation, leading to incorrect results.

For that reason, an error is thrown if a whole multiple of the configured interval isn't found.
jimczi marked this conversation as resolved.
Show resolved Hide resolved
**********************************

Because the RollupSearch endpoint can "upsample" intervals, there is no need to configure jobs with multiple intervals (hourly, daily, etc).
It's recommended to just configure a single job with the smallest granularity that is needed, and allow the search endpoint to upsample
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
Expand All @@ -17,7 +18,9 @@
import org.joda.time.DateTimeZone;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -32,6 +35,29 @@ public class RollupJobIdentifierUtils {

private static final Comparator<RollupJobCaps> COMPARATOR = RollupJobIdentifierUtils.getComparator();

public static final Map<String, Integer> CALENDAR_ORDERING;

static {
Map<String, Integer> dateFieldUnits = new HashMap<>(16);
dateFieldUnits.put("year", 8);
dateFieldUnits.put("1y", 8);
dateFieldUnits.put("quarter", 7);
dateFieldUnits.put("1q", 7);
dateFieldUnits.put("month", 6);
dateFieldUnits.put("1M", 6);
dateFieldUnits.put("week", 5);
dateFieldUnits.put("1w", 5);
dateFieldUnits.put("day", 4);
dateFieldUnits.put("1d", 4);
dateFieldUnits.put("hour", 3);
dateFieldUnits.put("1h", 3);
dateFieldUnits.put("minute", 2);
dateFieldUnits.put("1m", 2);
dateFieldUnits.put("second", 1);
dateFieldUnits.put("1s", 1);
CALENDAR_ORDERING = Collections.unmodifiableMap(dateFieldUnits);
}

/**
* Given the aggregation tree and a list of available job capabilities, this method will return a set
* of the "best" jobs that should be searched.
Expand Down Expand Up @@ -93,7 +119,8 @@ private static void checkDateHisto(DateHistogramAggregationBuilder source, List<
if (fieldCaps != null) {
for (Map<String, Object> agg : fieldCaps.getAggs()) {
if (agg.get(RollupField.AGG).equals(DateHistogramAggregationBuilder.NAME)) {
TimeValue interval = TimeValue.parseTimeValue((String)agg.get(RollupField.INTERVAL), "date_histogram.interval");
DateHistogramInterval interval = new DateHistogramInterval((String)agg.get(RollupField.INTERVAL));

String thisTimezone = (String)agg.get(DateHistoGroupConfig.TIME_ZONE.getPreferredName());
String sourceTimeZone = source.timeZone() == null ? DateTimeZone.UTC.toString() : source.timeZone().toString();

Expand All @@ -102,17 +129,20 @@ private static void checkDateHisto(DateHistogramAggregationBuilder source, List<
continue;
}
if (source.dateHistogramInterval() != null) {
TimeValue sourceInterval = TimeValue.parseTimeValue(source.dateHistogramInterval().toString(),
"source.date_histogram.interval");
//TODO should be divisor of interval
if (interval.compareTo(sourceInterval) <= 0) {
// Check if both are calendar and validate if they are.
// If not, check if both are fixed and validate
if (validateCalendarInterval(source.dateHistogramInterval(), interval)) {
localCaps.add(cap);
} else if (validateFixedInterval(source.dateHistogramInterval(), interval)) {
localCaps.add(cap);
}
} else {
if (interval.getMillis() <= source.interval()) {
// check if config is fixed and validate if it is
if (validateFixedInterval(source.interval(), interval)) {
localCaps.add(cap);
}
}
// not a candidate if we get here
break;
}
}
Expand All @@ -133,6 +163,55 @@ private static void checkDateHisto(DateHistogramAggregationBuilder source, List<
}
}

private static boolean isCalendarInterval(DateHistogramInterval interval) {
return DateHistogramAggregationBuilder.DATE_FIELD_UNITS.containsKey(interval.toString());
jimczi marked this conversation as resolved.
Show resolved Hide resolved
}

static boolean validateCalendarInterval(DateHistogramInterval requestInterval,
DateHistogramInterval configInterval) {
// Both must be calendar intervals
if (isCalendarInterval(requestInterval) == false || isCalendarInterval(configInterval) == false) {
return false;
}

// The request must be gte the config. The CALENDAR_ORDERING map values are integers representing
// relative orders between the calendar units
int requestOrder = CALENDAR_ORDERING.getOrDefault(requestInterval.toString(), Integer.MAX_VALUE);
int configOrder = CALENDAR_ORDERING.getOrDefault(configInterval.toString(), Integer.MAX_VALUE);

// All calendar units are multiples naturally, so we just care about gte
return requestOrder >= configOrder;
}

static boolean validateFixedInterval(DateHistogramInterval requestInterval,
DateHistogramInterval configInterval) {
// Neither can be calendar intervals
if (isCalendarInterval(requestInterval) || isCalendarInterval(configInterval)) {
return false;
}

// Both are fixed, good to conver to millis now
long configIntervalMillis = TimeValue.parseTimeValue(configInterval.toString(),
"date_histo.config.interval").getMillis();
long requestIntervalMillis = TimeValue.parseTimeValue(requestInterval.toString(),
"date_histo.request.interval").getMillis();

// Must be a multiple and gte the config
return requestIntervalMillis >= configIntervalMillis && requestIntervalMillis % configIntervalMillis == 0;
}

static boolean validateFixedInterval(long requestInterval, DateHistogramInterval configInterval) {
// config must not be a calendar interval
if (isCalendarInterval(configInterval)) {
return false;
}
long configIntervalMillis = TimeValue.parseTimeValue(configInterval.toString(),
"date_histo.config.interval").getMillis();

// Must be a multiple and gte the config
return requestInterval >= configIntervalMillis && requestInterval % configIntervalMillis == 0;
}

/**
* Find the set of histo's with the largest interval
*/
Expand All @@ -144,8 +223,8 @@ private static void checkHisto(HistogramAggregationBuilder source, List<RollupJo
for (Map<String, Object> agg : fieldCaps.getAggs()) {
if (agg.get(RollupField.AGG).equals(HistogramAggregationBuilder.NAME)) {
Long interval = (long)agg.get(RollupField.INTERVAL);
// TODO should be divisor of interval
if (interval <= source.interval()) {
// query interval must be gte the configured interval, and a whole multiple
if (interval <= source.interval() && source.interval() % interval == 0) {
localCaps.add(cap);
}
break;
Expand All @@ -155,8 +234,8 @@ private static void checkHisto(HistogramAggregationBuilder source, List<RollupJo
}

if (localCaps.isEmpty()) {
throw new IllegalArgumentException("There is not a rollup job that has a [" + source.getWriteableName() + "] agg on field [" +
source.field() + "] which also satisfies all requirements of query.");
throw new IllegalArgumentException("There is not a rollup job that has a [" + source.getWriteableName()
+ "] agg on field [" + source.field() + "] which also satisfies all requirements of query.");
}

// We are a leaf, save our best caps
Expand Down
Loading