diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java index 352a5ab3139..bbdae9e10fb 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java @@ -1050,8 +1050,9 @@ public static Table consumeToTable( } }; - consume(kafkaProperties, topic, partitionFilter, partitionToInitialOffset, keySpec, valueSpec, - StreamConsumerRegistrarProvider.single(registrar)); + consume(kafkaProperties, topic, partitionFilter, + new KafkaIngester.IntToLongLookupAdapter(partitionToInitialOffset), keySpec, valueSpec, + StreamConsumerRegistrarProvider.single(registrar), null); return resultHolder.getValue(); } @@ -1111,8 +1112,9 @@ public static PartitionedTable consumeToPartitionedTable( } }; - consume(kafkaProperties, topic, partitionFilter, partitionToInitialOffset, keySpec, valueSpec, - StreamConsumerRegistrarProvider.perPartition(registrar)); + consume(kafkaProperties, topic, partitionFilter, + new KafkaIngester.IntToLongLookupAdapter(partitionToInitialOffset), keySpec, valueSpec, + StreamConsumerRegistrarProvider.perPartition(registrar), null); return resultHolder.get(); } @@ -1249,7 +1251,7 @@ public Function visit(@NotNull final PerPar } /** - * Consume from Kafka to a {@link StreamConsumer} supplied by {@code streamConsumerRegistrar}. + * Consume from Kafka to {@link StreamConsumer stream consumers} supplied by {@code streamConsumerRegistrar}. * * @param kafkaProperties Properties to configure this table and also to be passed to create the KafkaConsumer * @param topic Kafka topic name @@ -1265,15 +1267,17 @@ public Function visit(@NotNull final PerPar * {@link TableDefinition}. See {@link StreamConsumerRegistrarProvider#single(SingleConsumerRegistrar) * single} and {@link StreamConsumerRegistrarProvider#perPartition(PerPartitionConsumerRegistrar) * per-partition}. + * @param consumerLoopCallback callback to inject logic into the ingester's consumer loop */ public static void consume( @NotNull final Properties kafkaProperties, @NotNull final String topic, @NotNull final IntPredicate partitionFilter, - @NotNull final IntToLongFunction partitionToInitialOffset, + @NotNull final KafkaIngester.InitialOffsetLookup partitionToInitialOffset, @NotNull final Consume.KeyOrValueSpec keySpec, @NotNull final Consume.KeyOrValueSpec valueSpec, - @NotNull final KafkaTools.StreamConsumerRegistrarProvider streamConsumerRegistrarProvider) { + @NotNull final KafkaTools.StreamConsumerRegistrarProvider streamConsumerRegistrarProvider, + @Nullable final KafkaIngester.ConsumerLoopCallback consumerLoopCallback) { final boolean ignoreKey = keySpec.dataFormat() == DataFormat.IGNORE; final boolean ignoreValue = valueSpec.dataFormat() == DataFormat.IGNORE; if (ignoreKey && ignoreValue) { @@ -1349,7 +1353,8 @@ public static void consume( topic, partitionFilter, kafkaRecordConsumerFactory, - partitionToInitialOffset); + partitionToInitialOffset, + consumerLoopCallback); kafkaIngesterHolder.setValue(ingester); ingester.start(); } diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java b/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java index 8208b17a10f..8fccac314ad 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java @@ -15,6 +15,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import java.text.DecimalFormat; import java.time.Duration; @@ -50,6 +51,9 @@ public int getIntKey(@NotNull final TopicPartition topicPartition) { } }); + @Nullable + private final ConsumerLoopCallback consumerLoopCallback; + private long messagesProcessed = 0; private long bytesProcessed = 0; private long pollCalls = 0; @@ -61,6 +65,28 @@ public int getIntKey(@NotNull final TopicPartition topicPartition) { private volatile boolean needsAssignment; private volatile boolean done; + /** + * A callback which is invoked from the consumer loop, enabling clients to inject logic to be invoked by the Kafka + * consumer thread. + */ + public interface ConsumerLoopCallback { + /** + * Called before the consumer is polled for records. + * + * @param consumer the KafkaConsumer that will be polled for records + */ + void beforePoll(KafkaConsumer consumer); + + /** + * Called after the consumer is polled for records and they have been published to the downstream + * KafkaRecordConsumer. + * + * @param consumer the KafkaConsumer that has been polled for records + * @param more true if more records should be read, false if the consumer should be shut down due to error + */ + void afterPoll(KafkaConsumer consumer, boolean more); + } + /** * Constant predicate that returns true for all partitions. This is the default, each and every partition that * exists will be handled by the same ingester. Because Kafka consumers are inherently single threaded, to scale @@ -176,6 +202,56 @@ public KafkaIngester( this(log, props, topic, ALL_PARTITIONS, partitionToStreamConsumer, partitionToInitialSeekOffset); } + /** + * Creates a Kafka ingester for the given topic. + * + * @param log A log for output + * @param props The properties used to create the {@link KafkaConsumer} + * @param topic The topic to replicate + * @param partitionFilter A predicate indicating which partitions we should replicate + * @param partitionToStreamConsumer A function implementing a mapping from partition to its consumer of records. The + * function will be invoked once per partition at construction; implementations should internally defer + * resource allocation until first call to {@link KafkaRecordConsumer#consume(List)} or + * {@link KafkaRecordConsumer#acceptFailure(Throwable)} if appropriate. + * @param partitionToInitialSeekOffset A function implementing a mapping from partition to its initial seek offset, + * or -1 if seek to beginning is intended. + */ + public KafkaIngester( + @NotNull final Logger log, + @NotNull final Properties props, + @NotNull final String topic, + @NotNull final IntPredicate partitionFilter, + @NotNull final Function partitionToStreamConsumer, + @NotNull final IntToLongFunction partitionToInitialSeekOffset) { + this(log, props, topic, partitionFilter, partitionToStreamConsumer, + new IntToLongLookupAdapter(partitionToInitialSeekOffset), null); + } + + /** + * Determines the initial offset to seek to for a given KafkaConsumer and TopicPartition. + */ + @FunctionalInterface + public interface InitialOffsetLookup { + long getInitialOffset(KafkaConsumer consumer, TopicPartition topicPartition); + } + + /** + * Adapts an IntToLongFunction to a PartitionToInitialOffsetFunction by ignoring the topic and consumer parameters. + */ + + public static class IntToLongLookupAdapter implements InitialOffsetLookup { + private final IntToLongFunction function; + + public IntToLongLookupAdapter(IntToLongFunction function) { + this.function = function; + } + + @Override + public long getInitialOffset(final KafkaConsumer consumer, final TopicPartition topicPartition) { + return function.applyAsLong(topicPartition.partition()); + } + } + public static long SEEK_TO_BEGINNING = -1; public static long DONT_SEEK = -2; public static long SEEK_TO_END = -3; @@ -204,12 +280,14 @@ public KafkaIngester( @NotNull final String topic, @NotNull final IntPredicate partitionFilter, @NotNull final Function partitionToStreamConsumer, - @NotNull final IntToLongFunction partitionToInitialSeekOffset) { + @NotNull final KafkaIngester.InitialOffsetLookup partitionToInitialSeekOffset, + @Nullable final ConsumerLoopCallback consumerLoopCallback) { this.log = log; this.topic = topic; partitionDescription = partitionFilter.toString(); logPrefix = KafkaIngester.class.getSimpleName() + "(" + topic + ", " + partitionDescription + "): "; kafkaConsumer = new KafkaConsumer(props); + this.consumerLoopCallback = consumerLoopCallback; kafkaConsumer.partitionsFor(topic).stream().filter(pi -> partitionFilter.test(pi.partition())) .map(pi -> new TopicPartition(topic, pi.partition())) @@ -220,7 +298,8 @@ public KafkaIngester( assign(); for (final TopicPartition topicPartition : assignedPartitions) { - final long seekOffset = partitionToInitialSeekOffset.applyAsLong(topicPartition.partition()); + final long seekOffset = + partitionToInitialSeekOffset.getInitialOffset(kafkaConsumer, topicPartition); if (seekOffset == SEEK_TO_BEGINNING) { log.info().append(logPrefix).append(topicPartition.toString()).append(" seeking to beginning.") .append(seekOffset).endl(); @@ -281,7 +360,31 @@ private void consumerLoop() { } final long beforePoll = System.nanoTime(); final long remainingNanos = beforePoll > nextReport ? 0 : (nextReport - beforePoll); - boolean more = pollOnce(Duration.ofNanos(remainingNanos)); + + boolean more = true; + if (consumerLoopCallback != null) { + try { + consumerLoopCallback.beforePoll(kafkaConsumer); + } catch (Exception e) { + log.error().append(logPrefix).append("Exception while executing beforePoll callback:").append(e) + .append(", aborting.").endl(); + notifyAllConsumersOnFailure(e); + more = false; + } + } + if (more) { + more = pollOnce(Duration.ofNanos(remainingNanos)); + if (consumerLoopCallback != null) { + try { + consumerLoopCallback.afterPoll(kafkaConsumer, more); + } catch (Exception e) { + log.error().append(logPrefix).append("Exception while executing afterPoll callback:").append(e) + .append(", aborting.").endl(); + notifyAllConsumersOnFailure(e); + more = false; + } + } + } if (!more) { log.error().append(logPrefix) .append("Stopping due to errors (").append(messagesWithErr) @@ -331,13 +434,7 @@ private boolean pollOnce(final Duration timeout) { } catch (Exception ex) { log.error().append(logPrefix).append("Exception while polling for Kafka messages:").append(ex) .append(", aborting.").endl(); - final KafkaRecordConsumer[] allConsumers; - synchronized (streamConsumers) { - allConsumers = streamConsumers.valueCollection().toArray(KafkaRecordConsumer[]::new); - } - for (final KafkaRecordConsumer streamConsumer : allConsumers) { - streamConsumer.acceptFailure(ex); - } + notifyAllConsumersOnFailure(ex); return false; } @@ -381,6 +478,16 @@ private boolean pollOnce(final Duration timeout) { return true; } + private void notifyAllConsumersOnFailure(Exception ex) { + final KafkaRecordConsumer[] allConsumers; + synchronized (streamConsumers) { + allConsumers = streamConsumers.valueCollection().toArray(KafkaRecordConsumer[]::new); + } + for (final KafkaRecordConsumer streamConsumer : allConsumers) { + streamConsumer.acceptFailure(ex); + } + } + public void shutdown() { if (done) { return;