Skip to content

Commit

Permalink
Merge pull request #2 from seznam/pete/clean-flink-source-exit
Browse files Browse the repository at this point in the history
#! [euphoria-kafka] Properly quit read loop when interrupted
  • Loading branch information
vanekjar authored Jan 27, 2017
2 parents 0c7154b + cb0740e commit 227403c
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
Expand Down Expand Up @@ -69,33 +70,26 @@ public void run(SourceContext<StreamingWindowedElement<Batch.BatchWindow, T>> ct
}

if (openReaders.size() == 1) {
// ~ execute the reader in a separate thread, such that
// we can safely quit without being blocked by the reader
// ~ here we specialize on the single reader scenario to
// avoid needlessly synchronizing on the ctx as a lock
Reader<T> reader = openReaders.get(0);
executor = createThreadPool();
Future<?> task = executor.submit(() -> {
try (Reader<T> reader = openReaders.get(0)) {
while (isRunning && reader.hasNext()) {
ctx.collect(toWindowedElement(reader.next()));
}
});
// ~ wait for the reader to finish or quit if request
try {
task.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else {
// start a new thread for each reader
executor = createThreadPool();
Deque<Future> tasks = new ArrayDeque<>();
for (Reader<T> reader : openReaders) {
tasks.add(executor.submit(() -> {
while (reader.hasNext()) {
synchronized (ctx) {
ctx.collect(toWindowedElement(reader.next()));
try {
while (reader.hasNext()) {
synchronized (ctx) {
ctx.collect(toWindowedElement(reader.next()));
}
}
return null;
} finally {
reader.close();
}
}));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ static final class ConsumerReader
@Override
protected Pair<byte[], byte[]> computeNext() {
while (next == null || !next.hasNext()) {
if (Thread.currentThread().isInterrupted()) {
LOG.info("Terminating polling on topic due to thread interruption");
endOfData();
return null;
}

commitIfNeeded();
ConsumerRecords<byte[], byte[]> polled = c.poll(500);
next = polled.iterator();
Expand Down

0 comments on commit 227403c

Please sign in to comment.