Skip to content

Commit

Permalink
@RabbitListener async result polishing
Browse files Browse the repository at this point in the history
- add warning log if the container is not configured for MANUAL acks
- improve docs
  • Loading branch information
garyrussell authored and artembilan committed Feb 1, 2019
1 parent 70a5b0f commit ae06325
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,4 +27,14 @@ public interface MessageListener {

void onMessage(Message message);

/**
* Called by the container to inform the listener of its acknowledgement
* mode.
* @param mode the {@link AcknowledgeMode}.
* @since 2.1.4
*/
default void containerAckMode(AcknowledgeMode mode) {
// NOSONAR - empty
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor

private boolean exposeListenerChannel = true;

private volatile Object messageListener;
private volatile MessageListener messageListener;

private volatile AcknowledgeMode acknowledgeMode = AcknowledgeMode.AUTO;

Expand Down Expand Up @@ -409,7 +409,7 @@ public void setExposeListenerChannel(boolean exposeListenerChannel) {
@Deprecated
public void setMessageListener(Object messageListener) {
checkMessageListener(messageListener);
this.messageListener = messageListener;
setMessageListener((MessageListener) messageListener);
}

/**
Expand Down Expand Up @@ -1168,6 +1168,9 @@ public void initialize() {
logger.debug("The 'channelTransacted' is coerced to 'true', when 'transactionManager' is provided");
setChannelTransacted(true);
}
if (this.messageListener != null) {
this.messageListener.containerAckMode(this.acknowledgeMode);
}
this.initialized = true;
}
catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2018 the original author or authors.
* Copyright 2014-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,7 @@
import org.apache.commons.logging.LogFactory;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Address;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
Expand Down Expand Up @@ -106,6 +107,8 @@ public abstract class AbstractAdaptableMessageListener implements ChannelAwareMe

private RecoveryCallback<?> recoveryCallback;

private boolean isManualAck;


/**
* Set the routing key to use when sending response messages.
Expand Down Expand Up @@ -252,6 +255,11 @@ protected MessageConverter getMessageConverter() {
return this.messageConverter;
}

@Override
public void containerAckMode(AcknowledgeMode mode) {
this.isManualAck = AcknowledgeMode.MANUAL.equals(mode);
}

/**
* Handle the given exception that arose during listener execution.
* The default implementation logs the exception at error level.
Expand Down Expand Up @@ -308,11 +316,19 @@ protected void handleResult(InvocationResult resultArg, Message request, Channel
protected void handleResult(InvocationResult resultArg, Message request, Channel channel, Object source) {
if (channel != null) {
if (resultArg.getReturnValue() instanceof ListenableFuture) {
if (!this.isManualAck) {
this.logger.warn("Container AcknowledgeMode must be MANUAL for a Future<?> return type; "
+ "otherwise the container will ack the message immediately");
}
((ListenableFuture<?>) resultArg.getReturnValue()).addCallback(
r -> asyncSuccess(resultArg, request, channel, source, r),
t -> asyncFailure(request, channel, t));
}
else if (monoPresent && MonoHandler.isMono(resultArg.getReturnValue())) {
if (!this.isManualAck) {
this.logger.warn("Container AcknowledgeMode must be MANUAL for a Mono<?> return type; "
+ "otherwise the container will ack the message immediately");
}
MonoHandler.subscribe(resultArg.getReturnValue(),
r -> asyncSuccess(resultArg, request, channel, source, r),
t -> asyncFailure(request, channel, t));
Expand All @@ -334,12 +350,12 @@ private void asyncSuccess(InvocationResult resultArg, Message request, Channel c
channel.basicAck(request.getMessageProperties().getDeliveryTag(), false);
}
catch (IOException e) {
this.logger.error("Failed to nack message", e);
this.logger.error("Failed to ack message", e);
}
}

private void asyncFailure(Message request, Channel channel, Throwable t) {
this.logger.error("Future was completed with an exception for " + request, t);
this.logger.error("Future or Mono was completed with an exception for " + request, t);
try {
channel.basicNack(request.getMessageProperties().getDeliveryTag(), false, true);
}
Expand Down
5 changes: 4 additions & 1 deletion src/reference/asciidoc/amqp.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2565,7 +2565,10 @@ IMPORTANT: Containers created this way are normal `@Bean` s and are not register

Starting with version 2.1, `@RabbitListener` (and `@RabbitHandler`) methods can be specified with asynchronous return types `ListenableFuture<?>` and `Mono<?>`, allowing the reply to be sent asynchronously.

IMPORTANT: The listener container factory must be configured with `AcknowledgeMode.MANUAL` so that the consumer thread will not ack the message; instead, the asynchronous completion will ack or nack (requeue) the message when the async operation completes.
IMPORTANT: The listener container factory must be configured with `AcknowledgeMode.MANUAL` so that the consumer thread will not ack the message; instead, the asynchronous completion will ack or nack the message when the async operation completes.
When the async result is completed with an error, whether the message is requeued or not depends on the exception type thrown, the container configuration, and the container error handler.
By default, the message will be requeued, unless the container's `defaultRequeueRejected` property is set to `false`.
If the async result is completed with an `AmqpRejectAndDontRequeueException`, the message will not be requeued.
If some exception occurs within the listener method that prevents creation of the async result object, you MUST catch that exception and return an appropriate return object that will cause the message to be acknowledged or requeued.

[[threading]]
Expand Down

0 comments on commit ae06325

Please sign in to comment.