From 69ddbd9dfba71366137a78ffd549de0988b56e8b Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Sat, 29 Jul 2023 06:46:14 -0400 Subject: [PATCH 01/11] Add callbacks to KafkaIngester consumer loop. --- .../java/io/deephaven/kafka/KafkaTools.java | 5 ++- .../deephaven/kafka/ingest/KafkaIngester.java | 44 ++++++++++++++++++- 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java index de00dfa1771..cc68806b8d6 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java @@ -1603,8 +1603,9 @@ public Function visit(@NotNull final PerPar * {@link TableDefinition}. See {@link StreamConsumerRegistrarProvider#single(SingleConsumerRegistrar) * single} and {@link StreamConsumerRegistrarProvider#perPartition(PerPartitionConsumerRegistrar) * per-partition}. + * @return the newly created KafkaIngester */ - public static void consume( + public static KafkaIngester consume( @NotNull final Properties kafkaProperties, @NotNull final String topic, @NotNull final IntPredicate partitionFilter, @@ -1690,6 +1691,8 @@ public static void consume( partitionToInitialOffset); kafkaIngesterHolder.setValue(ingester); ingester.start(); + + return ingester; } private static KeyOrValueSerializer getAvroSerializer( diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java b/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java index f0d42c40a29..2f62db9a492 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java @@ -18,9 +18,11 @@ import java.text.DecimalFormat; import java.time.Duration; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Properties; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Function; import java.util.function.IntPredicate; import java.util.function.IntToLongFunction; @@ -50,6 +52,8 @@ public int getIntKey(@NotNull final TopicPartition topicPartition) { } }); + private final Collection consumerLoopCallbacks = new CopyOnWriteArrayList<>(); + private long messagesProcessed = 0; private long bytesProcessed = 0; private long pollCalls = 0; @@ -61,6 +65,26 @@ public int getIntKey(@NotNull final TopicPartition topicPartition) { private volatile boolean needsAssignment; private volatile boolean done; + /** + * A callback which is invoked from the consumer loop, enabling clients to inject logic on the Kafka consumer. + */ + public interface ConsumerLoopCallback { + /** + * Called before the consumer is polled for records. + * + * @param consumer the KafkaConsumer that will be polled for records + */ + void beforePoll(KafkaConsumer consumer); + + /** + * Called after the consumer is polled for records and they have been published to the StreamConsumer. + * + * @param consumer the KafkaConsumer that has been polled for records + * @param more true if more records should be read, false if the consumer will be shut down due to error + */ + void afterPoll(KafkaConsumer consumer, boolean more); + } + /** * Constant predicate that returns true for all partitions. This is the default, each and every partition that * exists will be handled by the same ingester. Because Kafka consumers are inherently single threaded, to scale @@ -281,7 +305,9 @@ private void consumerLoop() { } final long beforePoll = System.nanoTime(); final long remainingNanos = beforePoll > nextReport ? 0 : (nextReport - beforePoll); - boolean more = pollOnce(Duration.ofNanos(remainingNanos)); + consumerLoopCallbacks.forEach(x -> x.beforePoll(kafkaConsumer)); + final boolean more = pollOnce(Duration.ofNanos(remainingNanos)); + consumerLoopCallbacks.forEach(x -> x.afterPoll(kafkaConsumer, more)); if (!more) { log.error().append(logPrefix) .append("Stopping due to errors (").append(messagesWithErr) @@ -404,4 +430,20 @@ public void shutdownPartition(final int partition) { } kafkaConsumer.wakeup(); } + + /** + * Add a callback for the consumer loop. + * @param callback a callback that allows subscribers to inject logic into the consumer loop. + */ + public void addConsumerLoopCallback(@NotNull final ConsumerLoopCallback callback) { + consumerLoopCallbacks.add(callback); + } + + /** + * Remove a callback from the consumer loop. + * @param callback a previously added callback. + */ + public void removeConsumerLoopCallback(@NotNull final ConsumerLoopCallback callback) { + consumerLoopCallbacks.remove(callback); + } } From 4167f3fba4aa2a3a0b26403b281c5a9732b3039f Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Sat, 29 Jul 2023 07:02:19 -0400 Subject: [PATCH 02/11] spotless --- .../src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java b/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java index 2f62db9a492..c59ac2c81ba 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java @@ -433,6 +433,7 @@ public void shutdownPartition(final int partition) { /** * Add a callback for the consumer loop. + * * @param callback a callback that allows subscribers to inject logic into the consumer loop. */ public void addConsumerLoopCallback(@NotNull final ConsumerLoopCallback callback) { @@ -441,6 +442,7 @@ public void addConsumerLoopCallback(@NotNull final ConsumerLoopCallback callback /** * Remove a callback from the consumer loop. + * * @param callback a previously added callback. */ public void removeConsumerLoopCallback(@NotNull final ConsumerLoopCallback callback) { From d030e5c6fb5118e73accb365bea1aefc2328bc40 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Sat, 29 Jul 2023 08:23:23 -0400 Subject: [PATCH 03/11] Expose consumer more directly, allow caller to control start of ingester. --- .../kafka/src/main/java/io/deephaven/kafka/KafkaTools.java | 7 +++---- .../main/java/io/deephaven/kafka/ingest/KafkaIngester.java | 7 +++++++ 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java index cc68806b8d6..edac665662c 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java @@ -1389,7 +1389,7 @@ public static Table consumeToTable( }; consume(kafkaProperties, topic, partitionFilter, partitionToInitialOffset, keySpec, valueSpec, - StreamConsumerRegistrarProvider.single(registrar)); + StreamConsumerRegistrarProvider.single(registrar)).start(); return resultHolder.getValue(); } @@ -1450,7 +1450,7 @@ public static PartitionedTable consumeToPartitionedTable( }; consume(kafkaProperties, topic, partitionFilter, partitionToInitialOffset, keySpec, valueSpec, - StreamConsumerRegistrarProvider.perPartition(registrar)); + StreamConsumerRegistrarProvider.perPartition(registrar)).start(); return resultHolder.get(); } @@ -1603,7 +1603,7 @@ public Function visit(@NotNull final PerPar * {@link TableDefinition}. See {@link StreamConsumerRegistrarProvider#single(SingleConsumerRegistrar) * single} and {@link StreamConsumerRegistrarProvider#perPartition(PerPartitionConsumerRegistrar) * per-partition}. - * @return the newly created KafkaIngester + * @return the newly created KafkaIngester, the caller must call start on the returned object to begin processing messages */ public static KafkaIngester consume( @NotNull final Properties kafkaProperties, @@ -1690,7 +1690,6 @@ public static KafkaIngester consume( kafkaRecordConsumerFactory, partitionToInitialOffset); kafkaIngesterHolder.setValue(ingester); - ingester.start(); return ingester; } diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java b/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java index c59ac2c81ba..d69f68027bd 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java @@ -431,6 +431,13 @@ public void shutdownPartition(final int partition) { kafkaConsumer.wakeup(); } + /** + * Retrieve the underlying KafkaConsumer used for this ingester. + */ + public KafkaConsumer getKafkaConsumer() { + return kafkaConsumer; + } + /** * Add a callback for the consumer loop. * From 8ca9720fbd4ca9848b9ef57996f5e953262a64dd Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Sat, 29 Jul 2023 17:49:12 -0400 Subject: [PATCH 04/11] More directly get the consumer and topic partition while determining offsets for Kafka aware clients. --- .../java/io/deephaven/kafka/KafkaTools.java | 36 ++++++++++- .../deephaven/kafka/ingest/KafkaIngester.java | 61 ++++++++++++++++--- 2 files changed, 87 insertions(+), 10 deletions(-) diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java index edac665662c..53dea39d197 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java @@ -1603,7 +1603,8 @@ public Function visit(@NotNull final PerPar * {@link TableDefinition}. See {@link StreamConsumerRegistrarProvider#single(SingleConsumerRegistrar) * single} and {@link StreamConsumerRegistrarProvider#perPartition(PerPartitionConsumerRegistrar) * per-partition}. - * @return the newly created KafkaIngester, the caller must call start on the returned object to begin processing messages + * @return the newly created KafkaIngester, the caller must call start on the returned object to begin processing + * messages */ public static KafkaIngester consume( @NotNull final Properties kafkaProperties, @@ -1613,6 +1614,39 @@ public static KafkaIngester consume( @NotNull final Consume.KeyOrValueSpec keySpec, @NotNull final Consume.KeyOrValueSpec valueSpec, @NotNull final KafkaTools.StreamConsumerRegistrarProvider streamConsumerRegistrarProvider) { + return consume(kafkaProperties, topic, partitionFilter, + new KafkaIngester.IntToLongFunctionAdapter(partitionToInitialOffset), keySpec, valueSpec, + streamConsumerRegistrarProvider); + } + + /** + * Consume from Kafka to a {@link StreamConsumer} supplied by {@code streamConsumerRegistrar}. + * + * @param kafkaProperties Properties to configure this table and also to be passed to create the KafkaConsumer + * @param topic Kafka topic name + * @param partitionFilter A predicate returning true for the partitions to consume. The convenience constant + * {@code ALL_PARTITIONS} is defined to facilitate requesting all partitions. + * @param partitionToInitialOffset A function specifying the desired initial offset for each partition consumed + * @param keySpec Conversion specification for Kafka record keys + * @param valueSpec Conversion specification for Kafka record values + * @param streamConsumerRegistrarProvider A provider for a function to + * {@link StreamPublisher#register(StreamConsumer) register} {@link StreamConsumer} instances. The registered + * stream consumers must accept {@link ChunkType chunk types} that correspond to + * {@link StreamChunkUtils#chunkTypeForColumnIndex(TableDefinition, int)} for the supplied + * {@link TableDefinition}. See {@link StreamConsumerRegistrarProvider#single(SingleConsumerRegistrar) + * single} and {@link StreamConsumerRegistrarProvider#perPartition(PerPartitionConsumerRegistrar) + * per-partition}. + * @return the newly created KafkaIngester, the caller must call start on the returned object to begin processing + * messages + */ + public static KafkaIngester consume( + @NotNull final Properties kafkaProperties, + @NotNull final String topic, + @NotNull final IntPredicate partitionFilter, + @NotNull final KafkaIngester.PartitionToInitialOffsetFunction partitionToInitialOffset, + @NotNull final Consume.KeyOrValueSpec keySpec, + @NotNull final Consume.KeyOrValueSpec valueSpec, + @NotNull final KafkaTools.StreamConsumerRegistrarProvider streamConsumerRegistrarProvider) { final boolean ignoreKey = keySpec.dataFormat() == DataFormat.IGNORE; final boolean ignoreValue = valueSpec.dataFormat() == DataFormat.IGNORE; if (ignoreKey && ignoreValue) { diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java b/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java index d69f68027bd..f51dcff01bd 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java @@ -200,6 +200,55 @@ public KafkaIngester( this(log, props, topic, ALL_PARTITIONS, partitionToStreamConsumer, partitionToInitialSeekOffset); } + /** + * Creates a Kafka ingester for the given topic. + * + * @param log A log for output + * @param props The properties used to create the {@link KafkaConsumer} + * @param topic The topic to replicate + * @param partitionFilter A predicate indicating which partitions we should replicate + * @param partitionToStreamConsumer A function implementing a mapping from partition to its consumer of records. The + * function will be invoked once per partition at construction; implementations should internally defer + * resource allocation until first call to {@link KafkaRecordConsumer#consume(List)} or + * {@link KafkaRecordConsumer#acceptFailure(Throwable)} if appropriate. + * @param partitionToInitialSeekOffset A function implementing a mapping from partition to its initial seek offset, + * or -1 if seek to beginning is intended. + */ + public KafkaIngester( + @NotNull final Logger log, + @NotNull final Properties props, + @NotNull final String topic, + @NotNull final IntPredicate partitionFilter, + @NotNull final Function partitionToStreamConsumer, + @NotNull final IntToLongFunction partitionToInitialSeekOffset) { + this(log, props, topic, partitionFilter, partitionToStreamConsumer, + new IntToLongFunctionAdapter(partitionToInitialSeekOffset)); + } + + /** + * Determines the initial offset to seek to for a given TopicPartition. + */ + @FunctionalInterface + public interface PartitionToInitialOffsetFunction { + long partitionToInitialOffset(KafkaConsumer consumer, TopicPartition topicPartition); + } + + /** + * Adapts an IntToLongFunction to a PartitionToInitialOffsetFunction by ignoring the topic and consumer parameters. + */ + public static class IntToLongFunctionAdapter implements PartitionToInitialOffsetFunction { + final IntToLongFunction function; + + public IntToLongFunctionAdapter(IntToLongFunction function) { + this.function = function; + } + + @Override + public long partitionToInitialOffset(final KafkaConsumer consumer, final TopicPartition topicPartition) { + return function.applyAsLong(topicPartition.partition()); + } + } + public static long SEEK_TO_BEGINNING = -1; public static long DONT_SEEK = -2; public static long SEEK_TO_END = -3; @@ -228,7 +277,7 @@ public KafkaIngester( @NotNull final String topic, @NotNull final IntPredicate partitionFilter, @NotNull final Function partitionToStreamConsumer, - @NotNull final IntToLongFunction partitionToInitialSeekOffset) { + @NotNull final PartitionToInitialOffsetFunction partitionToInitialSeekOffset) { this.log = log; this.topic = topic; partitionDescription = partitionFilter.toString(); @@ -244,7 +293,8 @@ public KafkaIngester( assign(); for (final TopicPartition topicPartition : assignedPartitions) { - final long seekOffset = partitionToInitialSeekOffset.applyAsLong(topicPartition.partition()); + final long seekOffset = + partitionToInitialSeekOffset.partitionToInitialOffset(kafkaConsumer, topicPartition); if (seekOffset == SEEK_TO_BEGINNING) { log.info().append(logPrefix).append(topicPartition.toString()).append(" seeking to beginning.") .append(seekOffset).endl(); @@ -431,13 +481,6 @@ public void shutdownPartition(final int partition) { kafkaConsumer.wakeup(); } - /** - * Retrieve the underlying KafkaConsumer used for this ingester. - */ - public KafkaConsumer getKafkaConsumer() { - return kafkaConsumer; - } - /** * Add a callback for the consumer loop. * From 7e4361a1cda6d68326a22ae0f6673c00b47839ce Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Tue, 1 Aug 2023 12:03:48 -0400 Subject: [PATCH 05/11] Apply suggestions from code review Co-authored-by: Ryan Caudy --- .../src/main/java/io/deephaven/kafka/KafkaTools.java | 4 ++-- .../java/io/deephaven/kafka/ingest/KafkaIngester.java | 11 ++++++----- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java index 53dea39d197..004ba1f3801 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java @@ -1603,7 +1603,7 @@ public Function visit(@NotNull final PerPar * {@link TableDefinition}. See {@link StreamConsumerRegistrarProvider#single(SingleConsumerRegistrar) * single} and {@link StreamConsumerRegistrarProvider#perPartition(PerPartitionConsumerRegistrar) * per-partition}. - * @return the newly created KafkaIngester, the caller must call start on the returned object to begin processing + * @return the newly created KafkaIngester; the caller must call {@link KafkaIngester#start() start} on the returned object to begin processing * messages */ public static KafkaIngester consume( @@ -1620,7 +1620,7 @@ public static KafkaIngester consume( } /** - * Consume from Kafka to a {@link StreamConsumer} supplied by {@code streamConsumerRegistrar}. + * Consume from Kafka to {@link StreamConsumer stream consumers} supplied by {@code streamConsumerRegistrar}. * * @param kafkaProperties Properties to configure this table and also to be passed to create the KafkaConsumer * @param topic Kafka topic name diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java b/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java index f51dcff01bd..b0a018ded60 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java @@ -66,7 +66,7 @@ public int getIntKey(@NotNull final TopicPartition topicPartition) { private volatile boolean done; /** - * A callback which is invoked from the consumer loop, enabling clients to inject logic on the Kafka consumer. + * A callback which is invoked from the consumer loop, enabling clients to inject logic to be invoked by the Kafka consumer thread. */ public interface ConsumerLoopCallback { /** @@ -77,10 +77,10 @@ public interface ConsumerLoopCallback { void beforePoll(KafkaConsumer consumer); /** - * Called after the consumer is polled for records and they have been published to the StreamConsumer. + * Called after the consumer is polled for records and they have been published to the downstream KafkaRecordConsumer. * * @param consumer the KafkaConsumer that has been polled for records - * @param more true if more records should be read, false if the consumer will be shut down due to error + * @param more true if more records should be read, false if the consumer should be shut down due to error */ void afterPoll(KafkaConsumer consumer, boolean more); } @@ -226,7 +226,7 @@ public KafkaIngester( } /** - * Determines the initial offset to seek to for a given TopicPartition. + * Determines the initial offset to seek to for a given KafkaConsumer and TopicPartition. */ @FunctionalInterface public interface PartitionToInitialOffsetFunction { @@ -237,7 +237,8 @@ public interface PartitionToInitialOffsetFunction { * Adapts an IntToLongFunction to a PartitionToInitialOffsetFunction by ignoring the topic and consumer parameters. */ public static class IntToLongFunctionAdapter implements PartitionToInitialOffsetFunction { - final IntToLongFunction function; + + private final IntToLongFunction function; public IntToLongFunctionAdapter(IntToLongFunction function) { this.function = function; From ca13d28a4f4cae05dc37c9de88e8676afc66f605 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Tue, 1 Aug 2023 12:18:27 -0400 Subject: [PATCH 06/11] be void --- .../java/io/deephaven/kafka/KafkaTools.java | 30 ++++++------ .../deephaven/kafka/ingest/KafkaIngester.java | 46 ++++++------------- 2 files changed, 30 insertions(+), 46 deletions(-) diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java index 004ba1f3801..91340d0d718 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java @@ -1389,7 +1389,7 @@ public static Table consumeToTable( }; consume(kafkaProperties, topic, partitionFilter, partitionToInitialOffset, keySpec, valueSpec, - StreamConsumerRegistrarProvider.single(registrar)).start(); + StreamConsumerRegistrarProvider.single(registrar), null); return resultHolder.getValue(); } @@ -1450,7 +1450,7 @@ public static PartitionedTable consumeToPartitionedTable( }; consume(kafkaProperties, topic, partitionFilter, partitionToInitialOffset, keySpec, valueSpec, - StreamConsumerRegistrarProvider.perPartition(registrar)).start(); + StreamConsumerRegistrarProvider.perPartition(registrar), null); return resultHolder.get(); } @@ -1587,7 +1587,7 @@ public Function visit(@NotNull final PerPar } /** - * Consume from Kafka to a {@link StreamConsumer} supplied by {@code streamConsumerRegistrar}. + * Consume from Kafka to {@link StreamConsumer stream consumers} supplied by {@code streamConsumerRegistrar}. * * @param kafkaProperties Properties to configure this table and also to be passed to create the KafkaConsumer * @param topic Kafka topic name @@ -1606,17 +1606,18 @@ public Function visit(@NotNull final PerPar * @return the newly created KafkaIngester; the caller must call {@link KafkaIngester#start() start} on the returned object to begin processing * messages */ - public static KafkaIngester consume( + public static void consume( @NotNull final Properties kafkaProperties, @NotNull final String topic, @NotNull final IntPredicate partitionFilter, @NotNull final IntToLongFunction partitionToInitialOffset, @NotNull final Consume.KeyOrValueSpec keySpec, @NotNull final Consume.KeyOrValueSpec valueSpec, - @NotNull final KafkaTools.StreamConsumerRegistrarProvider streamConsumerRegistrarProvider) { - return consume(kafkaProperties, topic, partitionFilter, - new KafkaIngester.IntToLongFunctionAdapter(partitionToInitialOffset), keySpec, valueSpec, - streamConsumerRegistrarProvider); + @NotNull final KafkaTools.StreamConsumerRegistrarProvider streamConsumerRegistrarProvider, + @Nullable final KafkaIngester.ConsumerLoopCallback consumerLoopCallback) { + consume(kafkaProperties, topic, partitionFilter, + new KafkaIngester.IntToLongLookupAdapter(partitionToInitialOffset), keySpec, valueSpec, + streamConsumerRegistrarProvider, consumerLoopCallback); } /** @@ -1639,14 +1640,15 @@ public static KafkaIngester consume( * @return the newly created KafkaIngester, the caller must call start on the returned object to begin processing * messages */ - public static KafkaIngester consume( + public static void consume( @NotNull final Properties kafkaProperties, @NotNull final String topic, @NotNull final IntPredicate partitionFilter, - @NotNull final KafkaIngester.PartitionToInitialOffsetFunction partitionToInitialOffset, + @NotNull final KafkaIngester.InitialOffsetLookup partitionToInitialOffset, @NotNull final Consume.KeyOrValueSpec keySpec, @NotNull final Consume.KeyOrValueSpec valueSpec, - @NotNull final KafkaTools.StreamConsumerRegistrarProvider streamConsumerRegistrarProvider) { + @NotNull final KafkaTools.StreamConsumerRegistrarProvider streamConsumerRegistrarProvider, + @Nullable final KafkaIngester.ConsumerLoopCallback consumerLoopCallback) { final boolean ignoreKey = keySpec.dataFormat() == DataFormat.IGNORE; final boolean ignoreValue = valueSpec.dataFormat() == DataFormat.IGNORE; if (ignoreKey && ignoreValue) { @@ -1722,10 +1724,10 @@ public static KafkaIngester consume( topic, partitionFilter, kafkaRecordConsumerFactory, - partitionToInitialOffset); + partitionToInitialOffset, + consumerLoopCallback); kafkaIngesterHolder.setValue(ingester); - - return ingester; + ingester.start(); } private static KeyOrValueSerializer getAvroSerializer( diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java b/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java index b0a018ded60..0f34309c50d 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java @@ -15,14 +15,13 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import java.text.DecimalFormat; import java.time.Duration; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Properties; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Function; import java.util.function.IntPredicate; import java.util.function.IntToLongFunction; @@ -52,7 +51,7 @@ public int getIntKey(@NotNull final TopicPartition topicPartition) { } }); - private final Collection consumerLoopCallbacks = new CopyOnWriteArrayList<>(); + private final ConsumerLoopCallback consumerLoopCallback; private long messagesProcessed = 0; private long bytesProcessed = 0; @@ -221,31 +220,30 @@ public KafkaIngester( @NotNull final IntPredicate partitionFilter, @NotNull final Function partitionToStreamConsumer, @NotNull final IntToLongFunction partitionToInitialSeekOffset) { - this(log, props, topic, partitionFilter, partitionToStreamConsumer, - new IntToLongFunctionAdapter(partitionToInitialSeekOffset)); + this(log, props, topic, partitionFilter, partitionToStreamConsumer, new IntToLongLookupAdapter(partitionToInitialSeekOffset), null); } /** * Determines the initial offset to seek to for a given KafkaConsumer and TopicPartition. */ @FunctionalInterface - public interface PartitionToInitialOffsetFunction { - long partitionToInitialOffset(KafkaConsumer consumer, TopicPartition topicPartition); + public interface InitialOffsetLookup { + long getInitialOffset(KafkaConsumer consumer, TopicPartition topicPartition); } /** * Adapts an IntToLongFunction to a PartitionToInitialOffsetFunction by ignoring the topic and consumer parameters. */ - public static class IntToLongFunctionAdapter implements PartitionToInitialOffsetFunction { + public static class IntToLongLookupAdapter implements InitialOffsetLookup { private final IntToLongFunction function; - public IntToLongFunctionAdapter(IntToLongFunction function) { + public IntToLongLookupAdapter(IntToLongFunction function) { this.function = function; } @Override - public long partitionToInitialOffset(final KafkaConsumer consumer, final TopicPartition topicPartition) { + public long getInitialOffset(final KafkaConsumer consumer, final TopicPartition topicPartition) { return function.applyAsLong(topicPartition.partition()); } } @@ -278,12 +276,14 @@ public KafkaIngester( @NotNull final String topic, @NotNull final IntPredicate partitionFilter, @NotNull final Function partitionToStreamConsumer, - @NotNull final PartitionToInitialOffsetFunction partitionToInitialSeekOffset) { + @NotNull final KafkaIngester.InitialOffsetLookup partitionToInitialSeekOffset, + @Nullable final ConsumerLoopCallback consumerLoopCallback) { this.log = log; this.topic = topic; partitionDescription = partitionFilter.toString(); logPrefix = KafkaIngester.class.getSimpleName() + "(" + topic + ", " + partitionDescription + "): "; kafkaConsumer = new KafkaConsumer(props); + this.consumerLoopCallback = consumerLoopCallback; kafkaConsumer.partitionsFor(topic).stream().filter(pi -> partitionFilter.test(pi.partition())) .map(pi -> new TopicPartition(topic, pi.partition())) @@ -295,7 +295,7 @@ public KafkaIngester( for (final TopicPartition topicPartition : assignedPartitions) { final long seekOffset = - partitionToInitialSeekOffset.partitionToInitialOffset(kafkaConsumer, topicPartition); + partitionToInitialSeekOffset.getInitialOffset(kafkaConsumer, topicPartition); if (seekOffset == SEEK_TO_BEGINNING) { log.info().append(logPrefix).append(topicPartition.toString()).append(" seeking to beginning.") .append(seekOffset).endl(); @@ -356,9 +356,9 @@ private void consumerLoop() { } final long beforePoll = System.nanoTime(); final long remainingNanos = beforePoll > nextReport ? 0 : (nextReport - beforePoll); - consumerLoopCallbacks.forEach(x -> x.beforePoll(kafkaConsumer)); + consumerLoopCallback.beforePoll(kafkaConsumer); final boolean more = pollOnce(Duration.ofNanos(remainingNanos)); - consumerLoopCallbacks.forEach(x -> x.afterPoll(kafkaConsumer, more)); + consumerLoopCallback.afterPoll(kafkaConsumer, more); if (!more) { log.error().append(logPrefix) .append("Stopping due to errors (").append(messagesWithErr) @@ -481,22 +481,4 @@ public void shutdownPartition(final int partition) { } kafkaConsumer.wakeup(); } - - /** - * Add a callback for the consumer loop. - * - * @param callback a callback that allows subscribers to inject logic into the consumer loop. - */ - public void addConsumerLoopCallback(@NotNull final ConsumerLoopCallback callback) { - consumerLoopCallbacks.add(callback); - } - - /** - * Remove a callback from the consumer loop. - * - * @param callback a previously added callback. - */ - public void removeConsumerLoopCallback(@NotNull final ConsumerLoopCallback callback) { - consumerLoopCallbacks.remove(callback); - } } From fd814cdfc65bbcf6aca498ce63e960373ac471c6 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Tue, 1 Aug 2023 12:24:39 -0400 Subject: [PATCH 07/11] ex handling --- .../java/io/deephaven/kafka/KafkaTools.java | 4 +-- .../deephaven/kafka/ingest/KafkaIngester.java | 36 +++++++++++++++---- 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java index 91340d0d718..acdec275822 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java @@ -1603,8 +1603,8 @@ public Function visit(@NotNull final PerPar * {@link TableDefinition}. See {@link StreamConsumerRegistrarProvider#single(SingleConsumerRegistrar) * single} and {@link StreamConsumerRegistrarProvider#perPartition(PerPartitionConsumerRegistrar) * per-partition}. - * @return the newly created KafkaIngester; the caller must call {@link KafkaIngester#start() start} on the returned object to begin processing - * messages + * @return the newly created KafkaIngester; the caller must call {@link KafkaIngester#start() start} on the returned + * object to begin processing messages */ public static void consume( @NotNull final Properties kafkaProperties, diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java b/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java index 0f34309c50d..9cd5f6e3d4b 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java @@ -51,6 +51,7 @@ public int getIntKey(@NotNull final TopicPartition topicPartition) { } }); + @Nullable private final ConsumerLoopCallback consumerLoopCallback; private long messagesProcessed = 0; @@ -65,7 +66,8 @@ public int getIntKey(@NotNull final TopicPartition topicPartition) { private volatile boolean done; /** - * A callback which is invoked from the consumer loop, enabling clients to inject logic to be invoked by the Kafka consumer thread. + * A callback which is invoked from the consumer loop, enabling clients to inject logic to be invoked by the Kafka + * consumer thread. */ public interface ConsumerLoopCallback { /** @@ -76,7 +78,8 @@ public interface ConsumerLoopCallback { void beforePoll(KafkaConsumer consumer); /** - * Called after the consumer is polled for records and they have been published to the downstream KafkaRecordConsumer. + * Called after the consumer is polled for records and they have been published to the downstream + * KafkaRecordConsumer. * * @param consumer the KafkaConsumer that has been polled for records * @param more true if more records should be read, false if the consumer should be shut down due to error @@ -220,7 +223,8 @@ public KafkaIngester( @NotNull final IntPredicate partitionFilter, @NotNull final Function partitionToStreamConsumer, @NotNull final IntToLongFunction partitionToInitialSeekOffset) { - this(log, props, topic, partitionFilter, partitionToStreamConsumer, new IntToLongLookupAdapter(partitionToInitialSeekOffset), null); + this(log, props, topic, partitionFilter, partitionToStreamConsumer, + new IntToLongLookupAdapter(partitionToInitialSeekOffset), null); } /** @@ -356,9 +360,29 @@ private void consumerLoop() { } final long beforePoll = System.nanoTime(); final long remainingNanos = beforePoll > nextReport ? 0 : (nextReport - beforePoll); - consumerLoopCallback.beforePoll(kafkaConsumer); - final boolean more = pollOnce(Duration.ofNanos(remainingNanos)); - consumerLoopCallback.afterPoll(kafkaConsumer, more); + + boolean more = true; + if (consumerLoopCallback != null) { + try { + consumerLoopCallback.beforePoll(kafkaConsumer); + } catch (Exception e) { + log.error().append(logPrefix).append("Exception while executing beforePoll callback:").append(e) + .append(", aborting.").endl(); + more = false; + } + } + if (more) { + more = pollOnce(Duration.ofNanos(remainingNanos)); + if (consumerLoopCallback != null) { + try { + consumerLoopCallback.afterPoll(kafkaConsumer, more); + } catch (Exception e) { + log.error().append(logPrefix).append("Exception while executing afterPoll callback:").append(e) + .append(", aborting.").endl(); + more = false; + } + } + } if (!more) { log.error().append(logPrefix) .append("Stopping due to errors (").append(messagesWithErr) From 7614abece77c86d4954b9538eaec51f8f5d1e0bd Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Tue, 1 Aug 2023 14:26:53 -0400 Subject: [PATCH 08/11] remove return --- .../kafka/src/main/java/io/deephaven/kafka/KafkaTools.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java index acdec275822..f1df81b0af3 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java @@ -1603,8 +1603,6 @@ public Function visit(@NotNull final PerPar * {@link TableDefinition}. See {@link StreamConsumerRegistrarProvider#single(SingleConsumerRegistrar) * single} and {@link StreamConsumerRegistrarProvider#perPartition(PerPartitionConsumerRegistrar) * per-partition}. - * @return the newly created KafkaIngester; the caller must call {@link KafkaIngester#start() start} on the returned - * object to begin processing messages */ public static void consume( @NotNull final Properties kafkaProperties, @@ -1637,8 +1635,6 @@ public static void consume( * {@link TableDefinition}. See {@link StreamConsumerRegistrarProvider#single(SingleConsumerRegistrar) * single} and {@link StreamConsumerRegistrarProvider#perPartition(PerPartitionConsumerRegistrar) * per-partition}. - * @return the newly created KafkaIngester, the caller must call start on the returned object to begin processing - * messages */ public static void consume( @NotNull final Properties kafkaProperties, From c0d4b184347cb7dba810273a1ef31b4509475d01 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Tue, 1 Aug 2023 14:33:15 -0400 Subject: [PATCH 09/11] failure notification --- .../deephaven/kafka/ingest/KafkaIngester.java | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java b/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java index 1afb4414f63..8fccac314ad 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/ingest/KafkaIngester.java @@ -368,6 +368,7 @@ private void consumerLoop() { } catch (Exception e) { log.error().append(logPrefix).append("Exception while executing beforePoll callback:").append(e) .append(", aborting.").endl(); + notifyAllConsumersOnFailure(e); more = false; } } @@ -379,6 +380,7 @@ private void consumerLoop() { } catch (Exception e) { log.error().append(logPrefix).append("Exception while executing afterPoll callback:").append(e) .append(", aborting.").endl(); + notifyAllConsumersOnFailure(e); more = false; } } @@ -432,13 +434,7 @@ private boolean pollOnce(final Duration timeout) { } catch (Exception ex) { log.error().append(logPrefix).append("Exception while polling for Kafka messages:").append(ex) .append(", aborting.").endl(); - final KafkaRecordConsumer[] allConsumers; - synchronized (streamConsumers) { - allConsumers = streamConsumers.valueCollection().toArray(KafkaRecordConsumer[]::new); - } - for (final KafkaRecordConsumer streamConsumer : allConsumers) { - streamConsumer.acceptFailure(ex); - } + notifyAllConsumersOnFailure(ex); return false; } @@ -482,6 +478,16 @@ private boolean pollOnce(final Duration timeout) { return true; } + private void notifyAllConsumersOnFailure(Exception ex) { + final KafkaRecordConsumer[] allConsumers; + synchronized (streamConsumers) { + allConsumers = streamConsumers.valueCollection().toArray(KafkaRecordConsumer[]::new); + } + for (final KafkaRecordConsumer streamConsumer : allConsumers) { + streamConsumer.acceptFailure(ex); + } + } + public void shutdown() { if (done) { return; From 5675048ccf99081913dc01cf6fc6d773c6756ad4 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Tue, 1 Aug 2023 14:54:05 -0400 Subject: [PATCH 10/11] remove delegation --- .../java/io/deephaven/kafka/KafkaTools.java | 33 +------------------ 1 file changed, 1 insertion(+), 32 deletions(-) diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java index 090dd58bac6..ef230b0965d 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java @@ -1265,38 +1265,7 @@ public Function visit(@NotNull final PerPar * {@link TableDefinition}. See {@link StreamConsumerRegistrarProvider#single(SingleConsumerRegistrar) * single} and {@link StreamConsumerRegistrarProvider#perPartition(PerPartitionConsumerRegistrar) * per-partition}. - */ - public static void consume( - @NotNull final Properties kafkaProperties, - @NotNull final String topic, - @NotNull final IntPredicate partitionFilter, - @NotNull final IntToLongFunction partitionToInitialOffset, - @NotNull final Consume.KeyOrValueSpec keySpec, - @NotNull final Consume.KeyOrValueSpec valueSpec, - @NotNull final KafkaTools.StreamConsumerRegistrarProvider streamConsumerRegistrarProvider, - @Nullable final KafkaIngester.ConsumerLoopCallback consumerLoopCallback) { - consume(kafkaProperties, topic, partitionFilter, - new KafkaIngester.IntToLongLookupAdapter(partitionToInitialOffset), keySpec, valueSpec, - streamConsumerRegistrarProvider, consumerLoopCallback); - } - - /** - * Consume from Kafka to {@link StreamConsumer stream consumers} supplied by {@code streamConsumerRegistrar}. - * - * @param kafkaProperties Properties to configure this table and also to be passed to create the KafkaConsumer - * @param topic Kafka topic name - * @param partitionFilter A predicate returning true for the partitions to consume. The convenience constant - * {@code ALL_PARTITIONS} is defined to facilitate requesting all partitions. - * @param partitionToInitialOffset A function specifying the desired initial offset for each partition consumed - * @param keySpec Conversion specification for Kafka record keys - * @param valueSpec Conversion specification for Kafka record values - * @param streamConsumerRegistrarProvider A provider for a function to - * {@link StreamPublisher#register(StreamConsumer) register} {@link StreamConsumer} instances. The registered - * stream consumers must accept {@link ChunkType chunk types} that correspond to - * {@link StreamChunkUtils#chunkTypeForColumnIndex(TableDefinition, int)} for the supplied - * {@link TableDefinition}. See {@link StreamConsumerRegistrarProvider#single(SingleConsumerRegistrar) - * single} and {@link StreamConsumerRegistrarProvider#perPartition(PerPartitionConsumerRegistrar) - * per-partition}. + * @param consumerLoopCallback callback to inject logic into the ingester's consumer loop */ public static void consume( @NotNull final Properties kafkaProperties, From 6953734897cec54bf866fe77e0c5e761afd25fc1 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Tue, 1 Aug 2023 14:56:36 -0400 Subject: [PATCH 11/11] adapt --- .../kafka/src/main/java/io/deephaven/kafka/KafkaTools.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java index ef230b0965d..bbdae9e10fb 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java @@ -1050,7 +1050,8 @@ public static Table consumeToTable( } }; - consume(kafkaProperties, topic, partitionFilter, partitionToInitialOffset, keySpec, valueSpec, + consume(kafkaProperties, topic, partitionFilter, + new KafkaIngester.IntToLongLookupAdapter(partitionToInitialOffset), keySpec, valueSpec, StreamConsumerRegistrarProvider.single(registrar), null); return resultHolder.getValue(); } @@ -1111,7 +1112,8 @@ public static PartitionedTable consumeToPartitionedTable( } }; - consume(kafkaProperties, topic, partitionFilter, partitionToInitialOffset, keySpec, valueSpec, + consume(kafkaProperties, topic, partitionFilter, + new KafkaIngester.IntToLongLookupAdapter(partitionToInitialOffset), keySpec, valueSpec, StreamConsumerRegistrarProvider.perPartition(registrar), null); return resultHolder.get(); }