From e8fa1dc913a4becddf06baedc681e5c7a8549a0e Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Thu, 1 Nov 2018 10:02:24 -0500 Subject: [PATCH] ML: Add support for rollup Indexes in Datafeeds (#34654) * Adding rollup support for datafeeds * Fixing tests and adjusting formatting * minor formatting chagne * fixing some syntax and removing redundancies * Refactoring and fixing failing test * Refactoring, adding paranoid null check * Moving rollup into the aggregation package * making AggregationToJsonProcessor package private again * Addressing test failure * Fixing validations, chunking * Addressing failing test * rolling back RollupJobCaps changes * Adding comment and cleaning up test * Addressing review comments and test failures * Moving builder logic into separate methods * Addressing PR comments, adding test for rollup permissions * Fixing test failure * Adding rollup priv check on datafeed put * Handling missing index when getting caps * Fixing unused import --- .../core/ml/datafeed/DatafeedConfig.java | 2 +- .../ml/datafeed/extractor/ExtractorUtils.java | 2 +- .../rollup/action/RollupSearchAction.java | 6 +- .../ml/integration/DatafeedJobsRestIT.java | 252 +++++++++++++++++- .../ml/action/TransportPutDatafeedAction.java | 60 +++-- .../extractor/DataExtractorFactory.java | 53 +++- .../AbstractAggregationDataExtractor.java | 168 ++++++++++++ .../aggregation/AggregationDataExtractor.java | 148 +--------- .../aggregation/RollupDataExtractor.java | 32 +++ .../RollupDataExtractorFactory.java | 218 +++++++++++++++ .../chunked/ChunkedDataExtractor.java | 192 ++++++++++--- .../chunked/ChunkedDataExtractorContext.java | 7 +- .../chunked/ChunkedDataExtractorFactory.java | 5 +- .../xpack/ml/LocalStateMachineLearning.java | 57 +++- .../extractor/DataExtractorFactoryTests.java | 189 ++++++++++++- .../AggregationDataExtractorTests.java | 6 +- .../chunked/ChunkedDataExtractorTests.java | 125 ++++++++- 17 files changed, 1298 insertions(+), 224 deletions(-) create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AbstractAggregationDataExtractor.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractor.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractorFactory.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java index afe38dc29bf7f..4437764b39650 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java @@ -430,9 +430,9 @@ private TimeValue defaultFrequencyTarget(TimeValue bucketSpan) { public static class Builder { + public static final int DEFAULT_AGGREGATION_CHUNKING_BUCKETS = 1000; private static final TimeValue MIN_DEFAULT_QUERY_DELAY = TimeValue.timeValueMinutes(1); private static final TimeValue MAX_DEFAULT_QUERY_DELAY = TimeValue.timeValueMinutes(2); - private static final int DEFAULT_AGGREGATION_CHUNKING_BUCKETS = 1000; private String id; private String jobId; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/extractor/ExtractorUtils.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/extractor/ExtractorUtils.java index 35999c9752fc3..f0ba07ad15c68 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/extractor/ExtractorUtils.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/extractor/ExtractorUtils.java @@ -139,7 +139,7 @@ private static long validateAndGetDateHistogramInterval(DateHistogramAggregation } } - static long validateAndGetCalendarInterval(String calendarInterval) { + public static long validateAndGetCalendarInterval(String calendarInterval) { TimeValue interval; DateTimeUnit dateTimeUnit = DateHistogramAggregationBuilder.DATE_FIELD_UNITS.get(calendarInterval); if (dateTimeUnit != null) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupSearchAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupSearchAction.java index b4d3d6efb7d47..18dfef4b7f4c4 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupSearchAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupSearchAction.java @@ -30,7 +30,11 @@ public SearchResponse newResponse() { return new SearchResponse(); } - static class RequestBuilder extends ActionRequestBuilder { + public static class RequestBuilder extends ActionRequestBuilder { + public RequestBuilder(ElasticsearchClient client, SearchRequest searchRequest) { + super(client, INSTANCE, searchRequest); + } + RequestBuilder(ElasticsearchClient client) { super(client, INSTANCE, new SearchRequest()); } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java index 700adef20a384..3fb223195e187 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java @@ -17,6 +17,7 @@ import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xpack.core.ml.integration.MlRestTestStateCleaner; import org.elasticsearch.xpack.core.ml.notifications.AuditorField; +import org.elasticsearch.xpack.core.rollup.job.RollupJob; import org.elasticsearch.xpack.ml.MachineLearning; import org.junit.After; import org.junit.Before; @@ -27,6 +28,7 @@ import java.util.Date; import java.util.List; import java.util.Locale; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; @@ -63,6 +65,16 @@ private void setupDataAccessRole(String index) throws IOException { client().performRequest(request); } + private void setupFullAccessRole(String index) throws IOException { + Request request = new Request("PUT", "/_xpack/security/role/test_data_access"); + request.setJsonEntity("{" + + " \"indices\" : [" + + " { \"names\": [\"" + index + "\"], \"privileges\": [\"all\"] }" + + " ]" + + "}"); + client().performRequest(request); + } + private void setupUser(String user, List roles) throws IOException { String password = new String(SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING.getChars()); @@ -359,7 +371,75 @@ public void testInsufficientSearchPrivilegesOnPut() throws Exception { assertThat(e.getMessage(), containsString("Cannot create datafeed")); assertThat(e.getMessage(), - containsString("user ml_admin lacks permissions on the indices to be searched")); + containsString("user ml_admin lacks permissions on the indices")); + } + + public void testInsufficientSearchPrivilegesOnPutWithRollup() throws Exception { + setupDataAccessRole("airline-data-aggs-rollup"); + String jobId = "privs-put-job-rollup"; + Request createJobRequest = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId); + createJobRequest.setJsonEntity("{\n" + + " \"description\": \"Aggs job\",\n" + + " \"analysis_config\": {\n" + + " \"bucket_span\": \"1h\",\n" + + " \"summary_count_field_name\": \"doc_count\",\n" + + " \"detectors\": [\n" + + " {\n" + + " \"function\": \"mean\",\n" + + " \"field_name\": \"responsetime\",\n" + + " \"by_field_name\": \"airline\"\n" + + " }\n" + + " ]\n" + + " },\n" + + " \"data_description\": {\"time_field\": \"time stamp\"}\n" + + "}"); + client().performRequest(createJobRequest); + + String rollupJobId = "rollup-" + jobId; + Request createRollupRequest = new Request("PUT", "/_xpack/rollup/job/" + rollupJobId); + createRollupRequest.setJsonEntity("{\n" + + "\"index_pattern\": \"airline-data-aggs\",\n" + + " \"rollup_index\": \"airline-data-aggs-rollup\",\n" + + " \"cron\": \"*/30 * * * * ?\",\n" + + " \"page_size\" :1000,\n" + + " \"groups\" : {\n" + + " \"date_histogram\": {\n" + + " \"field\": \"time stamp\",\n" + + " \"interval\": \"2m\",\n" + + " \"delay\": \"7d\"\n" + + " },\n" + + " \"terms\": {\n" + + " \"fields\": [\"airline\"]\n" + + " }" + + " },\n" + + " \"metrics\": [\n" + + " {\n" + + " \"field\": \"responsetime\",\n" + + " \"metrics\": [\"avg\",\"min\",\"max\",\"sum\"]\n" + + " },\n" + + " {\n" + + " \"field\": \"time stamp\",\n" + + " \"metrics\": [\"min\",\"max\"]\n" + + " }\n" + + " ]\n" + + "}"); + client().performRequest(createRollupRequest); + + String datafeedId = "datafeed-" + jobId; + String aggregations = "{\"buckets\":{\"date_histogram\":{\"field\":\"time stamp\",\"interval\":3600000}," + + "\"aggregations\":{" + + "\"time stamp\":{\"max\":{\"field\":\"time stamp\"}}," + + "\"responsetime\":{\"avg\":{\"field\":\"responsetime\"}}}}}"; + + + ResponseException e = expectThrows(ResponseException.class, () -> + new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs-rollup", "doc") + .setAggregations(aggregations) + .setAuthHeader(BASIC_AUTH_VALUE_ML_ADMIN_WITH_SOME_DATA_ACCESS) //want to search, but no admin access + .build()); + assertThat(e.getMessage(), containsString("Cannot create datafeed")); + assertThat(e.getMessage(), + containsString("user ml_admin_plus_data lacks permissions on the indices")); } public void testInsufficientSearchPrivilegesOnPreview() throws Exception { @@ -615,7 +695,7 @@ public void testLookbackWithoutPermissions() throws Exception { // There should be a notification saying that there was a problem extracting data client().performRequest(new Request("POST", "/_refresh")); Response notificationsResponse = client().performRequest( - new Request("GET", AuditorField.NOTIFICATIONS_INDEX + "/_search?q=job_id:" + jobId)); + new Request("GET", AuditorField.NOTIFICATIONS_INDEX + "/_search?size=1000&q=job_id:" + jobId)); String notificationsResponseAsString = EntityUtils.toString(notificationsResponse.getEntity()); assertThat(notificationsResponseAsString, containsString("\"message\":\"Datafeed is encountering errors extracting data: " + "action [indices:data/read/search] is unauthorized for user [ml_admin_plus_data]\"")); @@ -663,6 +743,171 @@ public void testLookbackWithPipelineBucketAgg() throws Exception { assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0")); } + public void testLookbackOnlyGivenAggregationsWithHistogramAndRollupIndex() throws Exception { + String jobId = "aggs-histogram-rollup-job"; + Request createJobRequest = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId); + createJobRequest.setJsonEntity("{\n" + + " \"description\": \"Aggs job\",\n" + + " \"analysis_config\": {\n" + + " \"bucket_span\": \"1h\",\n" + + " \"summary_count_field_name\": \"doc_count\",\n" + + " \"detectors\": [\n" + + " {\n" + + " \"function\": \"mean\",\n" + + " \"field_name\": \"responsetime\",\n" + + " \"by_field_name\": \"airline\"\n" + + " }\n" + + " ]\n" + + " },\n" + + " \"data_description\": {\"time_field\": \"time stamp\"}\n" + + "}"); + client().performRequest(createJobRequest); + + String rollupJobId = "rollup-" + jobId; + Request createRollupRequest = new Request("PUT", "/_xpack/rollup/job/" + rollupJobId); + createRollupRequest.setJsonEntity("{\n" + + "\"index_pattern\": \"airline-data-aggs\",\n" + + " \"rollup_index\": \"airline-data-aggs-rollup\",\n" + + " \"cron\": \"*/30 * * * * ?\",\n" + + " \"page_size\" :1000,\n" + + " \"groups\" : {\n" + + " \"date_histogram\": {\n" + + " \"field\": \"time stamp\",\n" + + " \"interval\": \"2m\",\n" + + " \"delay\": \"7d\"\n" + + " },\n" + + " \"terms\": {\n" + + " \"fields\": [\"airline\"]\n" + + " }" + + " },\n" + + " \"metrics\": [\n" + + " {\n" + + " \"field\": \"responsetime\",\n" + + " \"metrics\": [\"avg\",\"min\",\"max\",\"sum\"]\n" + + " },\n" + + " {\n" + + " \"field\": \"time stamp\",\n" + + " \"metrics\": [\"min\",\"max\"]\n" + + " }\n" + + " ]\n" + + "}"); + client().performRequest(createRollupRequest); + client().performRequest(new Request("POST", "/_xpack/rollup/job/" + rollupJobId + "/_start")); + + assertBusy(() -> { + Response getRollup = client().performRequest(new Request("GET", "/_xpack/rollup/job/" + rollupJobId)); + String body = EntityUtils.toString(getRollup.getEntity()); + assertThat(body, containsString("\"job_state\":\"started\"")); + assertThat(body, containsString("\"rollups_indexed\":4")); + }, 60, TimeUnit.SECONDS); + + client().performRequest(new Request("POST", "/_xpack/rollup/job/" + rollupJobId + "/_stop")); + assertBusy(() -> { + Response getRollup = client().performRequest(new Request("GET", "/_xpack/rollup/job/" + rollupJobId)); + assertThat(EntityUtils.toString(getRollup.getEntity()), containsString("\"job_state\":\"stopped\"")); + }, 60, TimeUnit.SECONDS); + + final Request refreshRollupIndex = new Request("POST", "airline-data-aggs-rollup/_refresh"); + client().performRequest(refreshRollupIndex); + + String datafeedId = "datafeed-" + jobId; + String aggregations = "{\"buckets\":{\"date_histogram\":{\"field\":\"time stamp\",\"interval\":3600000}," + + "\"aggregations\":{" + + "\"time stamp\":{\"max\":{\"field\":\"time stamp\"}}," + + "\"responsetime\":{\"avg\":{\"field\":\"responsetime\"}}}}}"; + new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs-rollup", "response").setAggregations(aggregations).build(); + openJob(client(), jobId); + + startDatafeedAndWaitUntilStopped(datafeedId); + waitUntilJobIsClosed(jobId); + Response jobStatsResponse = client().performRequest(new Request("GET", + MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats")); + String jobStatsResponseAsString = EntityUtils.toString(jobStatsResponse.getEntity()); + assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":2")); + assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":2")); + } + + public void testLookbackWithoutPermissionsAndRollup() throws Exception { + setupFullAccessRole("airline-data-aggs-rollup"); + String jobId = "rollup-permission-test-network-job"; + Request createJobRequest = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId); + createJobRequest.setJsonEntity("{\n" + + " \"description\": \"Aggs job\",\n" + + " \"analysis_config\": {\n" + + " \"bucket_span\": \"1h\",\n" + + " \"summary_count_field_name\": \"doc_count\",\n" + + " \"detectors\": [\n" + + " {\n" + + " \"function\": \"mean\",\n" + + " \"field_name\": \"responsetime\",\n" + + " \"by_field_name\": \"airline\"\n" + + " }\n" + + " ]\n" + + " },\n" + + " \"data_description\": {\"time_field\": \"time stamp\"}\n" + + "}"); + client().performRequest(createJobRequest); + + String rollupJobId = "rollup-" + jobId; + Request createRollupRequest = new Request("PUT", "/_xpack/rollup/job/" + rollupJobId); + createRollupRequest.setJsonEntity("{\n" + + "\"index_pattern\": \"airline-data-aggs\",\n" + + " \"rollup_index\": \"airline-data-aggs-rollup\",\n" + + " \"cron\": \"*/30 * * * * ?\",\n" + + " \"page_size\" :1000,\n" + + " \"groups\" : {\n" + + " \"date_histogram\": {\n" + + " \"field\": \"time stamp\",\n" + + " \"interval\": \"2m\",\n" + + " \"delay\": \"7d\"\n" + + " },\n" + + " \"terms\": {\n" + + " \"fields\": [\"airline\"]\n" + + " }" + + " },\n" + + " \"metrics\": [\n" + + " {\n" + + " \"field\": \"responsetime\",\n" + + " \"metrics\": [\"avg\",\"min\",\"max\",\"sum\"]\n" + + " },\n" + + " {\n" + + " \"field\": \"time stamp\",\n" + + " \"metrics\": [\"min\",\"max\"]\n" + + " }\n" + + " ]\n" + + "}"); + client().performRequest(createRollupRequest); + + String datafeedId = "datafeed-" + jobId; + String aggregations = "{\"buckets\":{\"date_histogram\":{\"field\":\"time stamp\",\"interval\":3600000}," + + "\"aggregations\":{" + + "\"time stamp\":{\"max\":{\"field\":\"time stamp\"}}," + + "\"responsetime\":{\"avg\":{\"field\":\"responsetime\"}}}}}"; + + + // At the time we create the datafeed the user can access the network-data index that we have access to + new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs-rollup", "doc") + .setAggregations(aggregations) + .setChunkingTimespan("300s") + .setAuthHeader(BASIC_AUTH_VALUE_ML_ADMIN_WITH_SOME_DATA_ACCESS) + .build(); + + // Change the role so that the user can no longer access network-data + setupFullAccessRole("some-other-data"); + + openJob(client(), jobId); + + startDatafeedAndWaitUntilStopped(datafeedId, BASIC_AUTH_VALUE_ML_ADMIN_WITH_SOME_DATA_ACCESS); + waitUntilJobIsClosed(jobId); + // There should be a notification saying that there was a problem extracting data + client().performRequest(new Request("POST", "/_refresh")); + Response notificationsResponse = client().performRequest( + new Request("GET", AuditorField.NOTIFICATIONS_INDEX + "/_search?size=1000&q=job_id:" + jobId)); + String notificationsResponseAsString = EntityUtils.toString(notificationsResponse.getEntity()); + assertThat(notificationsResponseAsString, containsString("\"message\":\"Datafeed is encountering errors extracting data: " + + "action [indices:admin/xpack/rollup/search] is unauthorized for user [ml_admin_plus_data]\"")); + } + public void testRealtime() throws Exception { String jobId = "job-realtime-1"; createJob(jobId, "airline"); @@ -882,7 +1127,8 @@ public static void openJob(RestClient client, String jobId) throws IOException { @After public void clearMlState() throws Exception { new MlRestTestStateCleaner(logger, adminClient()).clearMlMetadata(); - ESRestTestCase.waitForPendingTasks(adminClient()); + // Don't check rollup jobs because we clear them in the superclass. + waitForPendingTasks(adminClient(), taskName -> taskName.startsWith(RollupJob.NAME)); } private static class DatafeedBuilder { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java index 60b8235ec84b7..89ae04dcdd7d7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.tasks.Task; @@ -32,6 +33,8 @@ import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; +import org.elasticsearch.xpack.core.rollup.action.GetRollupIndexCapsAction; +import org.elasticsearch.xpack.core.rollup.action.RollupSearchAction; import org.elasticsearch.xpack.core.security.SecurityContext; import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesAction; import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest; @@ -42,6 +45,9 @@ import java.io.IOException; import java.util.Map; +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; + public class TransportPutDatafeedAction extends TransportMasterNodeAction { private final XPackLicenseState licenseState; @@ -78,23 +84,48 @@ protected void masterOperation(PutDatafeedAction.Request request, ClusterState s // If security is enabled only create the datafeed if the user requesting creation has // permission to read the indices the datafeed is going to read from if (licenseState.isAuthAllowed()) { - final String username = securityContext.getUser().principal(); - ActionListener privResponseListener = ActionListener.wrap( - r -> handlePrivsResponse(username, request, r, listener), - listener::onFailure); - HasPrivilegesRequest privRequest = new HasPrivilegesRequest(); + final String[] indices = request.getDatafeed().getIndices().toArray(new String[0]); + + final String username = securityContext.getUser().principal(); + final HasPrivilegesRequest privRequest = new HasPrivilegesRequest(); + privRequest.applicationPrivileges(new RoleDescriptor.ApplicationResourcePrivileges[0]); privRequest.username(username); privRequest.clusterPrivileges(Strings.EMPTY_ARRAY); - // We just check for permission to use the search action. In reality we'll also - // use the scroll action, but that's considered an implementation detail. - privRequest.indexPrivileges(RoleDescriptor.IndicesPrivileges.builder() - .indices(request.getDatafeed().getIndices().toArray(new String[0])) - .privileges(SearchAction.NAME) - .build()); - privRequest.applicationPrivileges(new RoleDescriptor.ApplicationResourcePrivileges[0]); - client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener); + final RoleDescriptor.IndicesPrivileges.Builder indicesPrivilegesBuilder = RoleDescriptor.IndicesPrivileges.builder() + .indices(indices); + + ActionListener privResponseListener = ActionListener.wrap( + r -> handlePrivsResponse(username, request, r, listener), + listener::onFailure); + + ActionListener getRollupIndexCapsActionHandler = ActionListener.wrap( + response -> { + if (response.getJobs().isEmpty()) { // This means no rollup indexes are in the config + indicesPrivilegesBuilder.privileges(SearchAction.NAME); + } else { + indicesPrivilegesBuilder.privileges(SearchAction.NAME, RollupSearchAction.NAME); + } + privRequest.indexPrivileges(indicesPrivilegesBuilder.build()); + client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener); + }, + e -> { + if (e instanceof IndexNotFoundException) { + indicesPrivilegesBuilder.privileges(SearchAction.NAME); + privRequest.indexPrivileges(indicesPrivilegesBuilder.build()); + client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener); + } else { + listener.onFailure(e); + } + } + ); + + executeAsyncWithOrigin(client, + ML_ORIGIN, + GetRollupIndexCapsAction.INSTANCE, + new GetRollupIndexCapsAction.Request(indices), + getRollupIndexCapsActionHandler); } else { putDatafeed(request, threadPool.getThreadContext().getHeaders(), listener); } @@ -115,8 +146,7 @@ private void handlePrivsResponse(String username, PutDatafeedAction.Request requ builder.endObject(); listener.onFailure(Exceptions.authorizationError("Cannot create datafeed [{}]" + - " because user {} lacks permissions on the indices to be" + - " searched: {}", + " because user {} lacks permissions on the indices: {}", request.getDatafeed().getId(), username, Strings.toString(builder))); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java index 8fd1ced17293a..77e2c695db7d5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java @@ -5,14 +5,19 @@ */ package org.elasticsearch.xpack.ml.datafeed.extractor; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.rollup.action.GetRollupIndexCapsAction; import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractorFactory; +import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.RollupDataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorFactory; -import org.elasticsearch.xpack.core.ml.job.config.Job; public interface DataExtractorFactory { DataExtractor newExtractor(long start, long end); @@ -22,16 +27,44 @@ public interface DataExtractorFactory { */ static void create(Client client, DatafeedConfig datafeed, Job job, ActionListener listener) { ActionListener factoryHandler = ActionListener.wrap( - factory -> listener.onResponse(datafeed.getChunkingConfig().isEnabled() - ? new ChunkedDataExtractorFactory(client, datafeed, job, factory) : factory) - , listener::onFailure + factory -> listener.onResponse(datafeed.getChunkingConfig().isEnabled() + ? new ChunkedDataExtractorFactory(client, datafeed, job, factory) : factory) + , listener::onFailure ); - boolean isScrollSearch = datafeed.hasAggregations() == false; - if (isScrollSearch) { - ScrollDataExtractorFactory.create(client, datafeed, job, factoryHandler); - } else { - factoryHandler.onResponse(new AggregationDataExtractorFactory(client, datafeed, job)); - } + ActionListener getRollupIndexCapsActionHandler = ActionListener.wrap( + response -> { + if (response.getJobs().isEmpty()) { // This means no rollup indexes are in the config + if (datafeed.hasAggregations()) { + factoryHandler.onResponse(new AggregationDataExtractorFactory(client, datafeed, job)); + } else { + ScrollDataExtractorFactory.create(client, datafeed, job, factoryHandler); + } + } else { + if (datafeed.hasAggregations()) { // Rollup indexes require aggregations + RollupDataExtractorFactory.create(client, datafeed, job, response.getJobs(), factoryHandler); + } else { + listener.onFailure(new IllegalArgumentException("Aggregations are required when using Rollup indices")); + } + } + }, + e -> { + if (e instanceof IndexNotFoundException) { + listener.onFailure(new ResourceNotFoundException("datafeed [" + datafeed.getId() + + "] cannot retrieve data because index " + ((IndexNotFoundException)e).getIndex() + " does not exist")); + } else { + listener.onFailure(e); + } + } + ); + + GetRollupIndexCapsAction.Request request = new GetRollupIndexCapsAction.Request(datafeed.getIndices().toArray(new String[0])); + + ClientHelper.executeAsyncWithOrigin( + client, + ClientHelper.ML_ORIGIN, + GetRollupIndexCapsAction.INSTANCE, + request, + getRollupIndexCapsActionHandler); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AbstractAggregationDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AbstractAggregationDataExtractor.java new file mode 100644 index 0000000000000..a520839652546 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AbstractAggregationDataExtractor.java @@ -0,0 +1,168 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; +import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * Abstract class for aggregated data extractors, e.g. {@link RollupDataExtractor} + * + * @param The request builder type for getting data from ElasticSearch + */ +abstract class AbstractAggregationDataExtractor>> + implements DataExtractor { + + private static final Logger LOGGER = LogManager.getLogger(AbstractAggregationDataExtractor.class); + + /** + * The number of key-value pairs written in each batch to process. + * This has to be a number that is small enough to allow for responsive + * cancelling and big enough to not cause overhead by calling the + * post data action too often. The value of 1000 was determined via + * such testing. + */ + private static int BATCH_KEY_VALUE_PAIRS = 1000; + + protected final Client client; + protected final AggregationDataExtractorContext context; + private boolean hasNext; + private boolean isCancelled; + private AggregationToJsonProcessor aggregationToJsonProcessor; + private ByteArrayOutputStream outputStream; + + AbstractAggregationDataExtractor(Client client, AggregationDataExtractorContext dataExtractorContext) { + this.client = Objects.requireNonNull(client); + context = Objects.requireNonNull(dataExtractorContext); + hasNext = true; + isCancelled = false; + outputStream = new ByteArrayOutputStream(); + } + + @Override + public boolean hasNext() { + return hasNext; + } + + @Override + public boolean isCancelled() { + return isCancelled; + } + + @Override + public void cancel() { + LOGGER.debug("[{}] Data extractor received cancel request", context.jobId); + isCancelled = true; + hasNext = false; + } + + @Override + public Optional next() throws IOException { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + if (aggregationToJsonProcessor == null) { + Aggregations aggs = search(); + if (aggs == null) { + hasNext = false; + return Optional.empty(); + } + initAggregationProcessor(aggs); + } + + return Optional.ofNullable(processNextBatch()); + } + + private Aggregations search() throws IOException { + LOGGER.debug("[{}] Executing aggregated search", context.jobId); + SearchResponse searchResponse = executeSearchRequest(buildSearchRequest(buildBaseSearchSource())); + LOGGER.debug("[{}] Search response was obtained", context.jobId); + ExtractorUtils.checkSearchWasSuccessful(context.jobId, searchResponse); + return validateAggs(searchResponse.getAggregations()); + } + + private void initAggregationProcessor(Aggregations aggs) throws IOException { + aggregationToJsonProcessor = new AggregationToJsonProcessor(context.timeField, context.fields, context.includeDocCount, + context.start); + aggregationToJsonProcessor.process(aggs); + } + + protected SearchResponse executeSearchRequest(T searchRequestBuilder) { + return ClientHelper.executeWithHeaders(context.headers, ClientHelper.ML_ORIGIN, client, searchRequestBuilder::get); + } + + private SearchSourceBuilder buildBaseSearchSource() { + // For derivative aggregations the first bucket will always be null + // so query one extra histogram bucket back and hope there is data + // in that bucket + long histogramSearchStartTime = Math.max(0, context.start - ExtractorUtils.getHistogramIntervalMillis(context.aggs)); + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() + .size(0) + .query(ExtractorUtils.wrapInTimeRangeQuery(context.query, context.timeField, histogramSearchStartTime, context.end)); + + context.aggs.getAggregatorFactories().forEach(searchSourceBuilder::aggregation); + context.aggs.getPipelineAggregatorFactories().forEach(searchSourceBuilder::aggregation); + return searchSourceBuilder; + } + + protected abstract T buildSearchRequest(SearchSourceBuilder searchRequestBuilder); + + private Aggregations validateAggs(@Nullable Aggregations aggs) { + if (aggs == null) { + return null; + } + List aggsAsList = aggs.asList(); + if (aggsAsList.isEmpty()) { + return null; + } + if (aggsAsList.size() > 1) { + throw new IllegalArgumentException("Multiple top level aggregations not supported; found: " + + aggsAsList.stream().map(Aggregation::getName).collect(Collectors.toList())); + } + + return aggs; + } + + private InputStream processNextBatch() throws IOException { + outputStream.reset(); + + hasNext = aggregationToJsonProcessor.writeDocs(BATCH_KEY_VALUE_PAIRS, outputStream); + return new ByteArrayInputStream(outputStream.toByteArray()); + } + + protected long getHistogramInterval() { + return ExtractorUtils.getHistogramIntervalMillis(context.aggs); + } + + public AggregationDataExtractorContext getContext() { + return context; + } + +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java index df627d2d39b03..0ca92162c9920 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java @@ -5,28 +5,10 @@ */ package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.search.aggregations.Aggregation; -import org.elasticsearch.search.aggregations.Aggregations; -import org.elasticsearch.xpack.core.ClientHelper; -import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; -import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.Objects; -import java.util.Optional; -import java.util.stream.Collectors; +import org.elasticsearch.search.builder.SearchSourceBuilder; /** * An implementation that extracts data from elasticsearch using search with aggregations on a client. @@ -34,132 +16,18 @@ * stored and they are then processed in batches. Cancellation is supported between batches. * Note that this class is NOT thread-safe. */ -class AggregationDataExtractor implements DataExtractor { - - private static final Logger LOGGER = LogManager.getLogger(AggregationDataExtractor.class); - - /** - * The number of key-value pairs written in each batch to process. - * This has to be a number that is small enough to allow for responsive - * cancelling and big enough to not cause overhead by calling the - * post data action too often. The value of 1000 was determined via - * such testing. - */ - private static int BATCH_KEY_VALUE_PAIRS = 1000; - - private final Client client; - private final AggregationDataExtractorContext context; - private boolean hasNext; - private boolean isCancelled; - private AggregationToJsonProcessor aggregationToJsonProcessor; - private ByteArrayOutputStream outputStream; +class AggregationDataExtractor extends AbstractAggregationDataExtractor { AggregationDataExtractor(Client client, AggregationDataExtractorContext dataExtractorContext) { - this.client = Objects.requireNonNull(client); - context = Objects.requireNonNull(dataExtractorContext); - hasNext = true; - isCancelled = false; - outputStream = new ByteArrayOutputStream(); + super(client, dataExtractorContext); } @Override - public boolean hasNext() { - return hasNext; - } - - @Override - public boolean isCancelled() { - return isCancelled; - } - - @Override - public void cancel() { - LOGGER.trace("[{}] Data extractor received cancel request", context.jobId); - isCancelled = true; - hasNext = false; - } - - @Override - public Optional next() throws IOException { - if (!hasNext()) { - throw new NoSuchElementException(); - } - - if (aggregationToJsonProcessor == null) { - Aggregations aggs = search(); - if (aggs == null) { - hasNext = false; - return Optional.empty(); - } - initAggregationProcessor(aggs); - } - - return Optional.ofNullable(processNextBatch()); - } - - private Aggregations search() throws IOException { - LOGGER.debug("[{}] Executing aggregated search", context.jobId); - SearchResponse searchResponse = executeSearchRequest(buildSearchRequest()); - LOGGER.debug("[{}] Search response was obtained", context.jobId); - ExtractorUtils.checkSearchWasSuccessful(context.jobId, searchResponse); - return validateAggs(searchResponse.getAggregations()); - } - - private void initAggregationProcessor(Aggregations aggs) throws IOException { - aggregationToJsonProcessor = new AggregationToJsonProcessor(context.timeField, context.fields, context.includeDocCount, - context.start); - aggregationToJsonProcessor.process(aggs); - } - - protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) { - return ClientHelper.executeWithHeaders(context.headers, ClientHelper.ML_ORIGIN, client, searchRequestBuilder::get); - } - - private SearchRequestBuilder buildSearchRequest() { - // For derivative aggregations the first bucket will always be null - // so query one extra histogram bucket back and hope there is data - // in that bucket - long histogramSearchStartTime = Math.max(0, context.start - getHistogramInterval()); - - SearchRequestBuilder searchRequestBuilder = SearchAction.INSTANCE.newRequestBuilder(client) - .setIndices(context.indices) - .setTypes(context.types) - .setSize(0) - .setQuery(ExtractorUtils.wrapInTimeRangeQuery(context.query, context.timeField, histogramSearchStartTime, context.end)); - - context.aggs.getAggregatorFactories().forEach(searchRequestBuilder::addAggregation); - context.aggs.getPipelineAggregatorFactories().forEach(searchRequestBuilder::addAggregation); - return searchRequestBuilder; - } - - private Aggregations validateAggs(@Nullable Aggregations aggs) { - if (aggs == null) { - return null; - } - List aggsAsList = aggs.asList(); - if (aggsAsList.isEmpty()) { - return null; - } - if (aggsAsList.size() > 1) { - throw new IllegalArgumentException("Multiple top level aggregations not supported; found: " - + aggsAsList.stream().map(Aggregation::getName).collect(Collectors.toList())); - } - - return aggs; - } - - private InputStream processNextBatch() throws IOException { - outputStream.reset(); - - hasNext = aggregationToJsonProcessor.writeDocs(BATCH_KEY_VALUE_PAIRS, outputStream); - return new ByteArrayInputStream(outputStream.toByteArray()); - } - - private long getHistogramInterval() { - return ExtractorUtils.getHistogramIntervalMillis(context.aggs); - } + protected SearchRequestBuilder buildSearchRequest(SearchSourceBuilder searchSourceBuilder) { - AggregationDataExtractorContext getContext() { - return context; + return new SearchRequestBuilder(client, SearchAction.INSTANCE) + .setSource(searchSourceBuilder) + .setIndices(context.indices) + .setTypes(context.types); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractor.java new file mode 100644 index 0000000000000..f5de574e99a96 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractor.java @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation; + +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xpack.core.rollup.action.RollupSearchAction; + +/** + * An implementation that extracts data from elasticsearch using search with aggregations against rollup indexes on a client. + * The first time {@link #next()} is called, the search is executed. The result aggregations are + * stored and they are then processed in batches. Cancellation is supported between batches. + * Note that this class is NOT thread-safe. + */ +class RollupDataExtractor extends AbstractAggregationDataExtractor { + + RollupDataExtractor(Client client, AggregationDataExtractorContext dataExtractorContext) { + super(client, dataExtractorContext); + } + + @Override + protected RollupSearchAction.RequestBuilder buildSearchRequest(SearchSourceBuilder searchSourceBuilder) { + SearchRequest searchRequest = new SearchRequest().indices(context.indices).source(searchSourceBuilder); + + return new RollupSearchAction.RequestBuilder(client, searchRequest); + } + +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractorFactory.java new file mode 100644 index 0000000000000..c8a96d6c306af --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractorFactory.java @@ -0,0 +1,218 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation; + +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.utils.Intervals; +import org.elasticsearch.xpack.core.rollup.action.RollableIndexCaps; +import org.elasticsearch.xpack.core.rollup.action.RollupJobCaps.RollupFieldCaps; +import org.elasticsearch.xpack.core.rollup.job.DateHistogramGroupConfig; +import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils.getHistogramAggregation; +import static org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils.getHistogramIntervalMillis; +import static org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils.validateAndGetCalendarInterval; + +public class RollupDataExtractorFactory implements DataExtractorFactory { + + private final Client client; + private final DatafeedConfig datafeedConfig; + private final Job job; + + private RollupDataExtractorFactory(Client client, DatafeedConfig datafeedConfig, Job job) { + this.client = Objects.requireNonNull(client); + this.datafeedConfig = Objects.requireNonNull(datafeedConfig); + this.job = Objects.requireNonNull(job); + } + + @Override + public DataExtractor newExtractor(long start, long end) { + long histogramInterval = datafeedConfig.getHistogramIntervalMillis(); + AggregationDataExtractorContext dataExtractorContext = new AggregationDataExtractorContext( + job.getId(), + job.getDataDescription().getTimeField(), + job.getAnalysisConfig().analysisFields(), + datafeedConfig.getIndices(), + datafeedConfig.getTypes(), + datafeedConfig.getQuery(), + datafeedConfig.getAggregations(), + Intervals.alignToCeil(start, histogramInterval), + Intervals.alignToFloor(end, histogramInterval), + job.getAnalysisConfig().getSummaryCountFieldName().equals(DatafeedConfig.DOC_COUNT), + datafeedConfig.getHeaders()); + return new RollupDataExtractor(client, dataExtractorContext); + } + + public static void create(Client client, + DatafeedConfig datafeed, + Job job, + Map rollupJobsWithCaps, + ActionListener listener) { + + final AggregationBuilder datafeedHistogramAggregation = getHistogramAggregation( + datafeed.getAggregations().getAggregatorFactories()); + if ((datafeedHistogramAggregation instanceof DateHistogramAggregationBuilder) == false) { + listener.onFailure( + new IllegalArgumentException("Rollup requires that the datafeed configuration use a [date_histogram] aggregation," + + " not a [histogram] aggregation over the time field.")); + return; + } + + final String timeField = ((ValuesSourceAggregationBuilder) datafeedHistogramAggregation).field(); + + Set rollupCapsSet = rollupJobsWithCaps.values() + .stream() + .flatMap(rollableIndexCaps -> rollableIndexCaps.getJobCaps().stream()) + .map(rollupJobCaps -> ParsedRollupCaps.fromJobFieldCaps(rollupJobCaps.getFieldCaps(), timeField)) + .collect(Collectors.toSet()); + + final long datafeedInterval = getHistogramIntervalMillis(datafeedHistogramAggregation); + + List validIntervalCaps = rollupCapsSet.stream() + .filter(rollupCaps -> validInterval(datafeedInterval, rollupCaps)) + .collect(Collectors.toList()); + + if (validIntervalCaps.isEmpty()) { + listener.onFailure( + new IllegalArgumentException( + "Rollup capabilities do not have a [date_histogram] aggregation with an interval " + + "that is a multiple of the datafeed's interval.") + ); + return; + } + final List flattenedAggs = new ArrayList<>(); + flattenAggregations(datafeed.getAggregations().getAggregatorFactories(), datafeedHistogramAggregation, flattenedAggs); + + if (validIntervalCaps.stream().noneMatch(rollupJobConfig -> hasAggregations(rollupJobConfig, flattenedAggs))) { + listener.onFailure( + new IllegalArgumentException("Rollup capabilities do not support all the datafeed aggregations at the desired interval.") + ); + return; + } + + listener.onResponse(new RollupDataExtractorFactory(client, datafeed, job)); + } + + private static boolean validInterval(long datafeedInterval, ParsedRollupCaps rollupJobGroupConfig) { + if (rollupJobGroupConfig.hasDatehistogram() == false) { + return false; + } + if ("UTC".equalsIgnoreCase(rollupJobGroupConfig.getTimezone()) == false) { + return false; + } + try { + long jobInterval = validateAndGetCalendarInterval(rollupJobGroupConfig.getInterval()); + return datafeedInterval % jobInterval == 0; + } catch (ElasticsearchStatusException exception) { + return false; + } + } + + private static void flattenAggregations(final Collection datafeedAggregations, + final AggregationBuilder datafeedHistogramAggregation, + final List flattenedAggregations) { + for (AggregationBuilder aggregationBuilder : datafeedAggregations) { + if (aggregationBuilder.equals(datafeedHistogramAggregation) == false) { + flattenedAggregations.add((ValuesSourceAggregationBuilder)aggregationBuilder); + } + flattenAggregations(aggregationBuilder.getSubAggregations(), datafeedHistogramAggregation, flattenedAggregations); + } + } + + private static boolean hasAggregations(ParsedRollupCaps rollupCaps, List datafeedAggregations) { + for (ValuesSourceAggregationBuilder aggregationBuilder : datafeedAggregations) { + String type = aggregationBuilder.getType(); + String field = aggregationBuilder.field(); + if (aggregationBuilder instanceof TermsAggregationBuilder) { + if (rollupCaps.supportedTerms.contains(field) == false) { + return false; + } + } else { + if (rollupCaps.supportedMetrics.contains(field + "_" + type) == false) { + return false; + } + } + } + return true; + } + + private static class ParsedRollupCaps { + private final Set supportedMetrics; + private final Set supportedTerms; + private final Map datehistogramAgg; + private static final List aggsToIgnore = + Arrays.asList(HistogramAggregationBuilder.NAME, DateHistogramAggregationBuilder.NAME); + + private static ParsedRollupCaps fromJobFieldCaps(Map rollupFieldCaps, String timeField) { + Map datehistogram = null; + RollupFieldCaps timeFieldCaps = rollupFieldCaps.get(timeField); + if (timeFieldCaps != null) { + for(Map agg : timeFieldCaps.getAggs()) { + if (agg.get("agg").equals(DateHistogramAggregationBuilder.NAME)) { + datehistogram = agg; + } + } + } + Set supportedMetrics = new HashSet<>(); + Set supportedTerms = new HashSet<>(); + rollupFieldCaps.forEach((field, fieldCaps) -> { + fieldCaps.getAggs().forEach(agg -> { + String type = (String)agg.get("agg"); + if (type.equals(TermsAggregationBuilder.NAME)) { + supportedTerms.add(field); + } else if (aggsToIgnore.contains(type) == false) { + supportedMetrics.add(field + "_" + type); + } + }); + }); + return new ParsedRollupCaps(supportedMetrics, supportedTerms, datehistogram); + } + + private ParsedRollupCaps(Set supportedMetrics, Set supportedTerms, Map datehistogramAgg) { + this.supportedMetrics = supportedMetrics; + this.supportedTerms = supportedTerms; + this.datehistogramAgg = datehistogramAgg; + } + + private String getInterval() { + if (datehistogramAgg == null) { + return null; + } + return (String)datehistogramAgg.get(DateHistogramGroupConfig.INTERVAL); + } + + private String getTimezone() { + if (datehistogramAgg == null) { + return null; + } + return (String)datehistogramAgg.get(DateHistogramGroupConfig.TIME_ZONE); + } + + private boolean hasDatehistogram() { + return datehistogramAgg != null; + } + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java index b3d8d7ab17772..89378699e2d90 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java @@ -7,7 +7,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; @@ -15,10 +17,14 @@ import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.metrics.max.Max; import org.elasticsearch.search.aggregations.metrics.min.Min; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils; +import org.elasticsearch.xpack.core.rollup.action.RollupSearchAction; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; +import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.RollupDataExtractorFactory; import java.io.IOException; import java.io.InputStream; @@ -43,6 +49,13 @@ */ public class ChunkedDataExtractor implements DataExtractor { + private interface DataSummary { + long estimateChunk(); + boolean hasData(); + long earliestTime(); + long getDataTimeSpread(); + } + private static final Logger LOGGER = LogManager.getLogger(ChunkedDataExtractor.class); private static final String EARLIEST_TIME = "earliest_time"; @@ -54,6 +67,7 @@ public class ChunkedDataExtractor implements DataExtractor { private final Client client; private final DataExtractorFactory dataExtractorFactory; private final ChunkedDataExtractorContext context; + private final DataSummaryFactory dataSummaryFactory; private long currentStart; private long currentEnd; private long chunkSpan; @@ -67,6 +81,7 @@ public ChunkedDataExtractor(Client client, DataExtractorFactory dataExtractorFac this.currentStart = context.start; this.currentEnd = context.start; this.isCancelled = false; + this.dataSummaryFactory = new DataSummaryFactory(); } @Override @@ -93,48 +108,24 @@ public Optional next() throws IOException { } private void setUpChunkedSearch() throws IOException { - DataSummary dataSummary = requestDataSummary(); - if (dataSummary.totalHits > 0) { - currentStart = context.timeAligner.alignToFloor(dataSummary.earliestTime); + DataSummary dataSummary = dataSummaryFactory.buildDataSummary(); + if (dataSummary.hasData()) { + currentStart = context.timeAligner.alignToFloor(dataSummary.earliestTime()); currentEnd = currentStart; chunkSpan = context.chunkSpan == null ? dataSummary.estimateChunk() : context.chunkSpan.getMillis(); chunkSpan = context.timeAligner.alignToCeil(chunkSpan); - LOGGER.debug("[{}]Chunked search configured: totalHits = {}, dataTimeSpread = {} ms, chunk span = {} ms", - context.jobId, dataSummary.totalHits, dataSummary.getDataTimeSpread(), chunkSpan); + LOGGER.debug("[{}]Chunked search configured: kind = {}, dataTimeSpread = {} ms, chunk span = {} ms", + context.jobId, dataSummary.getClass().getSimpleName(), dataSummary.getDataTimeSpread(), chunkSpan); } else { // search is over currentEnd = context.end; } } - private DataSummary requestDataSummary() throws IOException { - SearchRequestBuilder searchRequestBuilder = SearchAction.INSTANCE.newRequestBuilder(client) - .setSize(0) - .setIndices(context.indices) - .setTypes(context.types) - .setQuery(ExtractorUtils.wrapInTimeRangeQuery(context.query, context.timeField, currentStart, context.end)) - .addAggregation(AggregationBuilders.min(EARLIEST_TIME).field(context.timeField)) - .addAggregation(AggregationBuilders.max(LATEST_TIME).field(context.timeField)); - - SearchResponse response = executeSearchRequest(searchRequestBuilder); - LOGGER.debug("[{}] Data summary response was obtained", context.jobId); - - ExtractorUtils.checkSearchWasSuccessful(context.jobId, response); - - Aggregations aggregations = response.getAggregations(); - long earliestTime = 0; - long latestTime = 0; - long totalHits = response.getHits().getTotalHits(); - if (totalHits > 0) { - Min min = aggregations.get(EARLIEST_TIME); - earliestTime = (long) min.getValue(); - Max max = aggregations.get(LATEST_TIME); - latestTime = (long) max.getValue(); - } - return new DataSummary(earliestTime, latestTime, totalHits); - } - - protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) { + protected SearchResponse executeSearchRequest( + ActionRequestBuilder> searchRequestBuilder) { return ClientHelper.executeWithHeaders(context.headers, ClientHelper.ML_ORIGIN, client, searchRequestBuilder::get); } @@ -182,19 +173,104 @@ public void cancel() { isCancelled = true; } - private class DataSummary { + ChunkedDataExtractorContext getContext() { + return context; + } + + private class DataSummaryFactory { + + /** + * If there are aggregations, an AggregatedDataSummary object is created. It returns a ScrollingDataSummary otherwise. + * + * By default a DatafeedConfig with aggregations, should already have a manual ChunkingConfig created. + * However, the end user could have specifically set the ChunkingConfig to AUTO, which would not really work for aggregations. + * So, if we need to gather an appropriate chunked time for aggregations, we can utilize the AggregatedDataSummary + * + * @return DataSummary object + * @throws IOException when timefield range search fails + */ + private DataSummary buildDataSummary() throws IOException { + return context.hasAggregations ? newAggregatedDataSummary() : newScrolledDataSummary(); + } + + private DataSummary newScrolledDataSummary() throws IOException { + SearchRequestBuilder searchRequestBuilder = rangeSearchRequest().setTypes(context.types); + + SearchResponse response = executeSearchRequest(searchRequestBuilder); + LOGGER.debug("[{}] Scrolling Data summary response was obtained", context.jobId); + + ExtractorUtils.checkSearchWasSuccessful(context.jobId, response); + + Aggregations aggregations = response.getAggregations(); + long earliestTime = 0; + long latestTime = 0; + long totalHits = response.getHits().getTotalHits(); + if (totalHits > 0) { + Min min = aggregations.get(EARLIEST_TIME); + earliestTime = (long) min.getValue(); + Max max = aggregations.get(LATEST_TIME); + latestTime = (long) max.getValue(); + } + return new ScrolledDataSummary(earliestTime, latestTime, totalHits); + } + + private DataSummary newAggregatedDataSummary() throws IOException { + // TODO: once RollupSearchAction is changed from indices:admin* to indices:data/read/* this branch is not needed + ActionRequestBuilder> + searchRequestBuilder = + dataExtractorFactory instanceof RollupDataExtractorFactory ? rollupRangeSearchRequest() : rangeSearchRequest(); + SearchResponse response = executeSearchRequest(searchRequestBuilder); + LOGGER.debug("[{}] Aggregating Data summary response was obtained", context.jobId); + + ExtractorUtils.checkSearchWasSuccessful(context.jobId, response); + + Aggregations aggregations = response.getAggregations(); + Min min = aggregations.get(EARLIEST_TIME); + Max max = aggregations.get(LATEST_TIME); + return new AggregatedDataSummary(min.getValue(), max.getValue(), context.histogramInterval); + } + + private SearchSourceBuilder rangeSearchBuilder() { + return new SearchSourceBuilder() + .size(0) + .query(ExtractorUtils.wrapInTimeRangeQuery(context.query, context.timeField, currentStart, context.end)) + .aggregation(AggregationBuilders.min(EARLIEST_TIME).field(context.timeField)) + .aggregation(AggregationBuilders.max(LATEST_TIME).field(context.timeField)); + } + + private SearchRequestBuilder rangeSearchRequest() { + return new SearchRequestBuilder(client, SearchAction.INSTANCE) + .setIndices(context.indices) + .setSource(rangeSearchBuilder()); + } + + private RollupSearchAction.RequestBuilder rollupRangeSearchRequest() { + SearchRequest searchRequest = new SearchRequest().indices(context.indices).source(rangeSearchBuilder()); + return new RollupSearchAction.RequestBuilder(client, searchRequest); + } + } + + private class ScrolledDataSummary implements DataSummary { private long earliestTime; private long latestTime; private long totalHits; - private DataSummary(long earliestTime, long latestTime, long totalHits) { + private ScrolledDataSummary(long earliestTime, long latestTime, long totalHits) { this.earliestTime = earliestTime; this.latestTime = latestTime; this.totalHits = totalHits; } - private long getDataTimeSpread() { + @Override + public long earliestTime() { + return earliestTime; + } + + @Override + public long getDataTimeSpread() { return latestTime - earliestTime; } @@ -206,7 +282,8 @@ private long getDataTimeSpread() { * However, assuming this as the chunk span may often lead to half-filled pages or empty searches. * It is beneficial to take a multiple of that. Based on benchmarking, we set this to 10x. */ - private long estimateChunk() { + @Override + public long estimateChunk() { long dataTimeSpread = getDataTimeSpread(); if (totalHits <= 0 || dataTimeSpread <= 0) { return context.end - currentEnd; @@ -214,9 +291,46 @@ private long estimateChunk() { long estimatedChunk = 10 * (context.scrollSize * getDataTimeSpread()) / totalHits; return Math.max(estimatedChunk, MIN_CHUNK_SPAN); } + + @Override + public boolean hasData() { + return totalHits > 0; + } } - ChunkedDataExtractorContext getContext() { - return context; + private class AggregatedDataSummary implements DataSummary { + + private final double earliestTime; + private final double latestTime; + private final long histogramIntervalMillis; + + private AggregatedDataSummary(double earliestTime, double latestTime, long histogramInterval) { + this.earliestTime = earliestTime; + this.latestTime = latestTime; + this.histogramIntervalMillis = histogramInterval; + } + + /** + * This heuristic is a direct copy of the manual chunking config auto-creation done in {@link DatafeedConfig.Builder} + */ + @Override + public long estimateChunk() { + return DatafeedConfig.Builder.DEFAULT_AGGREGATION_CHUNKING_BUCKETS * histogramIntervalMillis; + } + + @Override + public boolean hasData() { + return (Double.isInfinite(earliestTime) || Double.isInfinite(latestTime)) == false; + } + + @Override + public long earliestTime() { + return (long)earliestTime; + } + + @Override + public long getDataTimeSpread() { + return (long)latestTime - (long)earliestTime; + } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorContext.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorContext.java index 38c2efd8679c0..bb32b40f7cde3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorContext.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorContext.java @@ -31,10 +31,13 @@ interface TimeAligner { final TimeValue chunkSpan; final TimeAligner timeAligner; final Map headers; + final boolean hasAggregations; + final Long histogramInterval; ChunkedDataExtractorContext(String jobId, String timeField, List indices, List types, QueryBuilder query, int scrollSize, long start, long end, @Nullable TimeValue chunkSpan, - TimeAligner timeAligner, Map headers) { + TimeAligner timeAligner, Map headers, boolean hasAggregations, + @Nullable Long histogramInterval) { this.jobId = Objects.requireNonNull(jobId); this.timeField = Objects.requireNonNull(timeField); this.indices = indices.toArray(new String[indices.size()]); @@ -46,5 +49,7 @@ interface TimeAligner { this.chunkSpan = chunkSpan; this.timeAligner = Objects.requireNonNull(timeAligner); this.headers = headers; + this.hasAggregations = hasAggregations; + this.histogramInterval = histogramInterval; } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java index 7b5bac64740d6..67079cf2e6777 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorFactory.java @@ -42,7 +42,10 @@ public DataExtractor newExtractor(long start, long end) { timeAligner.alignToFloor(end), datafeedConfig.getChunkingConfig().getTimeSpan(), timeAligner, - datafeedConfig.getHeaders()); + datafeedConfig.getHeaders(), + datafeedConfig.hasAggregations(), + datafeedConfig.hasAggregations() ? datafeedConfig.getHistogramIntervalMillis() : null + ); return new ChunkedDataExtractor(client, dataExtractorFactory, dataExtractorContext); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/LocalStateMachineLearning.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/LocalStateMachineLearning.java index 485556d8441ea..8cf2c9e47b1bb 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/LocalStateMachineLearning.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/LocalStateMachineLearning.java @@ -5,15 +5,31 @@ */ package org.elasticsearch.xpack.ml; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.license.LicenseService; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; +import org.elasticsearch.xpack.core.rollup.action.GetRollupIndexCapsAction; import org.elasticsearch.xpack.core.ssl.SSLService; import org.elasticsearch.xpack.monitoring.Monitoring; import org.elasticsearch.xpack.security.Security; import java.nio.file.Path; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; public class LocalStateMachineLearning extends LocalStateCompositeXPackPlugin { @@ -50,6 +66,45 @@ protected XPackLicenseState getLicenseState() { @Override protected XPackLicenseState getLicenseState() { return thisVar.getLicenseState(); } }); + plugins.add(new MockedRollupPlugin()); } -} + /** + * This is only required as we now have to have the GetRollupIndexCapsAction as a valid action in our node. + * The MachineLearningLicenseTests attempt to create a datafeed referencing this LocalStateMachineLearning object. + * Consequently, we need to be able to take this rollup action (response does not matter) + * as the datafeed extractor now depends on it. + */ + public static class MockedRollupPlugin extends Plugin implements ActionPlugin { + + @Override + public List> getActions() { + return Collections.singletonList( + new ActionHandler<>(GetRollupIndexCapsAction.INSTANCE, MockedRollupIndexCapsTransport.class) + ); + } + + public static class MockedRollupIndexCapsTransport + extends TransportAction { + + @Inject + public MockedRollupIndexCapsTransport(Settings settings, TransportService transportService, + ThreadPool threadPool, IndexNameExpressionResolver indexNameExpressionResolver) { + super(settings, GetRollupIndexCapsAction.NAME, threadPool, new ActionFilters(new HashSet<>()), + indexNameExpressionResolver, transportService.getTaskManager()); + } + + @Override + protected void doExecute(Task task, + GetRollupIndexCapsAction.Request request, + ActionListener listener) { + listener.onResponse(new GetRollupIndexCapsAction.Response()); + } + + @Override + protected void doExecute(GetRollupIndexCapsAction.Request request, ActionListener listener) { + listener.onResponse(new GetRollupIndexCapsAction.Response()); + } + } + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactoryTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactoryTests.java index 52e38a70abdb5..7399d31f7c6ac 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactoryTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactoryTests.java @@ -14,23 +14,39 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.job.config.DataDescription; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.rollup.action.GetRollupIndexCapsAction; +import org.elasticsearch.xpack.core.rollup.action.RollableIndexCaps; +import org.elasticsearch.xpack.core.rollup.action.RollupJobCaps; +import org.elasticsearch.xpack.core.rollup.job.DateHistogramGroupConfig; +import org.elasticsearch.xpack.core.rollup.job.GroupConfig; +import org.elasticsearch.xpack.core.rollup.job.MetricConfig; +import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig; +import org.elasticsearch.xpack.core.rollup.job.TermsGroupConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests; import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractorFactory; +import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.RollupDataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorFactory; -import org.elasticsearch.xpack.core.ml.job.config.DataDescription; -import org.elasticsearch.xpack.core.ml.job.config.Job; import org.junit.Before; +import java.util.Arrays; +import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.List; import java.util.Map; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Matchers.any; import static org.mockito.Matchers.same; @@ -41,6 +57,7 @@ public class DataExtractorFactoryTests extends ESTestCase { private FieldCapabilitiesResponse fieldsCapabilities; + private GetRollupIndexCapsAction.Response getRollupIndexResponse; private Client client; @@ -54,12 +71,22 @@ public void setUpTests() { givenAggregatableField("time", "date"); givenAggregatableField("field", "keyword"); + getRollupIndexResponse = mock(GetRollupIndexCapsAction.Response.class); + when(getRollupIndexResponse.getJobs()).thenReturn(new HashMap<>()); + doAnswer(invocationMock -> { @SuppressWarnings("raw_types") ActionListener listener = (ActionListener) invocationMock.getArguments()[2]; listener.onResponse(fieldsCapabilities); return null; }).when(client).execute(same(FieldCapabilitiesAction.INSTANCE), any(), any()); + + doAnswer(invocationMock -> { + @SuppressWarnings("raw_types") + ActionListener listener = (ActionListener) invocationMock.getArguments()[2]; + listener.onResponse(getRollupIndexResponse); + return null; + }).when(client).execute(same(GetRollupIndexCapsAction.INSTANCE), any(), any()); } public void testCreateDataExtractorFactoryGivenDefaultScroll() { @@ -165,6 +192,162 @@ public void testCreateDataExtractorFactoryGivenDefaultAggregationWithAutoChunk() DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), listener); } + public void testCreateDataExtractorFactoryGivenRollupAndValidAggregation() { + givenAggregatableRollup("myField", "max", 5, "termField"); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setTimeField("time"); + Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob(); + jobBuilder.setDataDescription(dataDescription); + DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo"); + datafeedConfig.setChunkingConfig(ChunkingConfig.newOff()); + MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time"); + MaxAggregationBuilder myField = AggregationBuilders.max("myField").field("myField"); + TermsAggregationBuilder myTerm = AggregationBuilders.terms("termAgg").field("termField").subAggregation(myField); + datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator( + AggregationBuilders.dateHistogram("time").interval(600_000).subAggregation(maxTime).subAggregation(myTerm).field("time"))); + ActionListener listener = ActionListener.wrap( + dataExtractorFactory -> assertThat(dataExtractorFactory, instanceOf(RollupDataExtractorFactory.class)), + e -> fail() + ); + DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), listener); + } + + public void testCreateDataExtractorFactoryGivenRollupAndValidAggregationAndAutoChunk() { + givenAggregatableRollup("myField", "max", 5, "termField"); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setTimeField("time"); + Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob(); + jobBuilder.setDataDescription(dataDescription); + DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo"); + datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto()); + MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time"); + MaxAggregationBuilder myField = AggregationBuilders.max("myField").field("myField"); + TermsAggregationBuilder myTerm = AggregationBuilders.terms("termAgg").field("termField").subAggregation(myField); + datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator( + AggregationBuilders.dateHistogram("time").interval(600_000).subAggregation(maxTime).subAggregation(myTerm).field("time"))); + ActionListener listener = ActionListener.wrap( + dataExtractorFactory -> assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class)), + e -> fail() + ); + DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), listener); + } + + public void testCreateDataExtractorFactoryGivenRollupButNoAggregations() { + givenAggregatableRollup("myField", "max", 5); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setTimeField("time"); + Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob(); + jobBuilder.setDataDescription(dataDescription); + DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo"); + datafeedConfig.setChunkingConfig(ChunkingConfig.newOff()); + + ActionListener listener = ActionListener.wrap( + dataExtractorFactory -> fail(), + e -> { + assertThat(e.getMessage(), equalTo("Aggregations are required when using Rollup indices")); + assertThat(e, instanceOf(IllegalArgumentException.class)); + } + ); + + DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), listener); + } + + public void testCreateDataExtractorFactoryGivenRollupWithBadInterval() { + givenAggregatableRollup("myField", "max", 7, "termField"); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setTimeField("time"); + Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob(); + jobBuilder.setDataDescription(dataDescription); + DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo"); + datafeedConfig.setChunkingConfig(ChunkingConfig.newOff()); + MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time"); + MaxAggregationBuilder myField = AggregationBuilders.max("myField").field("myField"); + TermsAggregationBuilder myTerm = AggregationBuilders.terms("termAgg").field("termField").subAggregation(myField); + datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator( + AggregationBuilders.dateHistogram("time").interval(600_000).subAggregation(maxTime).subAggregation(myTerm).field("time"))); + ActionListener listener = ActionListener.wrap( + dataExtractorFactory -> fail(), + e -> { + assertThat(e.getMessage(), + containsString("Rollup capabilities do not have a [date_histogram] aggregation with an interval " + + "that is a multiple of the datafeed's interval.")); + assertThat(e, instanceOf(IllegalArgumentException.class)); + } + ); + DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), listener); + } + + public void testCreateDataExtractorFactoryGivenRollupMissingTerms() { + givenAggregatableRollup("myField", "max", 5); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setTimeField("time"); + Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob(); + jobBuilder.setDataDescription(dataDescription); + DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo"); + datafeedConfig.setChunkingConfig(ChunkingConfig.newOff()); + MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time"); + MaxAggregationBuilder myField = AggregationBuilders.max("myField").field("myField"); + TermsAggregationBuilder myTerm = AggregationBuilders.terms("termAgg").field("termField").subAggregation(myField); + datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator( + AggregationBuilders.dateHistogram("time").interval(600_000).subAggregation(maxTime).subAggregation(myTerm).field("time"))); + ActionListener listener = ActionListener.wrap( + dataExtractorFactory -> fail(), + e -> { + assertThat(e.getMessage(), + containsString("Rollup capabilities do not support all the datafeed aggregations at the desired interval.")); + assertThat(e, instanceOf(IllegalArgumentException.class)); + } + ); + DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), listener); + } + + public void testCreateDataExtractorFactoryGivenRollupMissingMetric() { + givenAggregatableRollup("myField", "max", 5, "termField"); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setTimeField("time"); + Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob(); + jobBuilder.setDataDescription(dataDescription); + DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo"); + datafeedConfig.setChunkingConfig(ChunkingConfig.newOff()); + MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time"); + MaxAggregationBuilder myField = AggregationBuilders.max("myField").field("otherField"); + TermsAggregationBuilder myTerm = AggregationBuilders.terms("termAgg").field("termField").subAggregation(myField); + datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator( + AggregationBuilders.dateHistogram("time").interval(600_000).subAggregation(maxTime).subAggregation(myTerm).field("time"))); + ActionListener listener = ActionListener.wrap( + dataExtractorFactory -> fail(), + e -> { + assertThat(e.getMessage(), + containsString("Rollup capabilities do not support all the datafeed aggregations at the desired interval.")); + assertThat(e, instanceOf(IllegalArgumentException.class)); + } + ); + DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), listener); + } + + private void givenAggregatableRollup(String field, String type, int minuteInterval, String... groupByTerms) { + List metricConfigs = Arrays.asList(new MetricConfig(field, Collections.singletonList(type)), + new MetricConfig("time", Arrays.asList("min", "max"))); + TermsGroupConfig termsGroupConfig = null; + if (groupByTerms.length > 0) { + termsGroupConfig = new TermsGroupConfig(groupByTerms); + } + RollupJobConfig rollupJobConfig = new RollupJobConfig("rollupJob1", + "myIndexes*", + "myIndex_rollup", + "*/30 * * * * ?", + 300, + new GroupConfig( + new DateHistogramGroupConfig("time", DateHistogramInterval.minutes(minuteInterval)), null, termsGroupConfig), + metricConfigs, + null); + RollupJobCaps rollupJobCaps = new RollupJobCaps(rollupJobConfig); + RollableIndexCaps rollableIndexCaps = new RollableIndexCaps("myIndex_rollup", Collections.singletonList(rollupJobCaps)); + Map jobs = new HashMap<>(1); + jobs.put("rollupJob1", rollableIndexCaps); + when(getRollupIndexResponse.getJobs()).thenReturn(jobs); + } + private void givenAggregatableField(String field, String type) { FieldCapabilities fieldCaps = mock(FieldCapabilities.class); when(fieldCaps.isSearchable()).thenReturn(true); @@ -173,4 +356,4 @@ private void givenAggregatableField(String field, String type) { fieldCapsMap.put(type, fieldCaps); when(fieldsCapabilities.getField(field)).thenReturn(fieldCapsMap); } -} \ No newline at end of file +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java index 903ab4af1157f..af2b3e7750b63 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java @@ -46,7 +46,7 @@ public class AggregationDataExtractorTests extends ESTestCase { - private Client client; + private Client testClient; private List capturedSearchRequests; private String jobId; private String timeField; @@ -61,7 +61,7 @@ private class TestDataExtractor extends AggregationDataExtractor { private SearchResponse nextResponse; TestDataExtractor(long start, long end) { - super(client, createContext(start, end)); + super(testClient, createContext(start, end)); } @Override @@ -77,7 +77,7 @@ void setNextResponse(SearchResponse searchResponse) { @Before public void setUpTests() { - client = mock(Client.class); + testClient = mock(Client.class); capturedSearchRequests = new ArrayList<>(); jobId = "test-job"; timeField = "time"; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java index 18c35155b6f5d..766f35fce6ff0 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractorTests.java @@ -5,7 +5,8 @@ */ package org.elasticsearch.xpack.ml.datafeed.extractor.chunked; -import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.client.Client; @@ -44,7 +45,7 @@ public class ChunkedDataExtractorTests extends ESTestCase { private Client client; - private List capturedSearchRequests; + private List capturedSearchRequests; private String jobId; private String timeField; private List types; @@ -62,9 +63,15 @@ private class TestDataExtractor extends ChunkedDataExtractor { super(client, dataExtractorFactory, createContext(start, end)); } + TestDataExtractor(long start, long end, boolean hasAggregations, Long histogramInterval) { + super(client, dataExtractorFactory, createContext(start, end, hasAggregations, histogramInterval)); + } + @Override - protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) { - capturedSearchRequests.add(searchRequestBuilder); + protected SearchResponse executeSearchRequest( + ActionRequestBuilder> + searchRequestBuilder) { + capturedSearchRequests.add(searchRequestBuilder.request()); return nextResponse; } @@ -136,6 +143,89 @@ public void testExtractionGivenSpecifiedChunk() throws IOException { assertThat(searchRequest, not(containsString("\"sort\""))); } + public void testExtractionGivenSpecifiedChunkAndAggs() throws IOException { + chunkSpan = TimeValue.timeValueSeconds(1); + TestDataExtractor extractor = new TestDataExtractor(1000L, 2300L, true, 1000L); + extractor.setNextResponse(createSearchResponse(0L, 1000L, 2200L)); + + InputStream inputStream1 = mock(InputStream.class); + InputStream inputStream2 = mock(InputStream.class); + InputStream inputStream3 = mock(InputStream.class); + + DataExtractor subExtactor1 = new StubSubExtractor(inputStream1, inputStream2); + when(dataExtractorFactory.newExtractor(1000L, 2000L)).thenReturn(subExtactor1); + + DataExtractor subExtactor2 = new StubSubExtractor(inputStream3); + when(dataExtractorFactory.newExtractor(2000L, 2300L)).thenReturn(subExtactor2); + + assertThat(extractor.hasNext(), is(true)); + assertEquals(inputStream1, extractor.next().get()); + assertThat(extractor.hasNext(), is(true)); + assertEquals(inputStream2, extractor.next().get()); + assertThat(extractor.hasNext(), is(true)); + assertEquals(inputStream3, extractor.next().get()); + assertThat(extractor.hasNext(), is(true)); + assertThat(extractor.next().isPresent(), is(false)); + + verify(dataExtractorFactory).newExtractor(1000L, 2000L); + verify(dataExtractorFactory).newExtractor(2000L, 2300L); + Mockito.verifyNoMoreInteractions(dataExtractorFactory); + + assertThat(capturedSearchRequests.size(), equalTo(1)); + String searchRequest = capturedSearchRequests.get(0).toString().replaceAll("\\s", ""); + assertThat(searchRequest, containsString("\"size\":0")); + assertThat(searchRequest, containsString("\"query\":{\"bool\":{\"filter\":[{\"match_all\":{\"boost\":1.0}}," + + "{\"range\":{\"time\":{\"from\":1000,\"to\":2300,\"include_lower\":true,\"include_upper\":false," + + "\"format\":\"epoch_millis\",\"boost\":1.0}}}]")); + assertThat(searchRequest, containsString("\"aggregations\":{\"earliest_time\":{\"min\":{\"field\":\"time\"}}," + + "\"latest_time\":{\"max\":{\"field\":\"time\"}}}}")); + assertThat(searchRequest, not(containsString("\"sort\""))); + } + + public void testExtractionGivenAutoChunkAndAggs() throws IOException { + chunkSpan = null; + TestDataExtractor extractor = new TestDataExtractor(100_000L, 450_000L, true, 200L); + + extractor.setNextResponse(createSearchResponse(0L, 100_000L, 400_000L)); + + InputStream inputStream1 = mock(InputStream.class); + InputStream inputStream2 = mock(InputStream.class); + + // 200 * 1_000 == 200_000 + DataExtractor subExtactor1 = new StubSubExtractor(inputStream1); + when(dataExtractorFactory.newExtractor(100_000L, 300_000L)).thenReturn(subExtactor1); + + DataExtractor subExtactor2 = new StubSubExtractor(inputStream2); + when(dataExtractorFactory.newExtractor(300_000L, 450_000L)).thenReturn(subExtactor2); + + assertThat(extractor.hasNext(), is(true)); + assertEquals(inputStream1, extractor.next().get()); + assertThat(extractor.hasNext(), is(true)); + assertEquals(inputStream2, extractor.next().get()); + assertThat(extractor.next().isPresent(), is(false)); + assertThat(extractor.hasNext(), is(false)); + + verify(dataExtractorFactory).newExtractor(100_000L, 300_000L); + verify(dataExtractorFactory).newExtractor(300_000L, 450_000L); + Mockito.verifyNoMoreInteractions(dataExtractorFactory); + + assertThat(capturedSearchRequests.size(), equalTo(1)); + } + + public void testExtractionGivenAutoChunkAndAggsAndNoData() throws IOException { + chunkSpan = null; + TestDataExtractor extractor = new TestDataExtractor(100L, 500L, true, 200L); + + extractor.setNextResponse(createNullSearchResponse()); + + assertThat(extractor.next().isPresent(), is(false)); + assertThat(extractor.hasNext(), is(false)); + + Mockito.verifyNoMoreInteractions(dataExtractorFactory); + + assertThat(capturedSearchRequests.size(), equalTo(1)); + } + public void testExtractionGivenAutoChunkAndScrollSize1000() throws IOException { chunkSpan = null; scrollSize = 1000; @@ -430,6 +520,27 @@ private SearchResponse createSearchResponse(long totalHits, long earliestTime, l return searchResponse; } + private SearchResponse createNullSearchResponse() { + SearchResponse searchResponse = mock(SearchResponse.class); + when(searchResponse.status()).thenReturn(RestStatus.OK); + SearchHit[] hits = new SearchHit[0]; + SearchHits searchHits = new SearchHits(hits, 0, 1); + when(searchResponse.getHits()).thenReturn(searchHits); + + List aggs = new ArrayList<>(); + Min min = mock(Min.class); + when(min.getValue()).thenReturn(Double.POSITIVE_INFINITY); + when(min.getName()).thenReturn("earliest_time"); + aggs.add(min); + Max max = mock(Max.class); + when(max.getValue()).thenReturn(Double.POSITIVE_INFINITY); + when(max.getName()).thenReturn("latest_time"); + aggs.add(max); + Aggregations aggregations = new Aggregations(aggs) {}; + when(searchResponse.getAggregations()).thenReturn(aggregations); + return searchResponse; + } + private SearchResponse createErrorResponse() { SearchResponse searchResponse = mock(SearchResponse.class); when(searchResponse.status()).thenReturn(RestStatus.INTERNAL_SERVER_ERROR); @@ -445,8 +556,12 @@ private SearchResponse createResponseWithShardFailures() { } private ChunkedDataExtractorContext createContext(long start, long end) { + return createContext(start, end, false, null); + } + + private ChunkedDataExtractorContext createContext(long start, long end, boolean hasAggregations, Long histogramInterval) { return new ChunkedDataExtractorContext(jobId, timeField, indices, types, query, scrollSize, start, end, chunkSpan, - ChunkedDataExtractorFactory.newIdentityTimeAligner(), Collections.emptyMap()); + ChunkedDataExtractorFactory.newIdentityTimeAligner(), Collections.emptyMap(), hasAggregations, histogramInterval); } private static class StubSubExtractor implements DataExtractor {