Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix rollup on date fields that don't support epoch_millis #31890

Merged
merged 8 commits into from
Jul 19, 2018
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,8 @@ private QueryBuilder createBoundaryQuery(Map<String, Object> position) {
assert lowerBound <= maxBoundary;
final RangeQueryBuilder query = new RangeQueryBuilder(fieldName)
.gte(lowerBound)
.lt(maxBoundary);
.lt(maxBoundary)
.format("epoch_millis");
return query;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchResponseSections;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.joda.DateMathParser;
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
import org.elasticsearch.common.joda.Joda;
import org.elasticsearch.common.rounding.Rounding;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
Expand Down Expand Up @@ -506,6 +509,7 @@ private void executeTestCase(List<Map<String, Object>> docs, RollupJobConfig con
private Map<String, MappedFieldType> createFieldTypes(RollupJobConfig job) {
Map<String, MappedFieldType> fieldTypes = new HashMap<>();
MappedFieldType fieldType = new DateFieldMapper.Builder(job.getGroupConfig().getDateHisto().getField())
.dateTimeFormatter(Joda.forPattern(randomFrom("basic_date", "date_optional_time", "epoch_second")))
.build(new Mapper.BuilderContext(settings.getSettings(), new ContentPath(0)))
.fieldType();
fieldTypes.put(fieldType.name(), fieldType);
Expand Down Expand Up @@ -618,7 +622,7 @@ protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse
RangeQueryBuilder range = (RangeQueryBuilder) request.source().query();
final DateTimeZone timeZone = range.timeZone() != null ? DateTimeZone.forID(range.timeZone()) : null;
Query query = timestampField.rangeQuery(range.from(), range.to(), range.includeLower(), range.includeUpper(),
null, timeZone, null, queryShardContext);
null, timeZone, new DateMathParser(Joda.forPattern(range.format())), queryShardContext);

// extract composite agg
assertThat(request.source().aggregations().getAggregatorFactories().size(), equalTo(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@
package org.elasticsearch.multi_node;

import org.apache.http.HttpStatus;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.CompoundDateTimeFormatter;
import org.elasticsearch.common.time.DateFormatters;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
Expand All @@ -27,14 +33,11 @@
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.isOneOf;
Expand Down Expand Up @@ -73,13 +76,39 @@ public void clearRollupMetadata() throws Exception {

public void testBigRollup() throws Exception {
final int numDocs = 200;
String dateFormat = randomFrom("epoch_second", "date_time");

// create the test-index index
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
{
builder.startObject("mappings").startObject("_doc")
.startObject("properties")
.startObject("timestamp")
.field("type", "date")
.field("format", dateFormat)
.endObject()
.startObject("value")
.field("type", "integer")
.endObject()
.endObject()
.endObject().endObject();
}
builder.endObject();
final StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON);
Request req = new Request("PUT", "rollup-docs");
req.setEntity(entity);
client().performRequest(req);
}


// index documents for the rollup job
final StringBuilder bulk = new StringBuilder();
final CompoundDateTimeFormatter dateTimeFormatter = DateFormatters.forPattern(dateFormat);
for (int i = 0; i < numDocs; i++) {
bulk.append("{\"index\":{\"_index\":\"rollup-docs\",\"_type\":\"_doc\"}}\n");
ZonedDateTime zdt = ZonedDateTime.ofInstant(Instant.ofEpochSecond(1531221196 + (60*i)), ZoneId.of("UTC"));
String date = zdt.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
Instant dt= Instant.ofEpochSecond(1531221196 + (60 * i));
String date = dateTimeFormatter.format(dt);
bulk.append("{\"timestamp\":\"").append(date).append("\",\"value\":").append(i).append("}\n");
}
bulk.append("\r\n");
Expand All @@ -88,13 +117,15 @@ public void testBigRollup() throws Exception {
bulkRequest.addParameter("refresh", "true");
bulkRequest.setJsonEntity(bulk.toString());
client().performRequest(bulkRequest);

// create the rollup job
final Request createRollupJobRequest = new Request("PUT", "/_xpack/rollup/job/rollup-job-test");
int pageSize = randomIntBetween(2, 50);
createRollupJobRequest.setJsonEntity("{"
+ "\"index_pattern\":\"rollup-*\","
+ "\"rollup_index\":\"results-rollup\","
+ "\"cron\":\"*/1 * * * * ?\"," // fast cron and big page size so test runs quickly
+ "\"page_size\":20,"
+ "\"cron\":\"*/1 * * * * ?\"," // fast cron so test runs quickly
+ "\"page_size\":" + pageSize + ","
+ "\"groups\":{"
+ " \"date_histogram\":{"
+ " \"field\":\"timestamp\","
Expand Down Expand Up @@ -142,7 +173,8 @@ public void testBigRollup() throws Exception {
" \"date_histo\": {\n" +
" \"date_histogram\": {\n" +
" \"field\": \"timestamp\",\n" +
" \"interval\": \"1h\"\n" +
" \"interval\": \"1h\",\n" +
" \"format\": \"date_time\"\n" +
" },\n" +
" \"aggs\": {\n" +
" \"the_max\": {\n" +
Expand Down Expand Up @@ -226,7 +258,7 @@ private void assertRollUpJob(final String rollupJob) throws Exception {

}

private void waitForRollUpJob(final String rollupJob,String[] expectedStates) throws Exception {
private void waitForRollUpJob(final String rollupJob, String[] expectedStates) throws Exception {
assertBusy(() -> {
final Request getRollupJobRequest = new Request("GET", "_xpack/rollup/job/" + rollupJob);
Response getRollupJobResponse = client().performRequest(getRollupJobRequest);
Expand Down Expand Up @@ -317,10 +349,4 @@ private void deleteAllJobs() throws Exception {
}
}
}

private static String responseEntityToString(Response response) throws Exception {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8))) {
return reader.lines().collect(Collectors.joining("\n"));
}
}
}