From d4183ad10186b2f41a12b5a1ace64b3a3abfbfad Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Mon, 4 Dec 2023 13:17:24 -0600 Subject: [PATCH] fixing tests --- .../AutodetectResultProcessorIT.java | 5 +- .../autodetect/AutodetectProcessManager.java | 8 +-- .../output/AutodetectResultProcessor.java | 14 ++-- .../job/process/normalizer/Renormalizer.java | 3 +- .../ShortCircuitingRenormalizer.java | 66 +++++++++++-------- .../AutodetectResultProcessorTests.java | 61 ++++++++--------- .../ShortCircuitingRenormalizerTests.java | 5 +- .../BlobStoreCacheMaintenanceService.java | 3 +- .../watcher/history/HistoryStoreTests.java | 37 +++++------ 9 files changed, 102 insertions(+), 100 deletions(-) diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index c24c1c1becb18..d2b3c32993bd6 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -6,6 +6,7 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; @@ -378,7 +379,7 @@ public void testParseQuantiles_GivenRenormalizationIsEnabled() throws Exception Optional persistedQuantiles = getQuantiles(); assertTrue(persistedQuantiles.isPresent()); assertEquals(quantiles, persistedQuantiles.get()); - verify(renormalizer).renormalize(eq(quantiles), any(Runnable.class)); + verify(renormalizer).renormalize(eq(quantiles), any(Runnable.class), ActionListener.noop()); } public void testParseQuantiles_GivenRenormalizationIsDisabled() throws Exception { @@ -395,7 +396,7 @@ public void testParseQuantiles_GivenRenormalizationIsDisabled() throws Exception Optional persistedQuantiles = getQuantiles(); assertTrue(persistedQuantiles.isPresent()); assertEquals(quantiles, persistedQuantiles.get()); - verify(renormalizer, never()).renormalize(any(), any()); + verify(renormalizer, never()).renormalize(any(), any(), ActionListener.noop()); } public void testDeleteInterimResults() throws Exception { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 439427a83eef6..5dc5d56ce6818 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -795,13 +795,7 @@ AutodetectCommunicator create(JobTask jobTask, Job job, AutodetectParams autodet ExecutorService autodetectWorkerExecutor; try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) { autodetectWorkerExecutor = createAutodetectExecutorService(autodetectExecutorService); - autodetectExecutorService.submit(() -> { - try { - processor.process(); - } finally { - jobRenormalizedResultsPersister.close(); - } - }); + autodetectExecutorService.submit(() -> { processor.process(ActionListener.releasing(jobRenormalizedResultsPersister)); }); } catch (EsRejectedExecutionException e) { // If submitting the operation to read the results from the process fails we need to close // the process too, so that other submitted operations to threadpool are stopped. diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java index aceb993a53a2b..9c6c8dcfe0994 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java @@ -64,7 +64,7 @@ /** * A runnable class that reads the autodetect process output in the - * {@link #process()} method and persists parsed + * {@link #process(ActionListener)} method and persists parsed * results via the {@linkplain JobResultsPersister} passed in the constructor. *

* Has methods to register and remove alert observers. @@ -167,13 +167,13 @@ public AutodetectResultProcessor( this.runningForecasts = new ConcurrentHashMap<>(); } - public void process() { + public void process(ActionListener listener) { // If a function call in this throws for some reason we don't want it // to kill the results reader thread as autodetect will be blocked // trying to write its output. try { - readResults(); + readResults(listener); try { if (processKilled == false) { @@ -211,14 +211,14 @@ public void process() { } } - private void readResults() { + private void readResults(ActionListener listener) { currentRunBucketCount = 0; try { Iterator iterator = process.readAutodetectResults(); while (iterator.hasNext()) { try { AutodetectResult result = iterator.next(); - processResult(result); + processResult(result, listener); if (result.getBucket() != null) { logger.trace("[{}] Bucket number {} parsed from output", jobId, currentRunBucketCount); } @@ -268,7 +268,7 @@ void handleOpenForecasts() { } } - void processResult(AutodetectResult result) { + void processResult(AutodetectResult result, ActionListener listener) { if (processKilled) { return; } @@ -382,7 +382,7 @@ void processResult(AutodetectResult result) { // quantiles are superseded before they're used. bulkResultsPersister.executeRequest(); persister.commitWrites(jobId, JobResultsPersister.CommitType.RESULTS); - }); + }, listener); } } FlushAcknowledgement flushAcknowledgement = result.getFlushAcknowledgement(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Renormalizer.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Renormalizer.java index 62d3a7d58524d..f0dd62fbed198 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Renormalizer.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Renormalizer.java @@ -6,6 +6,7 @@ */ package org.elasticsearch.xpack.ml.job.process.normalizer; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; public interface Renormalizer { @@ -20,7 +21,7 @@ public interface Renormalizer { * Update the anomaly score field on all previously persisted buckets * and all contained records */ - void renormalize(Quantiles quantiles, Runnable setupStep); + void renormalize(Quantiles quantiles, Runnable setupStep, ActionListener listener); /** * Blocks until the renormalizer is idle and no further quantiles updates are pending. diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ShortCircuitingRenormalizer.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ShortCircuitingRenormalizer.java index 1371221cb16b7..2e61c0d196112 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ShortCircuitingRenormalizer.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ShortCircuitingRenormalizer.java @@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; @@ -54,7 +55,7 @@ public boolean isEnabled() { } @Override - public void renormalize(Quantiles quantiles, Runnable setupStep) { + public void renormalize(Quantiles quantiles, Runnable setupStep, ActionListener listener) { if (isEnabled() == false) { return; } @@ -69,7 +70,7 @@ public void renormalize(Quantiles quantiles, Runnable setupStep) { latestQuantilesHolder.getEvictedTimestamp(), latestQuantilesHolder.getLatch() ); - tryStartWork(); + tryStartWork(listener); } } @@ -128,8 +129,9 @@ private synchronized AugmentedQuantiles getLatestAugmentedQuantilesAndClear() { return latest; } - private synchronized boolean tryStartWork() { + private synchronized boolean tryStartWork(ActionListener listener) { if (latestQuantilesHolder == null) { + listener.onResponse(null); return false; } // Don't start a thread if another normalization thread is still working. The existing thread will @@ -137,7 +139,7 @@ private synchronized boolean tryStartWork() { // without hogging threads or queuing up many large quantiles documents. if (semaphore.tryAcquire()) { try { - latestTask = executorService.submit(this::doRenormalizations); + latestTask = executorService.submit(() -> this.doRenormalizations(listener)); } catch (RejectedExecutionException e) { latestQuantilesHolder.getLatch().countDown(); latestQuantilesHolder = null; @@ -148,6 +150,7 @@ private synchronized boolean tryStartWork() { } return true; } + listener.onResponse(null); return false; } @@ -161,31 +164,38 @@ private synchronized boolean tryFinishWork() { return true; } - private void doRenormalizations() { - do { - AugmentedQuantiles latestAugmentedQuantiles = getLatestAugmentedQuantilesAndClear(); - assert latestAugmentedQuantiles != null; - if (latestAugmentedQuantiles != null) { // TODO: remove this if the assert doesn't trip in CI over the next year or so - latestAugmentedQuantiles.runSetupStep(); - Quantiles latestQuantiles = latestAugmentedQuantiles.getQuantiles(); - CountDownLatch latch = latestAugmentedQuantiles.getLatch(); - try { - scoresUpdater.update( - latestQuantiles.getQuantileState(), - latestQuantiles.getTimestamp().getTime(), - latestAugmentedQuantiles.getWindowExtensionMs() - ); - } catch (Exception e) { - logger.error("[" + jobId + "] Normalization failed", e); - } finally { - latch.countDown(); + private void doRenormalizations(ActionListener listener) { + try { + do { + AugmentedQuantiles latestAugmentedQuantiles = getLatestAugmentedQuantilesAndClear(); + assert latestAugmentedQuantiles != null; + if (latestAugmentedQuantiles != null) { // TODO: remove this if the assert doesn't trip in CI over the next year or so + latestAugmentedQuantiles.runSetupStep(); + Quantiles latestQuantiles = latestAugmentedQuantiles.getQuantiles(); + CountDownLatch latch = latestAugmentedQuantiles.getLatch(); + try { + scoresUpdater.update( + latestQuantiles.getQuantileState(), + latestQuantiles.getTimestamp().getTime(), + latestAugmentedQuantiles.getWindowExtensionMs() + ); + } catch (Exception e) { + logger.error("[" + jobId + "] Normalization failed", e); + } finally { + latch.countDown(); + } + } else { + logger.warn("[{}] request to normalize null quantiles", jobId); } - } else { - logger.warn("[{}] request to normalize null quantiles", jobId); - } - // Loop if more work has become available while we were working, because the - // tasks originally submitted to do that work will have exited early. - } while (tryFinishWork() == false); + // Loop if more work has become available while we were working, because the + // tasks originally submitted to do that work will have exited early. + } while (tryFinishWork() == false); + } catch (Exception e) { + listener.onFailure(e); + throw e; + } finally { + listener.onResponse(null); + } } /** diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java index ed050a99cd16d..c5c0b8d52b3c2 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java @@ -8,6 +8,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkResponse; @@ -140,7 +141,7 @@ public void testProcess() throws Exception { AutodetectResult autodetectResult = mock(AutodetectResult.class); when(process.readAutodetectResults()).thenReturn(Collections.singletonList(autodetectResult).iterator()); - processorUnderTest.process(); + processorUnderTest.process(ActionListener.noop()); processorUnderTest.awaitCompletion(); assertThat(processorUnderTest.completionLatch.getCount(), is(equalTo(0L))); @@ -157,7 +158,7 @@ public void testProcessResult_bucket() { when(result.getBucket()).thenReturn(bucket); processorUnderTest.setDeleteInterimRequired(false); - processorUnderTest.processResult(result); + processorUnderTest.processResult(result, ActionListener.noop()); verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); verify(bulkResultsPersister).persistTimingStats(any(TimingStats.class)); @@ -173,7 +174,7 @@ public void testProcessResult_bucket_deleteInterimRequired() { Bucket bucket = new Bucket(JOB_ID, new Date(), BUCKET_SPAN_MS); when(result.getBucket()).thenReturn(bucket); - processorUnderTest.processResult(result); + processorUnderTest.processResult(result, ActionListener.noop()); assertEquals(1L, processorUnderTest.getCurrentRunBucketCount()); assertFalse(processorUnderTest.isDeleteInterimRequired()); @@ -193,7 +194,7 @@ public void testProcessResult_bucket_isInterim() { when(result.getBucket()).thenReturn(bucket); processorUnderTest.setDeleteInterimRequired(false); - processorUnderTest.processResult(result); + processorUnderTest.processResult(result, ActionListener.noop()); assertEquals(0L, processorUnderTest.getCurrentRunBucketCount()); verify(bulkResultsPersister, never()).persistTimingStats(any(TimingStats.class)); @@ -211,7 +212,7 @@ public void testProcessResult_records() { when(result.getRecords()).thenReturn(records); processorUnderTest.setDeleteInterimRequired(false); - processorUnderTest.processResult(result); + processorUnderTest.processResult(result, ActionListener.noop()); verify(bulkResultsPersister).persistRecords(records); verify(bulkResultsPersister, never()).executeRequest(); @@ -227,7 +228,7 @@ public void testProcessResult_influencers() { when(result.getInfluencers()).thenReturn(influencers); processorUnderTest.setDeleteInterimRequired(false); - processorUnderTest.processResult(result); + processorUnderTest.processResult(result, ActionListener.noop()); verify(bulkResultsPersister).persistInfluencers(influencers); verify(bulkResultsPersister, never()).executeRequest(); @@ -241,7 +242,7 @@ public void testProcessResult_categoryDefinition() { when(result.getCategoryDefinition()).thenReturn(categoryDefinition); processorUnderTest.setDeleteInterimRequired(false); - processorUnderTest.processResult(result); + processorUnderTest.processResult(result, ActionListener.noop()); verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); verify(bulkResultsPersister).persistCategoryDefinition(eq(categoryDefinition)); @@ -256,7 +257,7 @@ public void testProcessResult_flushAcknowledgementWithRefresh() { when(result.getFlushAcknowledgement()).thenReturn(flushAcknowledgement); processorUnderTest.setDeleteInterimRequired(false); - processorUnderTest.processResult(result); + processorUnderTest.processResult(result, ActionListener.noop()); assertTrue(processorUnderTest.isDeleteInterimRequired()); verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); @@ -275,7 +276,7 @@ public void testProcessResult_flushAcknowledgement() { when(result.getFlushAcknowledgement()).thenReturn(flushAcknowledgement); processorUnderTest.setDeleteInterimRequired(false); - processorUnderTest.processResult(result); + processorUnderTest.processResult(result, ActionListener.noop()); assertTrue(processorUnderTest.isDeleteInterimRequired()); verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); @@ -298,7 +299,7 @@ public void testProcessResult_flushAcknowledgementMustBeProcessedLast() { when(result.getCategoryDefinition()).thenReturn(categoryDefinition); processorUnderTest.setDeleteInterimRequired(false); - processorUnderTest.processResult(result); + processorUnderTest.processResult(result, ActionListener.noop()); assertTrue(processorUnderTest.isDeleteInterimRequired()); InOrder inOrder = inOrder(persister, bulkResultsPersister, flushListener); @@ -318,7 +319,7 @@ public void testProcessResult_modelPlot() { when(result.getModelPlot()).thenReturn(modelPlot); processorUnderTest.setDeleteInterimRequired(false); - processorUnderTest.processResult(result); + processorUnderTest.processResult(result, ActionListener.noop()); verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); verify(bulkResultsPersister).persistModelPlot(modelPlot); @@ -329,7 +330,7 @@ public void testProcessResult_annotation() { Annotation annotation = AnnotationTests.randomAnnotation(JOB_ID); when(result.getAnnotation()).thenReturn(annotation); - processorUnderTest.processResult(result); + processorUnderTest.processResult(result, ActionListener.noop()); verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); verify(bulkAnnotationsPersister).persistAnnotation(annotation); @@ -344,7 +345,7 @@ public void testProcessResult_modelSizeStats() { when(result.getModelSizeStats()).thenReturn(modelSizeStats); processorUnderTest.setDeleteInterimRequired(false); - processorUnderTest.processResult(result); + processorUnderTest.processResult(result, ActionListener.noop()); assertThat(processorUnderTest.modelSizeStats(), is(equalTo(modelSizeStats))); verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); @@ -358,12 +359,12 @@ public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() { // First one with soft_limit ModelSizeStats modelSizeStats = new ModelSizeStats.Builder(JOB_ID).setMemoryStatus(ModelSizeStats.MemoryStatus.SOFT_LIMIT).build(); when(result.getModelSizeStats()).thenReturn(modelSizeStats); - processorUnderTest.processResult(result); + processorUnderTest.processResult(result, ActionListener.noop()); // Another with soft_limit modelSizeStats = new ModelSizeStats.Builder(JOB_ID).setMemoryStatus(ModelSizeStats.MemoryStatus.SOFT_LIMIT).build(); when(result.getModelSizeStats()).thenReturn(modelSizeStats); - processorUnderTest.processResult(result); + processorUnderTest.processResult(result, ActionListener.noop()); // Now with hard_limit modelSizeStats = new ModelSizeStats.Builder(JOB_ID).setMemoryStatus(ModelSizeStats.MemoryStatus.HARD_LIMIT) @@ -371,12 +372,12 @@ public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() { .setModelBytesExceeded(ByteSizeValue.ofKb(1).getBytes()) .build(); when(result.getModelSizeStats()).thenReturn(modelSizeStats); - processorUnderTest.processResult(result); + processorUnderTest.processResult(result, ActionListener.noop()); // And another with hard_limit modelSizeStats = new ModelSizeStats.Builder(JOB_ID).setMemoryStatus(ModelSizeStats.MemoryStatus.HARD_LIMIT).build(); when(result.getModelSizeStats()).thenReturn(modelSizeStats); - processorUnderTest.processResult(result); + processorUnderTest.processResult(result, ActionListener.noop()); verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); verify(bulkResultsPersister, times(4)).persistModelSizeStats(any(ModelSizeStats.class)); @@ -400,7 +401,7 @@ public void testProcessResult_categorizationStatusChangeAnnotationCausesNotifica .setPartitionFieldValue("foo") .build(); when(result.getAnnotation()).thenReturn(annotation); - processorUnderTest.processResult(result); + processorUnderTest.processResult(result, ActionListener.noop()); verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); verify(bulkAnnotationsPersister).persistAnnotation(annotation); @@ -422,7 +423,7 @@ public void testProcessResult_modelSnapshot() { ); processorUnderTest.setDeleteInterimRequired(false); - processorUnderTest.processResult(result); + processorUnderTest.processResult(result, ActionListener.noop()); Annotation expectedAnnotation = new Annotation.Builder().setAnnotation("Job model snapshot with id [a_snapshot_id] stored") .setCreateTime(Date.from(CURRENT_TIME)) @@ -453,12 +454,12 @@ public void testProcessResult_quantiles_givenRenormalizationIsEnabled() { when(renormalizer.isEnabled()).thenReturn(true); processorUnderTest.setDeleteInterimRequired(false); - processorUnderTest.processResult(result); + processorUnderTest.processResult(result, ActionListener.noop()); verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); verify(persister).persistQuantiles(eq(quantiles), any()); verify(renormalizer).isEnabled(); - verify(renormalizer).renormalize(eq(quantiles), any(Runnable.class)); + verify(renormalizer).renormalize(eq(quantiles), any(Runnable.class), ActionListener.noop()); } public void testProcessResult_quantiles_givenRenormalizationIsDisabled() { @@ -468,7 +469,7 @@ public void testProcessResult_quantiles_givenRenormalizationIsDisabled() { when(renormalizer.isEnabled()).thenReturn(false); processorUnderTest.setDeleteInterimRequired(false); - processorUnderTest.processResult(result); + processorUnderTest.processResult(result, ActionListener.noop()); verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); verify(persister).persistQuantiles(eq(quantiles), any()); @@ -480,7 +481,7 @@ public void testAwaitCompletion() throws Exception { AutodetectResult autodetectResult = mock(AutodetectResult.class); when(process.readAutodetectResults()).thenReturn(Collections.singletonList(autodetectResult).iterator()); - processorUnderTest.process(); + processorUnderTest.process(ActionListener.noop()); processorUnderTest.awaitCompletion(); assertThat(processorUnderTest.completionLatch.getCount(), is(equalTo(0L))); assertThat(processorUnderTest.updateModelSnapshotSemaphore.availablePermits(), is(equalTo(1))); @@ -501,7 +502,7 @@ public void testPersisterThrowingDoesntBlockProcessing() { doThrow(new ElasticsearchException("this test throws")).when(persister).persistModelSnapshot(any(), any(), any()); - processorUnderTest.process(); + processorUnderTest.process(ActionListener.noop()); verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); verify(persister, times(2)).persistModelSnapshot(any(), eq(WriteRequest.RefreshPolicy.IMMEDIATE), any()); @@ -514,7 +515,7 @@ public void testParsingErrorSetsFailed() throws Exception { when(process.readAutodetectResults()).thenReturn(iterator); assertFalse(processorUnderTest.isFailed()); - processorUnderTest.process(); + processorUnderTest.process(ActionListener.noop()); assertTrue(processorUnderTest.isFailed()); // Wait for flush should return immediately @@ -532,14 +533,14 @@ public void testKill() throws Exception { when(process.readAutodetectResults()).thenReturn(Collections.singletonList(autodetectResult).iterator()); processorUnderTest.setProcessKilled(); - processorUnderTest.process(); + processorUnderTest.process(ActionListener.noop()); processorUnderTest.awaitCompletion(); assertThat(processorUnderTest.completionLatch.getCount(), is(equalTo(0L))); assertThat(processorUnderTest.updateModelSnapshotSemaphore.availablePermits(), is(equalTo(1))); verify(persister).bulkPersisterBuilder(eq(JOB_ID), any()); verify(persister).commitWrites(JOB_ID, EnumSet.allOf(JobResultsPersister.CommitType.class)); - verify(renormalizer, never()).renormalize(any(), any()); + verify(renormalizer, never()).renormalize(any(), any(), ActionListener.noop()); verify(renormalizer).shutdown(); verify(renormalizer).waitUntilIdle(); verify(flushListener).clear(); @@ -555,7 +556,7 @@ public void testProcessingOpenedForecasts() { ArgumentCaptor argument = ArgumentCaptor.forClass(ForecastRequestStats.class); processorUnderTest.setDeleteInterimRequired(false); - processorUnderTest.processResult(result); + processorUnderTest.processResult(result, ActionListener.noop()); processorUnderTest.handleOpenForecasts(); @@ -580,14 +581,14 @@ public void testProcessingForecasts() { ArgumentCaptor argument = ArgumentCaptor.forClass(ForecastRequestStats.class); processorUnderTest.setDeleteInterimRequired(false); - processorUnderTest.processResult(result); + processorUnderTest.processResult(result, ActionListener.noop()); result = mock(AutodetectResult.class); forecastRequestStats = new ForecastRequestStats("foo", "forecast"); forecastRequestStats.setStatus(ForecastRequestStats.ForecastRequestStatus.FINISHED); when(result.getForecastRequestStats()).thenReturn(forecastRequestStats); - processorUnderTest.processResult(result); + processorUnderTest.processResult(result, ActionListener.noop()); // There shouldn't be any opened forecasts. This call should do nothing processorUnderTest.handleOpenForecasts(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ShortCircuitingRenormalizerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ShortCircuitingRenormalizerTests.java index ae34a675d15bc..f22e14c3cc1ca 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ShortCircuitingRenormalizerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ShortCircuitingRenormalizerTests.java @@ -6,6 +6,7 @@ */ package org.elasticsearch.xpack.ml.job.process.normalizer; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; import org.junit.Before; @@ -47,12 +48,12 @@ public void testNormalize() throws InterruptedException { // Blast through many sets of quantiles in quick succession, faster than the normalizer can process them for (int i = 1; i < TEST_SIZE / 2; ++i) { Quantiles quantiles = new Quantiles(JOB_ID, new Date(), Integer.toString(i)); - renormalizer.renormalize(quantiles, () -> {}); + renormalizer.renormalize(quantiles, () -> {}, ActionListener.noop()); } renormalizer.waitUntilIdle(); for (int i = TEST_SIZE / 2; i <= TEST_SIZE; ++i) { Quantiles quantiles = new Quantiles(JOB_ID, new Date(), Integer.toString(i)); - renormalizer.renormalize(quantiles, () -> {}); + renormalizer.renormalize(quantiles, () -> {}, ActionListener.noop()); } renormalizer.waitUntilIdle(); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java index fc7574db47801..0048718aaa4fa 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java @@ -522,7 +522,6 @@ public void onFailure(Exception e) { assert knownRepositories != null; final Instant expirationTimeCopy = this.expirationTime; assert expirationTimeCopy != null; - for (SearchHit searchHit : searchHits) { lastSortValues = searchHit.getSortValues(); assert searchHit.getId() != null; @@ -566,11 +565,11 @@ public void onFailure(Exception e) { ); } } - assert lastSortValues != null; if (bulkRequest.numberOfActions() == 0) { this.searchResponse = null; this.searchAfter = lastSortValues; + bulkRequest.close(); executeNext(this); return; } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java index bba6de279dd73..6657590f26df6 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java @@ -39,7 +39,6 @@ import org.elasticsearch.xpack.watcher.notification.jira.JiraAccount; import org.elasticsearch.xpack.watcher.notification.jira.JiraIssue; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent; -import org.junit.After; import org.junit.Before; import org.mockito.ArgumentCaptor; @@ -61,9 +60,7 @@ public class HistoryStoreTests extends ESTestCase { - private HistoryStore historyStore; private Client client; - private BulkProcessor2 bulkProcessor; @Before public void init() { @@ -73,16 +70,6 @@ public void init() { when(client.threadPool()).thenReturn(threadPool); when(client.settings()).thenReturn(settings); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(settings)); - BulkProcessor2.Listener listener = mock(BulkProcessor2.Listener.class); - bulkProcessor = BulkProcessor2.builder(client::bulk, listener, threadPool).setBulkActions(1).build(); - historyStore = new HistoryStore(bulkProcessor); - } - - @After - public void cleanup() { - if (bulkProcessor != null) { - bulkProcessor.close(); - } } public void testPut() throws Exception { @@ -94,9 +81,9 @@ public void testPut() throws Exception { IndexResponse indexResponse = mock(IndexResponse.class); doAnswer(invocation -> { - BulkRequest request = (BulkRequest) invocation.getArguments()[1]; + BulkRequest request = (BulkRequest) invocation.getArguments()[0]; @SuppressWarnings("unchecked") - ActionListener listener = (ActionListener) invocation.getArguments()[2]; + ActionListener listener = (ActionListener) invocation.getArguments()[1]; IndexRequest indexRequest = (IndexRequest) request.requests().get(0); if (indexRequest.id().equals(wid.value()) @@ -111,7 +98,11 @@ public void testPut() throws Exception { return null; }).when(client).bulk(any(), any()); - historyStore.put(watchRecord); + BulkProcessor2.Listener listener = mock(BulkProcessor2.Listener.class); + try (BulkProcessor2 bulkProcessor = BulkProcessor2.builder(client::bulk, listener, client.threadPool()).setBulkActions(1).build()) { + HistoryStore historyStore = new HistoryStore(bulkProcessor); + historyStore.put(watchRecord); + } verify(client).bulk(any(), any()); } @@ -159,17 +150,21 @@ public void testStoreWithHideSecrets() throws Exception { ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(BulkRequest.class); doAnswer(invocation -> { @SuppressWarnings("unchecked") - ActionListener listener = (ActionListener) invocation.getArguments()[2]; + ActionListener listener = (ActionListener) invocation.getArguments()[1]; IndexResponse indexResponse = mock(IndexResponse.class); listener.onResponse(new BulkResponse(new BulkItemResponse[] { BulkItemResponse.success(1, OpType.CREATE, indexResponse) }, 1)); return null; }).when(client).bulk(requestCaptor.capture(), any()); - if (randomBoolean()) { - historyStore.put(watchRecord); - } else { - historyStore.forcePut(watchRecord); + BulkProcessor2.Listener listener = mock(BulkProcessor2.Listener.class); + try (BulkProcessor2 bulkProcessor = BulkProcessor2.builder(client::bulk, listener, client.threadPool()).setBulkActions(1).build()) { + HistoryStore historyStore = new HistoryStore(bulkProcessor); + if (randomBoolean()) { + historyStore.put(watchRecord); + } else { + historyStore.forcePut(watchRecord); + } } assertThat(requestCaptor.getAllValues(), hasSize(1));