Skip to content

Commit

Permalink
Fix rollup on date fields that don't support epoch_millis (#31890)
Browse files Browse the repository at this point in the history
The rollup indexer uses a range query to select the next page
of results based on the last time bucket of the previous round
and the `delay` configured on the rollup job. This query uses
the `epoch_millis` format implicitly but doesn't set the `format`.
This result in errors during the rollup job if the field
definition doesn't allow this format. It can also miss documents
if the format is not accepted but another format in the field
definition is able to parse the query (e.g.: `epoch_second`).
This change ensures that we use `epoch_millis` as the only format
to parse the rollup range query.
  • Loading branch information
jimczi authored Jul 19, 2018
1 parent 38e2e1d commit 644a92f
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,8 @@ private QueryBuilder createBoundaryQuery(Map<String, Object> position) {
assert lowerBound <= maxBoundary;
final RangeQueryBuilder query = new RangeQueryBuilder(fieldName)
.gte(lowerBound)
.lt(maxBoundary);
.lt(maxBoundary)
.format("epoch_millis");
return query;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchResponseSections;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.joda.DateMathParser;
import org.elasticsearch.common.joda.Joda;
import org.elasticsearch.common.rounding.Rounding;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
Expand Down Expand Up @@ -506,6 +508,7 @@ private void executeTestCase(List<Map<String, Object>> docs, RollupJobConfig con
private Map<String, MappedFieldType> createFieldTypes(RollupJobConfig job) {
Map<String, MappedFieldType> fieldTypes = new HashMap<>();
MappedFieldType fieldType = new DateFieldMapper.Builder(job.getGroupConfig().getDateHisto().getField())
.dateTimeFormatter(Joda.forPattern(randomFrom("basic_date", "date_optional_time", "epoch_second")))
.build(new Mapper.BuilderContext(settings.getSettings(), new ContentPath(0)))
.fieldType();
fieldTypes.put(fieldType.name(), fieldType);
Expand Down Expand Up @@ -618,7 +621,7 @@ protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse
RangeQueryBuilder range = (RangeQueryBuilder) request.source().query();
final DateTimeZone timeZone = range.timeZone() != null ? DateTimeZone.forID(range.timeZone()) : null;
Query query = timestampField.rangeQuery(range.from(), range.to(), range.includeLower(), range.includeUpper(),
null, timeZone, null, queryShardContext);
null, timeZone, new DateMathParser(Joda.forPattern(range.format())), queryShardContext);

// extract composite agg
assertThat(request.source().aggregations().getAggregatorFactories().size(), equalTo(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@
package org.elasticsearch.multi_node;

import org.apache.http.HttpStatus;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
Expand All @@ -33,8 +37,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.isOneOf;
Expand Down Expand Up @@ -73,6 +77,31 @@ public void clearRollupMetadata() throws Exception {

public void testBigRollup() throws Exception {
final int numDocs = 200;
String dateFormat = "strict_date_optional_time";

// create the test-index index
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
{
builder.startObject("mappings").startObject("_doc")
.startObject("properties")
.startObject("timestamp")
.field("type", "date")
.field("format", dateFormat)
.endObject()
.startObject("value")
.field("type", "integer")
.endObject()
.endObject()
.endObject().endObject();
}
builder.endObject();
final StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON);
Request req = new Request("PUT", "rollup-docs");
req.setEntity(entity);
client().performRequest(req);
}


// index documents for the rollup job
final StringBuilder bulk = new StringBuilder();
Expand All @@ -88,13 +117,15 @@ public void testBigRollup() throws Exception {
bulkRequest.addParameter("refresh", "true");
bulkRequest.setJsonEntity(bulk.toString());
client().performRequest(bulkRequest);

// create the rollup job
final Request createRollupJobRequest = new Request("PUT", "/_xpack/rollup/job/rollup-job-test");
int pageSize = randomIntBetween(2, 50);
createRollupJobRequest.setJsonEntity("{"
+ "\"index_pattern\":\"rollup-*\","
+ "\"rollup_index\":\"results-rollup\","
+ "\"cron\":\"*/1 * * * * ?\"," // fast cron and big page size so test runs quickly
+ "\"page_size\":20,"
+ "\"cron\":\"*/1 * * * * ?\"," // fast cron so test runs quickly
+ "\"page_size\":" + pageSize + ","
+ "\"groups\":{"
+ " \"date_histogram\":{"
+ " \"field\":\"timestamp\","
Expand Down Expand Up @@ -142,7 +173,8 @@ public void testBigRollup() throws Exception {
" \"date_histo\": {\n" +
" \"date_histogram\": {\n" +
" \"field\": \"timestamp\",\n" +
" \"interval\": \"1h\"\n" +
" \"interval\": \"1h\",\n" +
" \"format\": \"date_time\"\n" +
" },\n" +
" \"aggs\": {\n" +
" \"the_max\": {\n" +
Expand Down Expand Up @@ -226,7 +258,7 @@ private void assertRollUpJob(final String rollupJob) throws Exception {

}

private void waitForRollUpJob(final String rollupJob,String[] expectedStates) throws Exception {
private void waitForRollUpJob(final String rollupJob, String[] expectedStates) throws Exception {
assertBusy(() -> {
final Request getRollupJobRequest = new Request("GET", "_xpack/rollup/job/" + rollupJob);
Response getRollupJobResponse = client().performRequest(getRollupJobRequest);
Expand Down Expand Up @@ -317,10 +349,4 @@ private void deleteAllJobs() throws Exception {
}
}
}

private static String responseEntityToString(Response response) throws Exception {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8))) {
return reader.lines().collect(Collectors.joining("\n"));
}
}
}

0 comments on commit 644a92f

Please sign in to comment.