Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid leaving behind a failed consumer. #1326

Merged
merged 2 commits into from
Sep 21, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions Kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ private void consumerLoop() {
final long beforePoll = System.nanoTime();
final long nextReport = lastReportNanos + reportIntervalNanos;
final long remainingNanos = beforePoll > nextReport ? 0 : (nextReport - beforePoll);
boolean noMore = pollOnce(Duration.ofNanos(remainingNanos));
if (noMore) {
boolean more = pollOnce(Duration.ofNanos(remainingNanos));
if (!more) {
log.error().append(logPrefix)
.append("Stopping due to errors (").append(messagesWithErr)
.append(" messages with error out of ").append(messagesProcessed).append(" messages processed)")
Expand Down Expand Up @@ -308,15 +308,15 @@ private void consumerLoop() {
/**
*
* @param timeout
* @return true if we should abort the consumer thread.
* @return True if we should continue processing messages; false if we should abort the consumer thread.
*/
private boolean pollOnce(final Duration timeout) {
final ConsumerRecords<?, ?> records;
try {
records = consumer.poll(timeout);
} catch (WakeupException we) {
// we interpret this as a signal to stop.
return false;
// we interpret a wakeup as a signal to stop /this/ poll.
return true;
} catch (Exception ex) {
log.error().append(logPrefix).append("Exception while polling for Kafka messages:").append(ex)
.append(", aborting.");
Expand Down Expand Up @@ -346,12 +346,12 @@ private boolean pollOnce(final Duration timeout) {
consumer.acceptFailure(ex);
log.error().append(logPrefix)
.append("Max number of errors exceeded, aborting " + this + " consumer thread.");
return true;
return false;
}
continue;
}
messagesProcessed += partitionRecords.size();
}
return false;
return true;
}
}