diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java index 0a2c6440bf0cc..ed9d715b5f78c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java @@ -93,7 +93,7 @@ public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, Consumer finishHandler) { executorServiceForJob.execute(() -> { ProcessContext processContext = new ProcessContext(config.getId()); - synchronized (this) { + synchronized (processContextByAllocation) { if (task.isStopping()) { // The task was requested to stop before we created the process context finishHandler.accept(null); @@ -295,14 +295,17 @@ private void closeProcess(DataFrameAnalyticsTask task) { processContext.process.close(); LOGGER.info("[{}] Closed process", configId); } catch (Exception e) { - String errorMsg = new ParameterizedMessage("[{}] Error closing data frame analyzer process [{}]" - , configId, e.getMessage()).getFormattedMessage(); + String errorMsg = new ParameterizedMessage( + "[{}] Error closing data frame analyzer process [{}]", configId, e.getMessage()).getFormattedMessage(); processContext.setFailureReason(errorMsg); } } - public synchronized void stop(DataFrameAnalyticsTask task) { - ProcessContext processContext = processContextByAllocation.get(task.getAllocationId()); + public void stop(DataFrameAnalyticsTask task) { + ProcessContext processContext; + synchronized (processContextByAllocation) { + processContext = processContextByAllocation.get(task.getAllocationId()); + } if (processContext != null) { LOGGER.debug("[{}] Stopping process", task.getParams().getId()); processContext.stop();