diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/SeaTunnelRuntimeException.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/SeaTunnelRuntimeException.java index 3151c99286d..7e3f3ccace6 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/SeaTunnelRuntimeException.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/SeaTunnelRuntimeException.java @@ -17,9 +17,6 @@ package org.apache.seatunnel.common.exception; -import java.io.PrintWriter; -import java.io.StringWriter; - /** SeaTunnel global exception, used to tell user more clearly error messages */ public class SeaTunnelRuntimeException extends RuntimeException { private final SeaTunnelErrorCode seaTunnelErrorCode; @@ -36,17 +33,7 @@ public SeaTunnelRuntimeException( } public SeaTunnelRuntimeException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) { - super(seaTunnelErrorCode.getErrorMessage() + " - " + getMessageFromThrowable(cause)); + super(seaTunnelErrorCode.getErrorMessage(), cause); this.seaTunnelErrorCode = seaTunnelErrorCode; } - - public static String getMessageFromThrowable(Throwable cause) { - if (cause == null) { - return ""; - } - StringWriter stringWriter = new StringWriter(); - PrintWriter printWriter = new PrintWriter(stringWriter); - cause.printStackTrace(printWriter); - return stringWriter.toString(); - } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java index ab007291171..afceddc59a0 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java @@ -58,6 +58,7 @@ public static JdbcConnectionConfig of(ReadonlyConfig config) { builder.xaDataSourceClassName(config.get(JdbcOptions.XA_DATA_SOURCE_CLASS_NAME)); builder.maxCommitAttempts(config.get(JdbcOptions.MAX_COMMIT_ATTEMPTS)); builder.transactionTimeoutSec(config.get(JdbcOptions.TRANSACTION_TIMEOUT_SEC)); + builder.maxRetries(0); } config.getOptional(JdbcOptions.USER).ifPresent(builder::username); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java index 9d3690af0d4..60861891b57 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java @@ -41,6 +41,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Throwables; + import javax.transaction.xa.Xid; import java.io.IOException; @@ -137,11 +139,22 @@ public void write(SeaTunnelRow element) { @Override public Optional<XidInfo> prepareCommit() throws IOException { tryOpen(); - prepareCurrentTx(); + + boolean emptyXaTransaction = false; + try { + prepareCurrentTx(); + } catch (Exception e) { + if (Throwables.getRootCause(e) instanceof XaFacade.EmptyXaTransactionException) { + emptyXaTransaction = true; + LOG.info("skip prepare empty xa transaction, xid={}", currentXid); + } else { + throw e; + } + } this.currentXid = null; beginTx(); checkState(prepareXid != null, "prepare xid must not be null"); - return Optional.of(new XidInfo(prepareXid, 0)); + return emptyXaTransaction ? Optional.empty() : Optional.of(new XidInfo(prepareXid, 0)); } @Override @@ -186,14 +199,22 @@ private void beginTx() throws IOException { private void prepareCurrentTx() throws IOException { checkState(currentXid != null, "no current xid"); outputFormat.flush(); + + Exception endAndPrepareException = null; try { xaFacade.endAndPrepare(currentXid); - prepareXid = currentXid; } catch (Exception e) { + endAndPrepareException = e; throw new JdbcConnectorException( JdbcConnectorErrorCode.XA_OPERATION_FAILED, "unable to prepare current xa transaction", e); + } finally { + if (endAndPrepareException == null + || Throwables.getRootCause(endAndPrepareException) + instanceof XaFacade.EmptyXaTransactionException) { + prepareXid = currentXid; + } } } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java index 4e049e14b57..c8e2c268b3b 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java @@ -28,11 +28,14 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcAggregatedCommitInfo; import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo; +import lombok.extern.slf4j.Slf4j; + import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; +@Slf4j public class JdbcSinkAggregatedCommitter implements SinkAggregatedCommitter<XidInfo, JdbcAggregatedCommitInfo> { @@ -67,6 +70,7 @@ public List<JdbcAggregatedCommitInfo> commit( return aggregatedCommitInfos.stream() .map( aggregatedCommitInfo -> { + log.info("commit xid: " + aggregatedCommitInfo.getXidInfoList()); GroupXaOperationResult<XidInfo> result = xaGroupOps.commit( new ArrayList<>(aggregatedCommitInfo.getXidInfoList()), diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java index 17860945e4a..fe1e5c5a894 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java @@ -171,18 +171,19 @@ public void received(Record<?> record) { serializeStates(writerStateSerializer.get(), states)); } if (containAggCommitter) { - lastCommitInfo.ifPresent( - commitInfoT -> - runningTask - .getExecutionContext() - .sendToMember( - new SinkPrepareCommitOperation( - barrier, - committerTaskLocation, - SerializationUtils.serialize( - commitInfoT)), - committerTaskAddress) - .join()); + CommitInfoT commitInfoT = null; + if (lastCommitInfo.isPresent()) { + commitInfoT = lastCommitInfo.get(); + } + runningTask + .getExecutionContext() + .sendToMember( + new SinkPrepareCommitOperation( + barrier, + committerTaskLocation, + SerializationUtils.serialize(commitInfoT)), + committerTaskAddress) + .join(); } } else { if (containAggCommitter) { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java index 7e41b3d9c7a..06945a61b25 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java @@ -79,8 +79,10 @@ public void run() throws Exception { taskExecutionService .getExecutionContext(taskLocation.getTaskGroupLocation()) .getClassLoader(); - committerTask.receivedWriterCommitInfo( - barrier.getId(), SerializationUtils.deserialize(commitInfos, classLoader)); + if (commitInfos != null) { + committerTask.receivedWriterCommitInfo( + barrier.getId(), SerializationUtils.deserialize(commitInfos, classLoader)); + } committerTask.triggerBarrier(barrier); } }