Skip to content

Commit

Permalink
Remove references to the translation layer
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouyao committed Oct 19, 2022
1 parent 9a7d593 commit 6062de0
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 54 deletions.
9 changes: 2 additions & 7 deletions seatunnel-connectors-v2/connector-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
<modelVersion>4.0.0</modelVersion>

<artifactId>connector-kafka</artifactId>

<properties>
<kafka.client.version>3.2.0</kafka.client.version>
</properties>
Expand All @@ -46,12 +46,7 @@
<artifactId>seatunnel-format-json</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-translation-base</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
import org.apache.seatunnel.translation.source.ParallelEnumeratorContext;
import org.apache.seatunnel.translation.source.ParallelSource;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
Expand All @@ -41,6 +39,10 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

Expand All @@ -50,16 +52,18 @@ public class KafkaSourceSplitEnumerator implements SourceSplitEnumerator<KafkaSo
private static final String CLIENT_ID_PREFIX = "seatunnel";

private final ConsumerMetadata metadata;
private final ParallelEnumeratorContext<KafkaSourceSplit> context;
private final Context<KafkaSourceSplit> context;
private long discoveryIntervalMillis;
private AdminClient adminClient;

private Set<KafkaSourceSplit> pendingSplit;
private final Set<KafkaSourceSplit> assignedSplit;
private ScheduledExecutorService executor;
private ScheduledFuture scheduledFuture;

KafkaSourceSplitEnumerator(ConsumerMetadata metadata, Context<KafkaSourceSplit> context) {
this.metadata = metadata;
this.context = (ParallelEnumeratorContext) context;
this.context = context;
this.assignedSplit = new HashSet<>();
this.pendingSplit = new HashSet<>();
}
Expand All @@ -78,46 +82,36 @@ public class KafkaSourceSplitEnumerator implements SourceSplitEnumerator<KafkaSo
@Override
public void open() {
this.adminClient = initAdminClient(this.metadata.getProperties());
ParallelSource parallelSource = context.getParallelSource();
if (discoveryIntervalMillis > 0) {
Thread discoveryThread = new Thread(() -> {
while (parallelSource.isRunning()) {
this.executor = Executors.newScheduledThreadPool(1);
this.scheduledFuture = executor.scheduleWithFixedDelay(
() -> {
try {
run();
discoverySplits();
} catch (Exception e) {
try {
parallelSource.close();
} catch (IOException ex) {
throw new RuntimeException(ex);
}
throw new RuntimeException("Kafka Partition Discovery Task Failed ID" + context.getSubtaskId(), e);
log.error("Dynamic discovery failure:", e);
}
try {
Thread.sleep(discoveryIntervalMillis);
} catch (InterruptedException e) {
break;
}
}
}, "Kafka Partition Discovery Task ID " + context.getSubtaskId());
discoveryThread.start();
}, discoveryIntervalMillis, discoveryIntervalMillis, TimeUnit.MILLISECONDS
);
}
}

@Override
public synchronized void run() throws ExecutionException, InterruptedException {
getTopicInfo().forEach(split -> {
if (!assignedSplit.contains(split)) {
pendingSplit.add(split);
}
});
assignSplit();
public void run() throws ExecutionException, InterruptedException {
discoverySplits();
}

@Override
public void close() throws IOException {
if (this.adminClient != null) {
adminClient.close();
}
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
if (executor != null) {
executor.shutdownNow();
}
}
}

@Override
Expand Down Expand Up @@ -204,20 +198,18 @@ private Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> getKafkaPar

private void assignSplit() {
Map<Integer, List<KafkaSourceSplit>> readySplit = new HashMap<>(Common.COLLECTION_SIZE);
readySplit.computeIfAbsent(context.getSubtaskId(), id -> new ArrayList<>());
for (int taskID = 0; taskID < context.currentParallelism(); taskID++) {
readySplit.computeIfAbsent(taskID, id -> new ArrayList<>());
}

pendingSplit.forEach(s -> {
if (!assignedSplit.contains(s)) {
int ownerId = getSplitOwner(s.getTopicPartition(), context.currentParallelism());
if (context.getSubtaskId() == ownerId) {
readySplit.get(ownerId).add(s);
}
readySplit.get(getSplitOwner(s.getTopicPartition(), context.currentParallelism()))
.add(s);
}
});

if (!readySplit.get(context.getSubtaskId()).isEmpty()) {
readySplit.forEach(context::assignSplit);
}
readySplit.forEach(context::assignSplit);

assignedSplit.addAll(pendingSplit);
pendingSplit.clear();
Expand All @@ -229,4 +221,13 @@ private static int getSplitOwner(TopicPartition tp, int numReaders) {
return (startIndex + tp.partition()) % numReaders;
}

private synchronized void discoverySplits() throws ExecutionException, InterruptedException {
getTopicInfo().forEach(split -> {
if (!assignedSplit.contains(split)) {
pendingSplit.add(split);
}
});
assignSplit();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,4 @@ public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
throw new RuntimeException("");
}

public ParallelSource<?, SplitT, ?> getParallelSource() {
return parallelSource;
}

public Integer getSubtaskId() {
return subtaskId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,4 @@ public void notifyCheckpointAborted(long checkpointId) throws Exception {
reader.notifyCheckpointAborted(checkpointId);
}

public boolean isRunning() {
return running;
}
}

0 comments on commit 6062de0

Please sign in to comment.