Skip to content

Commit

Permalink
GH-1434: Mixed CFs With/Without Confirms/Returns
Browse files Browse the repository at this point in the history
GH-1434 allowing to rabbit template have multiple connection factories with not same confirms and returns flags.

GH-1434 avoiding call obtainTargetConnectionFactory twice

GH-1434 test

GH-1434 javadoc + removing else

GH-1434 fixing checkstyle

GH-1434 using publisherConfirms from PooledChannelConnectionFactory

GH-1434 adapting AbstractRoutingConnectionFactory

GH-1434 javadoc & checkstyle & BeforeEach > BeforeAll

GH-1434 javadoc

GH-1434 doc

GH-1434 doc
  • Loading branch information
LeonardoFerreiraa authored and garyrussell committed Mar 30, 2022
1 parent 6eca017 commit eccdb47
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 20 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ _you should see branches on origin as well as upstream, including 'main' and 'ma

== A Day in the Life of a Contributor

* _Always_ work on topic branches (Typically use the HitHub (or JIRA) issue ID as the branch name).
* _Always_ work on topic branches (Typically use the GitHub (or JIRA) issue ID as the branch name).
- For example, to create and switch to a new branch for issue #123: `git checkout -b GH-123`
* You might be working on several different topic branches at any given time, but when at a stopping point for one of those branches, commit (a local operation).
* Please follow the "Commit Guidelines" described in https://git-scm.com/book/en/Distributed-Git-Contributing-to-a-Project[this chapter of Pro Git].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public abstract class AbstractRoutingConnectionFactory implements ConnectionFact

private Boolean returns;

private boolean consistentConfirmsReturns = true;

/**
* Specify the map of target ConnectionFactories, with the lookup key as key.
* <p>The key can be of arbitrary type; this class implements the
Expand Down Expand Up @@ -125,10 +127,13 @@ private void checkConfirmsAndReturns(ConnectionFactory cf) {
if (this.returns == null) {
this.returns = cf.isPublisherReturns();
}
Assert.isTrue(this.confirms.booleanValue() == cf.isPublisherConfirms(),
"Target connection factories must have the same setting for publisher confirms");
Assert.isTrue(this.returns.booleanValue() == cf.isPublisherReturns(),
"Target connection factories must have the same setting for publisher returns");

if (this.consistentConfirmsReturns) {
Assert.isTrue(this.confirms.booleanValue() == cf.isPublisherConfirms(),
"Target connection factories must have the same setting for publisher confirms");
Assert.isTrue(this.returns.booleanValue() == cf.isPublisherReturns(),
"Target connection factories must have the same setting for publisher returns");
}
}

@Override
Expand Down Expand Up @@ -230,6 +235,25 @@ public ConnectionFactory getTargetConnectionFactory(Object key) {
return this.targetConnectionFactories.get(key);
}

/**
* Specify whether to apply a validation enforcing all {@link ConnectionFactory#isPublisherConfirms()} and
* {@link ConnectionFactory#isPublisherReturns()} have a consistent value.
* <p>
* A consistent value means that all ConnectionFactories must have the same value between all
* {@link ConnectionFactory#isPublisherConfirms()} and the same value between all
* {@link ConnectionFactory#isPublisherReturns()}.
* </p>
* <p>
* Note that in any case the values between {@link ConnectionFactory#isPublisherConfirms()} and
* {@link ConnectionFactory#isPublisherReturns()} don't need to be equals between each other.
* </p>
* @param consistentConfirmsReturns true to validate, false to not validate.
* @since 2.4.4
*/
public void setConsistentConfirmsReturns(boolean consistentConfirmsReturns) {
this.consistentConfirmsReturns = consistentConfirmsReturns;
}

/**
* Adds the given {@link ConnectionFactory} and associates it with the given lookup key.
* @param key the lookup key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
* @author Artem Bilan
* @author Steve Powell
* @author Will Droste
* @author Leonardo Ferreira
*/
@ManagedResource
public class CachingConnectionFactory extends AbstractConnectionFactory
Expand Down Expand Up @@ -1133,6 +1134,9 @@ else if (methodName.equals("isTransactional")) {
else if (methodName.equals("isConfirmSelected")) {
return this.confirmSelected;
}
else if (methodName.equals("isPublisherConfirms")) {
return this.publisherConfirms;
}
try {
if (this.target == null || !this.target.isOpen()) {
if (this.target instanceof PublisherCallbackChannel) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,12 @@ default boolean isConfirmSelected() {
return false;
}

/**
* Return true if publisher confirms are enabled.
* @return true if publisherConfirms.
*/
default boolean isPublisherConfirms() {
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ private Channel createProxy(Channel channel, boolean transacted) {
return channel.confirmSelect();
case "isConfirmSelected":
return confirmSelected.get();
case "isPublisherConfirms":
return false;
}
return null;
};
Expand All @@ -231,6 +233,7 @@ private Channel createProxy(Channel channel, boolean transacted) {
advisor.addMethodName("isTransactional");
advisor.addMethodName("confirmSelect");
advisor.addMethodName("isConfirmSelected");
advisor.addMethodName("isPublisherConfirms");
pf.addAdvisor(advisor);
pf.addInterface(ChannelProxy.class);
proxy.set((Channel) pf.getProxy());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
* {@link #closeThreadChannel()}.
*
* @author Gary Russell
* @author Leonardo Ferreira
* @since 2.3
*
*/
Expand Down Expand Up @@ -288,6 +289,8 @@ private Channel createProxy(Channel channel, boolean transactional) {
return channel.confirmSelect();
case "isConfirmSelected":
return confirmSelected.get();
case "isPublisherConfirms":
return false;
}
return null;
};
Expand All @@ -297,6 +300,7 @@ private Channel createProxy(Channel channel, boolean transactional) {
advisor.addMethodName("isTransactional");
advisor.addMethodName("confirmSelect");
advisor.addMethodName("isConfirmSelected");
advisor.addMethodName("isPublisherConfirms");
pf.addAdvisor(advisor);
pf.addInterface(ChannelProxy.class);
return (Channel) pf.getProxy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@
* @author Mark Norkin
* @author Mohammad Hewedy
* @author Alexey Platonov
* @author Leonardo Ferreira
*
* @since 1.0
*/
Expand Down Expand Up @@ -257,10 +258,6 @@ public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count

private ErrorHandler replyErrorHandler;

private volatile Boolean confirmsOrReturnsCapable;

private volatile boolean publisherConfirms;

private volatile boolean usingFastReplyTo;

private volatile boolean evaluatedFastReplyTo;
Expand Down Expand Up @@ -1263,7 +1260,7 @@ else if (isChannelTransacted()) {
}
return buildMessageFromDelivery(delivery);
}
});
}, obtainTargetConnectionFactory(this.receiveConnectionFactorySelectorExpression, null));
logReceived(message);
return message;
}
Expand Down Expand Up @@ -1960,7 +1957,7 @@ private Message doSendAndReceiveWithDirect(String exchange, String routingKey, M
boolean cancelConsumer = false;
try {
Channel channel = channelHolder.getChannel();
if (this.confirmsOrReturnsCapable) {
if (isPublisherConfirmsOrReturns(connectionFactory)) {
addListener(channel);
}
Message reply = doSendAndReceiveAsListener(exchange, routingKey, message, correlationData, channel,
Expand Down Expand Up @@ -2224,12 +2221,10 @@ private void cleanUpAfterAction(@Nullable Channel channel, boolean invokeScope,
private <T> T invokeAction(ChannelCallback<T> action, ConnectionFactory connectionFactory, Channel channel)
throws Exception { // NOSONAR see the callback

if (this.confirmsOrReturnsCapable == null) {
determineConfirmsReturnsCapability(connectionFactory);
}
if (this.confirmsOrReturnsCapable) {
if (isPublisherConfirmsOrReturns(connectionFactory)) {
addListener(channel);
}

if (logger.isDebugEnabled()) {
logger.debug(
"Executing callback " + action.getClass().getSimpleName() + " on RabbitMQ Channel: " + channel);
Expand Down Expand Up @@ -2351,10 +2346,8 @@ public void waitForConfirmsOrDie(long timeout) {
}
}

public void determineConfirmsReturnsCapability(ConnectionFactory connectionFactory) {
this.publisherConfirms = connectionFactory.isPublisherConfirms();
this.confirmsOrReturnsCapable =
this.publisherConfirms || connectionFactory.isPublisherReturns();
private boolean isPublisherConfirmsOrReturns(ConnectionFactory connectionFactory) {
return connectionFactory.isPublisherConfirms() || connectionFactory.isPublisherReturns();
}

/**
Expand Down Expand Up @@ -2435,8 +2428,11 @@ protected void sendToRabbit(Channel channel, String exchange, String routingKey,
}

private void setupConfirm(Channel channel, Message message, @Nullable CorrelationData correlationDataArg) {
if ((this.publisherConfirms || this.confirmCallback != null) && channel instanceof PublisherCallbackChannel) {
final boolean publisherConfirms = channel instanceof ChannelProxy
&& ((ChannelProxy) channel).isPublisherConfirms();

if ((publisherConfirms || this.confirmCallback != null)
&& channel instanceof PublisherCallbackChannel) {
long nextPublishSeqNo = channel.getNextPublishSeqNo();
if (nextPublishSeqNo > 0) {
PublisherCallbackChannel publisherCallbackChannel = (PublisherCallbackChannel) channel;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp.rabbit.core;

import static org.assertj.core.api.Assertions.assertThat;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.connection.PooledChannelConnectionFactory;
import org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory;
import org.springframework.amqp.rabbit.junit.BrokerTestUtils;
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;

/**
* @author Leonardo Ferreira
* @since 2.4.4
*/
@RabbitAvailable(queues = RabbitTemplateRoutingConnectionFactoryIntegrationTests.ROUTE)
class RabbitTemplateRoutingConnectionFactoryIntegrationTests {

public static final String ROUTE = "test.queue.RabbitTemplateRoutingConnectionFactoryIntegrationTests";

private static RabbitTemplate rabbitTemplate;

@BeforeAll
static void create() {
final com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();
cf.setHost("localhost");
cf.setPort(BrokerTestUtils.getPort());

CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(cf);

cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);

PooledChannelConnectionFactory pooledChannelConnectionFactory = new PooledChannelConnectionFactory(cf);

Map<Object, ConnectionFactory> connectionFactoryMap = new HashMap<>(2);
connectionFactoryMap.put("true", cachingConnectionFactory);
connectionFactoryMap.put("false", pooledChannelConnectionFactory);

final AbstractRoutingConnectionFactory routingConnectionFactory = new SimpleRoutingConnectionFactory();
routingConnectionFactory.setConsistentConfirmsReturns(false);
routingConnectionFactory.setDefaultTargetConnectionFactory(pooledChannelConnectionFactory);
routingConnectionFactory.setTargetConnectionFactories(connectionFactoryMap);

rabbitTemplate = new RabbitTemplate(routingConnectionFactory);

final Expression sendExpression = new SpelExpressionParser().parseExpression(
"messageProperties.headers['x-use-publisher-confirms'] ?: false");
rabbitTemplate.setSendConnectionFactorySelectorExpression(sendExpression);
}

@AfterAll
static void cleanUp() {
rabbitTemplate.destroy();
}

@Test
void sendWithoutConfirmsTest() {
final String payload = UUID.randomUUID().toString();
rabbitTemplate.convertAndSend(ROUTE, (Object) payload, new CorrelationData());
assertThat(rabbitTemplate.getUnconfirmedCount()).isZero();

final Message received = rabbitTemplate.receive(ROUTE, Duration.ofSeconds(3).toMillis());
assertThat(received).isNotNull();
final String receivedPayload = new String(received.getBody());

assertThat(receivedPayload).isEqualTo(payload);
}

@Test
void sendWithConfirmsTest() throws Exception {
final String payload = UUID.randomUUID().toString();
final Message message = MessageBuilder.withBody(payload.getBytes(StandardCharsets.UTF_8))
.setHeader("x-use-publisher-confirms", "true").build();

final CorrelationData correlationData = new CorrelationData();
rabbitTemplate.send(ROUTE, message, correlationData);
assertThat(rabbitTemplate.getUnconfirmedCount()).isEqualTo(1);

final CorrelationData.Confirm confirm = correlationData.getFuture().get(10, TimeUnit.SECONDS);

assertThat(confirm.isAck()).isTrue();

final Message received = rabbitTemplate.receive(ROUTE, Duration.ofSeconds(10).toMillis());
assertThat(received).isNotNull();
final String receivedPayload = new String(received.getBody());

assertThat(receivedPayload).isEqualTo(payload);
}

}
42 changes: 42 additions & 0 deletions src/reference/asciidoc/amqp.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,48 @@ For example, with lookup key qualifier `thing1` and a container listening to que
IMPORTANT: The target (and default, if provided) connection factories must have the same settings for publisher confirms and returns.
See <<cf-pub-conf-ret>>.

Starting with version 2.4.4, this validation can be disabled.
If you have a case that the values between confirms and returns need to be unequal, you can use `AbstractRoutingConnectionFactory#setConsistentConfirmsReturns` to turn of the validation.
Note that the first connection factory added to `AbstractRoutingConnectionFactory` will determine the general values of `confirms` and `returns`.

It may be useful if you have a case that certain messages you would to check confirms/returns and others you don't.
For example:

====
[source, java]
----
@Bean
public RabbitTemplate rabbitTemplate() {
final com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();
cf.setHost("localhost");
cf.setPort(5672);
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(cf);
cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
PooledChannelConnectionFactory pooledChannelConnectionFactory = new PooledChannelConnectionFactory(cf);
final Map<Object, ConnectionFactory> connectionFactoryMap = new HashMap<>(2);
connectionFactoryMap.put("true", cachingConnectionFactory);
connectionFactoryMap.put("false", pooledChannelConnectionFactory);
final AbstractRoutingConnectionFactory routingConnectionFactory = new SimpleRoutingConnectionFactory();
routingConnectionFactory.setConsistentConfirmsReturns(false);
routingConnectionFactory.setDefaultTargetConnectionFactory(pooledChannelConnectionFactory);
routingConnectionFactory.setTargetConnectionFactories(connectionFactoryMap);
final RabbitTemplate rabbitTemplate = new RabbitTemplate(routingConnectionFactory);
final Expression sendExpression = new SpelExpressionParser().parseExpression(
"messageProperties.headers['x-use-publisher-confirms'] ?: false");
rabbitTemplate.setSendConnectionFactorySelectorExpression(sendExpression);
}
----
====

This way messages with the header `x-use-publisher-confirms: true` will be sent through the caching connection and you can ensure the message delivery.
See <<cf-pub-conf-ret>> for more information about ensuring message delivery.

[[queue-affinity]]
===== Queue Affinity and the `LocalizedQueueConnectionFactory`

Expand Down

0 comments on commit eccdb47

Please sign in to comment.