Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add callbacks to KafkaIngester consumer loop. #4242

Merged
merged 12 commits into from
Aug 1, 2023
21 changes: 13 additions & 8 deletions extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java
Original file line number Diff line number Diff line change
Expand Up @@ -1050,8 +1050,9 @@ public static Table consumeToTable(
}
};

consume(kafkaProperties, topic, partitionFilter, partitionToInitialOffset, keySpec, valueSpec,
StreamConsumerRegistrarProvider.single(registrar));
consume(kafkaProperties, topic, partitionFilter,
new KafkaIngester.IntToLongLookupAdapter(partitionToInitialOffset), keySpec, valueSpec,
StreamConsumerRegistrarProvider.single(registrar), null);
return resultHolder.getValue();
}

Expand Down Expand Up @@ -1111,8 +1112,9 @@ public static PartitionedTable consumeToPartitionedTable(
}
};

consume(kafkaProperties, topic, partitionFilter, partitionToInitialOffset, keySpec, valueSpec,
StreamConsumerRegistrarProvider.perPartition(registrar));
consume(kafkaProperties, topic, partitionFilter,
new KafkaIngester.IntToLongLookupAdapter(partitionToInitialOffset), keySpec, valueSpec,
StreamConsumerRegistrarProvider.perPartition(registrar), null);
return resultHolder.get();
}

Expand Down Expand Up @@ -1249,7 +1251,7 @@ public Function<TopicPartition, KafkaRecordConsumer> visit(@NotNull final PerPar
}

/**
* Consume from Kafka to a {@link StreamConsumer} supplied by {@code streamConsumerRegistrar}.
* Consume from Kafka to {@link StreamConsumer stream consumers} supplied by {@code streamConsumerRegistrar}.
*
* @param kafkaProperties Properties to configure this table and also to be passed to create the KafkaConsumer
* @param topic Kafka topic name
Expand All @@ -1265,15 +1267,17 @@ public Function<TopicPartition, KafkaRecordConsumer> visit(@NotNull final PerPar
* {@link TableDefinition}. See {@link StreamConsumerRegistrarProvider#single(SingleConsumerRegistrar)
* single} and {@link StreamConsumerRegistrarProvider#perPartition(PerPartitionConsumerRegistrar)
* per-partition}.
* @param consumerLoopCallback callback to inject logic into the ingester's consumer loop
*/
public static void consume(
@NotNull final Properties kafkaProperties,
@NotNull final String topic,
@NotNull final IntPredicate partitionFilter,
@NotNull final IntToLongFunction partitionToInitialOffset,
@NotNull final KafkaIngester.InitialOffsetLookup partitionToInitialOffset,
@NotNull final Consume.KeyOrValueSpec keySpec,
@NotNull final Consume.KeyOrValueSpec valueSpec,
@NotNull final KafkaTools.StreamConsumerRegistrarProvider streamConsumerRegistrarProvider) {
@NotNull final KafkaTools.StreamConsumerRegistrarProvider streamConsumerRegistrarProvider,
@Nullable final KafkaIngester.ConsumerLoopCallback consumerLoopCallback) {
final boolean ignoreKey = keySpec.dataFormat() == DataFormat.IGNORE;
final boolean ignoreValue = valueSpec.dataFormat() == DataFormat.IGNORE;
if (ignoreKey && ignoreValue) {
Expand Down Expand Up @@ -1349,7 +1353,8 @@ public static void consume(
topic,
partitionFilter,
kafkaRecordConsumerFactory,
partitionToInitialOffset);
partitionToInitialOffset,
consumerLoopCallback);
kafkaIngesterHolder.setValue(ingester);
ingester.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.text.DecimalFormat;
import java.time.Duration;
Expand Down Expand Up @@ -50,6 +51,9 @@ public int getIntKey(@NotNull final TopicPartition topicPartition) {
}
});

@Nullable
private final ConsumerLoopCallback consumerLoopCallback;

private long messagesProcessed = 0;
private long bytesProcessed = 0;
private long pollCalls = 0;
Expand All @@ -61,6 +65,28 @@ public int getIntKey(@NotNull final TopicPartition topicPartition) {
private volatile boolean needsAssignment;
private volatile boolean done;

/**
* A callback which is invoked from the consumer loop, enabling clients to inject logic to be invoked by the Kafka
* consumer thread.
*/
public interface ConsumerLoopCallback {
/**
* Called before the consumer is polled for records.
*
* @param consumer the KafkaConsumer that will be polled for records
*/
void beforePoll(KafkaConsumer<?, ?> consumer);

/**
* Called after the consumer is polled for records and they have been published to the downstream
* KafkaRecordConsumer.
*
* @param consumer the KafkaConsumer that has been polled for records
* @param more true if more records should be read, false if the consumer should be shut down due to error
*/
void afterPoll(KafkaConsumer<?, ?> consumer, boolean more);
}

/**
* Constant predicate that returns true for all partitions. This is the default, each and every partition that
* exists will be handled by the same ingester. Because Kafka consumers are inherently single threaded, to scale
Expand Down Expand Up @@ -176,6 +202,56 @@ public KafkaIngester(
this(log, props, topic, ALL_PARTITIONS, partitionToStreamConsumer, partitionToInitialSeekOffset);
}

/**
* Creates a Kafka ingester for the given topic.
*
* @param log A log for output
* @param props The properties used to create the {@link KafkaConsumer}
* @param topic The topic to replicate
* @param partitionFilter A predicate indicating which partitions we should replicate
* @param partitionToStreamConsumer A function implementing a mapping from partition to its consumer of records. The
* function will be invoked once per partition at construction; implementations should internally defer
* resource allocation until first call to {@link KafkaRecordConsumer#consume(List)} or
* {@link KafkaRecordConsumer#acceptFailure(Throwable)} if appropriate.
* @param partitionToInitialSeekOffset A function implementing a mapping from partition to its initial seek offset,
* or -1 if seek to beginning is intended.
*/
public KafkaIngester(
@NotNull final Logger log,
@NotNull final Properties props,
@NotNull final String topic,
@NotNull final IntPredicate partitionFilter,
@NotNull final Function<TopicPartition, KafkaRecordConsumer> partitionToStreamConsumer,
@NotNull final IntToLongFunction partitionToInitialSeekOffset) {
this(log, props, topic, partitionFilter, partitionToStreamConsumer,
new IntToLongLookupAdapter(partitionToInitialSeekOffset), null);
}

/**
* Determines the initial offset to seek to for a given KafkaConsumer and TopicPartition.
*/
@FunctionalInterface
public interface InitialOffsetLookup {
long getInitialOffset(KafkaConsumer<?, ?> consumer, TopicPartition topicPartition);
}

/**
* Adapts an IntToLongFunction to a PartitionToInitialOffsetFunction by ignoring the topic and consumer parameters.
*/

public static class IntToLongLookupAdapter implements InitialOffsetLookup {
private final IntToLongFunction function;

public IntToLongLookupAdapter(IntToLongFunction function) {
this.function = function;
}

@Override
public long getInitialOffset(final KafkaConsumer<?, ?> consumer, final TopicPartition topicPartition) {
return function.applyAsLong(topicPartition.partition());
}
}

public static long SEEK_TO_BEGINNING = -1;
public static long DONT_SEEK = -2;
public static long SEEK_TO_END = -3;
Expand Down Expand Up @@ -204,12 +280,14 @@ public KafkaIngester(
@NotNull final String topic,
@NotNull final IntPredicate partitionFilter,
@NotNull final Function<TopicPartition, KafkaRecordConsumer> partitionToStreamConsumer,
@NotNull final IntToLongFunction partitionToInitialSeekOffset) {
@NotNull final KafkaIngester.InitialOffsetLookup partitionToInitialSeekOffset,
@Nullable final ConsumerLoopCallback consumerLoopCallback) {
this.log = log;
this.topic = topic;
partitionDescription = partitionFilter.toString();
logPrefix = KafkaIngester.class.getSimpleName() + "(" + topic + ", " + partitionDescription + "): ";
kafkaConsumer = new KafkaConsumer(props);
this.consumerLoopCallback = consumerLoopCallback;

kafkaConsumer.partitionsFor(topic).stream().filter(pi -> partitionFilter.test(pi.partition()))
.map(pi -> new TopicPartition(topic, pi.partition()))
Expand All @@ -220,7 +298,8 @@ public KafkaIngester(
assign();

for (final TopicPartition topicPartition : assignedPartitions) {
final long seekOffset = partitionToInitialSeekOffset.applyAsLong(topicPartition.partition());
final long seekOffset =
partitionToInitialSeekOffset.getInitialOffset(kafkaConsumer, topicPartition);
if (seekOffset == SEEK_TO_BEGINNING) {
log.info().append(logPrefix).append(topicPartition.toString()).append(" seeking to beginning.")
.append(seekOffset).endl();
Expand Down Expand Up @@ -281,7 +360,31 @@ private void consumerLoop() {
}
final long beforePoll = System.nanoTime();
final long remainingNanos = beforePoll > nextReport ? 0 : (nextReport - beforePoll);
boolean more = pollOnce(Duration.ofNanos(remainingNanos));

boolean more = true;
if (consumerLoopCallback != null) {
try {
consumerLoopCallback.beforePoll(kafkaConsumer);
} catch (Exception e) {
log.error().append(logPrefix).append("Exception while executing beforePoll callback:").append(e)
.append(", aborting.").endl();
notifyAllConsumersOnFailure(e);
more = false;
}
}
if (more) {
more = pollOnce(Duration.ofNanos(remainingNanos));
if (consumerLoopCallback != null) {
try {
consumerLoopCallback.afterPoll(kafkaConsumer, more);
} catch (Exception e) {
log.error().append(logPrefix).append("Exception while executing afterPoll callback:").append(e)
.append(", aborting.").endl();
notifyAllConsumersOnFailure(e);
more = false;
}
}
}
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
if (!more) {
log.error().append(logPrefix)
.append("Stopping due to errors (").append(messagesWithErr)
Expand Down Expand Up @@ -331,13 +434,7 @@ private boolean pollOnce(final Duration timeout) {
} catch (Exception ex) {
log.error().append(logPrefix).append("Exception while polling for Kafka messages:").append(ex)
.append(", aborting.").endl();
final KafkaRecordConsumer[] allConsumers;
synchronized (streamConsumers) {
allConsumers = streamConsumers.valueCollection().toArray(KafkaRecordConsumer[]::new);
}
for (final KafkaRecordConsumer streamConsumer : allConsumers) {
streamConsumer.acceptFailure(ex);
}
notifyAllConsumersOnFailure(ex);
return false;
}

Expand Down Expand Up @@ -381,6 +478,16 @@ private boolean pollOnce(final Duration timeout) {
return true;
}

private void notifyAllConsumersOnFailure(Exception ex) {
final KafkaRecordConsumer[] allConsumers;
synchronized (streamConsumers) {
allConsumers = streamConsumers.valueCollection().toArray(KafkaRecordConsumer[]::new);
}
for (final KafkaRecordConsumer streamConsumer : allConsumers) {
streamConsumer.acceptFailure(ex);
}
}

public void shutdown() {
if (done) {
return;
Expand Down
Loading