diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java index 515610e9dd02..33d2caeb9332 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java @@ -55,11 +55,17 @@ public void initTransactions() { @Override public void beginTransaction() throws ProducerFencedException { + if (log.isDebugEnabled()) { + log.debug("KafkaInternalProducer.beginTransaction. " + this.transactionalId); + } super.beginTransaction(); } @Override public void commitTransaction() throws ProducerFencedException { + if (log.isDebugEnabled()) { + log.debug("KafkaInternalProducer.commitTransaction." + this.transactionalId); + } super.commitTransaction(); } @@ -69,7 +75,18 @@ public void abortTransaction() throws ProducerFencedException { } public void setTransactionalId(String transactionalId) { + if (log.isDebugEnabled()) { + log.debug( + "KafkaInternalProducer.abortTransaction. Target transactionalId=" + + transactionalId); + } if (!transactionalId.equals(this.transactionalId)) { + if (log.isDebugEnabled()) { + log.debug( + "KafkaInternalProducer.abortTransaction. Current transactionalId={} not match target transactionalId={}", + this.transactionalId, + transactionalId); + } Object transactionManager = getTransactionManager(); synchronized (transactionManager) { ReflectionUtils.setField(transactionManager, "transactionalId", transactionalId); @@ -97,7 +114,7 @@ public long getProducerId() { return (long) ReflectionUtils.getField(producerIdAndEpoch, "producerId").get(); } - public void resumeTransaction(long producerId, short epoch) { + public void resumeTransaction(long producerId, short epoch, boolean txnStarted) { log.info( "Attempting to resume transaction {} with producerId {} and epoch {}", @@ -125,10 +142,15 @@ public void resumeTransaction(long producerId, short epoch) { transitionTransactionManagerStateTo(transactionManager, "READY"); transitionTransactionManagerStateTo(transactionManager, "IN_TRANSACTION"); - ReflectionUtils.setField(transactionManager, "transactionStarted", true); + ReflectionUtils.setField(transactionManager, "transactionStarted", txnStarted); } } + public boolean isTxnStarted() { + Object transactionManager = getTransactionManager(); + return (boolean) ReflectionUtils.getField(transactionManager, "transactionStarted").get(); + } + private static Object createProducerIdAndEpoch(long producerId, short epoch) { try { Field field = diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java index ed4e28080912..4be9fba709bb 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java @@ -48,7 +48,7 @@ public List commit(List commitInfos) { for (KafkaCommitInfo commitInfo : commitInfos) { String transactionId = commitInfo.getTransactionId(); if (log.isDebugEnabled()) { - log.debug("Committing transaction {}", transactionId); + log.debug("Committing transaction {}, commitInfo {}", transactionId, commitInfo); } KafkaProducer producer = getProducer(commitInfo); producer.commitTransaction(); @@ -87,7 +87,8 @@ public void abort(List commitInfos) { new KafkaInternalProducer<>( commitInfo.getKafkaProperties(), commitInfo.getTransactionId()); } - kafkaProducer.resumeTransaction(commitInfo.getProducerId(), commitInfo.getEpoch()); + kafkaProducer.resumeTransaction( + commitInfo.getProducerId(), commitInfo.getEpoch(), commitInfo.isTxnStarted()); return kafkaProducer; } } diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java index bfb2685595fc..b9503c2ec803 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java @@ -26,6 +26,7 @@ import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; +import java.time.Duration; import java.util.List; import java.util.Optional; import java.util.Properties; @@ -45,6 +46,7 @@ public class KafkaTransactionSender implements KafkaProduceSender { private String transactionId; private final String transactionPrefix; private final Properties kafkaProperties; + private int recordNumInTransaction = 0; public KafkaTransactionSender(String transactionPrefix, Properties kafkaProperties) { this.transactionPrefix = transactionPrefix; @@ -61,6 +63,7 @@ public void beginTransaction(String transactionId) { this.transactionId = transactionId; this.kafkaProducer = getTransactionProducer(kafkaProperties, transactionId); kafkaProducer.beginTransaction(); + recordNumInTransaction = 0; } @Override @@ -70,7 +73,8 @@ public Optional prepareCommit() { transactionId, kafkaProperties, this.kafkaProducer.getProducerId(), - this.kafkaProducer.getEpoch()); + this.kafkaProducer.getEpoch(), + this.kafkaProducer.isTxnStarted()); return Optional.of(kafkaCommitInfo); } @@ -107,6 +111,10 @@ public void abortTransaction(long checkpointId) { @Override public List snapshotState(long checkpointId) { + if (recordNumInTransaction == 0) { + // KafkaSinkCommitter does not support emptyTransaction, so we commit here. + kafkaProducer.commitTransaction(); + } return Lists.newArrayList( new KafkaSinkState( transactionId, transactionPrefix, checkpointId, kafkaProperties)); @@ -116,7 +124,9 @@ public List snapshotState(long checkpointId) { public void close() { if (kafkaProducer != null) { kafkaProducer.flush(); - kafkaProducer.close(); + // kafkaProducer will abort the transaction if you call close() without a duration arg + // which will cause an exception when Committer commit the transaction later. + kafkaProducer.close(Duration.ZERO); } } diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaCommitInfo.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaCommitInfo.java index 99cc3aaf3c4e..82ef8af4d3ca 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaCommitInfo.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaCommitInfo.java @@ -31,4 +31,5 @@ public class KafkaCommitInfo implements Serializable { private final Properties kafkaProperties; private final long producerId; private final short epoch; + private final boolean txnStarted; } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java index ffc97f4dd33f..4f418cba4dbc 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java @@ -769,6 +769,14 @@ public void testKafkaProtobufToAssert(TestContainer container) }); } + @TestTemplate + public void testKafkaExactlyOnce(TestContainer container) throws Exception { + container.executeJob("/kafka/fake_to_kafka_exactly_once.conf"); + String topicName = "kafka_topic_exactly_once"; + Map data = getKafkaConsumerData(topicName); + Assertions.assertEquals(4, data.size()); + } + public static String getTestConfigFile(String configFile) throws FileNotFoundException, URISyntaxException { URL resource = KafkaIT.class.getResource(configFile); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/fake_to_kafka_exactly_once.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/fake_to_kafka_exactly_once.conf new file mode 100644 index 000000000000..e57aa89565c3 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/fake_to_kafka_exactly_once.conf @@ -0,0 +1,48 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +env { + execution.parallelism = 1 + job.mode = "BATCH" + } + +source { + FakeSource { + parallelism = 1 + result_table_name = "fake" + split.read-interval = 30000 + split.num = 2 + row.num = 4 + schema = { + fields { + name = "string" + age = "int" + } + } + } + } + +transform {} + + +sink{ + kafka { + format = JSON + topic = "kafka_topic_exactly_once" + bootstrap.servers = "kafkaCluster:9092" + semantics = EXACTLY_ONCE + } +} \ No newline at end of file