Skip to content

Commit

Permalink
feat: add retry service
Browse files Browse the repository at this point in the history
Closes #32
  • Loading branch information
jnodorp-jaconi committed Sep 23, 2024
1 parent ae693d2 commit 5a85263
Show file tree
Hide file tree
Showing 10 changed files with 274 additions and 129 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@

### Features

* github actions ([0f3f860](https://github.com/jaconi-io/spring-rabbitmq-retry/commit/0f3f86048fa893b60f8cb8962a15b9c9a545627a))
* GitHub actions ([0f3f860](https://github.com/jaconi-io/spring-rabbitmq-retry/commit/0f3f86048fa893b60f8cb8962a15b9c9a545627a))
* initial commit ([7b42aec](https://github.com/jaconi-io/spring-rabbitmq-retry/commit/7b42aec084fd4fd21904a935324d40d0b476916d))
* maven publish ([840108a](https://github.com/jaconi-io/spring-rabbitmq-retry/commit/840108ad21c1b904fcea555a70cbf3adbd33c351))
* retry per queue ([21ce36b](https://github.com/jaconi-io/spring-rabbitmq-retry/commit/21ce36b750a732239af36e18e0e20f264d434e1b))
41 changes: 37 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ class MyListener {
}
```

By default, any exception thrown in the listener will immediately lead to the message being requeued. In many cases
exceptions are caused by either malformed messages or unavailable backends. In both cases, requeuing the message will
By default, any exception thrown in the listener will immediately lead to the message being re-queued. In many cases
exceptions are caused by either malformed messages or unavailable backends. In both cases, re-queuing the message will
not help. If a backend is unavailable due to being overloaded, this behavior is harmful.

A solution to this problem is exponential backoff. A message will not be retried immediately, but after some delay. For
Expand All @@ -37,7 +37,7 @@ class MyListener {
public void handle(Message msg) {
try {
processMessage(msg);
} catch (BackendTimeoutExceptoin e) {
} catch (BackendTimeoutException e) {
// Backend will probably come back. Retry.
throw new RetryMessagesException(msg);
} catch (MalformedMessageException e) {
Expand Down Expand Up @@ -88,12 +88,45 @@ jaconi:
If you set `create-resources = true` you need to ensure that the RabbitMQ user that your application is using has the
required permissions to declare (configure) the required queues.

## Manual Acknowledgement

When dealing with situations where the retry error handler cannot be used (for example, when dealing with manual `ack`
and `nack`), the `RetryService` can be used directly:

```java
class MyListener {
@Autowired
private RetryService retryService;
@RabbitListener(queues = "foo")
public void handle(Message msg, Channel ch) {
try {
processMessage(msg);
// Acknowledge successfully processed messages.
ch.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (BackendTimeoutException e) {
// Backend will probably come back. Retry.
retryService.retry(msg);
// Acknowledge messages scheduled for retry.
ch.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (MalformedMessageException e) {
// The message will not be fixed by retrying...
// Log and discard.
ch.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
}
```

## Releasing

Spring RabbitMQ Retry is published to the central maven repository.

Usually, publishing happens automatically via GitHub Actions. However, if you are an employee of jaconi, you can also
publish releases manually. To publish a release, you will need to configure the GPG private signing key and the keys
publish releases manually. To publish a release, you will need to configure the GPG private signing key and the key's
passphrase:

```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
@AutoConfiguration(after = RabbitAutoConfiguration.class)
@ConditionalOnProperty(value = "jaconi.rabbitmq.listener.retry.enabled", havingValue = "true")
@EnableConfigurationProperties(RetryProperties.class)
@Import({RetryErrorHandler.class, RetryResourceConfiguration.class})
@Import({RetryService.class, RetryErrorHandler.class, RetryResourceConfiguration.class})
@RequiredArgsConstructor
public class RetryAutoConfiguration {
private static final String NOOP_LOGGER = "io.jaconi.spring.rabbitmq.retry.noop";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,22 @@
package io.jaconi.spring.rabbitmq.retry;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.lang.NonNull;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

/**
* Custom error handling for RabbitMQ. By default, Spring AMQP retries most exceptions by immediately requeuing the
* Custom error handling for RabbitMQ. By default, Spring AMQP retries most exceptions by immediately re-queuing the
* causing message. This error handler rejects any messages causing an exception in the listener. If messages should be
* retried, the listener code can throw a {@link RetryMessagesException}. The messages in the
* {@link RetryMessagesException} are retried as configured in the {@link RetryProperties}.
*/
@Slf4j
@Component("retryErrorHandler")
public class RetryErrorHandler extends ConditionalRejectingErrorHandler {
private final AmqpTemplate amqpTemplate;
private final RetryService retryService;

public RetryErrorHandler(AmqpTemplate amqpTemplate) {
public RetryErrorHandler(RetryService retryService) {
super(new DefaultExceptionStrategy() {
@Override
protected boolean isUserCauseFatal(@NonNull Throwable cause) {
Expand All @@ -30,54 +25,16 @@ protected boolean isUserCauseFatal(@NonNull Throwable cause) {
}
});

this.amqpTemplate = amqpTemplate;
this.retryService = retryService;
}

@Override
public void handleError(@NonNull Throwable t) {
if (t instanceof ListenerExecutionFailedException lefe && lefe.getCause() instanceof RetryMessagesException rme) {
rme.getMessages().forEach(this::retryMessage);
rme.getMessages().forEach(retryService::retryMessage);
throw new ImmediateAcknowledgeAmqpException("acknowledge messages as they were scheduled for retry", t);
} else {
super.handleError(t);
}
}

private void retryMessage(Message<?> message) {
var retry = getRetry(message);
log.info("retrying message (attempt {}): {}", retry, message);

var routingKey = message.getHeaders().get(AmqpHeaders.RECEIVED_ROUTING_KEY, String.class);
amqpTemplate.convertAndSend(getRetryExchange(message), routingKey, message.getPayload(), m -> {
m.getMessageProperties().setHeader(RetryProperties.RETRY_HEADER, retry);
TechnicalHeadersFilter.filterHeaders(message.getHeaders())
.forEach(h -> m.getMessageProperties().setHeader(h, message.getHeaders().get(h)));
return m;
});
}

/**
* Determine the retry attempt for the {@link Message}.
*
* @param message the {@link Message}
* @return {@literal 1L} for the first retry, {@literal 2L} for the second, and so on
*/
private long getRetry(Message<?> message) {
Long previousRetryAttempt = message.getHeaders().get(RetryProperties.RETRY_HEADER, Long.class);
if (previousRetryAttempt == null) {
previousRetryAttempt = 0L;
}

return previousRetryAttempt + 1;
}

/**
* Determine the retry exchange for a {@link Message}.
*
* @param message the {@link Message}
* @return the retry exchange
*/
private String getRetryExchange(Message<?> message) {
return RetryProperties.RETRY_EXCHANGE_PATTERN.formatted(message.getHeaders().get(AmqpHeaders.CONSUMER_QUEUE, String.class));
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package io.jaconi.spring.rabbitmq.retry;

import lombok.Getter;
import org.springframework.amqp.support.converter.MessagingMessageConverter;
import org.springframework.messaging.Message;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;

@Getter
public class RetryMessagesException extends RuntimeException {
private static final MessagingMessageConverter CONVERTER = new MessagingMessageConverter();
private final Collection<Message<?>> messages;
Expand Down Expand Up @@ -70,8 +72,4 @@ public <T> RetryMessagesException(String message, Throwable cause, Collection<Me
this.messages = new ArrayList<>();
this.messages.addAll(messages);
}

public Collection<Message<?>> getMessages() {
return messages;
}
}
75 changes: 75 additions & 0 deletions src/main/java/io/jaconi/spring/rabbitmq/retry/RetryService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package io.jaconi.spring.rabbitmq.retry;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.amqp.support.converter.MessagingMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

/**
* Retry AMQP messages as configured in the {@link RetryProperties}.
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class RetryService {
private static final MessagingMessageConverter CONVERTER = new MessagingMessageConverter();

private final AmqpTemplate amqpTemplate;

/**
* Retry a {@link org.springframework.amqp.core.Message} by increasing the retry attempt in the message header and
* sending the message to the retry exchange.
*
* @param message the {@link org.springframework.amqp.core.Message}
*/
public void retryMessage(org.springframework.amqp.core.Message message) {
retryMessage((Message<?>) CONVERTER.fromMessage(message));
}

/**
* Retry a {@link Message} by increasing the retry attempt in the message header and sending the message to the
* retry exchange.
*
* @param message the {@link Message}
*/
public void retryMessage(Message<?> message) {
var retry = getRetry(message);
log.info("retrying message (attempt {}): {}", retry, message);

var routingKey = message.getHeaders().get(AmqpHeaders.RECEIVED_ROUTING_KEY, String.class);
amqpTemplate.convertAndSend(getRetryExchange(message), routingKey, message.getPayload(), m -> {
m.getMessageProperties().setHeader(RetryProperties.RETRY_HEADER, retry);
TechnicalHeadersFilter.filterHeaders(message.getHeaders())
.forEach(h -> m.getMessageProperties().setHeader(h, message.getHeaders().get(h)));
return m;
});
}

/**
* Determine the retry attempt for the {@link Message}.
*
* @param message the {@link Message}
* @return {@literal 1L} for the first retry, {@literal 2L} for the second, and so on
*/
private long getRetry(Message<?> message) {
Long previousRetryAttempt = message.getHeaders().get(RetryProperties.RETRY_HEADER, Long.class);
if (previousRetryAttempt == null) {
previousRetryAttempt = 0L;
}

return previousRetryAttempt + 1;
}

/**
* Determine the retry exchange for a {@link Message}.
*
* @param message the {@link Message}
* @return the retry exchange
*/
private String getRetryExchange(Message<?> message) {
return RetryProperties.RETRY_EXCHANGE_PATTERN.formatted(message.getHeaders().get(AmqpHeaders.CONSUMER_QUEUE, String.class));
}
}
71 changes: 71 additions & 0 deletions src/test/java/io/jaconi/spring/rabbitmq/retry/RabbitMQTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package io.jaconi.spring.rabbitmq.retry;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.RabbitMQContainer;

@SpringBootTest(properties = {
"jaconi.rabbitmq.listener.retry.enabled=true",
"jaconi.rabbitmq.listener.retry.create-resources=true"
})
abstract class RabbitMQTest {
protected static final String EXCHANGE = "test-exchange";
protected static final String QUEUE = "test-queue";
protected static final String ROUTING_KEY = "foo";

static final RabbitMQContainer rabbit = new RabbitMQContainer("rabbitmq:3.13-management-alpine");

@Autowired
@SuppressWarnings("unused")
protected AmqpTemplate amqpTemplate;

@BeforeAll
static void beforeAll() {
rabbit.start();
}

@AfterAll
static void afterAll() {
rabbit.stop();
}

@DynamicPropertySource
@SuppressWarnings("unused")
static void configureProperties(DynamicPropertyRegistry registry) {
registry.add("spring.rabbitmq.addresses", rabbit::getAmqpUrl);

registry.add("jaconi.rabbitmq.listener.retry.queues.%s.max-attempts".formatted(QUEUE), () -> 2);
registry.add("jaconi.rabbitmq.listener.retry.queues.%s.durations[0]".formatted(QUEUE), () -> "5s");
registry.add("jaconi.rabbitmq.listener.retry.queues.%s.durations[1]".formatted(QUEUE), () -> "10s");
}

@SpringBootApplication
protected abstract static class RabbitMQTestApplication {

@Bean(QUEUE)
@SuppressWarnings("unused")
Queue queue() {
return new Queue(QUEUE);
}

@Bean(EXCHANGE)
@SuppressWarnings("unused")
DirectExchange exchange() {
return new DirectExchange(EXCHANGE);
}

@Bean
@SuppressWarnings("unused")
Binding binding(@Qualifier(QUEUE) Queue queue, @Qualifier(EXCHANGE) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY).noargs();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.jaconi.spring.rabbitmq.retry;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.config.ContainerCustomizer;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
Expand Down Expand Up @@ -38,10 +37,6 @@ public void testAutoConfiguration_enabled() {
.run(context -> {
assertThat(context).hasNotFailed();
assertThat(context).hasBean("retryContainerCustomizer");

context.getBean(ContainerCustomizer.class);


});
}
}
Loading

0 comments on commit 5a85263

Please sign in to comment.