Skip to content

Commit

Permalink
[Improve][Connector-V2-kafka] Support for dynamic discover topic & pa…
Browse files Browse the repository at this point in the history
…rtition in streaming mode (#3125)

[Improve][Connector-V2-kafka] Support for dynamic discover topic & partition in streaming mode

Co-authored-by: zhouyao <yao.zhou@marketingforce.com>
Co-authored-by: Eric <gaojun2048@gmail.com>
  • Loading branch information
3 people authored Nov 22, 2022
1 parent 68dc142 commit 999cfd6
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 37 deletions.
36 changes: 21 additions & 15 deletions docs/en/connector-v2/source/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@ Source connector for Apache Kafka.

## Options

| name | type | required | default value |
|----------------------|---------| -------- |--------------------------|
| topic | String | yes | - |
| bootstrap.servers | String | yes | - |
| pattern | Boolean | no | false |
| consumer.group | String | no | SeaTunnel-Consumer-Group |
| commit_on_checkpoint | Boolean | no | true |
| kafka.* | String | no | - |
| common-options | config | no | - |
| schema | | no | - |
| format | String | no | json |
| start_mode | String | no | group_offsets |
| start_mode.offsets | | no | |
| start_mode.timestamp | Long | no | |
| name | type | required | default value |
|-------------------------------------|---------| -------- |--------------------------|
| topic | String | yes | - |
| bootstrap.servers | String | yes | - |
| pattern | Boolean | no | false |
| consumer.group | String | no | SeaTunnel-Consumer-Group |
| commit_on_checkpoint | Boolean | no | true |
| kafka.* | String | no | - |
| common-options | config | no | - |
| schema | | no | - |
| format | String | no | json |
| start_mode | String | no | group_offsets |
| start_mode.offsets | | no | |
| start_mode.timestamp | Long | no | |
| partition-discovery.interval-millis | long | no | -1 |

### topic [string]

Expand All @@ -52,6 +53,10 @@ If `pattern` is set to `true`,the regular expression for a pattern of topic name

If true the consumer's offset will be periodically committed in the background.

## partition-discovery.interval-millis [long]

The interval for dynamically discovering topics and partitions.

### kafka.* [string]

In addition to the above necessary parameters that must be specified by the `Kafka consumer` client, users can also specify multiple `consumer` client non-mandatory parameters, covering [all consumer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#consumerconfigs).
Expand Down Expand Up @@ -147,4 +152,5 @@ source {

### Next Version

- [Improve] Support setting read starting offset or time at startup config ([3157](https://github.com/apache/incubator-seatunnel/pull/3157))
- [Improve] Support setting read starting offset or time at startup config ([3157](https://github.com/apache/incubator-seatunnel/pull/3157))
- [Improve] Support for dynamic discover topic & partition in streaming mode ([3125](https://github.com/apache/incubator-seatunnel/pull/3125))
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,12 @@ public class Config {
.noDefaultValue()
.withDescription("The offset required for consumption mode to be specific_offsets.");

/**
* Configuration key to define the consumer's partition discovery interval, in milliseconds.
*/
public static final Option<Long> KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = Options.key("partition-discovery.interval-millis")
.longType()
.defaultValue(-1L)
.withDescription("The interval for dynamically discovering topics and partitions.");

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FORMAT;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.SCHEMA;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE;
Expand Down Expand Up @@ -72,6 +73,7 @@ public class KafkaSource implements SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
private DeserializationSchema<SeaTunnelRow> deserializationSchema;
private SeaTunnelRowType typeInfo;
private JobContext jobContext;
private long discoveryIntervalMillis = KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.defaultValue();

@Override
public Boundedness getBoundedness() {
Expand Down Expand Up @@ -140,6 +142,10 @@ public void prepare(Config config) throws PrepareFailException {
}
}

if (config.hasPath(KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.key())) {
this.discoveryIntervalMillis = config.getLong(KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.key());
}

TypesafeConfigUtils.extractSubConfig(config, "kafka.", false).entrySet().forEach(e -> {
this.metadata.getProperties().put(e.getKey(), String.valueOf(e.getValue().unwrapped()));
});
Expand All @@ -159,12 +165,12 @@ public SourceReader<SeaTunnelRow, KafkaSourceSplit> createReader(SourceReader.Co

@Override
public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> createEnumerator(SourceSplitEnumerator.Context<KafkaSourceSplit> enumeratorContext) throws Exception {
return new KafkaSourceSplitEnumerator(this.metadata, enumeratorContext);
return new KafkaSourceSplitEnumerator(this.metadata, enumeratorContext, discoveryIntervalMillis);
}

@Override
public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> restoreEnumerator(SourceSplitEnumerator.Context<KafkaSourceSplit> enumeratorContext, KafkaSourceState checkpointState) throws Exception {
return new KafkaSourceSplitEnumerator(this.metadata, enumeratorContext, checkpointState);
return new KafkaSourceSplitEnumerator(this.metadata, enumeratorContext, checkpointState, discoveryIntervalMillis);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ public String factoryIdentifier() {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
.required(Config.TOPIC, Config.BOOTSTRAP_SERVERS)
.optional(Config.PATTERN, Config.CONSUMER_GROUP, Config.COMMIT_ON_CHECKPOINT, Config.KAFKA_CONFIG_PREFIX, Config.SCHEMA, Config.FORMAT)
.conditional(Condition.of(Config.START_MODE, StartMode.TIMESTAMP), Config.START_MODE_TIMESTAMP)
.conditional(Condition.of(Config.START_MODE, StartMode.SPECIFIC_OFFSETS), Config.START_MODE_OFFSETS)
.build();
.required(Config.TOPIC, Config.BOOTSTRAP_SERVERS)
.optional(Config.PATTERN, Config.CONSUMER_GROUP, Config.COMMIT_ON_CHECKPOINT, Config.KAFKA_CONFIG_PREFIX, Config.SCHEMA,
Config.FORMAT, Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS)
.conditional(Condition.of(Config.START_MODE, StartMode.TIMESTAMP), Config.START_MODE_TIMESTAMP)
.conditional(Condition.of(Config.START_MODE, StartMode.SPECIFIC_OFFSETS), Config.START_MODE_OFFSETS)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;

@Slf4j
Expand All @@ -56,11 +56,14 @@ public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSource
private final ConsumerMetadata metadata;
private final Set<KafkaSourceSplit> sourceSplits;
private final Map<Long, Map<TopicPartition, Long>> checkpointOffsetMap;
private final ConcurrentMap<TopicPartition, KafkaSourceSplit> sourceSplitMap;
private final Map<TopicPartition, KafkaConsumerThread> consumerThreadMap;
private final ExecutorService executorService;
private final DeserializationSchema<SeaTunnelRow> deserializationSchema;

private final LinkedBlockingQueue<KafkaSourceSplit> pendingPartitionsQueue;

private volatile boolean running = false;

KafkaSourceReader(ConsumerMetadata metadata,
DeserializationSchema<SeaTunnelRow> deserializationSchema,
SourceReader.Context context) {
Expand All @@ -69,10 +72,10 @@ public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSource
this.sourceSplits = new HashSet<>();
this.deserializationSchema = deserializationSchema;
this.consumerThreadMap = new ConcurrentHashMap<>();
this.sourceSplitMap = new ConcurrentHashMap<>();
this.checkpointOffsetMap = new ConcurrentHashMap<>();
this.executorService = Executors.newCachedThreadPool(
r -> new Thread(r, "Kafka Source Data Consumer"));
pendingPartitionsQueue = new LinkedBlockingQueue<>();
}

@Override
Expand All @@ -88,10 +91,14 @@ public void close() throws IOException {

@Override
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
if (sourceSplitMap.isEmpty()) {
if (!running) {
Thread.sleep(THREAD_WAIT_TIME);
return;
}

while (pendingPartitionsQueue.size() != 0) {
sourceSplits.add(pendingPartitionsQueue.poll());
}
sourceSplits.forEach(sourceSplit -> consumerThreadMap.computeIfAbsent(sourceSplit.getTopicPartition(), s -> {
KafkaConsumerThread thread = new KafkaConsumerThread(metadata);
executorService.submit(thread);
Expand Down Expand Up @@ -157,9 +164,13 @@ public List<KafkaSourceSplit> snapshotState(long checkpointId) {

@Override
public void addSplits(List<KafkaSourceSplit> splits) {
sourceSplits.addAll(splits);
sourceSplits.forEach(split -> {
sourceSplitMap.put(split.getTopicPartition(), split);
running = true;
splits.forEach(s -> {
try {
pendingPartitionsQueue.put(s);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

Expand All @@ -48,10 +52,13 @@ public class KafkaSourceSplitEnumerator implements SourceSplitEnumerator<KafkaSo

private final ConsumerMetadata metadata;
private final Context<KafkaSourceSplit> context;
private long discoveryIntervalMillis;
private AdminClient adminClient;

private Map<TopicPartition, KafkaSourceSplit> pendingSplit;
private final Map<TopicPartition, KafkaSourceSplit> assignedSplit;
private ScheduledExecutorService executor;
private ScheduledFuture scheduledFuture;

KafkaSourceSplitEnumerator(ConsumerMetadata metadata, Context<KafkaSourceSplit> context) {
this.metadata = metadata;
Expand All @@ -65,20 +72,43 @@ public class KafkaSourceSplitEnumerator implements SourceSplitEnumerator<KafkaSo
this(metadata, context);
}

KafkaSourceSplitEnumerator(ConsumerMetadata metadata, Context<KafkaSourceSplit> context,
long discoveryIntervalMillis) {
this(metadata, context);
this.discoveryIntervalMillis = discoveryIntervalMillis;
}

KafkaSourceSplitEnumerator(ConsumerMetadata metadata, Context<KafkaSourceSplit> context,
KafkaSourceState sourceState, long discoveryIntervalMillis) {
this(metadata, context, sourceState);
this.discoveryIntervalMillis = discoveryIntervalMillis;
}

@Override
public void open() {
this.adminClient = initAdminClient(this.metadata.getProperties());
if (discoveryIntervalMillis > 0) {
this.executor = Executors.newScheduledThreadPool(1, runnable -> {
Thread thread = new Thread(runnable);
thread.setDaemon(true);
thread.setName("kafka-partition-dynamic-discovery");
return thread;
});
this.scheduledFuture = executor.scheduleWithFixedDelay(
() -> {
try {
discoverySplits();
} catch (Exception e) {
log.error("Dynamic discovery failure:", e);
}
}, discoveryIntervalMillis, discoveryIntervalMillis, TimeUnit.MILLISECONDS
);
}
}

@Override
public void run() throws ExecutionException, InterruptedException {
getTopicInfo().forEach(split -> {
if (!assignedSplit.containsKey(split.getTopicPartition())) {
if (!pendingSplit.containsKey(split.getTopicPartition())) {
pendingSplit.put(split.getTopicPartition(), split);
}
}
});
fetchPendingPartitionSplit();
setPartitionStartOffset();
assignSplit();
}
Expand Down Expand Up @@ -117,6 +147,12 @@ public void close() throws IOException {
if (this.adminClient != null) {
adminClient.close();
}
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
if (executor != null) {
executor.shutdownNow();
}
}
}

@Override
Expand Down Expand Up @@ -197,7 +233,7 @@ private Set<KafkaSourceSplit> getTopicInfo() throws ExecutionException, Interrup
}).collect(Collectors.toSet());
}

private void assignSplit() {
private synchronized void assignSplit() {
Map<Integer, List<KafkaSourceSplit>> readySplit = new HashMap<>(Common.COLLECTION_SIZE);
for (int taskID = 0; taskID < context.currentParallelism(); taskID++) {
readySplit.computeIfAbsent(taskID, id -> new ArrayList<>());
Expand Down Expand Up @@ -260,4 +296,18 @@ public Map<TopicPartition, Long> listConsumerGroupOffsets(Collection<TopicPartit
.get();
}

private void discoverySplits() throws ExecutionException, InterruptedException {
fetchPendingPartitionSplit();
assignSplit();
}

private void fetchPendingPartitionSplit() throws ExecutionException, InterruptedException {
getTopicInfo().forEach(split -> {
if (!assignedSplit.containsKey(split.getTopicPartition())) {
if (!pendingSplit.containsKey(split.getTopicPartition())) {
pendingSplit.put(split.getTopicPartition(), split);
}
}
});
}
}

0 comments on commit 999cfd6

Please sign in to comment.