Skip to content

Commit

Permalink
GH-1362: Enable Capture of Tx Synchronization Fail
Browse files Browse the repository at this point in the history
Resolves #1362

**cherry-pick to 2.3.x**

* Add missing doc changes.

* Fix test to reset flag.
  • Loading branch information
garyrussell authored and artembilan committed Jul 16, 2021
1 parent 10929b7 commit 032f8ec
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp.rabbit.connection;

import org.springframework.amqp.AmqpException;

/**
* Represents a failure to commit or rollback when performing afterCompletion
* after the primary transaction completes.
*
* @author Gary Russell
* @since 2.4
*/
public class AfterCompletionFailedException extends AmqpException {

private static final long serialVersionUID = 1L;

private final int syncStatus;

/**
* Construct an instance with the provided properties.
* @param syncStatus the synchronization status.
* @param cause the cause.
*/
public AfterCompletionFailedException(int syncStatus, Throwable cause) {
super(cause);
this.syncStatus = syncStatus;
}

/**
* Return the synchronization status.
* @return the status.
*/
public int getSyncStatus() {
return this.syncStatus;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.amqp.rabbit.connection;

import java.io.IOException;
import java.util.function.Consumer;

import org.springframework.amqp.AmqpIOException;
import org.springframework.lang.Nullable;
Expand All @@ -42,6 +43,10 @@
*/
public final class ConnectionFactoryUtils {

private static final ThreadLocal<AfterCompletionFailedException> failures = new ThreadLocal<>();

private static boolean captureAfterCompletionExceptions;

private ConnectionFactoryUtils() {
}

Expand Down Expand Up @@ -181,11 +186,42 @@ public static RabbitResourceHolder bindResourceToTransaction(RabbitResourceHolde
resourceHolder.setSynchronizedWithTransaction(true);
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronizationManager.registerSynchronization(new RabbitResourceSynchronization(resourceHolder,
connectionFactory));
connectionFactory, ConnectionFactoryUtils::completionFailed));
}
return resourceHolder;
}

private static void completionFailed(AfterCompletionFailedException ex) {
if (captureAfterCompletionExceptions) {
failures.set(ex);
}
}

/**
* Call this method to enable capturing {@link AfterCompletionFailedException}s
* when using transaction synchronization. Exceptions are stored in a {@link ThreadLocal}
* which must be cleared by calling {@link #checkAfterCompletion()} after the transaction
* has completed.
* @param enable true to enable capture.
*/
public static void enableAfterCompletionFailureCapture(boolean enable) {
captureAfterCompletionExceptions = enable;
}

/**
* When using transaction synchronization, call this method after the transaction commits to
* verify that the RabbitMQ transaction committed.
* @throws AfterCompletionFailedException if synchronization failed.
* @since 2.3.10
*/
public static void checkAfterCompletion() {
AfterCompletionFailedException ex = failures.get();
if (ex != null) {
failures.remove();
throw ex;
}
}

public static void registerDeliveryTag(ConnectionFactory connectionFactory, Channel channel, Long tag) {

Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
Expand Down Expand Up @@ -318,9 +354,14 @@ private static final class RabbitResourceSynchronization extends

private final RabbitResourceHolder resourceHolder;

RabbitResourceSynchronization(RabbitResourceHolder resourceHolder, Object resourceKey) {
private final Consumer<AfterCompletionFailedException> afterCompletionCallback;

RabbitResourceSynchronization(RabbitResourceHolder resourceHolder, Object resourceKey,
Consumer<AfterCompletionFailedException> afterCompletionCallback) {

super(resourceHolder, resourceKey);
this.resourceHolder = resourceHolder;
this.afterCompletionCallback = afterCompletionCallback;
}

@Override
Expand All @@ -338,6 +379,9 @@ public void afterCompletion(int status) {
this.resourceHolder.rollbackAll();
}
}
catch (RuntimeException ex) {
this.afterCompletionCallback.accept(new AfterCompletionFailedException(status, ex));
}
finally {
if (this.resourceHolder.isReleaseAfterCompletion()) {
this.resourceHolder.setSynchronizedWithTransaction(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.amqp.rabbit.core;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail;
Expand Down Expand Up @@ -62,8 +63,10 @@
import org.springframework.amqp.core.ReceiveAndReplyCallback;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory;
import org.springframework.amqp.rabbit.connection.AfterCompletionFailedException;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ChannelProxy;
import org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils;
import org.springframework.amqp.rabbit.connection.PublisherCallbackChannel;
import org.springframework.amqp.rabbit.connection.RabbitUtils;
import org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory;
Expand Down Expand Up @@ -616,6 +619,7 @@ void resourcesClearedAfterTxFails() throws IOException, TimeoutException {

@Test
void resourcesClearedAfterTxFailsWithSync() throws IOException, TimeoutException {
ConnectionFactoryUtils.enableAfterCompletionFailureCapture(true);
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);
Connection mockConnection = mock(Connection.class);
Channel mockChannel = mock(Channel.class);
Expand All @@ -638,6 +642,9 @@ void resourcesClearedAfterTxFailsWithSync() throws IOException, TimeoutException
assertThatIllegalStateException()
.isThrownBy(() -> (TransactionSynchronizationManager.getSynchronizations()).isEmpty())
.withMessage("Transaction synchronization is not active");
assertThatExceptionOfType(AfterCompletionFailedException.class)
.isThrownBy(() -> ConnectionFactoryUtils.checkAfterCompletion());
ConnectionFactoryUtils.enableAfterCompletionFailureCapture(false);
}

@SuppressWarnings("serial")
Expand Down
11 changes: 11 additions & 0 deletions src/reference/asciidoc/amqp.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5673,6 +5673,17 @@ If you prefer XML configuration, you can declare the following bean in your XML
----
====

[[tx-sync]]
===== Transaction Synchronization

Synchronizing a RabbitMQ transaction with some other (e.g. DBMS) transaction provides "Best Effort One Phase Commit" semantics.
It is possible that the RabbitMQ transaction fails to commit during the after completion phase of transaction synchronization.
This is logged by the `spring-tx` infrastructure as an error, but no exception is thrown to the calling code.
Starting with version 2.3.10, you can call `ConnectionUtils.checkAfterCompletion()` after the transaction has committed on the same thread that processed the transaction.
It will simply return if no exception occurred; otherwise it will throw an `AfterCompletionFailedException` which will have a property representing the synchronization status of the completion.

Enable this feature by calling `ConnectionFactoryUtils.enableAfterCompletionFailureCapture(true)`; this is a global flag and applies to all threads.

[[containerAttributes]]
==== Message Listener Container Configuration

Expand Down

0 comments on commit 032f8ec

Please sign in to comment.