Skip to content

Commit

Permalink
[Hotfix][Connector-V2][kafka] fix kafka sink config exactly-once exce…
Browse files Browse the repository at this point in the history
…ption
  • Loading branch information
fcb-xiaobo committed Oct 16, 2024
1 parent 69086e0 commit 95f2812
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,17 @@ public void initTransactions() {

@Override
public void beginTransaction() throws ProducerFencedException {
if (log.isDebugEnabled()) {
log.debug("KafkaInternalProducer.beginTransaction. " + this.transactionalId);
}
super.beginTransaction();
}

@Override
public void commitTransaction() throws ProducerFencedException {
if (log.isDebugEnabled()) {
log.debug("KafkaInternalProducer.commitTransaction." + this.transactionalId);
}
super.commitTransaction();
}

Expand All @@ -69,7 +75,18 @@ public void abortTransaction() throws ProducerFencedException {
}

public void setTransactionalId(String transactionalId) {
if (log.isDebugEnabled()) {
log.debug(
"KafkaInternalProducer.abortTransaction. Target transactionalId="
+ transactionalId);
}
if (!transactionalId.equals(this.transactionalId)) {
if (log.isDebugEnabled()) {
log.debug(
"KafkaInternalProducer.abortTransaction. Current transactionalId={} not match target transactionalId={}",
this.transactionalId,
transactionalId);
}
Object transactionManager = getTransactionManager();
synchronized (transactionManager) {
ReflectionUtils.setField(transactionManager, "transactionalId", transactionalId);
Expand Down Expand Up @@ -97,7 +114,7 @@ public long getProducerId() {
return (long) ReflectionUtils.getField(producerIdAndEpoch, "producerId").get();
}

public void resumeTransaction(long producerId, short epoch) {
public void resumeTransaction(long producerId, short epoch, boolean txnStarted) {

log.info(
"Attempting to resume transaction {} with producerId {} and epoch {}",
Expand Down Expand Up @@ -125,10 +142,15 @@ public void resumeTransaction(long producerId, short epoch) {
transitionTransactionManagerStateTo(transactionManager, "READY");

transitionTransactionManagerStateTo(transactionManager, "IN_TRANSACTION");
ReflectionUtils.setField(transactionManager, "transactionStarted", true);
ReflectionUtils.setField(transactionManager, "transactionStarted", txnStarted);
}
}

public boolean isTxnStarted() {
Object transactionManager = getTransactionManager();
return (boolean) ReflectionUtils.getField(transactionManager, "transactionStarted").get();
}

private static Object createProducerIdAndEpoch(long producerId, short epoch) {
try {
Field field =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public List<KafkaCommitInfo> commit(List<KafkaCommitInfo> commitInfos) {
for (KafkaCommitInfo commitInfo : commitInfos) {
String transactionId = commitInfo.getTransactionId();
if (log.isDebugEnabled()) {
log.debug("Committing transaction {}", transactionId);
log.debug("Committing transaction {}, commitInfo {}", transactionId, commitInfo);
}
KafkaProducer<?, ?> producer = getProducer(commitInfo);
producer.commitTransaction();
Expand Down Expand Up @@ -87,7 +87,8 @@ public void abort(List<KafkaCommitInfo> commitInfos) {
new KafkaInternalProducer<>(
commitInfo.getKafkaProperties(), commitInfo.getTransactionId());
}
kafkaProducer.resumeTransaction(commitInfo.getProducerId(), commitInfo.getEpoch());
kafkaProducer.resumeTransaction(
commitInfo.getProducerId(), commitInfo.getEpoch(), commitInfo.isTxnStarted());
return kafkaProducer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;

import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
Expand All @@ -45,6 +46,7 @@ public class KafkaTransactionSender<K, V> implements KafkaProduceSender<K, V> {
private String transactionId;
private final String transactionPrefix;
private final Properties kafkaProperties;
private int recordNumInTransaction = 0;

public KafkaTransactionSender(String transactionPrefix, Properties kafkaProperties) {
this.transactionPrefix = transactionPrefix;
Expand All @@ -61,6 +63,7 @@ public void beginTransaction(String transactionId) {
this.transactionId = transactionId;
this.kafkaProducer = getTransactionProducer(kafkaProperties, transactionId);
kafkaProducer.beginTransaction();
recordNumInTransaction = 0;
}

@Override
Expand All @@ -70,7 +73,8 @@ public Optional<KafkaCommitInfo> prepareCommit() {
transactionId,
kafkaProperties,
this.kafkaProducer.getProducerId(),
this.kafkaProducer.getEpoch());
this.kafkaProducer.getEpoch(),
this.kafkaProducer.isTxnStarted());
return Optional.of(kafkaCommitInfo);
}

Expand Down Expand Up @@ -107,6 +111,10 @@ public void abortTransaction(long checkpointId) {

@Override
public List<KafkaSinkState> snapshotState(long checkpointId) {
if (recordNumInTransaction == 0) {
// KafkaSinkCommitter does not support emptyTransaction, so we commit here.
kafkaProducer.commitTransaction();
}
return Lists.newArrayList(
new KafkaSinkState(
transactionId, transactionPrefix, checkpointId, kafkaProperties));
Expand All @@ -116,7 +124,9 @@ public List<KafkaSinkState> snapshotState(long checkpointId) {
public void close() {
if (kafkaProducer != null) {
kafkaProducer.flush();
kafkaProducer.close();
// kafkaProducer will abort the transaction if you call close() without a duration arg
// which will cause an exception when Committer commit the transaction later.
kafkaProducer.close(Duration.ZERO);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ public class KafkaCommitInfo implements Serializable {
private final Properties kafkaProperties;
private final long producerId;
private final short epoch;
private final boolean txnStarted;
}
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,14 @@ public void testKafkaProtobufToAssert(TestContainer container)
});
}

@TestTemplate
public void testKafkaExactlyOnce(TestContainer container) throws Exception {
container.executeJob("/kafka/fake_to_kafka_exactly_once.conf");
String topicName = "kafka_topic_exactly_once";
Map<String, String> data = getKafkaConsumerData(topicName);
Assertions.assertEquals(4, data.size());
}

public static String getTestConfigFile(String configFile)
throws FileNotFoundException, URISyntaxException {
URL resource = KafkaIT.class.getResource(configFile);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
env {
execution.parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
parallelism = 1
result_table_name = "fake"
split.read-interval = 30000
split.num = 2
row.num = 4
schema = {
fields {
name = "string"
age = "int"
}
}
}
}

transform {}


sink{
kafka {
format = JSON
topic = "kafka_topic_exactly_once"
bootstrap.servers = "kafkaCluster:9092"
semantics = EXACTLY_ONCE
}
}

0 comments on commit 95f2812

Please sign in to comment.