Skip to content

Commit

Permalink
GH-1032: Consumer Batching Phase 2 @RabbitListener
Browse files Browse the repository at this point in the history
Resolves #1032

Add basic consumer batching support to the `@RabbitListener` infrastructure.

* * Polishing - simplify batch configuration in container factories
* Add a test to ensure producer debatching works for DMLC too

* * Add more tests

* * Fix javadoc; add one more test

* * Fix type in test name
  • Loading branch information
garyrussell authored and artembilan committed Jun 28, 2019
1 parent 845a7e0 commit d1ebe22
Show file tree
Hide file tree
Showing 14 changed files with 766 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ public void setCharset(Charset charset) {
this.charset = charset;
}

MessageHandlerMethodFactory getMessageHandlerMethodFactory() {
return this.messageHandlerMethodFactory;
}

@Override
public void afterSingletonsInstantiated() {
this.registrar.setBeanFactory(this.beanFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ public abstract class AbstractRabbitListenerContainerFactory<C extends AbstractM

private BatchingStrategy batchingStrategy;

private Boolean deBatchingEnabled;

/**
* @param connectionFactory The connection factory.
* @see AbstractMessageListenerContainer#setConnectionFactory(ConnectionFactory)
Expand Down Expand Up @@ -375,6 +377,17 @@ public void setBatchingStrategy(BatchingStrategy batchingStrategy) {
this.batchingStrategy = batchingStrategy;
}

/**
* Determine whether or not the container should de-batch batched
* messages (true) or call the listener with the batch (false). Default: true.
* @param deBatchingEnabled whether or not to disable de-batching of messages.
* @since 2.2
* @see AbstractMessageListenerContainer#setDeBatchingEnabled(boolean)
*/
public void setDeBatchingEnabled(final Boolean deBatchingEnabled) {
this.deBatchingEnabled = deBatchingEnabled;
}

@Override
public C createListenerContainer(RabbitListenerEndpoint endpoint) {
C instance = createContainerInstance();
Expand Down Expand Up @@ -404,8 +417,12 @@ public C createListenerContainer(RabbitListenerEndpoint endpoint) {
.acceptIfNotNull(this.applicationEventPublisher, instance::setApplicationEventPublisher)
.acceptIfNotNull(this.autoStartup, instance::setAutoStartup)
.acceptIfNotNull(this.phase, instance::setPhase)
.acceptIfNotNull(this.afterReceivePostProcessors, instance::setAfterReceivePostProcessors);
instance.setDeBatchingEnabled(!this.batchListener);
.acceptIfNotNull(this.afterReceivePostProcessors, instance::setAfterReceivePostProcessors)
.acceptIfNotNull(this.deBatchingEnabled, instance::setDeBatchingEnabled);
if (this.batchListener && this.deBatchingEnabled == null) {
// turn off container debatching by default for batch listeners
instance.setDeBatchingEnabled(false);
}
if (endpoint != null) { // endpoint settings overriding default factory settings
javaUtils
.acceptIfNotNull(endpoint.getAutoStartup(), instance::setAutoStartup)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
public class SimpleRabbitListenerContainerFactory
extends AbstractRabbitListenerContainerFactory<SimpleMessageListenerContainer> {

private Integer txSize;
private Integer batchSize;

private Integer concurrentConsumers;

Expand All @@ -54,14 +54,25 @@ public class SimpleRabbitListenerContainerFactory

private Long receiveTimeout;

private Boolean deBatchingEnabled;
private Boolean consumerBatchEnabled;

/**
* @param txSize the transaction size.
* @see SimpleMessageListenerContainer#setTxSize
* @see SimpleMessageListenerContainer#setBatchSize
* @deprecated in favor of {@link #setBatchSize(Integer)}
*/
@Deprecated
public void setTxSize(Integer txSize) {
this.txSize = txSize;
setBatchSize(txSize);
}

/**
* @param batchSize the batch size.
* @since 2.2
* @see SimpleMessageListenerContainer#setBatchSize
*/
public void setBatchSize(Integer batchSize) {
this.batchSize = batchSize;
}

/**
Expand Down Expand Up @@ -121,13 +132,14 @@ public void setReceiveTimeout(Long receiveTimeout) {
}

/**
* Determine whether or not the container should de-batch batched
* messages (true) or call the listener with the batch (false). Default: true.
* @param deBatchingEnabled whether or not to disable de-batching of messages.
* @see SimpleMessageListenerContainer#setDeBatchingEnabled(boolean)
* Set to true to present a list of messages based on the {@link #setBatchSize(Integer)},
* if the listener supports it.
* @param consumerBatchEnabled true to create message batches in the container.
* @since 2.2
* @see #setBatchSize(Integer)
*/
public void setDeBatchingEnabled(final Boolean deBatchingEnabled) {
this.deBatchingEnabled = deBatchingEnabled;
public void setConsumerBatchEnabled(boolean consumerBatchEnabled) {
this.consumerBatchEnabled = consumerBatchEnabled;
}

@Override
Expand All @@ -140,7 +152,7 @@ protected void initializeContainer(SimpleMessageListenerContainer instance, Rabb
super.initializeContainer(instance, endpoint);

JavaUtils javaUtils = JavaUtils.INSTANCE
.acceptIfNotNull(this.txSize, instance::setBatchSize);
.acceptIfNotNull(this.batchSize, instance::setBatchSize);
String concurrency = null;
if (endpoint != null) {
concurrency = endpoint.getConcurrency();
Expand All @@ -149,14 +161,22 @@ protected void initializeContainer(SimpleMessageListenerContainer instance, Rabb
javaUtils
.acceptIfCondition(concurrency == null && this.concurrentConsumers != null, this.concurrentConsumers,
instance::setConcurrentConsumers)
.acceptIfCondition((concurrency == null || !(concurrency.contains("-"))) && this.maxConcurrentConsumers != null,
.acceptIfCondition((concurrency == null || !(concurrency.contains("-")))
&& this.maxConcurrentConsumers != null,
this.maxConcurrentConsumers, instance::setMaxConcurrentConsumers)
.acceptIfNotNull(this.startConsumerMinInterval, instance::setStartConsumerMinInterval)
.acceptIfNotNull(this.stopConsumerMinInterval, instance::setStopConsumerMinInterval)
.acceptIfNotNull(this.consecutiveActiveTrigger, instance::setConsecutiveActiveTrigger)
.acceptIfNotNull(this.consecutiveIdleTrigger, instance::setConsecutiveIdleTrigger)
.acceptIfNotNull(this.receiveTimeout, instance::setReceiveTimeout)
.acceptIfNotNull(this.deBatchingEnabled, instance::setDeBatchingEnabled);
.acceptIfNotNull(this.receiveTimeout, instance::setReceiveTimeout);
if (Boolean.TRUE.equals(this.consumerBatchEnabled)) {
instance.setConsumerBatchEnabled(true);
/*
* 'batchListener=true' turns off container debatching by default, it must be
* true when consumer batching is enabled.
*/
instance.setDeBatchingEnabled(true);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1495,7 +1495,7 @@ protected void doInvokeListener(ChannelAwareMessageListener listener, Channel ch
}
}
catch (Exception e) {
throw wrapToListenerExecutionFailedExceptionIfNeeded(e, message);
throw wrapToListenerExecutionFailedExceptionIfNeeded(e, data);
}
}
finally {
Expand Down Expand Up @@ -1551,7 +1551,7 @@ protected void doInvokeListener(MessageListener listener, Object data) {
}
}
catch (Exception e) {
throw wrapToListenerExecutionFailedExceptionIfNeeded(e, message);
throw wrapToListenerExecutionFailedExceptionIfNeeded(e, data);
}
}

Expand Down Expand Up @@ -1590,16 +1590,23 @@ protected void handleListenerException(Throwable ex) {

/**
* @param e The Exception.
* @param message The failed message.
* @param data The failed message.
* @return If 'e' is of type {@link ListenerExecutionFailedException} - return 'e' as it is, otherwise wrap it to
* {@link ListenerExecutionFailedException} and return.
*/
@SuppressWarnings("unchecked")
protected ListenerExecutionFailedException wrapToListenerExecutionFailedExceptionIfNeeded(Exception e,
Message message) {
Object data) {

if (!(e instanceof ListenerExecutionFailedException)) {
// Wrap exception to ListenerExecutionFailedException.
return new ListenerExecutionFailedException("Listener threw exception", e, message);
if (data instanceof List) {
return new ListenerExecutionFailedException("Listener threw exception", e,
((List<Message>) data).toArray(new Message[0]));
}
else {
return new ListenerExecutionFailedException("Listener threw exception", e, (Message) data);
}
}
return (ListenerExecutionFailedException) e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,8 @@ protected void doInitialize() {
Assert.state(!this.consumerBatchEnabled || getMessageListener() instanceof BatchMessageListener
|| getMessageListener() instanceof ChannelAwareBatchMessagelistener,
"When setting 'consumerBatchEnabled' to true, the listener must support batching");
Assert.state(!this.consumerBatchEnabled || isDeBatchingEnabled(),
"When setting 'consumerBatchEnabled' to true, 'deBatchingEnabled' must also be true");
}

@ManagedMetric(metricType = MetricType.GAUGE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,25 @@

import org.springframework.amqp.rabbit.batch.BatchingStrategy;
import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessagelistener;
import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler;
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;

import com.rabbitmq.client.Channel;

/**
* A listener adapter for batch listeners.
*
* @author Gary Russell
* @since 2.2
*
*/
public class BatchMessagingMessageListenerAdapter extends MessagingMessageListenerAdapter {
public class BatchMessagingMessageListenerAdapter extends MessagingMessageListenerAdapter
implements ChannelAwareBatchMessagelistener {

private final MessagingMessageConverterAdapter converterAdapter;

Expand All @@ -49,6 +54,36 @@ public BatchMessagingMessageListenerAdapter(Object bean, Method method, boolean
this.batchingStrategy = batchingStrategy == null ? new SimpleBatchingStrategy(0, 0, 0L) : batchingStrategy;
}

@Override
public void onMessageBatch(List<org.springframework.amqp.core.Message> messages, Channel channel) {
Message<?> converted;
if (this.converterAdapter.isAmqpMessageList()) {
converted = new GenericMessage<>(messages);
}
else {
List<Message<?>> messagingMessages = new ArrayList<>();
for (org.springframework.amqp.core.Message message : messages) {
messagingMessages.add(toMessagingMessage(message));
}
if (this.converterAdapter.isMessageList()) {
converted = new GenericMessage<>(messagingMessages);
}
else {
List<Object> payloads = new ArrayList<>();
for (Message<?> message : messagingMessages) {
payloads.add(message.getPayload());
}
converted = new GenericMessage<>(payloads);
}
}
try {
invokeHandlerAndProcessResult(null, channel, converted);
}
catch (Exception e) {
throw RabbitExceptionTranslator.convertRabbitAccessException(e);
}
}

@Override
protected Message<?> toMessagingMessage(org.springframework.amqp.core.Message amqpMessage) {
if (this.batchingStrategy.canDebatch(amqpMessage.getMessageProperties())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.MessagingMessageConverter;
import org.springframework.core.MethodParameter;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.handler.annotation.Payload;
Expand Down Expand Up @@ -128,6 +129,12 @@ public void setMessageConverter(MessageConverter messageConverter) {
@Override
public void onMessage(org.springframework.amqp.core.Message amqpMessage, Channel channel) throws Exception { // NOSONAR
Message<?> message = toMessagingMessage(amqpMessage);
invokeHandlerAndProcessResult(amqpMessage, channel, message);
}

protected void invokeHandlerAndProcessResult(@Nullable org.springframework.amqp.core.Message amqpMessage,
Channel channel, Message<?> message) throws Exception { // NOSONAR

if (logger.isDebugEnabled()) {
logger.debug("Processing [" + message + "]");
}
Expand Down Expand Up @@ -197,8 +204,9 @@ protected Message<?> toMessagingMessage(org.springframework.amqp.core.Message am
* @param message the messaging message.
* @return the result of invoking the handler.
*/
private InvocationResult invokeHandler(org.springframework.amqp.core.Message amqpMessage, Channel channel,
private InvocationResult invokeHandler(@Nullable org.springframework.amqp.core.Message amqpMessage, Channel channel,
Message<?> message) {

try {
return this.handlerAdapter.invoke(message, amqpMessage, channel);
}
Expand Down Expand Up @@ -267,6 +275,8 @@ protected final class MessagingMessageConverterAdapter extends MessagingMessageC

private boolean isMessageList;

private boolean isAmqpMessageList;

MessagingMessageConverterAdapter(Object bean, Method method, boolean batch) {
this.bean = bean;
this.method = method;
Expand All @@ -281,6 +291,14 @@ protected boolean isMessageList() {
return this.isMessageList;
}

protected boolean isAmqpMessageList() {
return this.isAmqpMessageList;
}

protected Method getMethod() {
return this.method;
}

@Override
protected Object extractPayload(org.springframework.amqp.core.Message message) {
MessageProperties messageProperties = message.getMessageProperties();
Expand Down Expand Up @@ -362,6 +380,7 @@ else if (parameterizedType.getRawType().equals(List.class)
boolean messageHasGeneric = paramType instanceof ParameterizedType
&& ((ParameterizedType) paramType).getRawType().equals(Message.class);
this.isMessageList = paramType.equals(Message.class) || messageHasGeneric;
this.isAmqpMessageList = paramType.equals(org.springframework.amqp.core.Message.class);
if (messageHasGeneric) {
genericParameterType = ((ParameterizedType) paramType).getActualTypeArguments()[0];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@

package org.springframework.amqp.rabbit.support;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;

Expand All @@ -31,21 +37,25 @@
@SuppressWarnings("serial")
public class ListenerExecutionFailedException extends AmqpException {

private final Message failedMessage;
private final List<Message> failedMessages = new ArrayList<>();

/**
* Constructor for ListenerExecutionFailedException.
* @param msg the detail message
* @param cause the exception thrown by the listener method
* @param failedMessage the message that failed
* @param failedMessage the message(s) that failed
*/
public ListenerExecutionFailedException(String msg, Throwable cause, Message failedMessage) {
public ListenerExecutionFailedException(String msg, Throwable cause, Message... failedMessage) {
super(msg, cause);
this.failedMessage = failedMessage;
this.failedMessages.addAll(Arrays.asList(failedMessage));
}

public Message getFailedMessage() {
return this.failedMessage;
return this.failedMessages.get(0);
}

public Collection<Message> getFailedMessages() {
return Collections.unmodifiableList(this.failedMessages);
}

}
Loading

0 comments on commit d1ebe22

Please sign in to comment.