diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java index 5b05a99f27..c605f15176 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java @@ -56,6 +56,7 @@ import org.springframework.amqp.rabbit.connection.RabbitResourceHolder; import org.springframework.amqp.rabbit.connection.RabbitUtils; import org.springframework.amqp.rabbit.connection.RoutingConnectionFactory; +import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessageListener; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.listener.exception.FatalListenerExecutionException; import org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException; @@ -243,6 +244,8 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor private volatile boolean lazyLoad; + private boolean isBatchListener; + @Override public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { this.applicationEventPublisher = applicationEventPublisher; @@ -424,6 +427,8 @@ public void setExposeListenerChannel(boolean exposeListenerChannel) { */ public void setMessageListener(MessageListener messageListener) { this.messageListener = messageListener; + this.isBatchListener = messageListener instanceof BatchMessageListener + || messageListener instanceof ChannelAwareBatchMessageListener; } /** @@ -1923,8 +1928,8 @@ private void checkPossibleAuthenticationFailureFatalFromProperty() { @Nullable protected List debatch(Message message) { - if (isDeBatchingEnabled() && getBatchingStrategy().canDebatch(message.getMessageProperties()) - && getMessageListener() instanceof BatchMessageListener) { + if (this.isBatchListener && isDeBatchingEnabled() + && getBatchingStrategy().canDebatch(message.getMessageProperties())) { final List messageList = new ArrayList<>(); getBatchingStrategy().deBatch(message, fragment -> messageList.add(fragment)); return messageList; diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java index 68c67e364c..07d368c331 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java @@ -56,8 +56,10 @@ import org.springframework.amqp.rabbit.support.ListenerContainerAware; import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException; import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator; +import org.springframework.amqp.support.ConsumerTagStrategy; import org.springframework.jmx.export.annotation.ManagedMetric; import org.springframework.jmx.support.MetricType; +import org.springframework.lang.Nullable; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.transaction.support.TransactionTemplate; @@ -833,8 +835,9 @@ this.cancellationLock, getAcknowledgeMode(), isChannelTransacted(), actualPrefet if (this.retryDeclarationInterval != null) { consumer.setRetryDeclarationInterval(this.retryDeclarationInterval); } - if (getConsumerTagStrategy() != null) { - consumer.setTagStrategy(getConsumerTagStrategy()); // NOSONAR never null here + ConsumerTagStrategy consumerTagStrategy = getConsumerTagStrategy(); + if (consumerTagStrategy != null) { + consumer.setTagStrategy(consumerTagStrategy); } consumer.setBackOffExecution(getRecoveryBackOff().start()); consumer.setShutdownTimeout(getShutdownTimeout()); @@ -1098,7 +1101,7 @@ protected void handleStartupFailure(BackOffExecution backOffExecution) { } @Override - protected void publishConsumerFailedEvent(String reason, boolean fatal, Throwable t) { + protected void publishConsumerFailedEvent(String reason, boolean fatal, @Nullable Throwable t) { if (!fatal || !isRunning()) { super.publishConsumerFailedEvent(reason, fatal, t); } diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitBatchIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitBatchIntegrationTests.java index ee3da2da7b..2c37e3b203 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitBatchIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitBatchIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 the original author or authors. + * Copyright 2019-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,6 +34,7 @@ import org.springframework.amqp.rabbit.junit.RabbitAvailable; import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition; import org.springframework.amqp.support.AmqpHeaders; +import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -50,7 +51,7 @@ */ @SpringJUnitConfig @DirtiesContext -@RabbitAvailable(queues = { "batch.1", "batch.2", "batch.3" }) +@RabbitAvailable(queues = { "batch.1", "batch.2", "batch.3", "batch.4" }) public class EnableRabbitBatchIntegrationTests { @Autowired @@ -99,6 +100,20 @@ public void simpleListConsumerAndProducerBatching() throws InterruptedException assertThat(this.listener.foosConsumerBatchToo.get(3).getBar()).isEqualTo("qux"); } + @Test + public void nativeMessageList() throws InterruptedException { + this.template.convertAndSend("batch.4", new Foo("foo")); + this.template.convertAndSend("batch.4", new Foo("bar")); + assertThat(this.listener.nativeMessagesLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.listener.nativeMessages).hasSize(2); + Foo payload = (Foo) new SimpleMessageConverter().fromMessage(this.listener.nativeMessages.get(0)); + assertThat(payload.getBar()).isEqualTo("foo"); + assertThat(this.listener.nativeMessages.get(1).getMessageProperties() + .getHeaders() + .get(AmqpHeaders.BATCH_SIZE)) + .isEqualTo(2); + } + @Configuration @EnableRabbit public static class Config { @@ -116,6 +131,11 @@ public DirectRabbitListenerContainerFactory directListenerContainerFactory() { DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory()); factory.setBatchListener(true); + factory.setContainerCustomizer(container -> { + if (container.getQueueNames()[0].equals("batch.4")) { + container.setDeBatchingEnabled(true); + } + }); return factory; } @@ -166,6 +186,10 @@ public static class Listener { CountDownLatch fooConsumerBatchTooLatch = new CountDownLatch(1); + private List nativeMessages; + + private final CountDownLatch nativeMessagesLatch = new CountDownLatch(1); + @RabbitListener(queues = "batch.1") public void listen1(List in) { this.foos = in; @@ -184,6 +208,12 @@ public void listen3(List in) { this.fooConsumerBatchTooLatch.countDown(); } + @RabbitListener(queues = "batch.4", containerFactory = "directListenerContainerFactory") + public void listen4(List in) { + this.nativeMessages = in; + this.nativeMessagesLatch.countDown(); + } + } @SuppressWarnings("serial")