Skip to content

Commit

Permalink
[Improve][Connector-V2-kafka] Support for dynamic discover topic & pa…
Browse files Browse the repository at this point in the history
…rtition in streaming mode
  • Loading branch information
zhouyao committed Nov 9, 2022
1 parent ea1ec28 commit adc31df
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> createEnumerato

@Override
public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> restoreEnumerator(SourceSplitEnumerator.Context<KafkaSourceSplit> enumeratorContext, KafkaSourceState checkpointState) throws Exception {
return new KafkaSourceSplitEnumerator(this.metadata, enumeratorContext, checkpointState);
return new KafkaSourceSplitEnumerator(this.metadata, enumeratorContext, checkpointState, discoveryIntervalMillis);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.TopicPartition;
Expand All @@ -33,7 +33,6 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand All @@ -58,6 +57,8 @@ public class KafkaSourceSplitEnumerator implements SourceSplitEnumerator<KafkaSo

private Map<TopicPartition, KafkaSourceSplit> pendingSplit;
private final Map<TopicPartition, KafkaSourceSplit> assignedSplit;
private ScheduledExecutorService executor;
private ScheduledFuture scheduledFuture;

KafkaSourceSplitEnumerator(ConsumerMetadata metadata, Context<KafkaSourceSplit> context) {
this.metadata = metadata;
Expand All @@ -77,11 +78,22 @@ public class KafkaSourceSplitEnumerator implements SourceSplitEnumerator<KafkaSo
this.discoveryIntervalMillis = discoveryIntervalMillis;
}

KafkaSourceSplitEnumerator(ConsumerMetadata metadata, Context<KafkaSourceSplit> context,
KafkaSourceState sourceState, long discoveryIntervalMillis) {
this(metadata, context, sourceState);
this.discoveryIntervalMillis = discoveryIntervalMillis;
}

@Override
public void open() {
this.adminClient = initAdminClient(this.metadata.getProperties());
if (discoveryIntervalMillis > 0) {
this.executor = Executors.newScheduledThreadPool(1);
this.executor = Executors.newScheduledThreadPool(1, runnable -> {
Thread thread = new Thread(runnable);
thread.setDaemon(true);
thread.setName("kafka-partition-dynamic-discovery");
return thread;
});
this.scheduledFuture = executor.scheduleWithFixedDelay(
() -> {
try {
Expand All @@ -96,7 +108,38 @@ public void open() {

@Override
public void run() throws ExecutionException, InterruptedException {
discoverySplits();
fetchPendingPartitionSplit();
setPartitionStartOffset();
assignSplit();
}

private void setPartitionStartOffset() throws ExecutionException, InterruptedException {
Collection<TopicPartition> topicPartitions = pendingSplit.keySet();
Map<TopicPartition, Long> topicPartitionOffsets = null;
switch (metadata.getStartMode()) {
case EARLIEST:
topicPartitionOffsets = listOffsets(topicPartitions, OffsetSpec.earliest());
break;
case GROUP_OFFSETS:
topicPartitionOffsets = listConsumerGroupOffsets(topicPartitions);
break;
case LATEST:
topicPartitionOffsets = listOffsets(topicPartitions, OffsetSpec.latest());
break;
case TIMESTAMP:
topicPartitionOffsets = listOffsets(topicPartitions, OffsetSpec.forTimestamp(metadata.getStartOffsetsTimestamp()));
break;
case SPECIFIC_OFFSETS:
topicPartitionOffsets = metadata.getSpecificStartOffsets();
break;
default:
break;
}
topicPartitionOffsets.entrySet().forEach(entry -> {
if (pendingSplit.containsKey(entry.getKey())) {
pendingSplit.get(entry.getKey()).setStartOffset(entry.getValue());
}
});
}

@Override
Expand All @@ -115,20 +158,20 @@ public void close() throws IOException {
@Override
public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
if (!splits.isEmpty()) {
pendingSplit.addAll(convertToNextSplit(splits));
pendingSplit.putAll(convertToNextSplit(splits));
assignSplit();
}
}

private Collection<? extends KafkaSourceSplit> convertToNextSplit(List<KafkaSourceSplit> splits) {
private Map<TopicPartition, ? extends KafkaSourceSplit> convertToNextSplit(List<KafkaSourceSplit> splits) {
try {
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> listOffsets =
getKafkaPartitionLatestOffset(splits.stream().map(KafkaSourceSplit::getTopicPartition).collect(Collectors.toList()));
Map<TopicPartition, Long> listOffsets =
listOffsets(splits.stream().map(KafkaSourceSplit::getTopicPartition).collect(Collectors.toList()), OffsetSpec.latest());
splits.forEach(split -> {
split.setStartOffset(split.getEndOffset() + 1);
split.setEndOffset(listOffsets.get(split.getTopicPartition()).offset());
split.setEndOffset(listOffsets.get(split.getTopicPartition()));
});
return splits;
return splits.stream().collect(Collectors.toMap(split -> split.getTopicPartition(), split -> split));
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -153,7 +196,7 @@ public void registerReader(int subtaskId) {

@Override
public KafkaSourceState snapshotState(long checkpointId) throws Exception {
return new KafkaSourceState(assignedSplit);
return new KafkaSourceState(assignedSplit.values().stream().collect(Collectors.toSet()));
}

@Override
Expand Down Expand Up @@ -182,34 +225,30 @@ private Set<KafkaSourceSplit> getTopicInfo() throws ExecutionException, Interrup
Collection<TopicPartition> partitions =
adminClient.describeTopics(topics).all().get().values().stream().flatMap(t -> t.partitions().stream()
.map(p -> new TopicPartition(t.name(), p.partition()))).collect(Collectors.toSet());
return getKafkaPartitionLatestOffset(partitions).entrySet().stream().map(partition -> {
KafkaSourceSplit split = new KafkaSourceSplit(partition.getKey());
split.setEndOffset(partition.getValue().offset());
Map<TopicPartition, Long> latestOffsets = listOffsets(partitions, OffsetSpec.latest());
return partitions.stream().map(partition -> {
KafkaSourceSplit split = new KafkaSourceSplit(partition);
split.setEndOffset(latestOffsets.get(split.getTopicPartition()));
return split;
}).collect(Collectors.toSet());
}

private Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> getKafkaPartitionLatestOffset(Collection<TopicPartition> partitions) throws InterruptedException, ExecutionException {
return adminClient.listOffsets(partitions.stream().collect(Collectors.toMap(p -> p, p -> OffsetSpec.latest())))
.all().get();
}

private void assignSplit() {
private synchronized void assignSplit() {
Map<Integer, List<KafkaSourceSplit>> readySplit = new HashMap<>(Common.COLLECTION_SIZE);
for (int taskID = 0; taskID < context.currentParallelism(); taskID++) {
readySplit.computeIfAbsent(taskID, id -> new ArrayList<>());
}

pendingSplit.forEach(s -> {
if (!assignedSplit.contains(s)) {
readySplit.get(getSplitOwner(s.getTopicPartition(), context.currentParallelism()))
.add(s);
pendingSplit.entrySet().forEach(s -> {
if (!assignedSplit.containsKey(s.getKey())) {
readySplit.get(getSplitOwner(s.getKey(), context.currentParallelism()))
.add(s.getValue());
}
});

readySplit.forEach(context::assignSplit);

assignedSplit.addAll(pendingSplit);
assignedSplit.putAll(pendingSplit);
pendingSplit.clear();
}

Expand All @@ -219,13 +258,56 @@ private static int getSplitOwner(TopicPartition tp, int numReaders) {
return (startIndex + tp.partition()) % numReaders;
}

private synchronized void discoverySplits() throws ExecutionException, InterruptedException {
private Map<TopicPartition, Long> listOffsets(Collection<TopicPartition> partitions, OffsetSpec offsetSpec) throws ExecutionException, InterruptedException {
Map<TopicPartition, OffsetSpec> topicPartitionOffsets = partitions.stream().collect(Collectors.toMap(partition -> partition, __ -> offsetSpec));

return adminClient.listOffsets(topicPartitionOffsets).all()
.thenApply(
result -> {
Map<TopicPartition, Long>
offsets = new HashMap<>();
result.forEach(
(tp, offsetsResultInfo) -> {
if (offsetsResultInfo != null) {
offsets.put(tp, offsetsResultInfo.offset());
}
});
return offsets;
})
.get();
}

public Map<TopicPartition, Long> listConsumerGroupOffsets(Collection<TopicPartition> partitions) throws ExecutionException, InterruptedException {
ListConsumerGroupOffsetsOptions options = new ListConsumerGroupOffsetsOptions().topicPartitions(new ArrayList<>(partitions));
return adminClient
.listConsumerGroupOffsets(metadata.getConsumerGroup(), options)
.partitionsToOffsetAndMetadata()
.thenApply(
result -> {
Map<TopicPartition, Long> offsets = new HashMap<>();
result.forEach(
(tp, oam) -> {
if (oam != null) {
offsets.put(tp, oam.offset());
}
});
return offsets;
})
.get();
}

private void discoverySplits() throws ExecutionException, InterruptedException {
fetchPendingPartitionSplit();
assignSplit();
}

private void fetchPendingPartitionSplit() throws ExecutionException, InterruptedException {
getTopicInfo().forEach(split -> {
if (!assignedSplit.contains(split)) {
pendingSplit.add(split);
if (!assignedSplit.containsKey(split.getTopicPartition())) {
if (!pendingSplit.containsKey(split.getTopicPartition())) {
pendingSplit.put(split.getTopicPartition(), split);
}
}
});
assignSplit();
}

}

0 comments on commit adc31df

Please sign in to comment.