Skip to content

Commit

Permalink
Fix Kafka failure propagation on poll error (deephaven#4247)
Browse files Browse the repository at this point in the history
  • Loading branch information
rcaudy committed Jul 31, 2023
1 parent f9e0169 commit 779787b
Showing 1 changed file with 8 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public KafkaIngester(

/**
* Creates a Kafka ingester for the given topic.
*
*
* @param log A log for output
* @param props The properties used to create the {@link KafkaConsumer}
* @param topic The topic to replicate
Expand Down Expand Up @@ -331,6 +331,13 @@ private boolean pollOnce(final Duration timeout) {
} catch (Exception ex) {
log.error().append(logPrefix).append("Exception while polling for Kafka messages:").append(ex)
.append(", aborting.").endl();
final KafkaRecordConsumer[] allConsumers;
synchronized (streamConsumers) {
allConsumers = streamConsumers.valueCollection().toArray(KafkaRecordConsumer[]::new);
}
for (final KafkaRecordConsumer streamConsumer : allConsumers) {
streamConsumer.acceptFailure(ex);
}
return false;
}

Expand Down

0 comments on commit 779787b

Please sign in to comment.