Skip to content

Commit

Permalink
AMQP-825: Add Reply RetryTemplate
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/AMQP-825

* Fix long line.

* Polishing - PR Comments

* More polishing


**cherry-pick to 2.0.x**
  • Loading branch information
garyrussell authored and artembilan committed Aug 15, 2018
1 parent d4c8377 commit dfcfda1
Show file tree
Hide file tree
Showing 8 changed files with 246 additions and 10 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ project('spring-amqp') {
compile ("org.springframework:spring-messaging:$springVersion", optional)
compile ("org.springframework:spring-oxm:$springVersion", optional)
compile ("org.springframework:spring-context:$springVersion", optional)
compile "org.springframework.retry:spring-retry:$springRetryVersion"

compile ("com.fasterxml.jackson.core:jackson-core:$jackson2Version", optional)
compile ("com.fasterxml.jackson.core:jackson-databind:$jackson2Version", optional)
compile ("com.fasterxml.jackson.dataformat:jackson-dataformat-xml:$jackson2Version", optional)
Expand Down Expand Up @@ -263,8 +265,6 @@ project('spring-rabbit') {
compile "org.springframework:spring-messaging:$springVersion"
compile "org.springframework:spring-tx:$springVersion"

compile "org.springframework.retry:spring-retry:$springRetryVersion"

compile ("ch.qos.logback:logback-classic:$logbackVersion", optional)

compile ("org.apache.logging.log4j:log4j-core:$log4jVersion", optional)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp.support;

import org.springframework.amqp.core.Address;
import org.springframework.amqp.core.Message;
import org.springframework.retry.RetryContext;

/**
* Type safe accessor for retried message sending.
*
* @author Gary Russell
* @since 2.0.6
*
*/
public final class SendRetryContextAccessor {

/**
* Key for the message we tried to send.
*/
public static final String MESSAGE = "message";

/**
* Key for the Address we tried to send to.
*/
public static final String ADDRESS = "address";

private SendRetryContextAccessor() {
super();
}

/**
* Retrieve the {@link Message} from the context.
* @param context the context.
* @return the message.
* @see #MESSAGE
*/
public static Message getMessage(RetryContext context) {
return (Message) context.getAttribute(MESSAGE);
}

/**
* Retrieve the {@link Address} from the context.
* @param context the context.
* @return the address.
* @see #ADDRESS
*/
public static Address getAddress(RetryContext context) {
return (Address) context.getAttribute(ADDRESS);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.util.ErrorHandler;
import org.springframework.util.backoff.BackOff;
Expand Down Expand Up @@ -106,6 +108,10 @@ public abstract class AbstractRabbitListenerContainerFactory<C extends AbstractM

private MessagePostProcessor[] beforeSendReplyPostProcessors;

private RetryTemplate retryTemplate;

private RecoveryCallback<?> recoveryCallback;

protected final AtomicInteger counter = new AtomicInteger();

/**
Expand Down Expand Up @@ -287,14 +293,41 @@ public void setAfterReceivePostProcessors(MessagePostProcessor... afterReceivePo
}

/**
* Set post processors that will be applied before sending replies.
* Set post processors that will be applied before sending replies; added to each
* message listener adapter.
* @param beforeSendReplyPostProcessors the post processors.
* @since 2.0.3
* @see AbstractAdaptableMessageListener#setBeforeSendReplyPostProcessors(MessagePostProcessor...)
*/
public void setBeforeSendReplyPostProcessors(MessagePostProcessor... beforeSendReplyPostProcessors) {
this.beforeSendReplyPostProcessors = beforeSendReplyPostProcessors;
}

/**
* Set a {@link RetryTemplate} to use when sending replies; added to each message
* listener adapter.
* @param retryTemplate the template.
* @since 2.0.6
* @see #setReplyRecoveryCallback(RecoveryCallback)
* @see AbstractAdaptableMessageListener#setRetryTemplate(RetryTemplate)
*/
public void setRetryTemplate(RetryTemplate retryTemplate) {
this.retryTemplate = retryTemplate;
}

/**
* Set a {@link RecoveryCallback} to invoke when retries are exhausted. Added to each
* message listener adapter. Only used if a {@link #setRetryTemplate(RetryTemplate)
* retryTemplate} is provided.
* @param recoveryCallback the recovery callback.
* @since 2.0.6
* @see #setRetryTemplate(RetryTemplate)
* @see AbstractAdaptableMessageListener#setRecoveryCallback(RecoveryCallback)
*/
public void setReplyRecoveryCallback(RecoveryCallback<?> recoveryCallback) {
this.recoveryCallback = recoveryCallback;
}

@Override
public C createListenerContainer(RabbitListenerEndpoint endpoint) {
C instance = createContainerInstance();
Expand Down Expand Up @@ -370,10 +403,18 @@ public C createListenerContainer(RabbitListenerEndpoint endpoint) {

endpoint.setupListenerContainer(instance);
}
if (this.beforeSendReplyPostProcessors != null
&& instance.getMessageListener() instanceof AbstractAdaptableMessageListener) {
((AbstractAdaptableMessageListener) instance.getMessageListener())
.setBeforeSendReplyPostProcessors(this.beforeSendReplyPostProcessors);
if (instance.getMessageListener() instanceof AbstractAdaptableMessageListener) {
AbstractAdaptableMessageListener messageListener = (AbstractAdaptableMessageListener) instance
.getMessageListener();
if (this.beforeSendReplyPostProcessors != null) {
messageListener.setBeforeSendReplyPostProcessors(this.beforeSendReplyPostProcessors);
}
if (this.retryTemplate != null) {
messageListener.setRetryTemplate(this.retryTemplate);
if (this.recoveryCallback != null) {
messageListener.setRecoveryCallback(this.recoveryCallback);
}
}
}
initializeContainer(instance, endpoint);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.springframework.amqp.rabbit.listener.adapter;

import java.io.IOException;
import java.lang.reflect.Type;
import java.util.Arrays;

Expand All @@ -32,6 +33,7 @@
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
import org.springframework.amqp.support.SendRetryContextAccessor;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
Expand All @@ -43,6 +45,8 @@
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.expression.spel.support.StandardTypeConverter;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;

import com.rabbitmq.client.Channel;
Expand Down Expand Up @@ -92,6 +96,11 @@ public abstract class AbstractAdaptableMessageListener implements ChannelAwareMe

private MessagePostProcessor[] beforeSendReplyPostProcessors;

private RetryTemplate retryTemplate;

private RecoveryCallback<?> recoveryCallback;


/**
* Set the routing key to use when sending response messages.
* This will be applied in case of a request message that
Expand Down Expand Up @@ -196,6 +205,26 @@ public void setBeforeSendReplyPostProcessors(MessagePostProcessor... beforeSendR
beforeSendReplyPostProcessors.length);
}

/**
* Set a {@link RetryTemplate} to use when sending replies.
* @param retryTemplate the template.
* @since 2.0.6
* @see #setRecoveryCallback(RecoveryCallback)
*/
public void setRetryTemplate(RetryTemplate retryTemplate) {
this.retryTemplate = retryTemplate;
}

/**
* Set a {@link RecoveryCallback} to invoke when retries are exhausted.
* @param recoveryCallback the recovery callback.
* @since 2.0.6
* @see #setRetryTemplate(RetryTemplate)
*/
public void setRecoveryCallback(RecoveryCallback<?> recoveryCallback) {
this.recoveryCallback = recoveryCallback;
}

/**
* Set a bean resolver for runtime SpEL expressions. Also configures the evaluation
* context with a standard type converter and map accessor.
Expand Down Expand Up @@ -414,15 +443,38 @@ protected void sendResponse(Channel channel, Address replyTo, Message messageIn)
try {
this.logger.debug("Publishing response to exchange = [" + replyTo.getExchangeName() + "], routingKey = ["
+ replyTo.getRoutingKey() + "]");
channel.basicPublish(replyTo.getExchangeName(), replyTo.getRoutingKey(), this.mandatoryPublish,
this.messagePropertiesConverter.fromMessageProperties(message.getMessageProperties(), this.encoding),
message.getBody());
if (this.retryTemplate == null) {
doPublish(channel, replyTo, message);
}
else {
final Message messageToSend = message;
this.retryTemplate.execute(ctx -> {
doPublish(channel, replyTo, messageToSend);
return null;
}, ctx -> {
if (this.recoveryCallback != null) {
ctx.setAttribute(SendRetryContextAccessor.MESSAGE, messageToSend);
ctx.setAttribute(SendRetryContextAccessor.ADDRESS, replyTo);
this.recoveryCallback.recover(ctx);
return null;
}
else {
throw RabbitExceptionTranslator.convertRabbitAccessException(ctx.getLastThrowable());
}
});
}
}
catch (Exception ex) {
throw RabbitExceptionTranslator.convertRabbitAccessException(ex);
}
}

protected void doPublish(Channel channel, Address replyTo, Message message) throws IOException {
channel.basicPublish(replyTo.getExchangeName(), replyTo.getRoutingKey(), this.mandatoryPublish,
this.messagePropertiesConverter.fromMessageProperties(message.getMessageProperties(), this.encoding),
message.getBody());
}

/**
* Post-process the given message before sending the response.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
Expand Down Expand Up @@ -249,6 +250,9 @@ public void autoSimpleDeclareAnonymousQueue() {
.getListenerContainer("anonymousQueue575");
assertThat(container.getQueueNames(), arrayWithSize(1));
assertEquals("viaAnonymous:foo", rabbitTemplate.convertSendAndReceive(container.getQueueNames()[0], "foo"));
Object messageListener = container.getMessageListener();
assertThat(TestUtils.getPropertyValue(messageListener, "retryTemplate"), notNullValue());
assertThat(TestUtils.getPropertyValue(messageListener, "recoveryCallback"), notNullValue());
}

@Test
Expand Down Expand Up @@ -1341,6 +1345,8 @@ public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
m.getMessageProperties().getHeaders().put("replyMPPApplied", true);
return m;
});
factory.setRetryTemplate(new RetryTemplate());
factory.setReplyRecoveryCallback(c -> null);
return factory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,36 @@

package org.springframework.amqp.rabbit.listener.adapter;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.willThrow;
import static org.mockito.Mockito.mock;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Before;
import org.junit.Test;

import org.springframework.amqp.core.Address;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.SendRetryContextAccessor;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

import com.rabbitmq.client.Channel;

/**
* @author Dave Syer
Expand Down Expand Up @@ -131,6 +147,39 @@ public void testJdkProxyListener() throws Exception {
assertEquals("handle", this.simpleService.called);
}

@Test
public void testReplyRetry() throws Exception {
this.adapter.setDefaultListenerMethod("handle");
this.adapter.setDelegate(this.simpleService);
RetryPolicy retryPolicy = new SimpleRetryPolicy(2);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(retryPolicy);
this.adapter.setRetryTemplate(retryTemplate);
AtomicReference<Message> replyMessage = new AtomicReference<>();
AtomicReference<Address> replyAddress = new AtomicReference<>();
AtomicReference<Throwable> throwable = new AtomicReference<>();
this.adapter.setRecoveryCallback(ctx -> {
replyMessage.set(SendRetryContextAccessor.getMessage(ctx));
replyAddress.set(SendRetryContextAccessor.getAddress(ctx));
throwable.set(ctx.getLastThrowable());
return null;
});
this.messageProperties.setReplyTo("foo/bar");
Channel channel = mock(Channel.class);
RuntimeException ex = new RuntimeException();
willThrow(ex).given(channel)
.basicPublish(eq("foo"), eq("bar"), eq(Boolean.FALSE), any(), any());
Message message = new Message("foo".getBytes(), this.messageProperties);
this.adapter.onMessage(message, channel);
assertThat(this.simpleService.called, equalTo("handle"));
assertThat(replyMessage.get(), notNullValue());
assertThat(new String(replyMessage.get().getBody()), equalTo("processedfoo"));
assertThat(replyAddress.get(), notNullValue());
assertThat(replyAddress.get().getExchangeName(), equalTo("foo"));
assertThat(replyAddress.get().getRoutingKey(), equalTo("bar"));
assertThat(throwable.get(), sameInstance(ex));
}

public interface Service {

String handle(String input);
Expand Down
Loading

0 comments on commit dfcfda1

Please sign in to comment.