From e6e78fe09ede39d47d8b0748344234c619b43d84 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 16 Oct 2018 11:46:56 +0100 Subject: [PATCH] [ML] Adjust finalize job action to work with documents (#34226) --- .../elasticsearch/xpack/core/ml/MlTasks.java | 2 +- .../xpack/core/ml/action/UpdateJobAction.java | 21 ++--- .../xpack/core/ml/job/config/JobUpdate.java | 4 +- .../action/UpdateJobActionRequestTests.java | 10 ++- .../core/ml/job/config/JobUpdateTests.java | 2 +- .../ml/action/TransportCloseJobAction.java | 9 +- .../TransportFinalizeJobExecutionAction.java | 46 +--------- .../ml/action/TransportOpenJobAction.java | 2 +- .../ml/job/persistence/JobConfigProvider.java | 5 +- .../job/persistence/JobResultsPersister.java | 4 +- .../output/AutoDetectResultProcessor.java | 86 ++++++++++++------- .../action/TransportCloseJobActionTests.java | 2 +- .../ml/job/persistence/MockClientBuilder.java | 2 +- .../AutoDetectResultProcessorTests.java | 23 +++-- 14 files changed, 100 insertions(+), 118 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java index a56d3d639239d..46685001153d7 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java @@ -97,7 +97,7 @@ public static Set openJobIds(PersistentTasksCustomMetaData tasks) { * Is there an ml anomaly detector job task for the job {@code jobId}? * @param jobId The job id * @param tasks Persistent tasks - * @return + * @return True if the job has a task */ public static boolean taskExistsForJob(String jobId, PersistentTasksCustomMetaData tasks) { return openJobIds(tasks).contains(jobId); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateJobAction.java index d4fe804c451af..f7e2e514e5769 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateJobAction.java @@ -49,7 +49,6 @@ public static UpdateJobAction.Request parseRequest(String jobId, XContentParser /** Indicates an update that was not triggered by a user */ private boolean isInternal; - private boolean waitForAck = true; public Request(String jobId, JobUpdate update) { this(jobId, update, false); @@ -83,14 +82,6 @@ public boolean isInternal() { return isInternal; } - public boolean isWaitForAck() { - return waitForAck; - } - - public void setWaitForAck(boolean waitForAck) { - this.waitForAck = waitForAck; - } - @Override public ActionRequestValidationException validate() { return null; @@ -106,10 +97,9 @@ public void readFrom(StreamInput in) throws IOException { } else { isInternal = false; } - if (in.getVersion().onOrAfter(Version.V_6_3_0)) { - waitForAck = in.readBoolean(); - } else { - waitForAck = true; + // TODO jindex change CURRENT to specific version when feature branch is merged + if (in.getVersion().onOrAfter(Version.V_6_3_0) && in.getVersion().before(Version.CURRENT)) { + in.readBoolean(); // was waitForAck } } @@ -121,8 +111,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_6_2_2)) { out.writeBoolean(isInternal); } - if (out.getVersion().onOrAfter(Version.V_6_3_0)) { - out.writeBoolean(waitForAck); + // TODO jindex change CURRENT to specific version when feature branch is merged + if (out.getVersion().onOrAfter(Version.V_6_3_0) && out.getVersion().before(Version.CURRENT)) { + out.writeBoolean(false); // was waitForAck } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java index 37fbf3dab14d8..44e20846f9aa3 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java @@ -60,7 +60,7 @@ public class JobUpdate implements Writeable, ToXContentObject { INTERNAL_PARSER.declareLong(Builder::setEstablishedModelMemory, Job.ESTABLISHED_MODEL_MEMORY); INTERNAL_PARSER.declareString(Builder::setModelSnapshotMinVersion, Job.MODEL_SNAPSHOT_MIN_VERSION); INTERNAL_PARSER.declareString(Builder::setJobVersion, Job.JOB_VERSION); - INTERNAL_PARSER.declareBoolean(Builder::setClearJobFinishTime, CLEAR_JOB_FINISH_TIME); + INTERNAL_PARSER.declareBoolean(Builder::setClearFinishTime, CLEAR_JOB_FINISH_TIME); } private final String jobId; @@ -753,7 +753,7 @@ public Builder setJobVersion(String version) { return this; } - public Builder setClearJobFinishTime(boolean clearJobFinishTime) { + public Builder setClearFinishTime(boolean clearJobFinishTime) { this.clearJobFinishTime = clearJobFinishTime; return this; } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/UpdateJobActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/UpdateJobActionRequestTests.java index 3b09017147886..20d27f03d0c29 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/UpdateJobActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/UpdateJobActionRequestTests.java @@ -18,8 +18,14 @@ protected UpdateJobAction.Request createTestInstance() { // no need to randomize JobUpdate this is already tested in: JobUpdateTests JobUpdate.Builder jobUpdate = new JobUpdate.Builder(jobId); jobUpdate.setAnalysisLimits(new AnalysisLimits(100L, 100L)); - UpdateJobAction.Request request = new UpdateJobAction.Request(jobId, jobUpdate.build()); - request.setWaitForAck(randomBoolean()); + UpdateJobAction.Request request; + if (randomBoolean()) { + request = new UpdateJobAction.Request(jobId, jobUpdate.build()); + } else { + // this call sets isInternal = true + request = UpdateJobAction.Request.internal(jobId, jobUpdate.build()); + } + return request; } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java index 35f72ced1e13b..c6eb42038901b 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java @@ -97,7 +97,7 @@ public JobUpdate createRandom(String jobId, @Nullable Job job) { update.setJobVersion(randomFrom(Version.CURRENT, Version.V_6_2_0, Version.V_6_1_0)); } if (useInternalParser) { - update.setClearJobFinishTime(randomBoolean()); + update.setClearFinishTime(randomBoolean()); } return update.build(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java index a31be5f3d7d5b..e6cc8b69e1bd1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java @@ -29,7 +29,6 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; -import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; @@ -49,9 +48,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; -import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; - public class TransportCloseJobAction extends TransportTasksAction { @@ -427,10 +423,7 @@ void waitForJobClosed(CloseJobAction.Request request, WaitForCloseRequest waitFo }, request.getCloseTimeout(), new ActionListener() { @Override public void onResponse(Boolean result) { - FinalizeJobExecutionAction.Request finalizeRequest = new FinalizeJobExecutionAction.Request( - waitForCloseRequest.jobsToFinalize.toArray(new String[0])); - executeAsyncWithOrigin(client, ML_ORIGIN, FinalizeJobExecutionAction.INSTANCE, finalizeRequest, - ActionListener.wrap(r -> listener.onResponse(response), listener::onFailure)); + listener.onResponse(response); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java index fb56e61983973..c9fdd7b18fb53 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java @@ -10,22 +10,15 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.XPackPlugin; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction; -import org.elasticsearch.xpack.core.ml.job.config.Job; - -import java.util.Date; public class TransportFinalizeJobExecutionAction extends TransportMasterNodeAction { @@ -51,41 +44,10 @@ protected AcknowledgedResponse newResponse() { @Override protected void masterOperation(FinalizeJobExecutionAction.Request request, ClusterState state, - ActionListener listener) throws Exception { - String jobIdString = String.join(",", request.getJobIds()); - String source = "finalize_job_execution [" + jobIdString + "]"; - logger.debug("finalizing jobs [{}]", jobIdString); - clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - XPackPlugin.checkReadyForXPackCustomMetadata(currentState); - MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState); - MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata); - Date finishedTime = new Date(); - - for (String jobId : request.getJobIds()) { - Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId)); - jobBuilder.setFinishedTime(finishedTime); - mlMetadataBuilder.putJob(jobBuilder.build(), true); - } - ClusterState.Builder builder = ClusterState.builder(currentState); - return builder.metaData(new MetaData.Builder(currentState.metaData()) - .putCustom(MlMetadata.TYPE, mlMetadataBuilder.build())) - .build(); - } - - @Override - public void onFailure(String source, Exception e) { - listener.onFailure(e); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, - ClusterState newState) { - logger.debug("finalized job [{}]", jobIdString); - listener.onResponse(new AcknowledgedResponse(true)); - } - }); + ActionListener listener) { + // This action is no longer required but needs to be preserved + // in case it is called by an old node in a mixed cluster + listener.onResponse(new AcknowledgedResponse(true)); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index ebabcab59d674..daa1c789f12ab 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -563,7 +563,7 @@ public void onTimeout(TimeValue timeout) { } private void clearJobFinishedTime(String jobId, ActionListener listener) { - JobUpdate update = new JobUpdate.Builder(jobId).setClearJobFinishTime(true).build(); + JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build(); jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap( job -> listener.onResponse(new AcknowledgedResponse(true)), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java index feab1e84a0146..26e9ee3019b04 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java @@ -237,7 +237,9 @@ public void onFailure(Exception e) { * * @param jobId The Id of the job to update * @param update The job update - * @param maxModelMemoryLimit The maximum model memory allowed + * @param maxModelMemoryLimit The maximum model memory allowed. This can be {@code null} + * if the job's {@link org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits} + * are not changed. * @param updatedJobListener Updated job listener */ public void updateJob(String jobId, JobUpdate update, ByteSizeValue maxModelMemoryLimit, ActionListener updatedJobListener) { @@ -373,7 +375,6 @@ private void indexUpdatedJob(Job updatedJob, long version, ActionListener u } } - /** * Check a job exists. A job exists if it has a configuration document. * If the .ml-config index does not exist it is treated as a missing job diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java index 233a2b4078ac7..9efdbc1975716 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java @@ -242,10 +242,10 @@ public void persistQuantiles(Quantiles quantiles, WriteRequest.RefreshPolicy ref /** * Persist a model snapshot description */ - public void persistModelSnapshot(ModelSnapshot modelSnapshot, WriteRequest.RefreshPolicy refreshPolicy) { + public IndexResponse persistModelSnapshot(ModelSnapshot modelSnapshot, WriteRequest.RefreshPolicy refreshPolicy) { Persistable persistable = new Persistable(modelSnapshot.getJobId(), modelSnapshot, ModelSnapshot.documentId(modelSnapshot)); persistable.setRefreshPolicy(refreshPolicy); - persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(modelSnapshot.getJobId())).actionGet(); + return persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(modelSnapshot.getJobId())).actionGet(); } /** diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index b86ec4de8257f..e07e6d6bc7e6d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -8,7 +8,12 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.update.UpdateAction; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.logging.Loggers; @@ -18,10 +23,10 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.xpack.core.ml.MachineLearningField; -import org.elasticsearch.xpack.core.ml.action.PutJobAction; -import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; -import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; +import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.messages.Messages; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; @@ -34,17 +39,20 @@ import org.elasticsearch.xpack.core.ml.job.results.Influencer; import org.elasticsearch.xpack.core.ml.job.results.ModelPlot; import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer; import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import org.elasticsearch.xpack.ml.notifications.Auditor; import java.time.Duration; +import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; @@ -164,7 +172,7 @@ public void process(AutodetectProcess process) { } LOGGER.info("[{}] {} buckets parsed from autodetect output", jobId, bucketCount); - runEstablishedModelMemoryUpdate(true); + onAutodetectClose(); } catch (Exception e) { failed = true; @@ -269,8 +277,10 @@ void processResult(Context context, AutodetectResult result) { ModelSnapshot modelSnapshot = result.getModelSnapshot(); if (modelSnapshot != null) { // We need to refresh in order for the snapshot to be available when we try to update the job with it - persister.persistModelSnapshot(modelSnapshot, WriteRequest.RefreshPolicy.IMMEDIATE); - updateModelSnapshotOnJob(modelSnapshot); + IndexResponse indexResponse = persister.persistModelSnapshot(modelSnapshot, WriteRequest.RefreshPolicy.IMMEDIATE); + if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) { + updateModelSnapshotOnJob(modelSnapshot); + } } Quantiles quantiles = result.getQuantiles(); if (quantiles != null) { @@ -334,12 +344,6 @@ private void notifyModelMemoryStatusChange(Context context, ModelSizeStats model } protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) { - JobUpdate update = new JobUpdate.Builder(jobId) - .setModelSnapshotId(modelSnapshot.getSnapshotId()) - .setModelSnapshotMinVersion(modelSnapshot.getMinVersion()) - .build(); - UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update); - try { // This blocks the main processing thread in the unlikely event // there are 2 model snapshots queued up. But it also has the @@ -351,20 +355,25 @@ protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) { return; } - executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, new ActionListener() { - @Override - public void onResponse(PutJobAction.Response response) { - updateModelSnapshotSemaphore.release(); - LOGGER.debug("[{}] Updated job with model snapshot id [{}]", jobId, modelSnapshot.getSnapshotId()); - } + Map update = new HashMap<>(); + update.put(Job.MODEL_SNAPSHOT_ID.getPreferredName(), modelSnapshot.getSnapshotId()); + update.put(Job.MODEL_SNAPSHOT_MIN_VERSION.getPreferredName(), modelSnapshot.getMinVersion().toString()); - @Override - public void onFailure(Exception e) { - updateModelSnapshotSemaphore.release(); - LOGGER.error("[" + jobId + "] Failed to update job with new model snapshot id [" + - modelSnapshot.getSnapshotId() + "]", e); - } - }); + updateJob(jobId, Collections.singletonMap(Job.MODEL_SNAPSHOT_ID.getPreferredName(), modelSnapshot.getSnapshotId()), + new ActionListener() { + @Override + public void onResponse(UpdateResponse updateResponse) { + updateModelSnapshotSemaphore.release(); + LOGGER.debug("[{}] Updated job with model snapshot id [{}]", jobId, modelSnapshot.getSnapshotId()); + } + + @Override + public void onFailure(Exception e) { + updateModelSnapshotSemaphore.release(); + LOGGER.error("[" + jobId + "] Failed to update job with new model snapshot id [" + + modelSnapshot.getSnapshotId() + "]", e); + } + }); } /** @@ -422,6 +431,13 @@ private synchronized void runEstablishedModelMemoryUpdate(boolean cancelExisting } } + private void onAutodetectClose() { + updateJob(jobId, Collections.singletonMap(Job.FINISHED_TIME.getPreferredName(), new Date()), ActionListener.wrap( + r -> runEstablishedModelMemoryUpdate(true), + e -> LOGGER.error("[" + jobId + "] Failed to finalize job on autodetect close", e)) + ); + } + private void updateEstablishedModelMemoryOnJob() { // Copy these before committing writes, so the calculation is done based on committed documents @@ -433,14 +449,10 @@ private void updateEstablishedModelMemoryOnJob() { jobResultsProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, modelSizeStatsForCalc, establishedModelMemory -> { if (latestEstablishedModelMemory != establishedModelMemory) { - JobUpdate update = new JobUpdate.Builder(jobId).setEstablishedModelMemory(establishedModelMemory).build(); - UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update); - updateRequest.setWaitForAck(false); - - executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, - new ActionListener() { + updateJob(jobId, Collections.singletonMap(Job.ESTABLISHED_MODEL_MEMORY.getPreferredName(), establishedModelMemory), + new ActionListener() { @Override - public void onResponse(PutJobAction.Response response) { + public void onResponse(UpdateResponse response) { latestEstablishedModelMemory = establishedModelMemory; LOGGER.debug("[{}] Updated job with established model memory [{}]", jobId, establishedModelMemory); } @@ -455,6 +467,14 @@ public void onFailure(Exception e) { }, e -> LOGGER.error("[" + jobId + "] Failed to calculate established model memory", e)); } + private void updateJob(String jobId, Map update, ActionListener listener) { + UpdateRequest updateRequest = new UpdateRequest(AnomalyDetectorsIndex.configIndexName(), + ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId)); + updateRequest.retryOnConflict(3); + updateRequest.doc(update); + executeAsyncWithOrigin(client, ML_ORIGIN, UpdateAction.INSTANCE, updateRequest, listener); + } + public void awaitCompletion() throws TimeoutException { try { // Although the results won't take 30 minutes to finish, the pipe won't be closed diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java index 81dfa38148fad..b7d73b79829ce 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java @@ -272,7 +272,7 @@ public static void addTask(String datafeedId, long startTime, String nodeId, Dat private TransportCloseJobAction createAction() { return new TransportCloseJobAction(Settings.EMPTY, - mock(TransportService.class), mock(ThreadPool.class), mock(ActionFilters.class), + mock(TransportService.class), mock(ThreadPool.class), mock(ActionFilters.class), clusterService, mock(Client.class), mock(Auditor.class), mock(PersistentTasksService.class), jobConfigProvider, datafeedConfigProvider); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java index 726b815728f52..c10af20aba79f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java @@ -274,7 +274,7 @@ public MockClientBuilder prepareSearch(String index, String type, int from, int * Creates a {@link SearchResponse} with a {@link SearchHit} for each element of {@code docs} * @param indexName Index being searched * @param docs Returned in the SearchResponse - * @return + * @return this */ @SuppressWarnings("unchecked") public MockClientBuilder prepareSearch(String indexName, List docs) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index 05b6bc7209b87..37bd367c28e2c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -8,17 +8,20 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.Version; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.update.UpdateAction; +import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; -import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; +import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; @@ -29,14 +32,15 @@ import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition; import org.elasticsearch.xpack.core.ml.job.results.Influencer; import org.elasticsearch.xpack.core.ml.job.results.ModelPlot; -import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; +import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer; import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.junit.After; import org.junit.Before; +import org.mockito.ArgumentCaptor; import org.mockito.InOrder; import java.time.Duration; @@ -50,6 +54,7 @@ import java.util.concurrent.TimeoutException; import java.util.function.Consumer; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; @@ -67,7 +72,7 @@ public class AutoDetectResultProcessorTests extends ESTestCase { - private static final String JOB_ID = "_id"; + private static final String JOB_ID = "valid_id"; private static final long BUCKET_SPAN_MS = 1000; private ThreadPool threadPool; @@ -90,6 +95,8 @@ public void setUpMocks() { when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); renormalizer = mock(Renormalizer.class); persister = mock(JobResultsPersister.class); + when(persister.persistModelSnapshot(any(), any())) + .thenReturn(new IndexResponse(new ShardId("ml", "uid", 0), "doc", "1", 0L, 0L, 0L, true)); jobResultsProvider = mock(JobResultsProvider.class); flushListener = mock(FlushListener.class); processorUnderTest = new AutoDetectResultProcessor(client, auditor, JOB_ID, renormalizer, persister, jobResultsProvider, @@ -411,11 +418,13 @@ public void testProcessResult_modelSnapshot() { processorUnderTest.processResult(context, result); verify(persister, times(1)).persistModelSnapshot(modelSnapshot, WriteRequest.RefreshPolicy.IMMEDIATE); - UpdateJobAction.Request expectedJobUpdateRequest = UpdateJobAction.Request.internal(JOB_ID, - new JobUpdate.Builder(JOB_ID).setModelSnapshotId("a_snapshot_id").setModelSnapshotMinVersion(Version.CURRENT).build()); - verify(client).execute(same(UpdateJobAction.INSTANCE), eq(expectedJobUpdateRequest), any()); + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(UpdateRequest.class); + verify(client).execute(same(UpdateAction.INSTANCE), requestCaptor.capture(), any()); verifyNoMoreInteractions(persister); + + UpdateRequest capturedRequest = requestCaptor.getValue(); + assertThat(capturedRequest.doc().sourceAsMap().keySet(), contains(Job.MODEL_SNAPSHOT_ID.getPreferredName())); } public void testProcessResult_quantiles_givenRenormalizationIsEnabled() {