Skip to content

Commit

Permalink
Add Future<?> to CorrelationData
Browse files Browse the repository at this point in the history
As an alternative to using a template callback, add a `ListenableFuture<?>` to the
`CorrelationData` base class.

* Fix NPE

* Old RabbitMQ on travis

* Fix Returns when no confirms

Only set up for confirms if the CF is configured for them.

* Restore check for confirmCallback - needed for Async template.

* Increase test timeout.
  • Loading branch information
garyrussell authored and artembilan committed Jul 31, 2018
1 parent ea95261 commit 4afefc4
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ public class RabbitTemplate extends RabbitAccessor implements BeanFactoryAware,

private volatile Boolean confirmsOrReturnsCapable;

private volatile boolean publisherConfirms;

private volatile Expression mandatoryExpression = new ValueExpression<Boolean>(false);

private volatile String correlationKey = null;
Expand Down Expand Up @@ -1951,8 +1953,9 @@ public void waitForConfirmsOrDie(long timeout) {
}

public void determineConfirmsReturnsCapability(ConnectionFactory connectionFactory) {
this.publisherConfirms = connectionFactory.isPublisherConfirms();
this.confirmsOrReturnsCapable =
connectionFactory.isPublisherConfirms() || connectionFactory.isPublisherReturns();
this.publisherConfirms || connectionFactory.isPublisherReturns();
}

/**
Expand Down Expand Up @@ -2014,7 +2017,7 @@ protected void sendToRabbit(Channel channel, String exchange, String routingKey,
}

private void setupConfirm(Channel channel, Message message, CorrelationData correlationData) {
if (this.confirmCallback != null && channel instanceof PublisherCallbackChannel) {
if ((this.publisherConfirms || this.confirmCallback != null) && channel instanceof PublisherCallbackChannel) {
PublisherCallbackChannel publisherCallbackChannel = (PublisherCallbackChannel) channel;
correlationData = this.correlationDataPostProcessor != null
? this.correlationDataPostProcessor.postProcess(message, correlationData)
Expand Down Expand Up @@ -2144,11 +2147,6 @@ public void handleConfirm(PendingConfirm pendingConfirm, boolean ack) {
if (this.confirmCallback != null) {
this.confirmCallback.confirm(pendingConfirm.getCorrelationData(), ack, pendingConfirm.getCause());
}
else {
if (logger.isDebugEnabled()) {
logger.warn("Confirm received but no callback available");
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.springframework.amqp.rabbit.support;

import org.springframework.amqp.core.Correlation;
import org.springframework.lang.Nullable;
import org.springframework.util.concurrent.SettableListenableFuture;

/**
* Base class for correlating publisher confirms to sent messages.
Expand All @@ -30,6 +32,8 @@
*/
public class CorrelationData implements Correlation {

private final SettableListenableFuture<Confirm> future = new SettableListenableFuture<>();

private volatile String id;

/**
Expand Down Expand Up @@ -66,9 +70,48 @@ public void setId(String id) {
this.id = id;
}

public SettableListenableFuture<Confirm> getFuture() {
return this.future;
}

@Override
public String toString() {
return "CorrelationData [id=" + this.id + "]";
}

/**
* Represents a publisher confirmation. When the ack field is
* true, the publish was successful; otherwise failed with a possible
* reason (may be null, meaning unknown).
*
* @since 2.1
*/
public static class Confirm {

private final boolean ack;

private final String reason;

public Confirm(boolean ack, @Nullable String reason) {
this.ack = ack;
this.reason = reason;
}

public boolean isAck() {
return this.ack;
}

public String getReason() {
return this.reason;
}

@Override
public String toString() {
return "Confirm [ack=" + this.ack
+ (this.reason != null ? ", reason=" + this.reason : "")
+ "]";
}

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,6 +38,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.amqp.rabbit.support.CorrelationData.Confirm;
import org.springframework.util.Assert;

import com.rabbitmq.client.AMQP;
Expand Down Expand Up @@ -888,7 +889,7 @@ public void handleAck(long seq, boolean multiple)
if (this.logger.isDebugEnabled()) {
this.logger.debug(this.toString() + " PC:Ack:" + seq + ":" + multiple);
}
this.processAck(seq, true, multiple, true);
processAck(seq, true, multiple, true);
}

@Override
Expand All @@ -897,7 +898,7 @@ public void handleNack(long seq, boolean multiple)
if (this.logger.isDebugEnabled()) {
this.logger.debug(this.toString() + " PC:Nack:" + seq + ":" + multiple);
}
this.processAck(seq, false, multiple, true);
processAck(seq, false, multiple, true);
}

private synchronized void processAck(long seq, boolean ack, boolean multiple, boolean remove) {
Expand Down Expand Up @@ -931,6 +932,10 @@ private void doProcessAck(long seq, boolean ack, boolean multiple, boolean remov
while (iterator.hasNext()) {
Entry<Long, PendingConfirm> entry = iterator.next();
PendingConfirm value = entry.getValue();
CorrelationData correlationData = value.getCorrelationData();
if (correlationData != null) {
correlationData.getFuture().set(new Confirm(ack, value.getCause()));
}
iterator.remove();
doHandleConfirm(ack, involvedListener, value);
}
Expand All @@ -953,6 +958,10 @@ private void doProcessAck(long seq, boolean ack, boolean multiple, boolean remov
pendingConfirm = confirmsForListener.get(seq);
}
if (pendingConfirm != null) {
CorrelationData correlationData = pendingConfirm.getCorrelationData();
if (correlationData != null) {
correlationData.getFuture().set(new Confirm(ack, pendingConfirm.getCause()));
}
doHandleConfirm(ack, listener, pendingConfirm);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ChannelProxy;
import org.springframework.amqp.rabbit.junit.BrokerRunning;
Expand Down Expand Up @@ -98,6 +100,9 @@ public class RabbitTemplatePublisherCallbacksIntegrationTests {

private static final String ROUTE = "test.queue";

@Rule
public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueues(ROUTE);

private CachingConnectionFactory connectionFactory;

private CachingConnectionFactory connectionFactoryWithConfirmsEnabled;
Expand All @@ -116,8 +121,6 @@ public void create() {
connectionFactory.setPort(BrokerTestUtils.getPort());
connectionFactoryWithConfirmsEnabled = new CachingConnectionFactory();
connectionFactoryWithConfirmsEnabled.setHost("localhost");
// When using publisher confirms, the cache size needs to be large enough
// otherwise channels can be closed before confirms are received.
connectionFactoryWithConfirmsEnabled.setChannelCacheSize(100);
connectionFactoryWithConfirmsEnabled.setPort(BrokerTestUtils.getPort());
connectionFactoryWithConfirmsEnabled.setPublisherConfirms(true);
Expand All @@ -140,9 +143,6 @@ public void cleanUp() {
this.brokerIsRunning.removeTestQueues();
}

@Rule
public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueues(ROUTE);

@Test
public void testPublisherConfirmReceived() throws Exception {
final CountDownLatch latch = new CountDownLatch(10000);
Expand Down Expand Up @@ -697,7 +697,7 @@ public void testPublisherConfirmGetUnconfirmedConcurrency() throws Exception {
sentAll.set(true);
});
long t1 = System.currentTimeMillis();
while (!sentAll.get() && System.currentTimeMillis() < t1 + 20000) {
while (!sentAll.get() && System.currentTimeMillis() < t1 + 60_000) {
template.getUnconfirmed(-1);
}
assertTrue(sentAll.get());
Expand Down Expand Up @@ -808,4 +808,23 @@ public void testPublisherCallbackChannelImplCloseWithPending() throws Exception

}

@Test
public void testWithFuture() throws Exception {
RabbitAdmin admin = new RabbitAdmin(this.connectionFactory);
Queue queue = QueueBuilder.nonDurable()
.autoDelete()
.withArgument("x-max-length", 1)
.withArgument("x-overflow", "reject-publish")
.build();
admin.declareQueue(queue);
CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());
CorrelationData cd2 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("", queue.getName(), "bar", cd2);
// TODO: Uncomment when travis updates to rabbitmq 3.7
// assertFalse(cd2.getFuture().get(10, TimeUnit.SECONDS).isAck());
admin.deleteQueue(queue.getName());
}

}
15 changes: 15 additions & 0 deletions src/reference/asciidoc/amqp.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,21 @@ Some other process could check out the channel and perform some operation that c
This could cause the confirmation to be lost; _version 2.1_ and later no longer return the channel to the cache while confirmations are outstanding.
Since the `RabbitTemplate` performs a logical `close()` on the channel after each operation; in general, this means that only one confirm will be outstanding on a channel at a time.

Starting with version 2.1, the `CorrelationData` object has a `ListenableFuture` which can be used to get the result, instead of using a `ConfirmCallback` on the template.

====
[source, java]
----
CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());
----
====

Since it's a `ListenableFuture<Confirm>`, you can either `get()` the result when ready, or add listeners for an async callback.
The `Confirm` object is a simple bean with 2 properties `ack` and `reason` (for nacks).
The reason is not populated for broker-generated nacks; it is populated for nacks generated by the framework (e.g. closing the connection while acks are outstanding).

See also <<scoped-operations>> for a simpler mechanism for waiting for publisher confirms.

[[scoped-operations]]
Expand Down
5 changes: 4 additions & 1 deletion src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ See <<collection-declaration>> for more information.
The `RabbitTemplate` now can be configured with the `noLocalReplyConsumer` option to control a `noLocal` flag for reply consumers in the `sendAndReceive()` operations.
See <<request-reply>> for more information.

===== Message Converts
`CorrelationData` for publisher confirms now has a `ListenableFuture` which can be used to get the acknowledgment instead of using a callback.
See <<template-confirms>> for more information.

===== Message Convertion

A new `Jackson2XmlMessageConverter` is introduced to support converting messages from/to XML format.
See <<Jackson2XmlMessageConverter>> for more information.
Expand Down

0 comments on commit 4afefc4

Please sign in to comment.