Skip to content

Commit

Permalink
Fix XA Transaction bug (#5020)
Browse files Browse the repository at this point in the history
  • Loading branch information
EricJoy2048 authored Jul 7, 2023
1 parent a2d2f2d commit 852fe10
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

package org.apache.seatunnel.common.exception;

import java.io.PrintWriter;
import java.io.StringWriter;

/** SeaTunnel global exception, used to tell user more clearly error messages */
public class SeaTunnelRuntimeException extends RuntimeException {
private final SeaTunnelErrorCode seaTunnelErrorCode;
Expand All @@ -36,17 +33,7 @@ public SeaTunnelRuntimeException(
}

public SeaTunnelRuntimeException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) {
super(seaTunnelErrorCode.getErrorMessage() + " - " + getMessageFromThrowable(cause));
super(seaTunnelErrorCode.getErrorMessage(), cause);
this.seaTunnelErrorCode = seaTunnelErrorCode;
}

public static String getMessageFromThrowable(Throwable cause) {
if (cause == null) {
return "";
}
StringWriter stringWriter = new StringWriter();
PrintWriter printWriter = new PrintWriter(stringWriter);
cause.printStackTrace(printWriter);
return stringWriter.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public static JdbcConnectionConfig of(ReadonlyConfig config) {
builder.xaDataSourceClassName(config.get(JdbcOptions.XA_DATA_SOURCE_CLASS_NAME));
builder.maxCommitAttempts(config.get(JdbcOptions.MAX_COMMIT_ATTEMPTS));
builder.transactionTimeoutSec(config.get(JdbcOptions.TRANSACTION_TIMEOUT_SEC));
builder.maxRetries(0);
}

config.getOptional(JdbcOptions.USER).ifPresent(builder::username);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Throwables;

import javax.transaction.xa.Xid;

import java.io.IOException;
Expand Down Expand Up @@ -137,11 +139,22 @@ public void write(SeaTunnelRow element) {
@Override
public Optional<XidInfo> prepareCommit() throws IOException {
tryOpen();
prepareCurrentTx();

boolean emptyXaTransaction = false;
try {
prepareCurrentTx();
} catch (Exception e) {
if (Throwables.getRootCause(e) instanceof XaFacade.EmptyXaTransactionException) {
emptyXaTransaction = true;
LOG.info("skip prepare empty xa transaction, xid={}", currentXid);
} else {
throw e;
}
}
this.currentXid = null;
beginTx();
checkState(prepareXid != null, "prepare xid must not be null");
return Optional.of(new XidInfo(prepareXid, 0));
return emptyXaTransaction ? Optional.empty() : Optional.of(new XidInfo(prepareXid, 0));
}

@Override
Expand Down Expand Up @@ -186,14 +199,22 @@ private void beginTx() throws IOException {
private void prepareCurrentTx() throws IOException {
checkState(currentXid != null, "no current xid");
outputFormat.flush();

Exception endAndPrepareException = null;
try {
xaFacade.endAndPrepare(currentXid);
prepareXid = currentXid;
} catch (Exception e) {
endAndPrepareException = e;
throw new JdbcConnectorException(
JdbcConnectorErrorCode.XA_OPERATION_FAILED,
"unable to prepare current xa transaction",
e);
} finally {
if (endAndPrepareException == null
|| Throwables.getRootCause(endAndPrepareException)
instanceof XaFacade.EmptyXaTransactionException) {
prepareXid = currentXid;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

@Slf4j
public class JdbcSinkAggregatedCommitter
implements SinkAggregatedCommitter<XidInfo, JdbcAggregatedCommitInfo> {

Expand Down Expand Up @@ -67,6 +70,7 @@ public List<JdbcAggregatedCommitInfo> commit(
return aggregatedCommitInfos.stream()
.map(
aggregatedCommitInfo -> {
log.info("commit xid: " + aggregatedCommitInfo.getXidInfoList());
GroupXaOperationResult<XidInfo> result =
xaGroupOps.commit(
new ArrayList<>(aggregatedCommitInfo.getXidInfoList()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,18 +171,19 @@ public void received(Record<?> record) {
serializeStates(writerStateSerializer.get(), states));
}
if (containAggCommitter) {
lastCommitInfo.ifPresent(
commitInfoT ->
runningTask
.getExecutionContext()
.sendToMember(
new SinkPrepareCommitOperation(
barrier,
committerTaskLocation,
SerializationUtils.serialize(
commitInfoT)),
committerTaskAddress)
.join());
CommitInfoT commitInfoT = null;
if (lastCommitInfo.isPresent()) {
commitInfoT = lastCommitInfo.get();
}
runningTask
.getExecutionContext()
.sendToMember(
new SinkPrepareCommitOperation(
barrier,
committerTaskLocation,
SerializationUtils.serialize(commitInfoT)),
committerTaskAddress)
.join();
}
} else {
if (containAggCommitter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,10 @@ public void run() throws Exception {
taskExecutionService
.getExecutionContext(taskLocation.getTaskGroupLocation())
.getClassLoader();
committerTask.receivedWriterCommitInfo(
barrier.getId(), SerializationUtils.deserialize(commitInfos, classLoader));
if (commitInfos != null) {
committerTask.receivedWriterCommitInfo(
barrier.getId(), SerializationUtils.deserialize(commitInfos, classLoader));
}
committerTask.triggerBarrier(barrier);
}
}

0 comments on commit 852fe10

Please sign in to comment.