Skip to content

Commit

Permalink
GH-1474: Fix BatchingStrategy Propagation
Browse files Browse the repository at this point in the history
See #1474
(Does not resolve because 2 issues are reported there).

`BatchingStrategy` was not set by container factory.

**cherry-pick to 2.4.x**
  • Loading branch information
garyrussell authored and artembilan committed Jul 25, 2022
1 parent 535ec9c commit 765e011
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,8 @@ public C createListenerContainer(RabbitListenerEndpoint endpoint) {
.acceptIfNotNull(this.phase, instance::setPhase)
.acceptIfNotNull(this.afterReceivePostProcessors, instance::setAfterReceivePostProcessors)
.acceptIfNotNull(this.deBatchingEnabled, instance::setDeBatchingEnabled)
.acceptIfNotNull(this.messageAckListener, instance::setMessageAckListener);
.acceptIfNotNull(this.messageAckListener, instance::setMessageAckListener)
.acceptIfNotNull(this.batchingStrategy, instance::setBatchingStrategy);
if (this.batchListener && this.deBatchingEnabled == null) {
// turn off container debatching by default for batch listeners
instance.setDeBatchingEnabled(false);
Expand All @@ -378,7 +379,7 @@ public C createListenerContainer(RabbitListenerEndpoint endpoint) {
javaUtils
.acceptIfNotNull(endpoint.getTaskExecutor(), instance::setTaskExecutor)
.acceptIfNotNull(endpoint.getAckMode(), instance::setAcknowledgeMode)
.acceptIfNotNull(this.batchingStrategy, endpoint::setBatchingStrategy);
.acceptIfNotNull(endpoint.getBatchingStrategy(), instance::setBatchingStrategy);
instance.setListenerId(endpoint.getId());
endpoint.setBatchListener(this.batchListener);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2021 the original author or authors.
* Copyright 2014-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -308,6 +308,7 @@ public void setBatchListener(boolean batchListener) {
this.batchListener = batchListener;
}

@Override
@Nullable
public BatchingStrategy getBatchingStrategy() {
return this.batchingStrategy;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -114,7 +114,6 @@ default TaskExecutor getTaskExecutor() {
* @since 2.2
*/
default void setBatchListener(boolean batchListener) {
// NOSONAR empty
}

/**
Expand All @@ -124,7 +123,16 @@ default void setBatchListener(boolean batchListener) {
* @see #setBatchListener(boolean)
*/
default void setBatchingStrategy(BatchingStrategy batchingStrategy) {
// NOSONAR empty
}

/**
* Return this endpoint's batching strategy, or null.
* @return the strategy.
* @since 2.4.7
*/
@Nullable
default BatchingStrategy getBatchingStrategy() {
return null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,13 +28,15 @@

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.amqp.utils.test.TestUtils;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.transaction.PlatformTransactionManager;
Expand Down Expand Up @@ -70,12 +72,17 @@ public void createSimpleContainer() {
SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();
endpoint.setMessageListener(this.messageListener);
endpoint.setQueueNames("myQueue");
BatchingStrategy bs1 = mock(BatchingStrategy.class);
this.factory.setBatchingStrategy(bs1);
BatchingStrategy bs2 = mock(BatchingStrategy.class);
endpoint.setBatchingStrategy(bs2);

SimpleMessageListenerContainer container = this.factory.createListenerContainer(endpoint);

assertBasicConfig(container);
assertThat(container.getMessageListener()).isEqualTo(messageListener);
assertThat(container.getQueueNames()[0]).isEqualTo("myQueue");
assertThat(TestUtils.getPropertyValue(container, "batchingStrategy")).isSameAs(bs2);
}

@Test
Expand All @@ -89,6 +96,8 @@ public void createContainerFullConfig() {
this.factory.setTaskExecutor(executor);
this.factory.setTransactionManager(transactionManager);
this.factory.setBatchSize(10);
BatchingStrategy bs1 = mock(BatchingStrategy.class);
this.factory.setBatchingStrategy(bs1);
this.factory.setConcurrentConsumers(2);
this.factory.setMaxConcurrentConsumers(5);
this.factory.setStartConsumerMinInterval(2000L);
Expand All @@ -115,6 +124,7 @@ public void createContainerFullConfig() {
SimpleMessageListenerContainer container = this.factory.createListenerContainer(endpoint);

assertBasicConfig(container);
assertThat(TestUtils.getPropertyValue(container, "batchingStrategy")).isSameAs(bs1);
DirectFieldAccessor fieldAccessor = new DirectFieldAccessor(container);
assertThat(fieldAccessor.getPropertyValue("taskExecutor")).isSameAs(executor);
assertThat(fieldAccessor.getPropertyValue("transactionManager")).isSameAs(transactionManager);
Expand Down

0 comments on commit 765e011

Please sign in to comment.