From cb0740e538c4c85d23e7174f2984bf93bc091325 Mon Sep 17 00:00:00 2001 From: "Novotnik, Petr" Date: Fri, 27 Jan 2017 13:37:06 +0100 Subject: [PATCH] #! [euphoria-kafka] Properly quit read loop when interrupted --- .../flink/streaming/io/DataSourceWrapper.java | 26 +++++++------------ .../cz/seznam/euphoria/kafka/KafkaSource.java | 6 +++++ 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/io/DataSourceWrapper.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/io/DataSourceWrapper.java index 06c7b62e2ddf3..9b888667d920b 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/io/DataSourceWrapper.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/io/DataSourceWrapper.java @@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Deque; @@ -69,22 +70,10 @@ public void run(SourceContext> ct } if (openReaders.size() == 1) { - // ~ execute the reader in a separate thread, such that - // we can safely quit without being blocked by the reader - // ~ here we specialize on the single reader scenario to - // avoid needlessly synchronizing on the ctx as a lock - Reader reader = openReaders.get(0); - executor = createThreadPool(); - Future task = executor.submit(() -> { + try (Reader reader = openReaders.get(0)) { while (isRunning && reader.hasNext()) { ctx.collect(toWindowedElement(reader.next())); } - }); - // ~ wait for the reader to finish or quit if request - try { - task.get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); } } else { // start a new thread for each reader @@ -92,10 +81,15 @@ public void run(SourceContext> ct Deque tasks = new ArrayDeque<>(); for (Reader reader : openReaders) { tasks.add(executor.submit(() -> { - while (reader.hasNext()) { - synchronized (ctx) { - ctx.collect(toWindowedElement(reader.next())); + try { + while (reader.hasNext()) { + synchronized (ctx) { + ctx.collect(toWindowedElement(reader.next())); + } } + return null; + } finally { + reader.close(); } })); } diff --git a/sdks/java/extensions/euphoria/euphoria-kafka/src/main/java/cz/seznam/euphoria/kafka/KafkaSource.java b/sdks/java/extensions/euphoria/euphoria-kafka/src/main/java/cz/seznam/euphoria/kafka/KafkaSource.java index a4a985223e571..4618e6e5a3c57 100644 --- a/sdks/java/extensions/euphoria/euphoria-kafka/src/main/java/cz/seznam/euphoria/kafka/KafkaSource.java +++ b/sdks/java/extensions/euphoria/euphoria-kafka/src/main/java/cz/seznam/euphoria/kafka/KafkaSource.java @@ -74,6 +74,12 @@ static final class ConsumerReader @Override protected Pair computeNext() { while (next == null || !next.hasNext()) { + if (Thread.currentThread().isInterrupted()) { + LOG.info("Terminating polling on topic due to thread interruption"); + endOfData(); + return null; + } + commitIfNeeded(); ConsumerRecords polled = c.poll(500); next = polled.iterator();