diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index 09a7f3c11040d..bdac41cd9b96d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.Environment; @@ -176,7 +177,7 @@ public void close(boolean restart, String reason) { // In this case the original exception is spurious and highly misleading throw ExceptionsHelper.conflictStatusException("Close job interrupted by kill request"); } else { - throw new ElasticsearchException(e); + throw FutureUtils.rethrowExecutionException(e); } } finally { destroyCategorizationAnalyzer(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index 67eccb1caef41..da5e70112f045 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -14,6 +14,9 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.xpack.core.ml.MachineLearningField; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; @@ -30,6 +33,7 @@ import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats; import org.elasticsearch.xpack.core.ml.job.results.Influencer; import org.elasticsearch.xpack.core.ml.job.results.ModelPlot; +import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; @@ -43,6 +47,7 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -71,6 +76,13 @@ public class AutoDetectResultProcessor { private static final Logger LOGGER = Loggers.getLogger(AutoDetectResultProcessor.class); + /** + * This is how far behind real-time we'll update the job with the latest established model memory. + * If more updates are received during the delay period then they'll take precedence. + * As a result there will be at most one update of established model memory per delay period. + */ + private static final TimeValue ESTABLISHED_MODEL_MEMORY_UPDATE_DELAY = TimeValue.timeValueSeconds(5); + private final Client client; private final Auditor auditor; private final String jobId; @@ -90,8 +102,10 @@ public class AutoDetectResultProcessor { * New model size stats are read as the process is running */ private volatile ModelSizeStats latestModelSizeStats; + private volatile Date latestDateForEstablishedModelMemoryCalc; private volatile long latestEstablishedModelMemory; private volatile boolean haveNewLatestModelSizeStats; + private Future scheduledEstablishedModelMemoryUpdate; // only accessed in synchronized methods public AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer, JobResultsPersister persister, JobProvider jobProvider, ModelSizeStats latestModelSizeStats, boolean restoredSnapshot) { @@ -148,6 +162,7 @@ public void process(AutodetectProcess process) { } LOGGER.info("[{}] {} buckets parsed from autodetect output", jobId, bucketCount); + runEstablishedModelMemoryUpdate(true); } catch (Exception e) { failed = true; @@ -194,15 +209,15 @@ void processResult(Context context, AutodetectResult result) { // persist after deleting interim results in case the new // results are also interim context.bulkResultsPersister.persistBucket(bucket).executeRequest(); + latestDateForEstablishedModelMemoryCalc = bucket.getTimestamp(); ++bucketCount; // if we haven't previously set established model memory, consider trying again after - // a reasonable amount of time has elapsed since the last model size stats update + // a reasonable number of buckets have elapsed since the last model size stats update long minEstablishedTimespanMs = JobProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE * bucket.getBucketSpan() * 1000L; - if (haveNewLatestModelSizeStats && latestEstablishedModelMemory == 0 - && bucket.getTimestamp().getTime() > latestModelSizeStats.getTimestamp().getTime() + minEstablishedTimespanMs) { - persister.commitResultWrites(context.jobId); - updateEstablishedModelMemoryOnJob(bucket.getTimestamp(), latestModelSizeStats); + if (haveNewLatestModelSizeStats && latestEstablishedModelMemory == 0 && latestDateForEstablishedModelMemoryCalc.getTime() + > latestModelSizeStats.getTimestamp().getTime() + minEstablishedTimespanMs) { + scheduleEstablishedModelMemoryUpdate(ESTABLISHED_MODEL_MEMORY_UPDATE_DELAY); haveNewLatestModelSizeStats = false; } } @@ -293,15 +308,14 @@ private void processModelSizeStats(Context context, ModelSizeStats modelSizeStat persister.persistModelSizeStats(modelSizeStats); notifyModelMemoryStatusChange(context, modelSizeStats); latestModelSizeStats = modelSizeStats; + latestDateForEstablishedModelMemoryCalc = modelSizeStats.getTimestamp(); haveNewLatestModelSizeStats = true; // This is a crude way to NOT refresh the index and NOT attempt to update established model memory during the first 20 buckets // because this is when the model size stats are likely to be least stable and lots of updates will be coming through, and // we'll NEVER consider memory usage to be established during this period if (restoredSnapshot || bucketCount >= JobProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE) { - // We need to make all results written up to and including these stats available for the established memory calculation - persister.commitResultWrites(context.jobId); - updateEstablishedModelMemoryOnJob(modelSizeStats.getTimestamp(), modelSizeStats); + scheduleEstablishedModelMemoryUpdate(ESTABLISHED_MODEL_MEMORY_UPDATE_DELAY); } } @@ -351,26 +365,91 @@ public void onFailure(Exception e) { }); } - private void updateEstablishedModelMemoryOnJob(Date latestBucketTimestamp, ModelSizeStats modelSizeStats) { - jobProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, modelSizeStats, establishedModelMemory -> { - JobUpdate update = new JobUpdate.Builder(jobId) - .setEstablishedModelMemory(establishedModelMemory).build(); - UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update); - updateRequest.setWaitForAck(false); - - executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, new ActionListener() { - @Override - public void onResponse(PutJobAction.Response response) { - latestEstablishedModelMemory = establishedModelMemory; - LOGGER.debug("[{}] Updated job with established model memory [{}]", jobId, establishedModelMemory); - } + /** + * The purpose of this method is to avoid saturating the cluster state update thread + * when a lookback job is churning through buckets very fast and the memory usage of + * the job is changing regularly. The idea is to only update the established model + * memory associated with the job a few seconds after the new value has been received. + * If more updates are received during the delay period then they simply replace the + * value that originally caused the update to be scheduled. This rate limits cluster + * state updates due to established model memory changing to one per job per delay period. + * (In reality updates will only occur this rapidly during lookback. During real-time + * operation the limit of one model size stats document per bucket will mean there is a + * maximum of one cluster state update per job per bucket, and usually the bucket span + * is 5 minutes or more.) + * @param delay The delay before updating established model memory. + */ + synchronized void scheduleEstablishedModelMemoryUpdate(TimeValue delay) { - @Override - public void onFailure(Exception e) { - LOGGER.error("[" + jobId + "] Failed to update job with new established model memory [" + establishedModelMemory + "]", - e); + if (scheduledEstablishedModelMemoryUpdate == null) { + try { + scheduledEstablishedModelMemoryUpdate = client.threadPool().schedule(delay, MachineLearning.UTILITY_THREAD_POOL_NAME, + () -> runEstablishedModelMemoryUpdate(false)); + LOGGER.trace("[{}] Scheduled established model memory update to run in [{}]", jobId, delay); + } catch (EsRejectedExecutionException e) { + if (e.isExecutorShutdown()) { + LOGGER.debug("failed to schedule established model memory update; shutting down", e); + } else { + throw e; } - }); + } + } + } + + /** + * This method is called from two places: + * - From the {@link Future} used for delayed updates + * - When shutting down this result processor + * When shutting down the result processor it's only necessary to do anything + * if an update has been scheduled, but we want to do the update immediately. + * Despite cancelling the scheduled update in this case, it's possible that + * it's already started running, in which case this method will get called + * twice in quick succession. But the second call will do nothing, as + * scheduledEstablishedModelMemoryUpdate will have been reset + * to null by the first call. + */ + private synchronized void runEstablishedModelMemoryUpdate(boolean cancelExisting) { + + if (scheduledEstablishedModelMemoryUpdate != null) { + if (cancelExisting) { + LOGGER.debug("[{}] Bringing forward previously scheduled established model memory update", jobId); + FutureUtils.cancel(scheduledEstablishedModelMemoryUpdate); + } + scheduledEstablishedModelMemoryUpdate = null; + updateEstablishedModelMemoryOnJob(); + } + } + + private void updateEstablishedModelMemoryOnJob() { + + // Copy these before committing writes, so the calculation is done based on committed documents + Date latestBucketTimestamp = latestDateForEstablishedModelMemoryCalc; + ModelSizeStats modelSizeStatsForCalc = latestModelSizeStats; + + // We need to make all results written up to and including these stats available for the established memory calculation + persister.commitResultWrites(jobId); + + jobProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, modelSizeStatsForCalc, establishedModelMemory -> { + if (latestEstablishedModelMemory != establishedModelMemory) { + JobUpdate update = new JobUpdate.Builder(jobId).setEstablishedModelMemory(establishedModelMemory).build(); + UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update); + updateRequest.setWaitForAck(false); + + executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, + new ActionListener() { + @Override + public void onResponse(PutJobAction.Response response) { + latestEstablishedModelMemory = establishedModelMemory; + LOGGER.debug("[{}] Updated job with established model memory [{}]", jobId, establishedModelMemory); + } + + @Override + public void onFailure(Exception e) { + LOGGER.error("[" + jobId + "] Failed to update job with new established model memory [" + + establishedModelMemory + "]", e); + } + }); + } }, e -> LOGGER.error("[" + jobId + "] Failed to calculate established model memory", e)); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index 1221f85e61de8..8eb0317ba0dbe 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -34,6 +35,7 @@ import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer; import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import org.elasticsearch.xpack.ml.notifications.Auditor; +import org.junit.After; import org.junit.Before; import org.mockito.InOrder; @@ -43,14 +45,16 @@ import java.util.Date; import java.util.Iterator; import java.util.List; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; -import static org.mockito.Matchers.isNull; import static org.mockito.Matchers.same; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; @@ -64,7 +68,9 @@ public class AutoDetectResultProcessorTests extends ESTestCase { private static final String JOB_ID = "_id"; + private static final long BUCKET_SPAN_MS = 1000; + private ThreadPool threadPool; private Client client; private Auditor auditor; private Renormalizer renormalizer; @@ -72,12 +78,14 @@ public class AutoDetectResultProcessorTests extends ESTestCase { private JobProvider jobProvider; private FlushListener flushListener; private AutoDetectResultProcessor processorUnderTest; + private ScheduledThreadPoolExecutor executor; @Before public void setUpMocks() { + executor = new ScheduledThreadPoolExecutor(1); client = mock(Client.class); auditor = mock(Auditor.class); - ThreadPool threadPool = mock(ThreadPool.class); + threadPool = mock(ThreadPool.class); when(client.threadPool()).thenReturn(threadPool); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); renormalizer = mock(Renormalizer.class); @@ -85,7 +93,12 @@ public void setUpMocks() { jobProvider = mock(JobProvider.class); flushListener = mock(FlushListener.class); processorUnderTest = new AutoDetectResultProcessor(client, auditor, JOB_ID, renormalizer, persister, jobProvider, - new ModelSizeStats.Builder(JOB_ID).build(), false, flushListener); + new ModelSizeStats.Builder(JOB_ID).setTimestamp(new Date(BUCKET_SPAN_MS)).build(), false, flushListener); + } + + @After + public void cleanup() { + executor.shutdown(); } public void testProcess() throws TimeoutException { @@ -289,6 +302,8 @@ public void testProcessResult_modelSizeStats() { public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); + setupScheduleDelayTime(TimeValue.timeValueSeconds(5)); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); @@ -322,11 +337,14 @@ public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() { verifyNoMoreInteractions(auditor); } - public void testProcessResult_modelSizeStatsAfterManyBuckets() { + public void testProcessResult_modelSizeStatsAfterManyBuckets() throws Exception { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder); + // To avoid slowing down the test this is using a delay of 1 nanosecond rather than the 5 seconds used in production + setupScheduleDelayTime(TimeValue.timeValueNanos(1)); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; for (int i = 0; i < JobProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE; ++i) { @@ -338,16 +356,64 @@ public void testProcessResult_modelSizeStatsAfterManyBuckets() { AutodetectResult result = mock(AutodetectResult.class); ModelSizeStats modelSizeStats = mock(ModelSizeStats.class); + Date timestamp = new Date(BUCKET_SPAN_MS); + when(modelSizeStats.getTimestamp()).thenReturn(timestamp); when(result.getModelSizeStats()).thenReturn(modelSizeStats); processorUnderTest.processResult(context, result); - verify(persister, times(1)).persistModelSizeStats(modelSizeStats); - verify(persister, times(1)).commitResultWrites(JOB_ID); - verifyNoMoreInteractions(persister); - verify(jobProvider, times(1)).getEstablishedMemoryUsage(eq(JOB_ID), isNull(Date.class), eq(modelSizeStats), + // Some calls will be made 1 nanosecond later in a different thread, hence the assertBusy() + assertBusy(() -> { + verify(persister, times(1)).persistModelSizeStats(modelSizeStats); + verify(persister, times(1)).commitResultWrites(JOB_ID); + verifyNoMoreInteractions(persister); + verify(jobProvider, times(1)).getEstablishedMemoryUsage(eq(JOB_ID), eq(timestamp), eq(modelSizeStats), any(Consumer.class), + any(Consumer.class)); + verifyNoMoreInteractions(jobProvider); + assertEquals(modelSizeStats, processorUnderTest.modelSizeStats()); + }); + } + + public void testProcessResult_manyModelSizeStatsInQuickSuccession() throws Exception { + JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); + when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); + when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder); + + setupScheduleDelayTime(TimeValue.timeValueSeconds(1)); + + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); + context.deleteInterimRequired = false; + ModelSizeStats modelSizeStats = null; + for (int i = 1; i <= JobProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE + 5; ++i) { + AutodetectResult result = mock(AutodetectResult.class); + Bucket bucket = mock(Bucket.class); + when(bucket.getTimestamp()).thenReturn(new Date(BUCKET_SPAN_MS * i)); + when(result.getBucket()).thenReturn(bucket); + processorUnderTest.processResult(context, result); + if (i > JobProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE) { + result = mock(AutodetectResult.class); + modelSizeStats = mock(ModelSizeStats.class); + when(modelSizeStats.getTimestamp()).thenReturn(new Date(BUCKET_SPAN_MS * i)); + when(result.getModelSizeStats()).thenReturn(modelSizeStats); + processorUnderTest.processResult(context, result); + } + } + + ModelSizeStats lastModelSizeStats = modelSizeStats; + assertNotNull(lastModelSizeStats); + Date lastTimestamp = lastModelSizeStats.getTimestamp(); + + // Some calls will be made 1 second later in a different thread, hence the assertBusy() + assertBusy(() -> { + // All the model size stats should be persisted to the index... + verify(persister, times(5)).persistModelSizeStats(any(ModelSizeStats.class)); + // ...but only the last should trigger an established model memory update + verify(persister, times(1)).commitResultWrites(JOB_ID); + verifyNoMoreInteractions(persister); + verify(jobProvider, times(1)).getEstablishedMemoryUsage(eq(JOB_ID), eq(lastTimestamp), eq(lastModelSizeStats), any(Consumer.class), any(Consumer.class)); - verifyNoMoreInteractions(jobProvider); - assertEquals(modelSizeStats, processorUnderTest.modelSizeStats()); + verifyNoMoreInteractions(jobProvider); + assertEquals(lastModelSizeStats, processorUnderTest.modelSizeStats()); + }); } public void testProcessResult_modelSnapshot() { @@ -487,4 +553,9 @@ public void testKill() throws TimeoutException { verify(renormalizer, times(1)).waitUntilIdle(); verify(flushListener, times(1)).clear(); } + + private void setupScheduleDelayTime(TimeValue delay) { + when(threadPool.schedule(any(TimeValue.class), anyString(), any(Runnable.class))) + .thenAnswer(i -> executor.schedule((Runnable) i.getArguments()[2], delay.nanos(), TimeUnit.NANOSECONDS)); + } }