Skip to content

Commit

Permalink
[Test] Add full cluster restart test for Rollup (#31533)
Browse files Browse the repository at this point in the history
This pull request adds a full cluster restart test for a Rollup job. 
The test creates and starts a Rollup job on the cluster and checks 
that the job already exists and is correctly started on the upgraded 
cluster.

This test allows to test that the persistent task state is correctly 
parsed from the cluster state after the upgrade, as the status field 
has been renamed to state in #31031.

The test undercovers a ClassCastException that can be thrown in 
the RollupIndexer when the timestamp as a very low value that fits 
into an integer. When it's the case, the value is parsed back as an 
Integer instead of Long object and (long) position.get(rollupFieldName) 
fails.
  • Loading branch information
tlrx authored Jun 26, 2018
1 parent 424be02 commit be9292c
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,11 @@ private QueryBuilder createBoundaryQuery(Map<String, Object> position) {
DateHistoGroupConfig dateHisto = job.getConfig().getGroupConfig().getDateHisto();
String fieldName = dateHisto.getField();
String rollupFieldName = fieldName + "." + DateHistogramAggregationBuilder.NAME;
long lowerBound = position != null ? (long) position.get(rollupFieldName) : 0;
long lowerBound = 0L;
if (position != null) {
Number value = (Number) position.get(rollupFieldName);
lowerBound = value.longValue();
}
assert lowerBound <= maxBoundary;
final RangeQueryBuilder query = new RangeQueryBuilder(fieldName)
.gte(lowerBound)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.Version;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.Booleans;
Expand All @@ -20,7 +21,6 @@
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.StreamsUtils;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils;
import org.elasticsearch.xpack.core.watcher.client.WatchSourceBuilder;
import org.elasticsearch.xpack.core.watcher.support.xcontent.ObjectPath;
import org.elasticsearch.xpack.security.support.SecurityIndexManager;
Expand All @@ -30,6 +30,7 @@
import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition;
import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger;
import org.hamcrest.Matcher;
import org.junit.Before;

import java.io.IOException;
Expand All @@ -38,6 +39,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -254,6 +256,71 @@ public void testWatcher() throws Exception {
}
}

/**
* Tests that a RollUp job created on a old cluster is correctly restarted after the upgrade.
*/
public void testRollupAfterRestart() throws Exception {
assumeTrue("Rollup can be tested with 6.3.0 and onwards", oldClusterVersion.onOrAfter(Version.V_6_3_0));
if (runningAgainstOldCluster) {
final int numDocs = 59;
final int year = randomIntBetween(1970, 2018);

// index documents for the rollup job
final StringBuilder bulk = new StringBuilder();
for (int i = 0; i < numDocs; i++) {
bulk.append("{\"index\":{\"_index\":\"rollup-docs\",\"_type\":\"doc\"}}\n");
String date = String.format(Locale.ROOT, "%04d-01-01T00:%02d:00Z", year, i);
bulk.append("{\"timestamp\":\"").append(date).append("\",\"value\":").append(i).append("}\n");
}
bulk.append("\r\n");

final Request bulkRequest = new Request("POST", "/_bulk");
bulkRequest.setJsonEntity(bulk.toString());
client().performRequest(bulkRequest);

// create the rollup job
final Request createRollupJobRequest = new Request("PUT", "/_xpack/rollup/job/rollup-job-test");
createRollupJobRequest.setJsonEntity("{"
+ "\"index_pattern\":\"rollup-*\","
+ "\"rollup_index\":\"results-rollup\","
+ "\"cron\":\"*/30 * * * * ?\","
+ "\"page_size\":100,"
+ "\"groups\":{"
+ " \"date_histogram\":{"
+ " \"field\":\"timestamp\","
+ " \"interval\":\"5m\""
+ " }"
+ "},"
+ "\"metrics\":["
+ " {\"field\":\"value\",\"metrics\":[\"min\",\"max\",\"sum\"]}"
+ "]"
+ "}");

Map<String, Object> createRollupJobResponse = toMap(client().performRequest(createRollupJobRequest));
assertThat(createRollupJobResponse.get("acknowledged"), equalTo(Boolean.TRUE));

// start the rollup job
final Request startRollupJobRequest = new Request("POST", "_xpack/rollup/job/rollup-job-test/_start");
Map<String, Object> startRollupJobResponse = toMap(client().performRequest(startRollupJobRequest));
assertThat(startRollupJobResponse.get("started"), equalTo(Boolean.TRUE));

assertRollUpJob("rollup-job-test");

} else {

final Request clusterHealthRequest = new Request("GET", "/_cluster/health");
clusterHealthRequest.addParameter("wait_for_status", "yellow");
clusterHealthRequest.addParameter("wait_for_no_relocating_shards", "true");
if (oldClusterVersion.onOrAfter(Version.V_6_2_0)) {
clusterHealthRequest.addParameter("wait_for_no_initializing_shards", "true");
}
Map<String, Object> clusterHealthResponse = toMap(client().performRequest(clusterHealthRequest));
assertThat(clusterHealthResponse.get("timed_out"), equalTo(Boolean.FALSE));

assertRollUpJob("rollup-job-test");
}
}

public void testSqlFailsOnIndexWithTwoTypes() throws IOException {
// TODO this isn't going to trigger until we backport to 6.1
assumeTrue("It is only possible to build an index that sql doesn't like before 6.0.0",
Expand Down Expand Up @@ -393,43 +460,6 @@ private void waitForHits(String indexName, int expectedHits) throws Exception {
}, 30, TimeUnit.SECONDS);
}

@SuppressWarnings("unchecked")
private void waitForMonitoringTemplates() throws Exception {
assertBusy(() -> {
final Map<String, Object> templates = toMap(client().performRequest("GET", "/_template/.monitoring-*"));

// in earlier versions, we published legacy templates in addition to the current ones to support transitioning
assertThat(templates.size(), greaterThanOrEqualTo(MonitoringTemplateUtils.TEMPLATE_IDS.length));

// every template should be updated to whatever the current version is
for (final String templateId : MonitoringTemplateUtils.TEMPLATE_IDS) {
final String templateName = MonitoringTemplateUtils.templateName(templateId);
final Map<String, Object> template = (Map<String, Object>) templates.get(templateName);

assertThat(template.get("version"), is(MonitoringTemplateUtils.LAST_UPDATED_VERSION));
}
}, 30, TimeUnit.SECONDS);
}

@SuppressWarnings("unchecked")
private void waitForClusterStats(final String expectedVersion) throws Exception {
assertBusy(() -> {
final Map<String, String> params = new HashMap<>(3);
params.put("q", "type:cluster_stats");
params.put("size", "1");
params.put("sort", "timestamp:desc");

final Map<String, Object> response = toMap(client().performRequest("GET", "/.monitoring-es-*/_search", params));
final Map<String, Object> hits = (Map<String, Object>) response.get("hits");

assertThat("No cluster_stats documents found.", (int)hits.get("total"), greaterThanOrEqualTo(1));

final Map<String, Object> hit = (Map<String, Object>) ((List<Object>) hits.get("hits")).get(0);
final Map<String, Object> source = (Map<String, Object>) hit.get("_source");
assertThat(source.get("version"), is(expectedVersion));
}, 30, TimeUnit.SECONDS);
}

static Map<String, Object> toMap(Response response) throws IOException {
return toMap(EntityUtils.toString(response.getEntity()));
}
Expand Down Expand Up @@ -492,4 +522,48 @@ private void assertRoleInfo(final String role) throws Exception {
assertNotNull(response.get("cluster"));
assertNotNull(response.get("indices"));
}

@SuppressWarnings("unchecked")
private void assertRollUpJob(final String rollupJob) throws Exception {
final Matcher<?> expectedStates = anyOf(equalTo("indexing"), equalTo("started"));
waitForRollUpJob(rollupJob, expectedStates);

// check that the rollup job is started using the RollUp API
final Request getRollupJobRequest = new Request("GET", "_xpack/rollup/job/" + rollupJob);
Map<String, Object> getRollupJobResponse = toMap(client().performRequest(getRollupJobRequest));
assertThat(ObjectPath.eval("jobs.0.status.job_state", getRollupJobResponse), expectedStates);

// check that the rollup job is started using the Tasks API
final Request taskRequest = new Request("GET", "_tasks");
taskRequest.addParameter("detailed", "true");
taskRequest.addParameter("actions", "xpack/rollup/*");
Map<String, Object> taskResponse = toMap(client().performRequest(taskRequest));
Map<String, Object> taskResponseNodes = (Map<String, Object>) taskResponse.get("nodes");
Map<String, Object> taskResponseNode = (Map<String, Object>) taskResponseNodes.values().iterator().next();
Map<String, Object> taskResponseTasks = (Map<String, Object>) taskResponseNode.get("tasks");
Map<String, Object> taskResponseStatus = (Map<String, Object>) taskResponseTasks.values().iterator().next();
assertThat(ObjectPath.eval("status.job_state", taskResponseStatus), expectedStates);

// check that the rollup job is started using the Cluster State API
final Request clusterStateRequest = new Request("GET", "_cluster/state/metadata");
Map<String, Object> clusterStateResponse = toMap(client().performRequest(clusterStateRequest));
Map<String, Object> rollupJobTask = ObjectPath.eval("metadata.persistent_tasks.tasks.0", clusterStateResponse);
assertThat(ObjectPath.eval("id", rollupJobTask), equalTo("rollup-job-test"));

// Persistent task state field has been renamed in 6.4.0 from "status" to "state"
final String stateFieldName = (runningAgainstOldCluster && oldClusterVersion.before(Version.V_6_4_0)) ? "status" : "state";

final String jobStateField = "task.xpack/rollup/job." + stateFieldName + ".job_state";
assertThat("Expected field [" + jobStateField + "] to be started or indexing in " + rollupJobTask,
ObjectPath.eval(jobStateField, rollupJobTask), expectedStates);
}

private void waitForRollUpJob(final String rollupJob, final Matcher<?> expectedStates) throws Exception {
assertBusy(() -> {
final Request getRollupJobRequest = new Request("GET", "_xpack/rollup/job/" + rollupJob);
Response getRollupJobResponse = client().performRequest(getRollupJobRequest);
assertThat(getRollupJobResponse.getStatusLine().getStatusCode(), equalTo(RestStatus.OK.getStatus()));
assertThat(ObjectPath.eval("jobs.0.status.job_state", toMap(getRollupJobResponse)), expectedStates);
}, 30L, TimeUnit.SECONDS);
}
}

0 comments on commit be9292c

Please sign in to comment.