Skip to content

Commit

Permalink
fixing ML tests
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Dec 5, 2023
1 parent ce71b8f commit 77e55da
Show file tree
Hide file tree
Showing 14 changed files with 169 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public void testProcessResults() throws Exception {
resultsBuilder.addQuantiles(quantiles);
when(process.readAutodetectResults()).thenReturn(resultsBuilder.build().iterator());

resultProcessor.process(ActionListener.noop());
resultProcessor.process();
resultProcessor.awaitCompletion();

BucketsQueryBuilder bucketsQuery = new BucketsQueryBuilder().includeInterim(true);
Expand Down Expand Up @@ -302,7 +302,7 @@ public void testProcessResults_ModelSnapshot() throws Exception {
ResultsBuilder resultsBuilder = new ResultsBuilder().addModelSnapshot(modelSnapshot);
when(process.readAutodetectResults()).thenReturn(resultsBuilder.build().iterator());

resultProcessor.process(ActionListener.noop());
resultProcessor.process();
resultProcessor.awaitCompletion();

QueryPage<ModelSnapshot> persistedModelSnapshot = getModelSnapshots();
Expand Down Expand Up @@ -335,7 +335,7 @@ public void testProcessResults_TimingStats() throws Exception {
.addBucket(createBucket(false, 1000));
when(process.readAutodetectResults()).thenReturn(resultsBuilder.build().iterator());

resultProcessor.process(ActionListener.noop());
resultProcessor.process();
resultProcessor.awaitCompletion();

TimingStats timingStats = resultProcessor.timingStats();
Expand All @@ -354,7 +354,7 @@ public void testProcessResults_InterimResultsDoNotChangeTimingStats() throws Exc
.addBucket(createBucket(false, 10000));
when(process.readAutodetectResults()).thenReturn(resultsBuilder.build().iterator());

resultProcessor.process(ActionListener.noop());
resultProcessor.process();
resultProcessor.awaitCompletion();

TimingStats timingStats = resultProcessor.timingStats();
Expand All @@ -373,13 +373,13 @@ public void testParseQuantiles_GivenRenormalizationIsEnabled() throws Exception
resultsBuilder.addQuantiles(quantiles);
when(process.readAutodetectResults()).thenReturn(resultsBuilder.build().iterator());

resultProcessor.process(ActionListener.noop());
resultProcessor.process();
resultProcessor.awaitCompletion();

Optional<Quantiles> persistedQuantiles = getQuantiles();
assertTrue(persistedQuantiles.isPresent());
assertEquals(quantiles, persistedQuantiles.get());
verify(renormalizer).renormalize(eq(quantiles), any(Runnable.class), ActionListener.noop());
verify(renormalizer).renormalize(eq(quantiles), any(Runnable.class));
}

public void testParseQuantiles_GivenRenormalizationIsDisabled() throws Exception {
Expand All @@ -390,13 +390,13 @@ public void testParseQuantiles_GivenRenormalizationIsDisabled() throws Exception
resultsBuilder.addQuantiles(quantiles);
when(process.readAutodetectResults()).thenReturn(resultsBuilder.build().iterator());

resultProcessor.process(ActionListener.noop());
resultProcessor.process();
resultProcessor.awaitCompletion();

Optional<Quantiles> persistedQuantiles = getQuantiles();
assertTrue(persistedQuantiles.isPresent());
assertEquals(quantiles, persistedQuantiles.get());
verify(renormalizer, never()).renormalize(any(), any(), ActionListener.noop());
verify(renormalizer, never()).renormalize(any(), any());
}

public void testDeleteInterimResults() throws Exception {
Expand All @@ -410,7 +410,7 @@ public void testDeleteInterimResults() throws Exception {
.addBucket(nonInterimBucket); // and this will delete the interim results
when(process.readAutodetectResults()).thenReturn(resultsBuilder.build().iterator());

resultProcessor.process(ActionListener.noop());
resultProcessor.process();
resultProcessor.awaitCompletion();

QueryPage<Bucket> persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true));
Expand Down Expand Up @@ -442,7 +442,7 @@ public void testMultipleFlushesBetweenPersisting() throws Exception {
.addBucket(finalBucket); // this deletes the previous interim and persists final bucket & records
when(process.readAutodetectResults()).thenReturn(resultsBuilder.build().iterator());

resultProcessor.process(ActionListener.noop());
resultProcessor.process();
resultProcessor.awaitCompletion();

QueryPage<Bucket> persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true));
Expand All @@ -466,7 +466,7 @@ public void testEndOfStreamTriggersPersisting() throws Exception {
.addRecords(secondSetOfRecords);
when(process.readAutodetectResults()).thenReturn(resultsBuilder.build().iterator());

resultProcessor.process(ActionListener.noop());
resultProcessor.process();
resultProcessor.awaitCompletion();

QueryPage<Bucket> persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ public void testFailOverBasics() throws Exception {
ensureStableCluster(2);
awaitJobOpenedAndAssigned(job.getId(), null);
assertRecentLastTaskStateChangeTime(MlTasks.jobTaskId(job.getId()), Duration.of(10, ChronoUnit.SECONDS), null);

closeJobs(job.getId());
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/82591")
Expand Down Expand Up @@ -166,6 +168,8 @@ public void testFailOverBasics_withDataFeeder() throws Exception {
assertEquals(1, statsResponse.getResponse().results().size());
assertEquals(DatafeedState.STARTED, statsResponse.getResponse().results().get(0).getDatafeedState());
});

closeJobs(job.getId());
}

public void testJobAutoClose() throws Exception {
Expand Down Expand Up @@ -205,6 +209,8 @@ public void testJobAutoClose() throws Exception {
assertEquals(3L, jobStats.getDataCounts().getProcessedRecordCount());
assertEquals(JobState.CLOSED, jobStats.getState());
});

closeJobs(job.getId());
}

public void testDedicatedMlNode() throws Exception {
Expand Down Expand Up @@ -259,6 +265,8 @@ public void testDedicatedMlNode() throws Exception {
// job should be re-opened:
assertJobTask(jobId, JobState.OPENED, true);
});

closeJobs(job.getId());
}

public void testMaxConcurrentJobAllocations() throws Exception {
Expand Down Expand Up @@ -304,13 +312,15 @@ public void testMaxConcurrentJobAllocations() throws Exception {

ensureYellow(); // at least the primary shards of the indices a job uses should be started
int numJobs = numMlNodes * 10;
String[] jobIds = new String[numJobs];
for (int i = 0; i < numJobs; i++) {
Job.Builder job = createJob(Integer.toString(i), ByteSizeValue.ofMb(2));
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();

OpenJobAction.Request openJobRequest = new OpenJobAction.Request(job.getId());
client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet();
jobIds[i] = job.getId();
}

assertBusy(checkAllJobsAreAssignedAndOpened(numJobs));
Expand Down Expand Up @@ -342,6 +352,8 @@ public void testMaxConcurrentJobAllocations() throws Exception {
assertBusy(checkAllJobsAreAssignedAndOpened(numJobs), 30, TimeUnit.SECONDS);

assertEquals("Expected no violations, but got [" + violations + "]", 0, violations.size());

closeJobs(jobIds);
}

// This test is designed to check that a job will not open when the .ml-state
Expand Down Expand Up @@ -444,6 +456,8 @@ public void testMlStateAndResultsIndicesNotAvailable() throws Exception {
ensureYellow(); // at least the primary shards of the indices a job uses should be started
client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet();
assertBusy(() -> assertJobTask(jobId, JobState.OPENED, true));

closeJobs(job.getId());
}

public void testCloseUnassignedLazyJobAndDatafeed() {
Expand Down Expand Up @@ -534,4 +548,14 @@ private CheckedRunnable<Exception> checkAllJobsAreAssignedAndOpened(int numJobs)
}
};
}

/*
* Utility method to close any jobs that remain open at the end of a test so that we don't leave unclosed resources out there.
*/
private void closeJobs(String... jobIds) {
for (String jobId : jobIds) {
CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(jobId);
client().execute(CloseJobAction.INSTANCE, closeJobRequest).actionGet();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ public void testCrudOnTwoJobsInSharedIndex() throws Exception {
.indices().length,
equalTo(1)
);

// We need to close the remaining open jobs to free up resources (otherwise the test will fail)
CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(jobId);
client().execute(CloseJobAction.INSTANCE, closeJobRequest).actionGet();
closeJobRequest = new CloseJobAction.Request(jobId2);
client().execute(CloseJobAction.INSTANCE, closeJobRequest).actionGet();
}

public void testForceCloseDoesNotCreateState() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
Expand Down Expand Up @@ -216,6 +217,10 @@ public void testDeleteDedicatedJobWithDataInShared() throws Exception {
.getTotalHits().value,
equalTo(0L)
);

// We need to close the remaining open job to free up resources (otherwise the test will fail)
CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(jobIdShared);
client().execute(CloseJobAction.INSTANCE, closeJobRequest).actionGet();
}

private void createBuckets(String jobId, int from, int count) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,8 @@ public void testJobRelocationIsMemoryAware() throws Exception {
assertEquals(1L, bigJobNodes.stream().distinct().count());
assertNotEquals(smallJobNodes, bigJobNodes);
});

closeJobs("small1", "small2", "small3", "big1");
}

public void testClusterWithTwoMlNodes_RunsDatafeed_GivenOriginalNodeGoesDown() throws Exception {
Expand Down Expand Up @@ -540,6 +542,8 @@ public void testClusterWithTwoMlNodes_RunsDatafeed_GivenOriginalNodeGoesDown() t
assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs));
assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L));
});

closeJobs(job.getId());
}

public void testClusterWithTwoMlNodes_StopsDatafeed_GivenJobFailsOnReassign() throws Exception {
Expand Down Expand Up @@ -747,6 +751,8 @@ private void run(String jobId, CheckedRunnable<Exception> disrupt) throws Except
long now2 = System.currentTimeMillis();
indexDocs(logger, "data", numDocs2, now2 + 5000, now2 + 6000);
waitForJobToHaveProcessedExactly(jobId, numDocs1 + numDocs2);

closeJobs(jobId);
}

// Get datacounts from index instead of via job stats api,
Expand Down Expand Up @@ -812,4 +818,14 @@ private void indexModelSnapshotFromCurrentJobStats(String jobId) throws IOExcept
JobUpdate jobUpdate = new JobUpdate.Builder(jobId).setModelSnapshotId(modelSnapshot.getSnapshotId()).build();
client().execute(UpdateJobAction.INSTANCE, new UpdateJobAction.Request(jobId, jobUpdate)).actionGet();
}

/*
* Utility method to close any jobs that remain open at the end of a test so that we don't leave unclosed resources out there.
*/
private void closeJobs(String... jobIds) {
for (String jobId : jobIds) {
CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(jobId);
client().execute(CloseJobAction.INSTANCE, closeJobRequest).actionGet();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -66,6 +67,11 @@ public class AutodetectCommunicator implements Closeable {
private final ExecutorService autodetectWorkerExecutor;
private final NamedXContentRegistry xContentRegistry;
private final boolean includeTokensField;
/*
* This is called in order to release any resources when this object dies, whether closed or killed. It is kept as a RunOnce in order
* to deal with race conditions where the object is killed and closed at the same time on different threads.
*/
private final RunOnce resourceReleaser;
private volatile CategorizationAnalyzer categorizationAnalyzer;
private volatile boolean processKilled;

Expand All @@ -77,7 +83,8 @@ public class AutodetectCommunicator implements Closeable {
AutodetectResultProcessor autodetectResultProcessor,
BiConsumer<Exception, Boolean> onFinishHandler,
NamedXContentRegistry xContentRegistry,
ExecutorService autodetectWorkerExecutor
ExecutorService autodetectWorkerExecutor,
Runnable resourceReleaser
) {
this.job = job;
this.autodetectProcess = process;
Expand All @@ -88,6 +95,7 @@ public class AutodetectCommunicator implements Closeable {
this.xContentRegistry = xContentRegistry;
this.autodetectWorkerExecutor = autodetectWorkerExecutor;
this.includeTokensField = job.getAnalysisConfig().getCategorizationFieldName() != null;
this.resourceReleaser = new RunOnce(resourceReleaser);
}

public void restoreState(ModelSnapshot modelSnapshot) {
Expand Down Expand Up @@ -191,7 +199,7 @@ public void close() {
throw FutureUtils.rethrowExecutionException(e);
}
} finally {
destroyCategorizationAnalyzer();
releaseResources();
}
}

Expand All @@ -214,10 +222,13 @@ public void killProcess(boolean awaitCompletion, boolean finish, boolean finaliz
}
}
} finally {
if (finish) {
onFinishHandler.accept(null, finalizeJob);
try {
if (finish) {
onFinishHandler.accept(null, finalizeJob);
}
} finally {
releaseResources();
}
destroyCategorizationAnalyzer();
}
}

Expand Down Expand Up @@ -360,12 +371,18 @@ public DataCounts getDataCounts() {
}

/**
* This method runs the resource-releasing runnable passed in to the constructor, as well as closes the categoryAnalyzer.
* Care must be taken to ensure this method is not called while data is being posted.
* The methods in this class that call it wait for all processing to complete first.
* The expectation is that external calls are only made when cleaning up after a fatal
* error.
*/
void destroyCategorizationAnalyzer() {
void releaseResources() {
resourceReleaser.run();
destroyCategorizationAnalyzer();
}

private void destroyCategorizationAnalyzer() {
if (categorizationAnalyzer != null) {
categorizationAnalyzer.close();
categorizationAnalyzer = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ public boolean isNodeDying() {
* Jobs that are already closing continue to close.
*/
public synchronized void vacateOpenJobsOnThisNode() {

for (ProcessContext processContext : processByAllocation.values()) {

// We ignore jobs that either don't have a running process yet or already closing.
Expand Down Expand Up @@ -795,7 +794,7 @@ AutodetectCommunicator create(JobTask jobTask, Job job, AutodetectParams autodet
ExecutorService autodetectWorkerExecutor;
try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) {
autodetectWorkerExecutor = createAutodetectExecutorService(autodetectExecutorService);
autodetectExecutorService.submit(() -> { processor.process(ActionListener.releasing(jobRenormalizedResultsPersister)); });
autodetectExecutorService.submit(() -> processor.process());
} catch (EsRejectedExecutionException e) {
// If submitting the operation to read the results from the process fails we need to close
// the process too, so that other submitted operations to threadpool are stopped.
Expand All @@ -814,7 +813,8 @@ AutodetectCommunicator create(JobTask jobTask, Job job, AutodetectParams autodet
processor,
handler,
xContentRegistry,
autodetectWorkerExecutor
autodetectWorkerExecutor,
jobRenormalizedResultsPersister::close
);
}

Expand Down Expand Up @@ -852,7 +852,7 @@ private Consumer<String> onProcessCrash(JobTask jobTask) {
if (processContext != null) {
AutodetectCommunicator communicator = processContext.getAutodetectCommunicator();
if (communicator != null) {
communicator.destroyCategorizationAnalyzer();
communicator.releaseResources();
}
}
setJobState(jobTask, JobState.FAILED, reason);
Expand Down
Loading

0 comments on commit 77e55da

Please sign in to comment.