From e4f8449dd856ad43d67b6d118b65b2969bddf51e Mon Sep 17 00:00:00 2001 From: Cristian Ferretti Date: Tue, 21 Sep 2021 14:00:32 -0400 Subject: [PATCH 1/2] Avoid leaving behind a failed consumer. --- .../src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java b/Kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java index c04c08ea4bf..cd3e4441797 100644 --- a/Kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java +++ b/Kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java @@ -320,7 +320,7 @@ private boolean pollOnce(final Duration timeout) { } catch (Exception ex) { log.error().append(logPrefix).append("Exception while polling for Kafka messages:").append(ex) .append(", aborting."); - return false; + return true; } for (final TopicPartition topicPartition : records.partitions()) { From ff30a0677cd845834e3f6c8c4bdf2a8723b7725f Mon Sep 17 00:00:00 2001 From: Cristian Ferretti Date: Tue, 21 Sep 2021 14:19:07 -0400 Subject: [PATCH 2/2] Improved readability. --- .../io/deephaven/kafka/ingest/KafkaIngester.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/Kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java b/Kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java index cd3e4441797..f8bff785a42 100644 --- a/Kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java +++ b/Kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java @@ -279,8 +279,8 @@ private void consumerLoop() { final long beforePoll = System.nanoTime(); final long nextReport = lastReportNanos + reportIntervalNanos; final long remainingNanos = beforePoll > nextReport ? 0 : (nextReport - beforePoll); - boolean noMore = pollOnce(Duration.ofNanos(remainingNanos)); - if (noMore) { + boolean more = pollOnce(Duration.ofNanos(remainingNanos)); + if (!more) { log.error().append(logPrefix) .append("Stopping due to errors (").append(messagesWithErr) .append(" messages with error out of ").append(messagesProcessed).append(" messages processed)") @@ -308,19 +308,19 @@ private void consumerLoop() { /** * * @param timeout - * @return true if we should abort the consumer thread. + * @return True if we should continue processing messages; false if we should abort the consumer thread. */ private boolean pollOnce(final Duration timeout) { final ConsumerRecords records; try { records = consumer.poll(timeout); } catch (WakeupException we) { - // we interpret this as a signal to stop. - return false; + // we interpret a wakeup as a signal to stop /this/ poll. + return true; } catch (Exception ex) { log.error().append(logPrefix).append("Exception while polling for Kafka messages:").append(ex) .append(", aborting."); - return true; + return false; } for (final TopicPartition topicPartition : records.partitions()) { @@ -346,12 +346,12 @@ private boolean pollOnce(final Duration timeout) { consumer.acceptFailure(ex); log.error().append(logPrefix) .append("Max number of errors exceeded, aborting " + this + " consumer thread."); - return true; + return false; } continue; } messagesProcessed += partitionRecords.size(); } - return false; + return true; } }