Skip to content

Commit

Permalink
[ML] Fix thread leak when waiting for job flush (elastic#32196)
Browse files Browse the repository at this point in the history
  • Loading branch information
benwtrent committed Aug 1, 2018
1 parent 0cae19c commit aa9a566
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ public void persistJob(BiConsumer<Void, Exception> handler) {
}

@Nullable
FlushAcknowledgement waitFlushToCompletion(String flushId) {
FlushAcknowledgement waitFlushToCompletion(String flushId) throws InterruptedException {
LOGGER.debug("[{}] waiting for flush", job.getId());

FlushAcknowledgement flushAcknowledgement;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ public void awaitCompletion() throws TimeoutException {
* @return The {@link FlushAcknowledgement} if the flush has completed or the parsing finished; {@code null} if the timeout expired
*/
@Nullable
public FlushAcknowledgement waitForFlushAcknowledgement(String flushId, Duration timeout) {
public FlushAcknowledgement waitForFlushAcknowledgement(String flushId, Duration timeout) throws InterruptedException {
return failed ? null : flushListener.waitForFlush(flushId, timeout);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,14 @@ class FlushListener {
final AtomicBoolean cleared = new AtomicBoolean(false);

@Nullable
FlushAcknowledgement waitForFlush(String flushId, Duration timeout) {
FlushAcknowledgement waitForFlush(String flushId, Duration timeout) throws InterruptedException {
if (cleared.get()) {
return null;
}

FlushAcknowledgementHolder holder = awaitingFlushed.computeIfAbsent(flushId, (key) -> new FlushAcknowledgementHolder(flushId));
try {
if (holder.latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
return holder.flushAcknowledgement;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (holder.latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
return holder.flushAcknowledgement;
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void testWriteUpdateProcessMessage() throws IOException {
verifyNoMoreInteractions(process);
}

public void testFlushJob() throws IOException {
public void testFlushJob() throws IOException, InterruptedException {
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
when(process.isProcessAlive()).thenReturn(true);
AutoDetectResultProcessor processor = mock(AutoDetectResultProcessor.class);
Expand All @@ -123,7 +123,7 @@ public void testFlushJob() throws IOException {
}
}

public void testWaitForFlushReturnsIfParserFails() throws IOException {
public void testWaitForFlushReturnsIfParserFails() throws IOException, InterruptedException {
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
when(process.isProcessAlive()).thenReturn(true);
AutoDetectResultProcessor processor = mock(AutoDetectResultProcessor.class);
Expand All @@ -144,7 +144,7 @@ public void testFlushJob_throwsIfProcessIsDead() throws IOException {
assertEquals("[foo] Unexpected death of autodetect: Mock process is dead", holder[0].getMessage());
}

public void testFlushJob_givenFlushWaitReturnsTrueOnSecondCall() throws IOException {
public void testFlushJob_givenFlushWaitReturnsTrueOnSecondCall() throws IOException, InterruptedException {
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
when(process.isProcessAlive()).thenReturn(true);
AutoDetectResultProcessor autoDetectResultProcessor = Mockito.mock(AutoDetectResultProcessor.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ public void testPersisterThrowingDoesntBlockProcessing() {
verify(persister, times(2)).persistModelSnapshot(any(), eq(WriteRequest.RefreshPolicy.IMMEDIATE));
}

public void testParsingErrorSetsFailed() {
public void testParsingErrorSetsFailed() throws InterruptedException {
@SuppressWarnings("unchecked")
Iterator<AutodetectResult> iterator = mock(Iterator.class);
when(iterator.hasNext()).thenThrow(new ElasticsearchParseException("this test throws"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@ public void testAcknowledgeFlush() throws Exception {
FlushListener listener = new FlushListener();
AtomicReference<FlushAcknowledgement> flushAcknowledgementHolder = new AtomicReference<>();
new Thread(() -> {
FlushAcknowledgement flushAcknowledgement = listener.waitForFlush("_id", Duration.ofMillis(10000));
flushAcknowledgementHolder.set(flushAcknowledgement);
try {
FlushAcknowledgement flushAcknowledgement = listener.waitForFlush("_id", Duration.ofMillis(10000));
flushAcknowledgementHolder.set(flushAcknowledgement);
} catch (InterruptedException _ex) {
Thread.currentThread().interrupt();
}
}).start();
assertBusy(() -> assertTrue(listener.awaitingFlushed.containsKey("_id")));
assertNull(flushAcknowledgementHolder.get());
Expand All @@ -46,8 +50,12 @@ public void testClear() throws Exception {
AtomicReference<FlushAcknowledgement> flushAcknowledgementHolder = new AtomicReference<>();
flushAcknowledgementHolders.add(flushAcknowledgementHolder);
new Thread(() -> {
FlushAcknowledgement flushAcknowledgement = listener.waitForFlush(String.valueOf(id), Duration.ofMillis(10000));
flushAcknowledgementHolder.set(flushAcknowledgement);
try {
FlushAcknowledgement flushAcknowledgement = listener.waitForFlush(String.valueOf(id), Duration.ofMillis(10000));
flushAcknowledgementHolder.set(flushAcknowledgement);
} catch (InterruptedException _ex) {
Thread.currentThread().interrupt();
}
}).start();
}
assertBusy(() -> assertEquals(numWaits, listener.awaitingFlushed.size()));
Expand Down

0 comments on commit aa9a566

Please sign in to comment.