Skip to content

Commit

Permalink
ML: Add support for rollup Indexes in Datafeeds (#34654)
Browse files Browse the repository at this point in the history
* Adding rollup support for datafeeds

* Fixing tests and adjusting formatting

* minor formatting chagne

* fixing some syntax and removing redundancies

* Refactoring and fixing failing test

* Refactoring, adding paranoid null check

* Moving rollup into the aggregation package

* making AggregationToJsonProcessor package private again

* Addressing test failure

* Fixing validations, chunking

* Addressing failing test

* rolling back RollupJobCaps changes

* Adding comment and cleaning up test

* Addressing review comments and test failures

* Moving builder logic into separate methods

* Addressing PR comments, adding test for rollup permissions

* Fixing test failure

* Adding rollup priv check on datafeed put

* Handling missing index when getting caps

* Fixing unused import
  • Loading branch information
benwtrent committed Nov 1, 2018
1 parent 4add939 commit e8fa1dc
Show file tree
Hide file tree
Showing 17 changed files with 1,298 additions and 224 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -430,9 +430,9 @@ private TimeValue defaultFrequencyTarget(TimeValue bucketSpan) {

public static class Builder {

public static final int DEFAULT_AGGREGATION_CHUNKING_BUCKETS = 1000;
private static final TimeValue MIN_DEFAULT_QUERY_DELAY = TimeValue.timeValueMinutes(1);
private static final TimeValue MAX_DEFAULT_QUERY_DELAY = TimeValue.timeValueMinutes(2);
private static final int DEFAULT_AGGREGATION_CHUNKING_BUCKETS = 1000;

private String id;
private String jobId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private static long validateAndGetDateHistogramInterval(DateHistogramAggregation
}
}

static long validateAndGetCalendarInterval(String calendarInterval) {
public static long validateAndGetCalendarInterval(String calendarInterval) {
TimeValue interval;
DateTimeUnit dateTimeUnit = DateHistogramAggregationBuilder.DATE_FIELD_UNITS.get(calendarInterval);
if (dateTimeUnit != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ public SearchResponse newResponse() {
return new SearchResponse();
}

static class RequestBuilder extends ActionRequestBuilder<SearchRequest, SearchResponse, RequestBuilder> {
public static class RequestBuilder extends ActionRequestBuilder<SearchRequest, SearchResponse, RequestBuilder> {
public RequestBuilder(ElasticsearchClient client, SearchRequest searchRequest) {
super(client, INSTANCE, searchRequest);
}

RequestBuilder(ElasticsearchClient client) {
super(client, INSTANCE, new SearchRequest());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.core.ml.integration.MlRestTestStateCleaner;
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
import org.elasticsearch.xpack.core.rollup.job.RollupJob;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.junit.After;
import org.junit.Before;
Expand All @@ -27,6 +28,7 @@
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
Expand Down Expand Up @@ -63,6 +65,16 @@ private void setupDataAccessRole(String index) throws IOException {
client().performRequest(request);
}

private void setupFullAccessRole(String index) throws IOException {
Request request = new Request("PUT", "/_xpack/security/role/test_data_access");
request.setJsonEntity("{"
+ " \"indices\" : ["
+ " { \"names\": [\"" + index + "\"], \"privileges\": [\"all\"] }"
+ " ]"
+ "}");
client().performRequest(request);
}

private void setupUser(String user, List<String> roles) throws IOException {
String password = new String(SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING.getChars());

Expand Down Expand Up @@ -359,7 +371,75 @@ public void testInsufficientSearchPrivilegesOnPut() throws Exception {

assertThat(e.getMessage(), containsString("Cannot create datafeed"));
assertThat(e.getMessage(),
containsString("user ml_admin lacks permissions on the indices to be searched"));
containsString("user ml_admin lacks permissions on the indices"));
}

public void testInsufficientSearchPrivilegesOnPutWithRollup() throws Exception {
setupDataAccessRole("airline-data-aggs-rollup");
String jobId = "privs-put-job-rollup";
Request createJobRequest = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
createJobRequest.setJsonEntity("{\n"
+ " \"description\": \"Aggs job\",\n"
+ " \"analysis_config\": {\n"
+ " \"bucket_span\": \"1h\",\n"
+ " \"summary_count_field_name\": \"doc_count\",\n"
+ " \"detectors\": [\n"
+ " {\n"
+ " \"function\": \"mean\",\n"
+ " \"field_name\": \"responsetime\",\n"
+ " \"by_field_name\": \"airline\"\n"
+ " }\n"
+ " ]\n"
+ " },\n"
+ " \"data_description\": {\"time_field\": \"time stamp\"}\n"
+ "}");
client().performRequest(createJobRequest);

String rollupJobId = "rollup-" + jobId;
Request createRollupRequest = new Request("PUT", "/_xpack/rollup/job/" + rollupJobId);
createRollupRequest.setJsonEntity("{\n"
+ "\"index_pattern\": \"airline-data-aggs\",\n"
+ " \"rollup_index\": \"airline-data-aggs-rollup\",\n"
+ " \"cron\": \"*/30 * * * * ?\",\n"
+ " \"page_size\" :1000,\n"
+ " \"groups\" : {\n"
+ " \"date_histogram\": {\n"
+ " \"field\": \"time stamp\",\n"
+ " \"interval\": \"2m\",\n"
+ " \"delay\": \"7d\"\n"
+ " },\n"
+ " \"terms\": {\n"
+ " \"fields\": [\"airline\"]\n"
+ " }"
+ " },\n"
+ " \"metrics\": [\n"
+ " {\n"
+ " \"field\": \"responsetime\",\n"
+ " \"metrics\": [\"avg\",\"min\",\"max\",\"sum\"]\n"
+ " },\n"
+ " {\n"
+ " \"field\": \"time stamp\",\n"
+ " \"metrics\": [\"min\",\"max\"]\n"
+ " }\n"
+ " ]\n"
+ "}");
client().performRequest(createRollupRequest);

String datafeedId = "datafeed-" + jobId;
String aggregations = "{\"buckets\":{\"date_histogram\":{\"field\":\"time stamp\",\"interval\":3600000},"
+ "\"aggregations\":{"
+ "\"time stamp\":{\"max\":{\"field\":\"time stamp\"}},"
+ "\"responsetime\":{\"avg\":{\"field\":\"responsetime\"}}}}}";


ResponseException e = expectThrows(ResponseException.class, () ->
new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs-rollup", "doc")
.setAggregations(aggregations)
.setAuthHeader(BASIC_AUTH_VALUE_ML_ADMIN_WITH_SOME_DATA_ACCESS) //want to search, but no admin access
.build());
assertThat(e.getMessage(), containsString("Cannot create datafeed"));
assertThat(e.getMessage(),
containsString("user ml_admin_plus_data lacks permissions on the indices"));
}

public void testInsufficientSearchPrivilegesOnPreview() throws Exception {
Expand Down Expand Up @@ -615,7 +695,7 @@ public void testLookbackWithoutPermissions() throws Exception {
// There should be a notification saying that there was a problem extracting data
client().performRequest(new Request("POST", "/_refresh"));
Response notificationsResponse = client().performRequest(
new Request("GET", AuditorField.NOTIFICATIONS_INDEX + "/_search?q=job_id:" + jobId));
new Request("GET", AuditorField.NOTIFICATIONS_INDEX + "/_search?size=1000&q=job_id:" + jobId));
String notificationsResponseAsString = EntityUtils.toString(notificationsResponse.getEntity());
assertThat(notificationsResponseAsString, containsString("\"message\":\"Datafeed is encountering errors extracting data: " +
"action [indices:data/read/search] is unauthorized for user [ml_admin_plus_data]\""));
Expand Down Expand Up @@ -663,6 +743,171 @@ public void testLookbackWithPipelineBucketAgg() throws Exception {
assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0"));
}

public void testLookbackOnlyGivenAggregationsWithHistogramAndRollupIndex() throws Exception {
String jobId = "aggs-histogram-rollup-job";
Request createJobRequest = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
createJobRequest.setJsonEntity("{\n"
+ " \"description\": \"Aggs job\",\n"
+ " \"analysis_config\": {\n"
+ " \"bucket_span\": \"1h\",\n"
+ " \"summary_count_field_name\": \"doc_count\",\n"
+ " \"detectors\": [\n"
+ " {\n"
+ " \"function\": \"mean\",\n"
+ " \"field_name\": \"responsetime\",\n"
+ " \"by_field_name\": \"airline\"\n"
+ " }\n"
+ " ]\n"
+ " },\n"
+ " \"data_description\": {\"time_field\": \"time stamp\"}\n"
+ "}");
client().performRequest(createJobRequest);

String rollupJobId = "rollup-" + jobId;
Request createRollupRequest = new Request("PUT", "/_xpack/rollup/job/" + rollupJobId);
createRollupRequest.setJsonEntity("{\n"
+ "\"index_pattern\": \"airline-data-aggs\",\n"
+ " \"rollup_index\": \"airline-data-aggs-rollup\",\n"
+ " \"cron\": \"*/30 * * * * ?\",\n"
+ " \"page_size\" :1000,\n"
+ " \"groups\" : {\n"
+ " \"date_histogram\": {\n"
+ " \"field\": \"time stamp\",\n"
+ " \"interval\": \"2m\",\n"
+ " \"delay\": \"7d\"\n"
+ " },\n"
+ " \"terms\": {\n"
+ " \"fields\": [\"airline\"]\n"
+ " }"
+ " },\n"
+ " \"metrics\": [\n"
+ " {\n"
+ " \"field\": \"responsetime\",\n"
+ " \"metrics\": [\"avg\",\"min\",\"max\",\"sum\"]\n"
+ " },\n"
+ " {\n"
+ " \"field\": \"time stamp\",\n"
+ " \"metrics\": [\"min\",\"max\"]\n"
+ " }\n"
+ " ]\n"
+ "}");
client().performRequest(createRollupRequest);
client().performRequest(new Request("POST", "/_xpack/rollup/job/" + rollupJobId + "/_start"));

assertBusy(() -> {
Response getRollup = client().performRequest(new Request("GET", "/_xpack/rollup/job/" + rollupJobId));
String body = EntityUtils.toString(getRollup.getEntity());
assertThat(body, containsString("\"job_state\":\"started\""));
assertThat(body, containsString("\"rollups_indexed\":4"));
}, 60, TimeUnit.SECONDS);

client().performRequest(new Request("POST", "/_xpack/rollup/job/" + rollupJobId + "/_stop"));
assertBusy(() -> {
Response getRollup = client().performRequest(new Request("GET", "/_xpack/rollup/job/" + rollupJobId));
assertThat(EntityUtils.toString(getRollup.getEntity()), containsString("\"job_state\":\"stopped\""));
}, 60, TimeUnit.SECONDS);

final Request refreshRollupIndex = new Request("POST", "airline-data-aggs-rollup/_refresh");
client().performRequest(refreshRollupIndex);

String datafeedId = "datafeed-" + jobId;
String aggregations = "{\"buckets\":{\"date_histogram\":{\"field\":\"time stamp\",\"interval\":3600000},"
+ "\"aggregations\":{"
+ "\"time stamp\":{\"max\":{\"field\":\"time stamp\"}},"
+ "\"responsetime\":{\"avg\":{\"field\":\"responsetime\"}}}}}";
new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs-rollup", "response").setAggregations(aggregations).build();
openJob(client(), jobId);

startDatafeedAndWaitUntilStopped(datafeedId);
waitUntilJobIsClosed(jobId);
Response jobStatsResponse = client().performRequest(new Request("GET",
MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"));
String jobStatsResponseAsString = EntityUtils.toString(jobStatsResponse.getEntity());
assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":2"));
assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":2"));
}

public void testLookbackWithoutPermissionsAndRollup() throws Exception {
setupFullAccessRole("airline-data-aggs-rollup");
String jobId = "rollup-permission-test-network-job";
Request createJobRequest = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
createJobRequest.setJsonEntity("{\n"
+ " \"description\": \"Aggs job\",\n"
+ " \"analysis_config\": {\n"
+ " \"bucket_span\": \"1h\",\n"
+ " \"summary_count_field_name\": \"doc_count\",\n"
+ " \"detectors\": [\n"
+ " {\n"
+ " \"function\": \"mean\",\n"
+ " \"field_name\": \"responsetime\",\n"
+ " \"by_field_name\": \"airline\"\n"
+ " }\n"
+ " ]\n"
+ " },\n"
+ " \"data_description\": {\"time_field\": \"time stamp\"}\n"
+ "}");
client().performRequest(createJobRequest);

String rollupJobId = "rollup-" + jobId;
Request createRollupRequest = new Request("PUT", "/_xpack/rollup/job/" + rollupJobId);
createRollupRequest.setJsonEntity("{\n"
+ "\"index_pattern\": \"airline-data-aggs\",\n"
+ " \"rollup_index\": \"airline-data-aggs-rollup\",\n"
+ " \"cron\": \"*/30 * * * * ?\",\n"
+ " \"page_size\" :1000,\n"
+ " \"groups\" : {\n"
+ " \"date_histogram\": {\n"
+ " \"field\": \"time stamp\",\n"
+ " \"interval\": \"2m\",\n"
+ " \"delay\": \"7d\"\n"
+ " },\n"
+ " \"terms\": {\n"
+ " \"fields\": [\"airline\"]\n"
+ " }"
+ " },\n"
+ " \"metrics\": [\n"
+ " {\n"
+ " \"field\": \"responsetime\",\n"
+ " \"metrics\": [\"avg\",\"min\",\"max\",\"sum\"]\n"
+ " },\n"
+ " {\n"
+ " \"field\": \"time stamp\",\n"
+ " \"metrics\": [\"min\",\"max\"]\n"
+ " }\n"
+ " ]\n"
+ "}");
client().performRequest(createRollupRequest);

String datafeedId = "datafeed-" + jobId;
String aggregations = "{\"buckets\":{\"date_histogram\":{\"field\":\"time stamp\",\"interval\":3600000},"
+ "\"aggregations\":{"
+ "\"time stamp\":{\"max\":{\"field\":\"time stamp\"}},"
+ "\"responsetime\":{\"avg\":{\"field\":\"responsetime\"}}}}}";


// At the time we create the datafeed the user can access the network-data index that we have access to
new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs-rollup", "doc")
.setAggregations(aggregations)
.setChunkingTimespan("300s")
.setAuthHeader(BASIC_AUTH_VALUE_ML_ADMIN_WITH_SOME_DATA_ACCESS)
.build();

// Change the role so that the user can no longer access network-data
setupFullAccessRole("some-other-data");

openJob(client(), jobId);

startDatafeedAndWaitUntilStopped(datafeedId, BASIC_AUTH_VALUE_ML_ADMIN_WITH_SOME_DATA_ACCESS);
waitUntilJobIsClosed(jobId);
// There should be a notification saying that there was a problem extracting data
client().performRequest(new Request("POST", "/_refresh"));
Response notificationsResponse = client().performRequest(
new Request("GET", AuditorField.NOTIFICATIONS_INDEX + "/_search?size=1000&q=job_id:" + jobId));
String notificationsResponseAsString = EntityUtils.toString(notificationsResponse.getEntity());
assertThat(notificationsResponseAsString, containsString("\"message\":\"Datafeed is encountering errors extracting data: " +
"action [indices:admin/xpack/rollup/search] is unauthorized for user [ml_admin_plus_data]\""));
}

public void testRealtime() throws Exception {
String jobId = "job-realtime-1";
createJob(jobId, "airline");
Expand Down Expand Up @@ -882,7 +1127,8 @@ public static void openJob(RestClient client, String jobId) throws IOException {
@After
public void clearMlState() throws Exception {
new MlRestTestStateCleaner(logger, adminClient()).clearMlMetadata();
ESRestTestCase.waitForPendingTasks(adminClient());
// Don't check rollup jobs because we clear them in the superclass.
waitForPendingTasks(adminClient(), taskName -> taskName.startsWith(RollupJob.NAME));
}

private static class DatafeedBuilder {
Expand Down
Loading

0 comments on commit e8fa1dc

Please sign in to comment.