diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index a121217d4cdaa..25b745c4f499a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -107,7 +107,6 @@ import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; import org.elasticsearch.xpack.core.ml.action.PutFilterAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; -import org.elasticsearch.xpack.core.ml.action.MlUpgradeAction; import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; @@ -292,7 +291,6 @@ public List> getClientActions() { PostCalendarEventsAction.INSTANCE, PersistJobAction.INSTANCE, FindFileStructureAction.INSTANCE, - MlUpgradeAction.INSTANCE, // security ClearRealmCacheAction.INSTANCE, ClearRolesCacheAction.INSTANCE, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/MlUpgradeAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/MlUpgradeAction.java deleted file mode 100644 index 404f15d4f6270..0000000000000 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/MlUpgradeAction.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.core.ml.action; - -import org.elasticsearch.action.Action; -import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.action.support.master.MasterNodeReadOperationRequestBuilder; -import org.elasticsearch.action.support.master.MasterNodeReadRequest; -import org.elasticsearch.client.ElasticsearchClient; -import org.elasticsearch.common.ParseField; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.xcontent.ObjectParser; -import org.elasticsearch.common.xcontent.ToXContentObject; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.tasks.CancellableTask; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.TaskId; -import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields; - -import java.io.IOException; -import java.util.Map; -import java.util.Objects; - - -public class MlUpgradeAction extends Action { - public static final MlUpgradeAction INSTANCE = new MlUpgradeAction(); - public static final String NAME = "cluster:admin/xpack/ml/upgrade"; - - private MlUpgradeAction() { - super(NAME); - } - - @Override - public AcknowledgedResponse newResponse() { - return new AcknowledgedResponse(); - } - - public static class Request extends MasterNodeReadRequest implements ToXContentObject { - - private static final ParseField REINDEX_BATCH_SIZE = new ParseField("reindex_batch_size"); - - public static ObjectParser PARSER = new ObjectParser<>("ml_upgrade", true, Request::new); - static { - PARSER.declareInt(Request::setReindexBatchSize, REINDEX_BATCH_SIZE); - } - - static final String INDEX = AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + "*"; - private int reindexBatchSize = 1000; - - /** - * Should this task store its result? - */ - private boolean shouldStoreResult; - - // for serialization - public Request() { - } - - public Request(StreamInput in) throws IOException { - super(in); - reindexBatchSize = in.readInt(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeInt(reindexBatchSize); - } - - public String[] indices() { - return new String[]{INDEX}; - } - - public IndicesOptions indicesOptions() { - return IndicesOptions.strictExpandOpenAndForbidClosed(); - } - - /** - * Should this task store its result after it has finished? - */ - public Request setShouldStoreResult(boolean shouldStoreResult) { - this.shouldStoreResult = shouldStoreResult; - return this; - } - - @Override - public boolean getShouldStoreResult() { - return shouldStoreResult; - } - - public Request setReindexBatchSize(int reindexBatchSize) { - this.reindexBatchSize = reindexBatchSize; - return this; - } - - public int getReindexBatchSize() { - return reindexBatchSize; - } - - @Override - public ActionRequestValidationException validate() { - if (reindexBatchSize <= 0) { - ActionRequestValidationException validationException = new ActionRequestValidationException(); - validationException.addValidationError("["+ REINDEX_BATCH_SIZE.getPreferredName()+"] must be greater than 0."); - return validationException; - } - return null; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - Request request = (Request) o; - return Objects.equals(reindexBatchSize, request.reindexBatchSize); - } - - @Override - public int hashCode() { - return Objects.hash(reindexBatchSize); - } - - @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return new CancellableTask(id, type, action, "ml-upgrade", parentTaskId, headers) { - @Override - public boolean shouldCancelChildrenOnCancellation() { - return true; - } - }; - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.field(REINDEX_BATCH_SIZE.getPreferredName(), reindexBatchSize); - builder.endObject(); - return builder; - } - } - - public static class RequestBuilder extends MasterNodeReadOperationRequestBuilder { - - public RequestBuilder(ElasticsearchClient client) { - super(client, INSTANCE, new Request()); - } - } - -} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/MlUpgradeRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/MlUpgradeRequestTests.java deleted file mode 100644 index 227fc20ec9688..0000000000000 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/MlUpgradeRequestTests.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.core.ml.action; - -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.test.AbstractWireSerializingTestCase; - - -public class MlUpgradeRequestTests extends AbstractWireSerializingTestCase { - - @Override - protected MlUpgradeAction.Request createTestInstance() { - MlUpgradeAction.Request request = new MlUpgradeAction.Request(); - if (randomBoolean()) { - request.setReindexBatchSize(randomIntBetween(1, 10_000)); - } - return request; - } - - @Override - protected Writeable.Reader instanceReader() { - return MlUpgradeAction.Request::new; - } - -} diff --git a/x-pack/plugin/ml/qa/ml-with-security/build.gradle b/x-pack/plugin/ml/qa/ml-with-security/build.gradle index 6e0127f614c9a..abfed3fd878d0 100644 --- a/x-pack/plugin/ml/qa/ml-with-security/build.gradle +++ b/x-pack/plugin/ml/qa/ml-with-security/build.gradle @@ -93,9 +93,7 @@ integTestRunner { 'ml/validate/Test job config that is invalid only because of the job ID', 'ml/validate_detector/Test invalid detector', 'ml/delete_forecast/Test delete on _all forecasts not allow no forecasts', - 'ml/delete_forecast/Test delete forecast on missing forecast', - 'ml/ml_upgrade/Upgrade results when there is nothing to upgrade', - 'ml/ml_upgrade/Upgrade results when there is nothing to upgrade not waiting for results' + 'ml/delete_forecast/Test delete forecast on missing forecast' ].join(',') } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlUpgradeIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlUpgradeIT.java deleted file mode 100644 index a2a05ea1686fa..0000000000000 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlUpgradeIT.java +++ /dev/null @@ -1,378 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.integration; - -import org.elasticsearch.Version; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.index.reindex.ReindexAction; -import org.elasticsearch.index.reindex.ReindexRequest; -import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.ml.action.MlUpgradeAction; -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; -import org.elasticsearch.xpack.core.ml.job.config.Job; -import org.elasticsearch.xpack.core.ml.job.config.JobState; -import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; -import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields; -import org.elasticsearch.xpack.ml.ResultsIndexUpgradeService; -import org.junit.After; -import org.junit.Assert; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeedBuilder; -import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob; -import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.indexDocs; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.core.Is.is; - -public class MlUpgradeIT extends MlNativeAutodetectIntegTestCase { - - @After - public void cleanup() throws Exception { - cleanUp(); - } - - public void testMigrationWhenItIsNotNecessary() throws Exception { - String jobId1 = "no-migration-test1"; - String jobId2 = "no-migration-test2"; - String jobId3 = "no-migration-test3"; - - String dataIndex = createDataIndex().v2(); - List jobs = createJobsWithData(jobId1, jobId2, jobId3, dataIndex); - Job job1 = jobs.get(0); - Job job2 = jobs.get(1); - Job job3 = jobs.get(2); - - String job1Index = job1.getResultsIndexName(); - String job2Index = job2.getResultsIndexName(); - String job3Index = job3.getResultsIndexName(); - - assertThat(indexExists(job1Index), is(true)); - assertThat(indexExists(job2Index), is(true)); - assertThat(indexExists(job3Index), is(true)); - - long job1Total = getTotalDocCount(job1Index); - long job2Total = getTotalDocCount(job2Index); - long job3Total = getTotalDocCount(job3Index); - - AcknowledgedResponse resp = ESIntegTestCase.client().execute(MlUpgradeAction.INSTANCE, - new MlUpgradeAction.Request()).actionGet(); - assertThat(resp.isAcknowledged(), is(true)); - - // Migration should have done nothing - assertThat(indexExists(job1Index), is(true)); - assertThat(indexExists(job2Index), is(true)); - assertThat(indexExists(job3Index), is(true)); - - assertThat(getTotalDocCount(job1Index), equalTo(job1Total)); - assertThat(getTotalDocCount(job2Index), equalTo(job2Total)); - assertThat(getTotalDocCount(job3Index), equalTo(job3Total)); - - ClusterState state = admin().cluster().state(new ClusterStateRequest()).actionGet().getState(); - IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(); - String[] indices = indexNameExpressionResolver.concreteIndexNames(state, - IndicesOptions.strictExpandOpenAndForbidClosed(), - AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*"); - - // Our backing index size should be two as we have a shared and custom index - assertThat(indices.length, equalTo(2)); - } - - public void testMigration() throws Exception { - String jobId1 = "migration-test1"; - String jobId2 = "migration-test2"; - String jobId3 = "migration-test3"; - - String dataIndex = createDataIndex().v2(); - List jobs = createJobsWithData(jobId1, jobId2, jobId3, dataIndex); - Job job1 = jobs.get(0); - Job job2 = jobs.get(1); - Job job3 = jobs.get(2); - - String job1Index = job1.getResultsIndexName(); - String job2Index = job2.getResultsIndexName(); - String job3Index = job3.getResultsIndexName(); - - assertThat(indexExists(job1Index), is(true)); - assertThat(indexExists(job2Index), is(true)); - assertThat(indexExists(job3Index), is(true)); - - long job1Total = getJobResultsCount(job1.getId()); - long job2Total = getJobResultsCount(job2.getId()); - long job3Total = getJobResultsCount(job3.getId()); - - IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(); - - ResultsIndexUpgradeService resultsIndexUpgradeService = new ResultsIndexUpgradeService(indexNameExpressionResolver, - ThreadPool.Names.SAME, - indexMetaData -> true); - - PlainActionFuture future = PlainActionFuture.newFuture(); - - resultsIndexUpgradeService.upgrade(ESIntegTestCase.client(), - new MlUpgradeAction.Request(), - ESIntegTestCase.client().admin().cluster().prepareState().get().getState(), - future); - - AcknowledgedResponse response = future.get(); - assertThat(response.isAcknowledged(), is(true)); - - assertThat(indexExists(job1Index), is(false)); - assertThat(indexExists(job2Index), is(false)); - assertThat(indexExists(job3Index), is(false)); - - ClusterState state = admin().cluster().state(new ClusterStateRequest()).actionGet().getState(); - String[] indices = indexNameExpressionResolver.concreteIndexNames(state, - IndicesOptions.strictExpandOpenAndForbidClosed(), - AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*"); - - // Our backing index size should be four as we have a shared and custom index and upgrading doubles the number of indices - Assert.assertThat(indices.length, equalTo(4)); - - refresh(indices); - assertThat(getJobResultsCount(job1.getId()), equalTo(job1Total)); - assertThat(getJobResultsCount(job2.getId()), equalTo(job2Total)); - assertThat(getJobResultsCount(job3.getId()), equalTo(job3Total)); - - - // WE should still be able to write, and the aliases should allow to read from the appropriate indices - postDataToJob(jobId1); - postDataToJob(jobId2); - postDataToJob(jobId3); - // We should also be able to create new jobs and old jobs should be unaffected. - String jobId4 = "migration-test4"; - Job job4 = createAndOpenJobAndStartDataFeedWithData(jobId4, dataIndex, false); - waitUntilJobIsClosed(jobId4); - - indices = indexNameExpressionResolver.concreteIndexNames(state, - IndicesOptions.strictExpandOpenAndForbidClosed(), - AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*"); - refresh(indices); - - long newJob1Total = getJobResultsCount(job1.getId()); - assertThat(newJob1Total, greaterThan(job1Total)); - - long newJob2Total = getJobResultsCount(job2.getId()); - assertThat(newJob2Total, greaterThan(job2Total)); - - long newJob3Total = getJobResultsCount(job3.getId()); - assertThat(newJob3Total, greaterThan(job3Total)); - - assertThat(getJobResultsCount(jobId4), greaterThan(0L)); - assertThat(getJobResultsCount(jobId1), equalTo(newJob1Total)); - assertThat(getJobResultsCount(jobId2), equalTo(newJob2Total)); - assertThat(getJobResultsCount(jobId3), equalTo(newJob3Total)); - } - - //I think this test name could be a little bit longer.... - public void testMigrationWithManuallyCreatedIndexThatNeedsMigrating() throws Exception { - String jobId1 = "migration-failure-test1"; - String jobId2 = "migration-failure-test2"; - String jobId3 = "migration-failure-test3"; - - String dataIndex = createDataIndex().v2(); - List jobs = createJobsWithData(jobId1, jobId2, jobId3, dataIndex); - Job job1 = jobs.get(0); - Job job2 = jobs.get(1); - Job job3 = jobs.get(2); - - String job1Index = job1.getResultsIndexName(); - String job2Index = job2.getResultsIndexName(); - String job3Index = job3.getResultsIndexName(); - - // This index name should match one of the automatically created migration indices - String manuallyCreatedIndex = job1Index + "-" + Version.CURRENT.major; - client().admin().indices().prepareCreate(manuallyCreatedIndex).execute().actionGet(); - - IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(); - - ResultsIndexUpgradeService resultsIndexUpgradeService = new ResultsIndexUpgradeService(indexNameExpressionResolver, - ThreadPool.Names.SAME, - indexMetaData -> true); //indicates that this manually created index needs migrated - - resultsIndexUpgradeService.upgrade(ESIntegTestCase.client(), - new MlUpgradeAction.Request(), - ESIntegTestCase.client().admin().cluster().prepareState().get().getState(), - ActionListener.wrap( - resp -> fail(), - exception -> { - assertThat(exception, instanceOf(IllegalStateException.class)); - assertThat(exception.getMessage(), - equalTo("Index [" + manuallyCreatedIndex + "] already exists and is not the current version.")); - } - )); - } - - public void testMigrationWithExistingIndexWithData() throws Exception { - String jobId1 = "partial-migration-test1"; - String jobId2 = "partial-migration-test2"; - String jobId3 = "partial-migration-test3"; - - String dataIndex = createDataIndex().v2(); - List jobs = createJobsWithData(jobId1, jobId2, jobId3, dataIndex); - Job job1 = jobs.get(0); - Job job2 = jobs.get(1); - Job job3 = jobs.get(2); - - String job1Index = job1.getResultsIndexName(); - String job2Index = job2.getResultsIndexName(); - String job3Index = job3.getResultsIndexName(); - - assertThat(indexExists(job1Index), is(true)); - assertThat(indexExists(job2Index), is(true)); - assertThat(indexExists(job3Index), is(true)); - - long job1Total = getJobResultsCount(job1.getId()); - long job2Total = getJobResultsCount(job2.getId()); - long job3Total = getJobResultsCount(job3.getId()); - - //lets manually create a READ index with reindexed data already - // Should still get aliased appropriately without any additional/duplicate data. - String alreadyMigratedIndex = job1Index + "-" + Version.CURRENT.major + "r"; - ReindexRequest reindexRequest = new ReindexRequest(); - reindexRequest.setSourceIndices(job1Index); - reindexRequest.setDestIndex(alreadyMigratedIndex); - client().execute(ReindexAction.INSTANCE, reindexRequest).actionGet(); - - //New write index as well, should still get aliased appropriately - String alreadyMigratedWriteIndex = job1Index + "-" + Version.CURRENT.major; - client().admin().indices().prepareCreate(alreadyMigratedWriteIndex).execute().actionGet(); - - IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(); - - ResultsIndexUpgradeService resultsIndexUpgradeService = new ResultsIndexUpgradeService(indexNameExpressionResolver, - ThreadPool.Names.SAME, - //indicates that this manually created index is already migrated and should not be included in our migration steps - indexMetaData -> !(indexMetaData.getIndex().getName().equals(alreadyMigratedIndex) || - indexMetaData.getIndex().getName().equals(alreadyMigratedWriteIndex))); - - PlainActionFuture future = PlainActionFuture.newFuture(); - - resultsIndexUpgradeService.upgrade(ESIntegTestCase.client(), - new MlUpgradeAction.Request(), - ESIntegTestCase.client().admin().cluster().prepareState().get().getState(), - future); - - AcknowledgedResponse response = future.get(); - assertThat(response.isAcknowledged(), is(true)); - - assertThat(indexExists(job1Index), is(false)); - assertThat(indexExists(job2Index), is(false)); - assertThat(indexExists(job3Index), is(false)); - - ClusterState state = admin().cluster().state(new ClusterStateRequest()).actionGet().getState(); - String[] indices = indexNameExpressionResolver.concreteIndexNames(state, - IndicesOptions.strictExpandOpenAndForbidClosed(), - AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*"); - - // Our backing index size should be four as we have a shared and custom index and upgrading doubles the number of indices - Assert.assertThat(indices.length, equalTo(4)); - refresh(indices); - - assertThat(getJobResultsCount(job1.getId()), equalTo(job1Total)); - assertThat(getJobResultsCount(job2.getId()), equalTo(job2Total)); - assertThat(getJobResultsCount(job3.getId()), equalTo(job3Total)); - - // WE should still be able to write, and the aliases should allow to read from the appropriate indices - postDataToJob(jobId1); - postDataToJob(jobId2); - postDataToJob(jobId3); - - refresh(indices); - - long newJob1Total = getJobResultsCount(job1.getId()); - assertThat(newJob1Total, greaterThan(job1Total)); - - long newJob2Total = getJobResultsCount(job2.getId()); - assertThat(newJob2Total, greaterThan(job2Total)); - - long newJob3Total = getJobResultsCount(job3.getId()); - assertThat(newJob3Total, greaterThan(job3Total)); - } - - private long getTotalDocCount(String indexName) { - SearchResponse searchResponse = ESIntegTestCase.client().prepareSearch(indexName) - .setSize(10_000) - .setTrackTotalHits(true) - .setQuery(QueryBuilders.matchAllQuery()) - .execute().actionGet(); - return searchResponse.getHits().getTotalHits().value; - } - - private long getJobResultsCount(String jobId) { - String index = AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + jobId; - return getTotalDocCount(index); - } - - private void postDataToJob(String jobId) throws Exception { - openJob(jobId); - ESTestCase.assertBusy(() -> Assert.assertEquals(getJobStats(jobId).get(0).getState(), JobState.OPENED)); - startDatafeed(jobId + "-datafeed", 0L, System.currentTimeMillis()); - waitUntilJobIsClosed(jobId); - } - - private Job createAndOpenJobAndStartDataFeedWithData(String jobId, String dataIndex, boolean isCustom) throws Exception { - Job.Builder jobbuilder = createScheduledJob(jobId); - if (isCustom) { - jobbuilder.setResultsIndexName(jobId); - } - registerJob(jobbuilder); - - Job job = putJob(jobbuilder).getResponse(); - - openJob(job.getId()); - ESTestCase.assertBusy(() -> Assert.assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED)); - - DatafeedConfig.Builder builder = createDatafeedBuilder(job.getId() + "-datafeed", - job.getId(), - Collections.singletonList(dataIndex)); - builder.setQueryDelay(TimeValue.timeValueSeconds(5)); - builder.setFrequency(TimeValue.timeValueSeconds(5)); - DatafeedConfig datafeedConfig = builder.build(); - registerDatafeed(datafeedConfig); - putDatafeed(datafeedConfig); - startDatafeed(datafeedConfig.getId(), 0L, System.currentTimeMillis()); - waitUntilJobIsClosed(jobId); - return job; - } - - private Tuple createDataIndex() { - ESIntegTestCase.client().admin().indices().prepareCreate("data-for-migration-1") - .addMapping("type", "time", "type=date") - .get(); - long numDocs = ESTestCase.randomIntBetween(32, 512); - long now = System.currentTimeMillis(); - long oneWeekAgo = now - 604800000; - long twoWeeksAgo = oneWeekAgo - 604800000; - indexDocs(logger, "data-for-migration-1", numDocs, twoWeeksAgo, oneWeekAgo); - return new Tuple<>(numDocs, "data-for-migration-1"); - } - - private List createJobsWithData(String sharedJobId1, String sharedJobId2, String customJobId, String dataIndex) throws Exception { - - Job job1 = createAndOpenJobAndStartDataFeedWithData(sharedJobId1, dataIndex, false); - Job job2 = createAndOpenJobAndStartDataFeedWithData(sharedJobId2, dataIndex, false); - Job job3 = createAndOpenJobAndStartDataFeedWithData(customJobId, dataIndex, true); - - return Arrays.asList(job1, job2, job3); - } -} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 43674d42a56e6..39316389b0496 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -96,7 +96,6 @@ import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; import org.elasticsearch.xpack.core.ml.action.PutFilterAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; -import org.elasticsearch.xpack.core.ml.action.MlUpgradeAction; import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; @@ -152,7 +151,6 @@ import org.elasticsearch.xpack.ml.action.TransportPutDatafeedAction; import org.elasticsearch.xpack.ml.action.TransportPutFilterAction; import org.elasticsearch.xpack.ml.action.TransportPutJobAction; -import org.elasticsearch.xpack.ml.action.TransportMlUpgradeAction; import org.elasticsearch.xpack.ml.action.TransportRevertModelSnapshotAction; import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction; import org.elasticsearch.xpack.ml.action.TransportStopDatafeedAction; @@ -232,7 +230,6 @@ import org.elasticsearch.xpack.ml.rest.results.RestGetInfluencersAction; import org.elasticsearch.xpack.ml.rest.results.RestGetOverallBucketsAction; import org.elasticsearch.xpack.ml.rest.results.RestGetRecordsAction; -import org.elasticsearch.xpack.ml.rest.results.RestUpgradeMlAction; import org.elasticsearch.xpack.ml.rest.validate.RestValidateDetectorAction; import org.elasticsearch.xpack.ml.rest.validate.RestValidateJobConfigAction; @@ -545,8 +542,7 @@ public List getRestHandlers(Settings settings, RestController restC new RestPutCalendarJobAction(settings, restController), new RestGetCalendarEventsAction(settings, restController), new RestPostCalendarEventAction(settings, restController), - new RestFindFileStructureAction(settings, restController), - new RestUpgradeMlAction(settings, restController) + new RestFindFileStructureAction(settings, restController) ); } @@ -604,8 +600,7 @@ public List getRestHandlers(Settings settings, RestController restC new ActionHandler<>(GetCalendarEventsAction.INSTANCE, TransportGetCalendarEventsAction.class), new ActionHandler<>(PostCalendarEventsAction.INSTANCE, TransportPostCalendarEventsAction.class), new ActionHandler<>(PersistJobAction.INSTANCE, TransportPersistJobAction.class), - new ActionHandler<>(FindFileStructureAction.INSTANCE, TransportFindFileStructureAction.class), - new ActionHandler<>(MlUpgradeAction.INSTANCE, TransportMlUpgradeAction.class) + new ActionHandler<>(FindFileStructureAction.INSTANCE, TransportFindFileStructureAction.class) ); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/ResultsIndexUpgradeService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/ResultsIndexUpgradeService.java deleted file mode 100644 index ccbaed13feca0..0000000000000 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/ResultsIndexUpgradeService.java +++ /dev/null @@ -1,513 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ElasticsearchTimeoutException; -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.ResourceAlreadyExistsException; -import org.elasticsearch.Version; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.admin.indices.alias.Alias; -import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; -import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; -import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; -import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.reindex.BulkByScrollResponse; -import org.elasticsearch.index.reindex.ReindexAction; -import org.elasticsearch.index.reindex.ReindexRequest; -import org.elasticsearch.index.reindex.ScrollableHitSource; -import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.xpack.core.ml.action.MlUpgradeAction; -import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; -import org.elasticsearch.xpack.ml.utils.TypedChainTaskExecutor; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.Function; -import java.util.function.Predicate; -import java.util.stream.Collectors; - -import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; -import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; - -/** - * ML Job results index upgrade service - */ -public class ResultsIndexUpgradeService { - - private static final Logger logger = LogManager.getLogger(ResultsIndexUpgradeService.class); - - // Adjust the following constants as necessary for various versions and backports. - private static final int INDEX_VERSION = Version.CURRENT.major; - private static final Version MIN_REQUIRED_VERSION = Version.CURRENT.minimumCompatibilityVersion(); - - private final IndexNameExpressionResolver indexNameExpressionResolver; - private final Predicate shouldUpgrade; - private final String executor; - - /** - * Construct a new upgrade service - * - * @param indexNameExpressionResolver Index expression resolver for the request - * @param executor Where to execute client calls - * @param shouldUpgrade Given IndexMetadata indicate if it should be upgraded or not - * {@code true} indicates that it SHOULD upgrade - */ - public ResultsIndexUpgradeService(IndexNameExpressionResolver indexNameExpressionResolver, - String executor, - Predicate shouldUpgrade) { - this.indexNameExpressionResolver = indexNameExpressionResolver; - this.shouldUpgrade = shouldUpgrade; - this.executor = executor; - } - - public static boolean wasIndexCreatedInCurrentMajorVersion(IndexMetaData indexMetaData) { - return indexMetaData.getCreationVersion().major == INDEX_VERSION; - } - - /** - * There are two reasons for these indices to exist: - * 1. The upgrade process has ran before and either failed for some reason, or the end user is simply running it again. - * Either way, it should be ok to proceed as this action SHOULD be idempotent, - * unless the shouldUpgrade predicate is poorly formed - * 2. This index was created manually by the user. If the index was created manually and actually needs upgrading, then - * we consider the "new index" to be invalid as the passed predicate indicates that it still needs upgrading. - * - * @param metaData Cluster metadata - * @param newIndexName The index to check - * @param shouldUpgrade Should be index be upgraded - * @return {@code true} if the "new index" is valid - */ - private static boolean validNewIndex(MetaData metaData, String newIndexName, Predicate shouldUpgrade) { - return (metaData.hasIndex(newIndexName) && shouldUpgrade.test(metaData.index(newIndexName))) == false; - } - - private static void validateMinNodeVersion(ClusterState clusterState) { - if (clusterState.nodes().getMinNodeVersion().before(MIN_REQUIRED_VERSION)) { - throw new IllegalStateException("All nodes should have at least version [" + MIN_REQUIRED_VERSION + "] to upgrade"); - } - } - - // This method copies the behavior of the normal {index}/_upgrade rest response handler - private static Tuple getStatusAndCause(BulkByScrollResponse response) { - /* - * Return the highest numbered rest status under the assumption that higher numbered statuses are "more error" - * and thus more interesting to the user. - */ - RestStatus status = RestStatus.OK; - Throwable cause = null; - if (response.isTimedOut()) { - status = RestStatus.REQUEST_TIMEOUT; - cause = new ElasticsearchTimeoutException("Reindex request timed out"); - } - for (BulkItemResponse.Failure failure : response.getBulkFailures()) { - if (failure.getStatus().getStatus() > status.getStatus()) { - status = failure.getStatus(); - cause = failure.getCause(); - } - } - for (ScrollableHitSource.SearchFailure failure : response.getSearchFailures()) { - RestStatus failureStatus = ExceptionsHelper.status(failure.getReason()); - if (failureStatus.getStatus() > status.getStatus()) { - status = failureStatus; - cause = failure.getReason(); - } - } - return new Tuple<>(status, cause); - } - - /** - * Upgrade the indices given in the request. - * - * @param client The client to use when making calls - * @param request The upgrade request - * @param state The current cluster state - * @param listener The listener to alert when actions have completed - */ - public void upgrade(Client client, MlUpgradeAction.Request request, ClusterState state, - ActionListener listener) { - try { - validateMinNodeVersion(state); - String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(state, request.indicesOptions(), request.indices()); - MetaData metaData = state.getMetaData(); - - List indicesToUpgrade = Arrays.stream(concreteIndices) - .filter(indexName -> shouldUpgrade.test(metaData.index(indexName))) - .collect(Collectors.toList()); - - // All the internal indices are up to date - if (indicesToUpgrade.isEmpty()) { - listener.onResponse(new AcknowledgedResponse(true)); - return; - } - - IndexNameAndAliasProvider indexNameAndAliasProvider = new IndexNameAndAliasProvider(indicesToUpgrade, metaData); - Exception validationException = indexNameAndAliasProvider.validate(metaData, shouldUpgrade); - if (validationException != null) { - listener.onFailure(validationException); - return; - } - - // <7> Now that we have deleted the old indices, we are complete, alert the user - ActionListener deleteIndicesListener = ActionListener.wrap( - listener::onResponse, - error -> { - String msg = "Failed to delete old indices: " + Strings.collectionToCommaDelimitedString(indicesToUpgrade); - logger.error(msg, error); - listener.onFailure(new ElasticsearchException(msg, error)); - } - ); - - // <6> Now that aliases are moved, need to delete the old indices - ActionListener readAliasListener = ActionListener.wrap( - resp -> deleteOldIndices(client, indicesToUpgrade, deleteIndicesListener), - error -> { - String msg = "Failed adjusting aliases from old indices to new."; - logger.error(msg, error); - listener.onFailure(new ElasticsearchException(msg, error)); - } - ); - - // <5> Documents are now reindexed, time to move read aliases - ActionListener reindexListener = ActionListener.wrap( - resp -> - // Need to make indices writable again so that the aliases can be removed from them - removeReadOnlyBlock(client, indicesToUpgrade, - ActionListener.wrap( - rrob -> adjustAliases(client, - indexNameAndAliasProvider.oldIndicesWithReadAliases(), - indexNameAndAliasProvider.newReadIndicesWithReadAliases(), - readAliasListener), - rrobFailure -> { - String msg = "Failed making old indices writable again so that aliases can be moved."; - logger.error(msg, rrobFailure); - listener.onFailure(new ElasticsearchException(msg, rrobFailure)); - }) - ), - error -> { - logger.error("Failed to reindex old read-only indices", error); - removeReadOnlyBlock(client, indicesToUpgrade, ActionListener.wrap( - empty -> listener.onFailure(error), - removeReadOnlyBlockError -> { - String msg = "Failed making old indices read/write again after failing to reindex: " + error.getMessage(); - logger.error(msg, removeReadOnlyBlockError); - listener.onFailure(new ElasticsearchException(msg, removeReadOnlyBlockError)); - } - )); - } - ); - - // <4> Old indexes are now readOnly, Time to reindex - ActionListener readOnlyListener = ActionListener.wrap( - ack -> reindexOldReadIndicesToNewIndices(client, indexNameAndAliasProvider.needsReindex(), request, reindexListener), - listener::onFailure - ); - - // <3> Set old indices to readOnly - ActionListener writeAliasesMovedListener = ActionListener.wrap( - resp -> setReadOnlyBlock(client, indicesToUpgrade, readOnlyListener), - listener::onFailure - ); - - // <2> Move write index alias to new write indices - ActionListener createWriteIndicesAndSetReadAliasListener = ActionListener.wrap( - resp -> adjustAliases(client, - indexNameAndAliasProvider.oldIndicesWithWriteAliases(), - indexNameAndAliasProvider.newWriteIndicesWithWriteAliases(), - writeAliasesMovedListener), - listener::onFailure - ); - - // <1> Create the new write indices and set the read aliases to include them - createNewWriteIndicesIfNecessary(client, metaData, indexNameAndAliasProvider.newWriteIndices(), - ActionListener.wrap( - indicesCreated -> adjustAliases(client, - Collections.emptyMap(), - indexNameAndAliasProvider.newWriteIndicesWithReadAliases(), - createWriteIndicesAndSetReadAliasListener), - listener::onFailure - )); - - } catch (Exception e) { - listener.onFailure(e); - } - - } - - private void createNewWriteIndicesIfNecessary(Client client, - MetaData metaData, - Collection newWriteIndices, - ActionListener createIndexListener) { - TypedChainTaskExecutor chainTaskExecutor = - new TypedChainTaskExecutor<>( - client.threadPool().executor(executor), - (createIndexResponse -> true), //We always want to complete all our tasks - (exception -> - // Short circuit execution IF the exception is NOT a ResourceAlreadyExistsException - // This should be rare, as it requires the index to be created between our previous check and this exception - exception instanceof ResourceAlreadyExistsException == false - )); - newWriteIndices.forEach((index) -> { - // If the index already exists, don't try and created it - // We have already verified that IF this index exists, that it does not require upgrading - // So, if it was created between that check and this one, we can assume it is the correct version as it was JUST created - if (metaData.hasIndex(index) == false) { - CreateIndexRequest request = new CreateIndexRequest(index); - chainTaskExecutor.add(listener -> - executeAsyncWithOrigin(client.threadPool().getThreadContext(), - ML_ORIGIN, - request, - listener, - client.admin().indices()::create)); - } - }); - - chainTaskExecutor.execute(ActionListener.wrap( - createIndexResponses -> createIndexListener.onResponse(true), - createIndexListener::onFailure - )); - } - - /** - * Makes the indices readonly if it's not set as a readonly yet - */ - private void setReadOnlyBlock(Client client, List indices, ActionListener listener) { - Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), true).build(); - UpdateSettingsRequest request = new UpdateSettingsRequest(indices.toArray(new String[0])); - request.settings(settings); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), - ML_ORIGIN, - request, - listener, - client.admin().indices()::updateSettings); - } - - private void removeReadOnlyBlock(Client client, List indices, - ActionListener listener) { - Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), false).build(); - UpdateSettingsRequest request = new UpdateSettingsRequest(indices.toArray(new String[0])); - request.settings(settings); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), - ML_ORIGIN, - request, - listener, - client.admin().indices()::updateSettings); - } - - private void reindexOldReadIndicesToNewIndices(Client client, - Map reindexIndices, - MlUpgradeAction.Request request, - ActionListener listener) { - TypedChainTaskExecutor chainTaskExecutor = - new TypedChainTaskExecutor<>( - client.threadPool().executor(executor), - (createIndexResponse) -> { // If there are errors in the reindex, we should stop - Tuple status = getStatusAndCause(createIndexResponse); - return status.v1().equals(RestStatus.OK); - }, - (exception -> true)); // Short circuit and call onFailure for any exception - - List newIndices = new ArrayList<>(reindexIndices.size()); - reindexIndices.forEach((oldIndex, newIndex) -> { - ReindexRequest reindexRequest = new ReindexRequest(); - reindexRequest.setSourceBatchSize(request.getReindexBatchSize()); - reindexRequest.setSourceIndices(oldIndex); - reindexRequest.setDestIndex(newIndex); - reindexRequest.setSourceDocTypes(ElasticsearchMappings.DOC_TYPE); - reindexRequest.setDestDocType(ElasticsearchMappings.DOC_TYPE); - // Don't worry if these indices already exist, we validated settings.index.created.version earlier - reindexRequest.setAbortOnVersionConflict(false); - // If the document exists already in the new index, don't want to update or overwrite as we are pulling from "old data" - reindexRequest.setDestOpType(DocWriteRequest.OpType.CREATE.getLowercase()); - newIndices.add(newIndex); - chainTaskExecutor.add(chainedListener -> - executeAsyncWithOrigin(client, - ML_ORIGIN, - ReindexAction.INSTANCE, - reindexRequest, - chainedListener)); - }); - - chainTaskExecutor.execute(ActionListener.wrap( - bulkScrollingResponses -> { - BulkByScrollResponse response = bulkScrollingResponses.get(bulkScrollingResponses.size() - 1); - Tuple status = getStatusAndCause(response); - if (status.v1().equals(RestStatus.OK)) { - listener.onResponse(true); - } else { - logger.error("Failed to reindex old results indices.", status.v2()); - listener.onFailure(new ElasticsearchException("Failed to reindex old results indices.",status.v2())); - } - }, - failure -> { - List createdIndices = newIndices.subList(0, chainTaskExecutor.getCollectedResponses().size()); - logger.error( - "Failed to reindex all old read indices. Successfully reindexed: [" + - Strings.collectionToCommaDelimitedString(createdIndices) + "]", - failure); - listener.onFailure(failure); - } - )); - - } - - private void deleteOldIndices(Client client, - List oldIndices, - ActionListener deleteIndicesListener) { - DeleteIndexRequest request = new DeleteIndexRequest(oldIndices.toArray(new String[0])); - request.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), - ML_ORIGIN, - request, - deleteIndicesListener, - client.admin().indices()::delete); - } - - private void adjustAliases(Client client, - Map> oldAliases, - Map> newAliases, - ActionListener indicesAliasListener) { - IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest(); - oldAliases.forEach((oldIndex, aliases) -> - { - if (aliases.isEmpty() == false) { //if the aliases are empty, that means there are none to remove - indicesAliasesRequest.addAliasAction(IndicesAliasesRequest - .AliasActions - .remove() - .index(oldIndex) - .aliases(aliases.stream().map(Alias::name).toArray(String[]::new))); - } - } - ); - newAliases.forEach((newIndex, aliases) -> - aliases.forEach(alias -> { - IndicesAliasesRequest.AliasActions action = IndicesAliasesRequest.AliasActions.add().index(newIndex); - if (alias.filter() != null) { - action.filter(alias.filter()); - } - action.alias(alias.name()); - indicesAliasesRequest.addAliasAction(action); - }) - ); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), - ML_ORIGIN, - indicesAliasesRequest, - indicesAliasListener, - client.admin().indices()::aliases); - } - - - private static class IndexNameAndAliasProvider { - - private final List oldIndices; - private final Map> writeAliases = new HashMap<>(); - private final Map> readAliases = new HashMap<>(); - - private IndexNameAndAliasProvider(List oldIndices, MetaData metaData) { - this.oldIndices = oldIndices; - oldIndices.forEach(index -> { - IndexMetaData indexMetaData = metaData.index(index); - List writes = new ArrayList<>(); - List reads = new ArrayList<>(); - indexMetaData.getAliases().forEach(aliasCursor -> { - Alias alias = new Alias(aliasCursor.value.alias()); - if (aliasCursor.value.filteringRequired()) { - alias.filter(aliasCursor.value.getFilter().string()); //Set the read alias jobId filter - } - if (alias.name().contains(".write-")) { - writes.add(alias); - } else { - reads.add(alias); - } - }); - - writeAliases.put(index, writes); - readAliases.put(index, reads); - }); - } - - private Exception validate(MetaData metaData, Predicate shouldUpgrade) { - for (String index : oldIndices) { - String newWriteName = newWriteName(index); - // If the "new" indices exist, either they were created from a previous run of the upgrade process or the end user - if (validNewIndex(metaData, newWriteName, shouldUpgrade) == false) { - return new IllegalStateException("Index [" + newWriteName + "] already exists and is not the current version."); - } - - String newReadName = newReadName(index); - if (validNewIndex(metaData, newReadName, shouldUpgrade) == false) { - return new IllegalStateException("Index [" + newReadName + "] already exists and is not the current version."); - } - } - return null; - } - - private String newReadName(String oldIndexName) { - return oldIndexName + "-" + INDEX_VERSION + "r"; - } - - private String newWriteName(String oldIndexName) { - return oldIndexName + "-" + INDEX_VERSION; - } - - private List newWriteIndices() { - return oldIndices.stream().map(this::newWriteName).collect(Collectors.toList()); - } - - private List readAliases(String oldIndex) { - return readAliases.get(oldIndex); - } - - private List writeAliases(String oldIndex) { - return writeAliases.get(oldIndex); - } - - private Map> newWriteIndicesWithReadAliases() { - return oldIndices.stream().collect(Collectors.toMap(this::newWriteName, this::readAliases)); - } - - private Map> oldIndicesWithWriteAliases() { - return writeAliases; - } - - private Map> newWriteIndicesWithWriteAliases() { - return oldIndices.stream().collect(Collectors.toMap(this::newWriteName, this::writeAliases)); - } - - private Map> oldIndicesWithReadAliases() { - return readAliases; - } - - private Map> newReadIndicesWithReadAliases() { - return oldIndices.stream().collect(Collectors.toMap(this::newReadName, this::readAliases)); - } - - private Map needsReindex() { - return oldIndices.stream().collect(Collectors.toMap(Function.identity(), this::newReadName)); - } - } -} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportMlUpgradeAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportMlUpgradeAction.java deleted file mode 100644 index 2b676277aa690..0000000000000 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportMlUpgradeAction.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.action; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.ParentTaskAssigningClient; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.TaskId; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.ml.action.MlUpgradeAction; -import org.elasticsearch.xpack.ml.ResultsIndexUpgradeService; - -import static org.elasticsearch.xpack.ml.ResultsIndexUpgradeService.wasIndexCreatedInCurrentMajorVersion; - -public class TransportMlUpgradeAction - extends TransportMasterNodeReadAction { - - private final Client client; - private final ResultsIndexUpgradeService resultsIndexUpgradeService; - - @Inject - public TransportMlUpgradeAction(TransportService transportService, ClusterService clusterService, - ThreadPool threadPool, ActionFilters actionFilters, Client client, - IndexNameExpressionResolver indexNameExpressionResolver) { - super(MlUpgradeAction.NAME, transportService, clusterService, threadPool, - actionFilters, MlUpgradeAction.Request::new, indexNameExpressionResolver); - this.client = client; - this.resultsIndexUpgradeService = new ResultsIndexUpgradeService(indexNameExpressionResolver, - executor(), - indexMetadata -> wasIndexCreatedInCurrentMajorVersion(indexMetadata) == false); - } - - @Override - protected void masterOperation(Task task, MlUpgradeAction.Request request, ClusterState state, - ActionListener listener) { - TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId()); - ParentTaskAssigningClient parentAwareClient = new ParentTaskAssigningClient(client, taskId); - try { - resultsIndexUpgradeService.upgrade(parentAwareClient, request, state, listener); - } catch (Exception e) { - listener.onFailure(e); - } - } - - @Override - protected final void masterOperation(MlUpgradeAction.Request request, ClusterState state, - ActionListener listener) { - throw new UnsupportedOperationException("the task parameter is required"); - } - - @Override - protected String executor() { - return ThreadPool.Names.SAME; - } - - @Override - protected AcknowledgedResponse newResponse() { - return new AcknowledgedResponse(); - } - - @Override - protected ClusterBlockException checkBlock(MlUpgradeAction.Request request, ClusterState state) { - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); - } -} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/results/RestUpgradeMlAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/results/RestUpgradeMlAction.java deleted file mode 100644 index cad82ce325c27..0000000000000 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/results/RestUpgradeMlAction.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.rest.results; - -import org.apache.logging.log4j.LogManager; -import org.elasticsearch.client.node.NodeClient; -import org.elasticsearch.common.logging.DeprecationLogger; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.rest.BaseRestHandler; -import org.elasticsearch.rest.BytesRestResponse; -import org.elasticsearch.rest.RestController; -import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.rest.action.RestToXContentListener; -import org.elasticsearch.tasks.LoggingTaskListener; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.xpack.core.ml.action.MlUpgradeAction; -import org.elasticsearch.xpack.ml.MachineLearning; - -import java.io.IOException; - -import static org.elasticsearch.rest.RestRequest.Method.POST; - -public class RestUpgradeMlAction extends BaseRestHandler { - - private static final DeprecationLogger deprecationLogger = - new DeprecationLogger(LogManager.getLogger(RestUpgradeMlAction.class)); - - public RestUpgradeMlAction(Settings settings, RestController controller) { - super(settings); - controller.registerWithDeprecatedHandler( - POST, - MachineLearning.BASE_PATH + "_upgrade", - this, - POST, - MachineLearning.PRE_V7_BASE_PATH + "_upgrade", - deprecationLogger); - } - - @Override - public String getName() { - return "xpack_ml_upgrade_action"; - } - - @Override - protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { - MlUpgradeAction.Request parsedRequest = new MlUpgradeAction.Request(); - if (restRequest.hasContent()) { - XContentParser parser = restRequest.contentParser(); - parsedRequest = MlUpgradeAction.Request.PARSER.apply(parser, null); - } - final MlUpgradeAction.Request upgradeRequest = parsedRequest; - - if (restRequest.paramAsBoolean("wait_for_completion", false)) { - return channel -> client.execute(MlUpgradeAction.INSTANCE, upgradeRequest, new RestToXContentListener<>(channel)); - } else { - upgradeRequest.setShouldStoreResult(true); - - Task task = client.executeLocally(MlUpgradeAction.INSTANCE, upgradeRequest, LoggingTaskListener.instance()); - // Send task description id instead of waiting for the message - return channel -> { - try (XContentBuilder builder = channel.newBuilder()) { - builder.startObject(); - builder.field("task", client.getLocalNodeId() + ":" + task.getId()); - builder.endObject(); - channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); - } - }; - } - } -} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.upgrade.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.upgrade.json deleted file mode 100644 index b67b125bb692a..0000000000000 --- a/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.upgrade.json +++ /dev/null @@ -1,21 +0,0 @@ -{ - "ml.upgrade": { - "documentation": "TODO", - "methods": [ "POST" ], - "url": { - "path": "/_ml/_upgrade", - "paths": [ "/_ml/_upgrade" ], - "params": { - "wait_for_completion": { - "type": "boolean", - "description": "Should this request wait until the operation has completed before returning", - "default": false - } - } - }, - "body": { - "description" : "Upgrade options", - "required" : false - } - } -} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/ml_upgrade.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/ml_upgrade.yml deleted file mode 100644 index ee1f9f77f9325..0000000000000 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/ml_upgrade.yml +++ /dev/null @@ -1,70 +0,0 @@ -setup: - - skip: - features: headers - - do: - headers: - Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser - ml.put_job: - job_id: jobs-upgrade-results - body: > - { - "analysis_config" : { - "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] - }, - "data_description" : { - "format":"xcontent", - "time_field":"time" - } - } - - - do: - headers: - Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser - Content-Type: application/json - index: - index: .ml-anomalies-jobs-upgrade-results - type: doc - id: "jobs-upgrade-results_1464739200000_1" - body: - { - "job_id": "jobs-upgrade-results", - "result_type": "bucket", - "timestamp": "2016-06-01T00:00:00Z", - "anomaly_score": 90.0, - "bucket_span":1 - } - - - do: - headers: - Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser - indices.refresh: - index: .ml-anomalies-jobs-upgrade-results - ---- -"Upgrade results when there is nothing to upgrade": - - do: - ml.upgrade: - wait_for_completion: true - - - match: { acknowledged: true } - - - do: - indices.exists: - index: .ml-anomalies-shared - - - is_true: '' ---- -"Upgrade results when there is nothing to upgrade not waiting for results": - - do: - ml.upgrade: - wait_for_completion: false - - - match: {task: '/.+:\d+/'} - - set: {task: task} - - - do: - tasks.get: - wait_for_completion: true - task_id: $task - - match: {completed: true} - - match: {response.acknowledged: true} diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_ml_results_upgrade.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_ml_results_upgrade.yml deleted file mode 100644 index 73478be65597e..0000000000000 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/mixed_cluster/80_ml_results_upgrade.yml +++ /dev/null @@ -1,11 +0,0 @@ ---- -"Verify jobs exist": - - do: - ml.get_jobs: - job_id: old-cluster-job-to-upgrade - - match: { count: 1 } - - - do: - ml.get_jobs: - job_id: old-cluster-job-to-upgrade-custom - - match: { count: 1 } diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/80_ml_results_upgrade.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/80_ml_results_upgrade.yml deleted file mode 100644 index d21b5e6def61d..0000000000000 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/old_cluster/80_ml_results_upgrade.yml +++ /dev/null @@ -1,120 +0,0 @@ ---- -"Put job on the old cluster and post some data": - - - do: - ml.put_job: - job_id: old-cluster-job-to-upgrade - body: > - { - "description":"Cluster upgrade", - "analysis_config" : { - "bucket_span": "60s", - "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] - }, - "analysis_limits" : { - "model_memory_limit": "50mb" - }, - "data_description" : { - "format":"xcontent", - "time_field":"time", - "time_format":"epoch" - } - } - - match: { job_id: old-cluster-job-to-upgrade } - - - do: - ml.open_job: - job_id: old-cluster-job-to-upgrade - - - do: - ml.post_data: - job_id: old-cluster-job-to-upgrade - body: - - airline: AAL - responsetime: 132.2046 - sourcetype: post-data-job - time: 1403481600 - - airline: JZA - responsetime: 990.4628 - sourcetype: post-data-job - time: 1403481700 - - match: { processed_record_count: 2 } - - - do: - ml.close_job: - job_id: old-cluster-job-to-upgrade - - - do: - ml.get_buckets: - job_id: old-cluster-job-to-upgrade - - match: { count: 1 } - -# Wait for indices to be fully allocated before -# killing the node - - do: - cluster.health: - index: [".ml-state", ".ml-anomalies-shared"] - wait_for_status: green - ---- -"Put job on the old cluster with a custom index": - - do: - ml.put_job: - job_id: old-cluster-job-to-upgrade-custom - body: > - { - "description":"Cluster upgrade", - "analysis_config" : { - "bucket_span": "60s", - "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] - }, - "analysis_limits" : { - "model_memory_limit": "50mb" - }, - "data_description" : { - "format":"xcontent", - "time_field":"time", - "time_format":"epoch" - }, - "results_index_name": "old-cluster-job-to-upgrade-custom" - } - - match: { job_id: old-cluster-job-to-upgrade-custom } - - - do: - ml.open_job: - job_id: old-cluster-job-to-upgrade-custom - - - do: - ml.post_data: - job_id: old-cluster-job-to-upgrade-custom - body: - - airline: AAL - responsetime: 132.2046 - sourcetype: post-data-job - time: 1403481600 - - airline: JZA - responsetime: 990.4628 - sourcetype: post-data-job - time: 1403481700 - - airline: JZA - responsetime: 423.0000 - sourcetype: post-data-job - time: 1403481800 - - match: { processed_record_count: 3 } - - - do: - ml.close_job: - job_id: old-cluster-job-to-upgrade-custom - - - do: - ml.get_buckets: - job_id: old-cluster-job-to-upgrade-custom - - match: { count: 3 } - -# Wait for indices to be fully allocated before -# killing the node - - do: - cluster.health: - index: [".ml-state", ".ml-anomalies-old-cluster-job-to-upgrade-custom"] - wait_for_status: green - diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_ml_results_upgrade.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_ml_results_upgrade.yml deleted file mode 100644 index f049b9c073ad8..0000000000000 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_ml_results_upgrade.yml +++ /dev/null @@ -1,158 +0,0 @@ ---- -"Migrate results data to latest index binary version": - # Verify that all the results are there and the typical indices exist - - do: - ml.get_buckets: - job_id: old-cluster-job-to-upgrade - - match: { count: 1 } - - - do: - ml.get_buckets: - job_id: old-cluster-job-to-upgrade-custom - - match: { count: 3 } - - - do: - indices.exists: - index: .ml-anomalies-shared - - - is_true: '' - - - do: - indices.get_settings: - index: .ml-anomalies-shared - name: index.version.created - - - match: { \.ml-anomalies-shared.settings.index.version.created: '/6\d+/' } - - - do: - indices.exists: - index: .ml-anomalies-custom-old-cluster-job-to-upgrade-custom - - - is_true: '' - - # Do the upgrade - - do: - ml.upgrade: - wait_for_completion: true - - - match: { acknowledged: true } - - # Verify that old indices are gone - - do: - indices.exists: - index: .ml-anomalies-shared - - - is_false: '' - - - do: - indices.exists: - index: .ml-anomalies-custom-old-cluster-job-to-upgrade-custom - - - is_false: '' - - # Verify that results can still be retrieved - - - do: - indices.refresh: {} - - - do: - ml.get_buckets: - job_id: old-cluster-job-to-upgrade - - match: { count: 1 } - - - do: - ml.get_buckets: - job_id: old-cluster-job-to-upgrade-custom - - match: { count: 3 } - - # Verify the created version is correct - - - do: - indices.get_settings: - index: .ml-anomalies-old-cluster-job-to-upgrade - name: index.version.created - - match: { \.ml-anomalies-shared-7.settings.index.version.created: '/7\d+/' } - - match: { \.ml-anomalies-shared-7r.settings.index.version.created: '/7\d+/' } - - - do: - indices.get_settings: - index: .ml-anomalies-old-cluster-job-to-upgrade-custom - name: index.version.created - - match: { \.ml-anomalies-custom-old-cluster-job-to-upgrade-custom-7.settings.index.version.created: '/7\d+/' } - - match: { \.ml-anomalies-custom-old-cluster-job-to-upgrade-custom-7r.settings.index.version.created: '/7\d+/' } - - # Create a new job to verify that the .ml-anomalies-shared index gets created again without issues - - - do: - ml.put_job: - job_id: upgraded-cluster-job-should-not-upgrade - body: > - { - "description":"Cluster upgrade", - "analysis_config" : { - "bucket_span": "60s", - "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] - }, - "analysis_limits" : { - "model_memory_limit": "50mb" - }, - "data_description" : { - "format":"xcontent", - "time_field":"time", - "time_format":"epoch" - } - } - - match: { job_id: upgraded-cluster-job-should-not-upgrade } - - - do: - ml.open_job: - job_id: upgraded-cluster-job-should-not-upgrade - - - do: - ml.post_data: - job_id: upgraded-cluster-job-should-not-upgrade - body: - - airline: AAL - responsetime: 132.2046 - sourcetype: post-data-job - time: 1403481600 - - airline: JZA - responsetime: 990.4628 - sourcetype: post-data-job - time: 1403481700 - - match: { processed_record_count: 2 } - - - do: - ml.close_job: - job_id: upgraded-cluster-job-should-not-upgrade - - - do: - ml.get_buckets: - job_id: upgraded-cluster-job-should-not-upgrade - - match: { count: 1 } - - - do: - indices.exists: - index: .ml-anomalies-shared - - - is_true: '' - - - do: - indices.get_settings: - index: .ml-anomalies-shared - name: index.version.created - - - match: { \.ml-anomalies-shared.settings.index.version.created: '/7\d+/' } - - # Do the upgrade Again as nothing needs upgraded now - - do: - ml.upgrade: - wait_for_completion: true - - - match: { acknowledged: true } - - - do: - indices.exists: - index: .ml-anomalies-shared - - - is_true: ''