Skip to content

Commit

Permalink
fixing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Dec 4, 2023
1 parent 5bcbb92 commit d4183ad
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
package org.elasticsearch.xpack.ml.integration;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
Expand Down Expand Up @@ -378,7 +379,7 @@ public void testParseQuantiles_GivenRenormalizationIsEnabled() throws Exception
Optional<Quantiles> persistedQuantiles = getQuantiles();
assertTrue(persistedQuantiles.isPresent());
assertEquals(quantiles, persistedQuantiles.get());
verify(renormalizer).renormalize(eq(quantiles), any(Runnable.class));
verify(renormalizer).renormalize(eq(quantiles), any(Runnable.class), ActionListener.noop());
}

public void testParseQuantiles_GivenRenormalizationIsDisabled() throws Exception {
Expand All @@ -395,7 +396,7 @@ public void testParseQuantiles_GivenRenormalizationIsDisabled() throws Exception
Optional<Quantiles> persistedQuantiles = getQuantiles();
assertTrue(persistedQuantiles.isPresent());
assertEquals(quantiles, persistedQuantiles.get());
verify(renormalizer, never()).renormalize(any(), any());
verify(renormalizer, never()).renormalize(any(), any(), ActionListener.noop());
}

public void testDeleteInterimResults() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -795,13 +795,7 @@ AutodetectCommunicator create(JobTask jobTask, Job job, AutodetectParams autodet
ExecutorService autodetectWorkerExecutor;
try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) {
autodetectWorkerExecutor = createAutodetectExecutorService(autodetectExecutorService);
autodetectExecutorService.submit(() -> {
try {
processor.process();
} finally {
jobRenormalizedResultsPersister.close();
}
});
autodetectExecutorService.submit(() -> { processor.process(ActionListener.releasing(jobRenormalizedResultsPersister)); });
} catch (EsRejectedExecutionException e) {
// If submitting the operation to read the results from the process fails we need to close
// the process too, so that other submitted operations to threadpool are stopped.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@

/**
* A runnable class that reads the autodetect process output in the
* {@link #process()} method and persists parsed
* {@link #process(ActionListener<Void>)} method and persists parsed
* results via the {@linkplain JobResultsPersister} passed in the constructor.
* <p>
* Has methods to register and remove alert observers.
Expand Down Expand Up @@ -167,13 +167,13 @@ public AutodetectResultProcessor(
this.runningForecasts = new ConcurrentHashMap<>();
}

public void process() {
public void process(ActionListener<Void> listener) {

// If a function call in this throws for some reason we don't want it
// to kill the results reader thread as autodetect will be blocked
// trying to write its output.
try {
readResults();
readResults(listener);

try {
if (processKilled == false) {
Expand Down Expand Up @@ -211,14 +211,14 @@ public void process() {
}
}

private void readResults() {
private void readResults(ActionListener<Void> listener) {
currentRunBucketCount = 0;
try {
Iterator<AutodetectResult> iterator = process.readAutodetectResults();
while (iterator.hasNext()) {
try {
AutodetectResult result = iterator.next();
processResult(result);
processResult(result, listener);
if (result.getBucket() != null) {
logger.trace("[{}] Bucket number {} parsed from output", jobId, currentRunBucketCount);
}
Expand Down Expand Up @@ -268,7 +268,7 @@ void handleOpenForecasts() {
}
}

void processResult(AutodetectResult result) {
void processResult(AutodetectResult result, ActionListener<Void> listener) {
if (processKilled) {
return;
}
Expand Down Expand Up @@ -382,7 +382,7 @@ void processResult(AutodetectResult result) {
// quantiles are superseded before they're used.
bulkResultsPersister.executeRequest();
persister.commitWrites(jobId, JobResultsPersister.CommitType.RESULTS);
});
}, listener);
}
}
FlushAcknowledgement flushAcknowledgement = result.getFlushAcknowledgement();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
package org.elasticsearch.xpack.ml.job.process.normalizer;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;

public interface Renormalizer {
Expand All @@ -20,7 +21,7 @@ public interface Renormalizer {
* Update the anomaly score field on all previously persisted buckets
* and all contained records
*/
void renormalize(Quantiles quantiles, Runnable setupStep);
void renormalize(Quantiles quantiles, Runnable setupStep, ActionListener<Void> listener);

/**
* Blocks until the renormalizer is idle and no further quantiles updates are pending.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;

Expand Down Expand Up @@ -54,7 +55,7 @@ public boolean isEnabled() {
}

@Override
public void renormalize(Quantiles quantiles, Runnable setupStep) {
public void renormalize(Quantiles quantiles, Runnable setupStep, ActionListener<Void> listener) {
if (isEnabled() == false) {
return;
}
Expand All @@ -69,7 +70,7 @@ public void renormalize(Quantiles quantiles, Runnable setupStep) {
latestQuantilesHolder.getEvictedTimestamp(),
latestQuantilesHolder.getLatch()
);
tryStartWork();
tryStartWork(listener);
}
}

Expand Down Expand Up @@ -128,16 +129,17 @@ private synchronized AugmentedQuantiles getLatestAugmentedQuantilesAndClear() {
return latest;
}

private synchronized boolean tryStartWork() {
private synchronized boolean tryStartWork(ActionListener<Void> listener) {
if (latestQuantilesHolder == null) {
listener.onResponse(null);
return false;
}
// Don't start a thread if another normalization thread is still working. The existing thread will
// do this normalization when it finishes its current one. This means we serialise normalizations
// without hogging threads or queuing up many large quantiles documents.
if (semaphore.tryAcquire()) {
try {
latestTask = executorService.submit(this::doRenormalizations);
latestTask = executorService.submit(() -> this.doRenormalizations(listener));
} catch (RejectedExecutionException e) {
latestQuantilesHolder.getLatch().countDown();
latestQuantilesHolder = null;
Expand All @@ -148,6 +150,7 @@ private synchronized boolean tryStartWork() {
}
return true;
}
listener.onResponse(null);
return false;
}

Expand All @@ -161,31 +164,38 @@ private synchronized boolean tryFinishWork() {
return true;
}

private void doRenormalizations() {
do {
AugmentedQuantiles latestAugmentedQuantiles = getLatestAugmentedQuantilesAndClear();
assert latestAugmentedQuantiles != null;
if (latestAugmentedQuantiles != null) { // TODO: remove this if the assert doesn't trip in CI over the next year or so
latestAugmentedQuantiles.runSetupStep();
Quantiles latestQuantiles = latestAugmentedQuantiles.getQuantiles();
CountDownLatch latch = latestAugmentedQuantiles.getLatch();
try {
scoresUpdater.update(
latestQuantiles.getQuantileState(),
latestQuantiles.getTimestamp().getTime(),
latestAugmentedQuantiles.getWindowExtensionMs()
);
} catch (Exception e) {
logger.error("[" + jobId + "] Normalization failed", e);
} finally {
latch.countDown();
private void doRenormalizations(ActionListener<Void> listener) {
try {
do {
AugmentedQuantiles latestAugmentedQuantiles = getLatestAugmentedQuantilesAndClear();
assert latestAugmentedQuantiles != null;
if (latestAugmentedQuantiles != null) { // TODO: remove this if the assert doesn't trip in CI over the next year or so
latestAugmentedQuantiles.runSetupStep();
Quantiles latestQuantiles = latestAugmentedQuantiles.getQuantiles();
CountDownLatch latch = latestAugmentedQuantiles.getLatch();
try {
scoresUpdater.update(
latestQuantiles.getQuantileState(),
latestQuantiles.getTimestamp().getTime(),
latestAugmentedQuantiles.getWindowExtensionMs()
);
} catch (Exception e) {
logger.error("[" + jobId + "] Normalization failed", e);
} finally {
latch.countDown();
}
} else {
logger.warn("[{}] request to normalize null quantiles", jobId);
}
} else {
logger.warn("[{}] request to normalize null quantiles", jobId);
}
// Loop if more work has become available while we were working, because the
// tasks originally submitted to do that work will have exited early.
} while (tryFinishWork() == false);
// Loop if more work has become available while we were working, because the
// tasks originally submitted to do that work will have exited early.
} while (tryFinishWork() == false);
} catch (Exception e) {
listener.onFailure(e);
throw e;
} finally {
listener.onResponse(null);
}
}

/**
Expand Down
Loading

0 comments on commit d4183ad

Please sign in to comment.