Skip to content

Commit

Permalink
GH-1127: Add MessageBatchRecoverer
Browse files Browse the repository at this point in the history
Resolves #1127
  • Loading branch information
garyrussell authored and artembilan committed Nov 27, 2019
1 parent e6e659a commit a9dc7e9
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,21 @@

package org.springframework.amqp.rabbit.config;

import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.retry.MessageBatchRecoverer;
import org.springframework.amqp.rabbit.retry.MessageKeyGenerator;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.NewMessageIdentifier;
import org.springframework.retry.RetryOperations;
import org.springframework.retry.interceptor.MethodArgumentsKeyGenerator;
import org.springframework.retry.interceptor.MethodInvocationRecoverer;
import org.springframework.retry.interceptor.NewMethodArgumentsIdentifier;
import org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor;
import org.springframework.retry.support.RetryTemplate;

Expand Down Expand Up @@ -71,34 +77,49 @@ public StatefulRetryOperationsInterceptor getObject() {
retryTemplate = new RetryTemplate();
}
retryInterceptor.setRetryOperations(retryTemplate);
retryInterceptor.setNewItemIdentifier(createNewItemIdentifier());
retryInterceptor.setRecoverer(createRecoverer());
retryInterceptor.setKeyGenerator(createKeyGenerator());
return retryInterceptor;

}

retryInterceptor.setNewItemIdentifier(args -> {
Message message = (Message) args[1];
private NewMethodArgumentsIdentifier createNewItemIdentifier() {
return args -> {
Message message = argToMessage(args);
if (StatefulRetryOperationsInterceptorFactoryBean.this.newMessageIdentifier == null) {
return !message.getMessageProperties().isRedelivered();
}
else {
return StatefulRetryOperationsInterceptorFactoryBean.this.newMessageIdentifier.isNew(message);
}
});
};
}

final MessageRecoverer messageRecoverer = getMessageRecoverer();
retryInterceptor.setRecoverer((args, cause) -> {
Message message = (Message) args[1];
@SuppressWarnings("unchecked")
private MethodInvocationRecoverer<?> createRecoverer() {
return (args, cause) -> {
MessageRecoverer messageRecoverer = getMessageRecoverer();
Object arg = args[1];
if (messageRecoverer == null) {
logger.warn("Message dropped on recovery: " + message, cause);
logger.warn("Message(s) dropped on recovery: " + arg, cause);
}
else {
messageRecoverer.recover(message, cause);
else if (arg instanceof Message) {
messageRecoverer.recover((Message) arg, cause);
}
else if (arg instanceof List && messageRecoverer instanceof MessageBatchRecoverer) {
((MessageBatchRecoverer) messageRecoverer).recover((List<Message>) arg, cause);
}
// This is actually a normal outcome. It means the recovery was successful, but we don't want to consume
// any more messages until the acks and commits are sent for this (problematic) message...
throw new ImmediateAcknowledgeAmqpException("Recovered message forces ack (if ack mode requires it): "
+ message, cause);
});
+ arg, cause);
};
}

retryInterceptor.setKeyGenerator(args -> {
Message message = (Message) args[1];
private MethodArgumentsKeyGenerator createKeyGenerator() {
return args -> {
Message message = argToMessage(args);
if (StatefulRetryOperationsInterceptorFactoryBean.this.messageKeyGenerator == null) {
String messageId = message.getMessageProperties().getMessageId();
if (messageId == null && message.getMessageProperties().isRedelivered()) {
Expand All @@ -109,10 +130,20 @@ public StatefulRetryOperationsInterceptor getObject() {
else {
return StatefulRetryOperationsInterceptorFactoryBean.this.messageKeyGenerator.getKey(message);
}
});

return retryInterceptor;
};
}

@SuppressWarnings("unchecked")
private Message argToMessage(Object[] args) {
Object arg = args[1];
Message message = null;
if (arg instanceof Message) {
message = (Message) arg;
}
else if (arg instanceof List) {
message = ((List<Message>) arg).get(0);
}
return message;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@

package org.springframework.amqp.rabbit.config;

import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.retry.MessageBatchRecoverer;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.retry.RetryOperations;
import org.springframework.retry.interceptor.MethodInvocationRecoverer;
import org.springframework.retry.interceptor.RetryOperationsInterceptor;
import org.springframework.retry.support.RetryTemplate;

Expand Down Expand Up @@ -53,21 +57,27 @@ public RetryOperationsInterceptor getObject() {
retryTemplate = new RetryTemplate();
}
retryInterceptor.setRetryOperations(retryTemplate);
retryInterceptor.setRecoverer(createRecoverer());
return retryInterceptor;

}

final MessageRecoverer messageRecoverer = getMessageRecoverer();
retryInterceptor.setRecoverer((args, cause) -> {
Message message = (Message) args[1];
@SuppressWarnings("unchecked")
private MethodInvocationRecoverer<?> createRecoverer() {
return (args, cause) -> {
MessageRecoverer messageRecoverer = getMessageRecoverer();
Object arg = args[1];
if (messageRecoverer == null) {
logger.warn("Message dropped on recovery: " + message, cause);
logger.warn("Message(s) dropped on recovery: " + arg, cause);
}
else {
messageRecoverer.recover(message, cause);
else if (arg instanceof Message) {
messageRecoverer.recover((Message) arg, cause);
}
else if (arg instanceof List && messageRecoverer instanceof MessageBatchRecoverer) {
((MessageBatchRecoverer) messageRecoverer).recover((List<Message>) arg, cause);
}
return null;
});

return retryInterceptor;

};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp.rabbit.retry;

import java.util.List;

import org.springframework.amqp.core.Message;

/**
* A retry recoverer for use with a batch listener. Users should consider throwing an
* exception containing the index within the batch where the exception occurred, allowing
* the recoverer to properly recover the remaining records.
*
* @author Gary Russell
* @since 2.2
*
*/
@FunctionalInterface
public interface MessageBatchRecoverer extends MessageRecoverer {

@Override
default void recover(Message message, Throwable cause) {
throw new IllegalStateException("MessageBatchRecoverer configured with a non-batch listener");
}

/**
* Callback for message batch that was consumed but failed all retry attempts.
*
* @param messages the messages to recover
* @param cause the cause of the error
*/
void recover(List<Message> messages, Throwable cause);

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.junit.jupiter.api.Test;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.BatchMessageListener;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.AbstractRetryOperationsInterceptorFactoryBean;
import org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean;
Expand All @@ -41,9 +42,9 @@
import org.springframework.amqp.rabbit.junit.LogLevels;
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.rabbit.retry.MessageBatchRecoverer;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.amqp.utils.SerializationUtils;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.retry.policy.MapRetryContextCache;
import org.springframework.retry.support.RetryTemplate;
Expand Down Expand Up @@ -90,7 +91,43 @@ private RabbitTemplate createTemplate(int concurrentConsumers) {
}

@Test
public void testStatefulRetryWithAllMessagesFailing() throws Exception {
void testStatelessRetryWithBatchListener() throws Exception {
doTestRetryWithBatchListener(false);
}

@Test
void testStatefulRetryWithBatchListener() throws Exception {
doTestRetryWithBatchListener(true);
}

private void doTestRetryWithBatchListener(boolean stateful) throws Exception {
RabbitTemplate template = createTemplate(1);
template.convertAndSend(queue.getName(), "foo");
template.convertAndSend(queue.getName(), "bar");

final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(
template.getConnectionFactory());
container.setMessageListener((BatchMessageListener) messages -> {
throw new RuntimeException("intended");
});
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setConsumerBatchEnabled(true);
container.setBatchSize(2);

final CountDownLatch latch = new CountDownLatch(1);
container.setAdviceChain(new Advice[] { createRetryInterceptor(latch, stateful, true) });

container.setQueueNames(queue.getName());
container.setReceiveTimeout(50);
container.afterPropertiesSet();
container.start();

assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
container.stop();
}

@Test
void testStatefulRetryWithAllMessagesFailing() throws Exception {

int messageCount = 10;
int txSize = 1;
Expand All @@ -101,7 +138,7 @@ public void testStatefulRetryWithAllMessagesFailing() throws Exception {
}

@Test
public void testStatelessRetryWithAllMessagesFailing() throws Exception {
void testStatelessRetryWithAllMessagesFailing() throws Exception {

int messageCount = 10;
int txSize = 1;
Expand All @@ -112,7 +149,7 @@ public void testStatelessRetryWithAllMessagesFailing() throws Exception {
}

@Test
public void testStatefulRetryWithNoMessageIds() {
void testStatefulRetryWithNoMessageIds() {

int messageCount = 2;
int txSize = 1;
Expand All @@ -135,7 +172,7 @@ public void testStatefulRetryWithNoMessageIds() {
}

@RepeatedTest(10)
public void testStatefulRetryWithTxSizeAndIntermittentFailure() throws Exception {
void testStatefulRetryWithTxSizeAndIntermittentFailure() throws Exception {

int messageCount = 10;
int txSize = 4;
Expand All @@ -146,7 +183,7 @@ public void testStatefulRetryWithTxSizeAndIntermittentFailure() throws Exception
}

@Test
public void testStatefulRetryWithMoreMessages() throws Exception {
void testStatefulRetryWithMoreMessages() throws Exception {

int messageCount = 200;
int txSize = 10;
Expand All @@ -157,18 +194,29 @@ public void testStatefulRetryWithMoreMessages() throws Exception {
}

private Advice createRetryInterceptor(final CountDownLatch latch, boolean stateful) throws Exception {
return createRetryInterceptor(latch, stateful, false);
}

private Advice createRetryInterceptor(final CountDownLatch latch, boolean stateful, boolean listRecoverer)
throws Exception {

AbstractRetryOperationsInterceptorFactoryBean factory;
if (stateful) {
factory = new StatefulRetryOperationsInterceptorFactoryBean();
}
else {
factory = new StatelessRetryOperationsInterceptorFactoryBean();
}
factory.setMessageRecoverer((message, cause) -> {
logger.warn("Recovered: [" + SerializationUtils.deserialize(message.getBody()).toString() +
"], message: " + message);
latch.countDown();
});
if (listRecoverer) {
factory.setMessageRecoverer((MessageBatchRecoverer) (messages, cause) -> {
latch.countDown();
});
}
else {
factory.setMessageRecoverer((message, cause) -> {
latch.countDown();
});
}
if (retryTemplate == null) {
retryTemplate = new RetryTemplate();
}
Expand Down
11 changes: 11 additions & 0 deletions src/reference/asciidoc/amqp.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5898,6 +5898,17 @@ Only a subset of retry capabilities can be configured this way.
More advanced features would need the configuration of a `RetryTemplate` as a Spring bean.
See the https://docs.spring.io/spring-retry/docs/api/current/[Spring Retry Javadoc] for complete information about available policies and their configuration.

[[batch-retry]]
===== Retry with Batch Listeners

It is not recommended to configure retry with a batch listener, unless the batch was created by the producer, in a single record.
See <<de-batching>> for information about consumer and producer-created batches.
With a consumer-created batch, the framework has no knowledge about which message in the batch caused the failure so recovery after the retries are exhausted is not possible.
With producer-created batches, since there is only one message that actually failed, the whole message can be recovered.
Applications may want to inform a custom recoverer where in the batch the failure occurred, perhaps by setting an index property of the thrown exception.

A retry recoverer for a batch listener must implement `MessageBatchRecoverer`.

[[async-listeners]]
===== Message Listeners and the Asynchronous Case

Expand Down
3 changes: 3 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,6 @@ See <<builder-api>> for more information.

Outbound headers with values of type `Class<?>` are now mapped using `getName()` instead of `toString()`.
See <<message-properties-converters>> for more information.

Recovery of failed producer-created batches is now supported.
See <<batch-retry>> for more information.

0 comments on commit a9dc7e9

Please sign in to comment.