Skip to content

Commit

Permalink
GH-1201: Fix for ChannelAwareBatchMessageListener
Browse files Browse the repository at this point in the history
Resolves #1201

The previous commit did not work with `ChannelAwareBatchMessageListener`.

Also add a test to get `List<o.s.a.c.Message>` in a `@RabbitListener` (which
exposed this issue).

**cherry-pick to 2.2.x**
  • Loading branch information
garyrussell authored and artembilan committed May 19, 2020
1 parent 6abdd27 commit a2119b3
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.springframework.amqp.rabbit.connection.RabbitResourceHolder;
import org.springframework.amqp.rabbit.connection.RabbitUtils;
import org.springframework.amqp.rabbit.connection.RoutingConnectionFactory;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessageListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.exception.FatalListenerExecutionException;
import org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException;
Expand Down Expand Up @@ -243,6 +244,8 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor

private volatile boolean lazyLoad;

private boolean isBatchListener;

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
Expand Down Expand Up @@ -424,6 +427,8 @@ public void setExposeListenerChannel(boolean exposeListenerChannel) {
*/
public void setMessageListener(MessageListener messageListener) {
this.messageListener = messageListener;
this.isBatchListener = messageListener instanceof BatchMessageListener
|| messageListener instanceof ChannelAwareBatchMessageListener;
}

/**
Expand Down Expand Up @@ -1923,8 +1928,8 @@ private void checkPossibleAuthenticationFailureFatalFromProperty() {

@Nullable
protected List<Message> debatch(Message message) {
if (isDeBatchingEnabled() && getBatchingStrategy().canDebatch(message.getMessageProperties())
&& getMessageListener() instanceof BatchMessageListener) {
if (this.isBatchListener && isDeBatchingEnabled()
&& getBatchingStrategy().canDebatch(message.getMessageProperties())) {
final List<Message> messageList = new ArrayList<>();
getBatchingStrategy().deBatch(message, fragment -> messageList.add(fragment));
return messageList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@
import org.springframework.amqp.rabbit.support.ListenerContainerAware;
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
import org.springframework.amqp.support.ConsumerTagStrategy;
import org.springframework.jmx.export.annotation.ManagedMetric;
import org.springframework.jmx.support.MetricType;
import org.springframework.lang.Nullable;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate;
Expand Down Expand Up @@ -833,8 +835,9 @@ this.cancellationLock, getAcknowledgeMode(), isChannelTransacted(), actualPrefet
if (this.retryDeclarationInterval != null) {
consumer.setRetryDeclarationInterval(this.retryDeclarationInterval);
}
if (getConsumerTagStrategy() != null) {
consumer.setTagStrategy(getConsumerTagStrategy()); // NOSONAR never null here
ConsumerTagStrategy consumerTagStrategy = getConsumerTagStrategy();
if (consumerTagStrategy != null) {
consumer.setTagStrategy(consumerTagStrategy);
}
consumer.setBackOffExecution(getRecoveryBackOff().start());
consumer.setShutdownTimeout(getShutdownTimeout());
Expand Down Expand Up @@ -1098,7 +1101,7 @@ protected void handleStartupFailure(BackOffExecution backOffExecution) {
}

@Override
protected void publishConsumerFailedEvent(String reason, boolean fatal, Throwable t) {
protected void publishConsumerFailedEvent(String reason, boolean fatal, @Nullable Throwable t) {
if (!fatal || !isRunning()) {
super.publishConsumerFailedEvent(reason, fatal, t);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 the original author or authors.
* Copyright 2019-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -34,6 +34,7 @@
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -50,7 +51,7 @@
*/
@SpringJUnitConfig
@DirtiesContext
@RabbitAvailable(queues = { "batch.1", "batch.2", "batch.3" })
@RabbitAvailable(queues = { "batch.1", "batch.2", "batch.3", "batch.4" })
public class EnableRabbitBatchIntegrationTests {

@Autowired
Expand Down Expand Up @@ -99,6 +100,20 @@ public void simpleListConsumerAndProducerBatching() throws InterruptedException
assertThat(this.listener.foosConsumerBatchToo.get(3).getBar()).isEqualTo("qux");
}

@Test
public void nativeMessageList() throws InterruptedException {
this.template.convertAndSend("batch.4", new Foo("foo"));
this.template.convertAndSend("batch.4", new Foo("bar"));
assertThat(this.listener.nativeMessagesLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(this.listener.nativeMessages).hasSize(2);
Foo payload = (Foo) new SimpleMessageConverter().fromMessage(this.listener.nativeMessages.get(0));
assertThat(payload.getBar()).isEqualTo("foo");
assertThat(this.listener.nativeMessages.get(1).getMessageProperties()
.getHeaders()
.get(AmqpHeaders.BATCH_SIZE))
.isEqualTo(2);
}

@Configuration
@EnableRabbit
public static class Config {
Expand All @@ -116,6 +131,11 @@ public DirectRabbitListenerContainerFactory directListenerContainerFactory() {
DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setBatchListener(true);
factory.setContainerCustomizer(container -> {
if (container.getQueueNames()[0].equals("batch.4")) {
container.setDeBatchingEnabled(true);
}
});
return factory;
}

Expand Down Expand Up @@ -166,6 +186,10 @@ public static class Listener {

CountDownLatch fooConsumerBatchTooLatch = new CountDownLatch(1);

private List<org.springframework.amqp.core.Message> nativeMessages;

private final CountDownLatch nativeMessagesLatch = new CountDownLatch(1);

@RabbitListener(queues = "batch.1")
public void listen1(List<Foo> in) {
this.foos = in;
Expand All @@ -184,6 +208,12 @@ public void listen3(List<Foo> in) {
this.fooConsumerBatchTooLatch.countDown();
}

@RabbitListener(queues = "batch.4", containerFactory = "directListenerContainerFactory")
public void listen4(List<org.springframework.amqp.core.Message> in) {
this.nativeMessages = in;
this.nativeMessagesLatch.countDown();
}

}

@SuppressWarnings("serial")
Expand Down

0 comments on commit a2119b3

Please sign in to comment.