Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[hotfix][kafka] Fix the problem that the partition information cannot be obtained when kafka is restored #4764

Merged
merged 13 commits into from
Jun 6, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
Expand All @@ -57,18 +59,19 @@ public class KafkaSourceSplitEnumerator
private final ConsumerMetadata metadata;
private final Context<KafkaSourceSplit> context;
private long discoveryIntervalMillis;
private AdminClient adminClient;
private final AdminClient adminClient;

private Map<TopicPartition, KafkaSourceSplit> pendingSplit;
private final Map<TopicPartition, KafkaSourceSplit> pendingSplit;
private final Map<TopicPartition, KafkaSourceSplit> assignedSplit;
private ScheduledExecutorService executor;
private ScheduledFuture scheduledFuture;
private ScheduledFuture<?> scheduledFuture;

KafkaSourceSplitEnumerator(ConsumerMetadata metadata, Context<KafkaSourceSplit> context) {
this.metadata = metadata;
this.context = context;
this.assignedSplit = new HashMap<>();
this.pendingSplit = new HashMap<>();
this.adminClient = initAdminClient(this.metadata.getProperties());
}

KafkaSourceSplitEnumerator(
Expand Down Expand Up @@ -97,7 +100,6 @@ public class KafkaSourceSplitEnumerator

@Override
public void open() {
this.adminClient = initAdminClient(this.metadata.getProperties());
if (discoveryIntervalMillis > 0) {
this.executor =
Executors.newScheduledThreadPool(
Expand Down Expand Up @@ -180,7 +182,6 @@ public void close() throws IOException {
public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
if (!splits.isEmpty()) {
pendingSplit.putAll(convertToNextSplit(splits));
assignSplit();
ic4y marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -191,6 +192,7 @@ public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
listOffsets(
splits.stream()
.map(KafkaSourceSplit::getTopicPartition)
.filter(Objects::nonNull)
.collect(Collectors.toList()),
OffsetSpec.latest());
splits.forEach(
Expand All @@ -199,7 +201,7 @@ public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
split.setEndOffset(listOffsets.get(split.getTopicPartition()));
});
return splits.stream()
.collect(Collectors.toMap(split -> split.getTopicPartition(), split -> split));
.collect(Collectors.toMap(KafkaSourceSplit::getTopicPartition, split -> split));
} catch (Exception e) {
throw new KafkaConnectorException(
KafkaConnectorErrorCode.ADD_SPLIT_BACK_TO_ENUMERATOR_FAILED, e);
Expand All @@ -225,7 +227,7 @@ public void registerReader(int subtaskId) {

@Override
public KafkaSourceState snapshotState(long checkpointId) throws Exception {
return new KafkaSourceState(assignedSplit.values().stream().collect(Collectors.toSet()));
return new KafkaSourceState(new HashSet<>(assignedSplit.values()));
}

@Override
Expand Down Expand Up @@ -291,18 +293,12 @@ private synchronized void assignSplit() {
readySplit.computeIfAbsent(taskID, id -> new ArrayList<>());
}

pendingSplit
.entrySet()
.forEach(
s -> {
if (!assignedSplit.containsKey(s.getKey())) {
readySplit
.get(
getSplitOwner(
s.getKey(), context.currentParallelism()))
.add(s.getValue());
}
});
pendingSplit.forEach(
(key, value) -> {
if (!assignedSplit.containsKey(key)) {
readySplit.get(getSplitOwner(key, context.currentParallelism())).add(value);
}
});

readySplit.forEach(
(id, split) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@
import org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask;
import org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation;

import lombok.extern.slf4j.Slf4j;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

@Slf4j
public class SeaTunnelSplitEnumeratorContext<SplitT extends SourceSplit>
implements SourceSplitEnumerator.Context<SplitT> {

Expand Down Expand Up @@ -60,6 +63,10 @@ public Set<Integer> registeredReaders() {

@Override
public void assignSplit(int subtaskIndex, List<SplitT> splits) {
if (registeredReaders().isEmpty()) {
log.warn("No reader is obtained, skip this assign!");
return;
}
task.getExecutionContext()
.sendToMember(
new AssignSplitOperation<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ private void register() {
enumeratorTaskAddress)
.get();
} catch (InterruptedException | ExecutionException e) {
log.warn("source register failed {}", e);
log.warn("source register failed.", e);
throw new RuntimeException(e);
}
}
Expand All @@ -177,7 +177,7 @@ public void requestSplit() {
enumeratorTaskAddress)
.get();
} catch (InterruptedException | ExecutionException e) {
log.warn("source request split failed [{}]", e);
log.warn("source request split failed.", e);
liugddx marked this conversation as resolved.
Show resolved Hide resolved
throw new RuntimeException(e);
}
}
Expand All @@ -192,7 +192,7 @@ public void sendSourceEventToEnumerator(SourceEvent sourceEvent) {
enumeratorTaskAddress)
.get();
} catch (InterruptedException | ExecutionException e) {
log.warn("source request split failed {}", e);
log.warn("source request split failed.", e);
throw new RuntimeException(e);
}
}
Expand Down Expand Up @@ -258,7 +258,7 @@ public void restoreState(List<ActionSubtaskState> actionStateList) throws Except
enumeratorTaskAddress)
.get();
} catch (InterruptedException | ExecutionException e) {
log.warn("source request split failed {}", e);
log.warn("source request split failed.", e);
throw new RuntimeException(e);
}
}
Expand Down