Skip to content

Commit

Permalink
GH-1289: Confirms and Returns with Routing CF
Browse files Browse the repository at this point in the history
Resolves #1289

`RoutingConnectionFactory` did not support correlated confirms or returns.

Target factories (and default) must have the same settings.

**cherry-pick to 2.2.x, 2.1.x**

GH-1289: Fix test for back port

- `CorrelationData` needs an id (`null` by default before 2.3).
  • Loading branch information
garyrussell committed Dec 23, 2020
1 parent 819587e commit ad612a8
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@
import java.util.concurrent.ConcurrentHashMap;

import org.springframework.amqp.AmqpException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

Expand All @@ -35,7 +36,8 @@
* @author Gary Russell
* @since 1.3
*/
public abstract class AbstractRoutingConnectionFactory implements ConnectionFactory, RoutingConnectionFactory {
public abstract class AbstractRoutingConnectionFactory implements ConnectionFactory, RoutingConnectionFactory,
InitializingBean {

private final Map<Object, ConnectionFactory> targetConnectionFactories =
new ConcurrentHashMap<Object, ConnectionFactory>();
Expand All @@ -46,6 +48,10 @@ public abstract class AbstractRoutingConnectionFactory implements ConnectionFact

private boolean lenientFallback = true;

private Boolean confirms;

private Boolean returns;

/**
* Specify the map of target ConnectionFactories, with the lookup key as key.
* <p>The key can be of arbitrary type; this class implements the
Expand All @@ -58,6 +64,7 @@ public void setTargetConnectionFactories(Map<Object, ConnectionFactory> targetCo
Assert.noNullElements(targetConnectionFactories.values().toArray(),
"'targetConnectionFactories' cannot have null values.");
this.targetConnectionFactories.putAll(targetConnectionFactories);
targetConnectionFactories.values().stream().forEach(cf -> checkConfirmsAndReturns(cf));
}

/**
Expand All @@ -69,6 +76,7 @@ public void setTargetConnectionFactories(Map<Object, ConnectionFactory> targetCo
*/
public void setDefaultTargetConnectionFactory(ConnectionFactory defaultTargetConnectionFactory) {
this.defaultTargetConnectionFactory = defaultTargetConnectionFactory;
checkConfirmsAndReturns(defaultTargetConnectionFactory);
}

/**
Expand All @@ -93,9 +101,37 @@ public boolean isLenientFallback() {
return this.lenientFallback;
}

@Override
public boolean isPublisherConfirms() {
return this.confirms;
}

@Override
public boolean isPublisherReturns() {
return this.returns;
}

@Override
public void afterPropertiesSet() throws Exception {
Assert.notNull(this.confirms, "At least one target factory (or default) is required");
}

private void checkConfirmsAndReturns(ConnectionFactory cf) {
if (this.confirms == null) {
this.confirms = cf.isPublisherConfirms();
}
if (this.returns == null) {
this.returns = cf.isPublisherReturns();
}
Assert.isTrue(this.confirms.booleanValue() == cf.isPublisherConfirms(),
"Target connection factories must have the same setting for publisher confirms");
Assert.isTrue(this.returns.booleanValue() == cf.isPublisherReturns(),
"Target connection factories must have the same setting for publisher returns");
}

@Override
public Connection createConnection() throws AmqpException {
return this.determineTargetConnectionFactory().createConnection();
return determineTargetConnectionFactory().createConnection();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016 the original author or authors.
* Copyright 2016-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertNotNull;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
Expand All @@ -29,6 +30,8 @@
import org.junit.Test;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory;
import org.springframework.amqp.rabbit.junit.BrokerRunning;
import org.springframework.amqp.rabbit.junit.BrokerTestUtils;

Expand All @@ -45,6 +48,8 @@ public class RabbitTemplatePublisherCallbacksIntegrationTests2 {

private static final String ROUTE = "test.queue";

public static final String ROUTE2 = "test.queue.RabbitTemplatePublisherCallbacksIntegrationTests2.route";

private CachingConnectionFactory connectionFactoryWithConfirmsEnabled;

private RabbitTemplate templateWithConfirmsEnabled;
Expand All @@ -56,8 +61,6 @@ public class RabbitTemplatePublisherCallbacksIntegrationTests2 {
public void create() {
connectionFactoryWithConfirmsEnabled = new CachingConnectionFactory();
connectionFactoryWithConfirmsEnabled.setHost("localhost");
// When using publisher confirms, the cache size needs to be large enough
// otherwise channels can be closed before confirms are received.
connectionFactoryWithConfirmsEnabled.setChannelCacheSize(100);
connectionFactoryWithConfirmsEnabled.setPort(BrokerTestUtils.getPort());
connectionFactoryWithConfirmsEnabled.setPublisherConfirms(true);
Expand Down Expand Up @@ -94,6 +97,51 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie
assertMessageCountEquals(0L);
}

@Test
public void routingWithConfirmsNoListener() throws Exception {
routingWithConfirms(false);
}

@Test
public void routingWithConfirmsListener() throws Exception {
routingWithConfirms(true);
}

private void routingWithConfirms(boolean listener) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
SimpleRoutingConnectionFactory rcf = new SimpleRoutingConnectionFactory();
rcf.setDefaultTargetConnectionFactory(this.connectionFactoryWithConfirmsEnabled);
this.templateWithConfirmsEnabled.setConnectionFactory(rcf);
if (listener) {
this.templateWithConfirmsEnabled.setConfirmCallback((correlationData, ack, cause) -> {
latch.countDown();
});
}
this.templateWithConfirmsEnabled.setMandatory(true);
CorrelationData corr = new CorrelationData("foo");
this.templateWithConfirmsEnabled.convertAndSend("", ROUTE2, "foo", corr);
assertTrue(corr.getFuture().get(10, TimeUnit.SECONDS).isAck());
if (listener) {
assertTrue(latch.await(10, TimeUnit.SECONDS));
}
corr = new CorrelationData("bar");
this.templateWithConfirmsEnabled.convertAndSend("", "bad route", "foo", corr);
assertTrue(corr.getFuture().get(10, TimeUnit.SECONDS).isAck());
assertNotNull(corr.getReturnedMessage());
}

@Test
public void routingWithSimpleConfirms() throws Exception {
SimpleRoutingConnectionFactory rcf = new SimpleRoutingConnectionFactory();
rcf.setDefaultTargetConnectionFactory(this.connectionFactoryWithConfirmsEnabled);
this.templateWithConfirmsEnabled.setConnectionFactory(rcf);
assertTrue(this.templateWithConfirmsEnabled.<Boolean>invoke(template -> {
template.convertAndSend("", ROUTE2, "foo");
template.waitForConfirmsOrDie(10_000);
return true;
}));
}

private void assertMessageCountEquals(long wanted) throws InterruptedException {
long messageCount = determineMessageCount();
int n = 0;
Expand Down
7 changes: 5 additions & 2 deletions src/reference/asciidoc/amqp.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,9 @@ Doing so enables, for example, listening to queues with the same name but in a d

For example, with lookup key qualifier `thing1` and a container listening to queue `thing2`, the lookup key you could register the target connection factory with could be `thing1[thing2]`.

IMPORTANT: The target (and default, if provided) connection factories must have the same settings for publisher confirms and returns.
See <<cf-pub-conf-ret>>.

[[queue-affinity]]
===== Queue Affinity and the `LocalizedQueueConnectionFactory`

Expand Down Expand Up @@ -1115,8 +1118,8 @@ The `Confirm` object is a simple bean with 2 properties: `ack` and `reason` (for
The reason is not populated for broker-generated `nack` instances.
It is populated for `nack` instances generated by the framework (for example, closing the connection while `ack` instances are outstanding).

In addition, when both confirms and returns are enabled, the `CorrelationData` is populated with the returned message.
It is guaranteed that this occurs before the future is set with the `ack`.
In addition, when both confirms and returns are enabled, the `CorrelationData` is populated with the returned message, as long as the `CorrelationData` has a unique `id`; this is always the case, by default, starting with version 2.3.
It is guaranteed that the returned message is set before the future is set with the `ack`.

See also <<scoped-operations>> for a simpler mechanism for waiting for publisher confirms.

Expand Down

0 comments on commit ad612a8

Please sign in to comment.