From 765e0118b79170076bf9de40e570b50001361399 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Mon, 25 Jul 2022 10:46:32 -0400 Subject: [PATCH] GH-1474: Fix BatchingStrategy Propagation See https://github.com/spring-projects/spring-amqp/issues/1474 (Does not resolve because 2 issues are reported there). `BatchingStrategy` was not set by container factory. **cherry-pick to 2.4.x** --- .../AbstractRabbitListenerContainerFactory.java | 5 +++-- .../listener/AbstractRabbitListenerEndpoint.java | 3 ++- .../rabbit/listener/RabbitListenerEndpoint.java | 14 +++++++++++--- .../RabbitListenerContainerFactoryTests.java | 12 +++++++++++- 4 files changed, 27 insertions(+), 7 deletions(-) diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/AbstractRabbitListenerContainerFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/AbstractRabbitListenerContainerFactory.java index 69d6874173..bb1f6b1ffb 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/AbstractRabbitListenerContainerFactory.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/AbstractRabbitListenerContainerFactory.java @@ -369,7 +369,8 @@ public C createListenerContainer(RabbitListenerEndpoint endpoint) { .acceptIfNotNull(this.phase, instance::setPhase) .acceptIfNotNull(this.afterReceivePostProcessors, instance::setAfterReceivePostProcessors) .acceptIfNotNull(this.deBatchingEnabled, instance::setDeBatchingEnabled) - .acceptIfNotNull(this.messageAckListener, instance::setMessageAckListener); + .acceptIfNotNull(this.messageAckListener, instance::setMessageAckListener) + .acceptIfNotNull(this.batchingStrategy, instance::setBatchingStrategy); if (this.batchListener && this.deBatchingEnabled == null) { // turn off container debatching by default for batch listeners instance.setDeBatchingEnabled(false); @@ -378,7 +379,7 @@ public C createListenerContainer(RabbitListenerEndpoint endpoint) { javaUtils .acceptIfNotNull(endpoint.getTaskExecutor(), instance::setTaskExecutor) .acceptIfNotNull(endpoint.getAckMode(), instance::setAcknowledgeMode) - .acceptIfNotNull(this.batchingStrategy, endpoint::setBatchingStrategy); + .acceptIfNotNull(endpoint.getBatchingStrategy(), instance::setBatchingStrategy); instance.setListenerId(endpoint.getId()); endpoint.setBatchListener(this.batchListener); } diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractRabbitListenerEndpoint.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractRabbitListenerEndpoint.java index bcd7dffc4f..6aaaf7c700 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractRabbitListenerEndpoint.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractRabbitListenerEndpoint.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2021 the original author or authors. + * Copyright 2014-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -308,6 +308,7 @@ public void setBatchListener(boolean batchListener) { this.batchListener = batchListener; } + @Override @Nullable public BatchingStrategy getBatchingStrategy() { return this.batchingStrategy; diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/RabbitListenerEndpoint.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/RabbitListenerEndpoint.java index 979dd9ee1c..787a9473d5 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/RabbitListenerEndpoint.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/RabbitListenerEndpoint.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -114,7 +114,6 @@ default TaskExecutor getTaskExecutor() { * @since 2.2 */ default void setBatchListener(boolean batchListener) { - // NOSONAR empty } /** @@ -124,7 +123,16 @@ default void setBatchListener(boolean batchListener) { * @see #setBatchListener(boolean) */ default void setBatchingStrategy(BatchingStrategy batchingStrategy) { - // NOSONAR empty + } + + /** + * Return this endpoint's batching strategy, or null. + * @return the strategy. + * @since 2.4.7 + */ + @Nullable + default BatchingStrategy getBatchingStrategy() { + return null; } /** diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/RabbitListenerContainerFactoryTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/RabbitListenerContainerFactoryTests.java index 9707b14942..efcc268de8 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/RabbitListenerContainerFactoryTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/RabbitListenerContainerFactoryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,6 +28,7 @@ import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.MessagePostProcessor; +import org.springframework.amqp.rabbit.batch.BatchingStrategy; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer; import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer; @@ -35,6 +36,7 @@ import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.SimpleMessageConverter; +import org.springframework.amqp.utils.test.TestUtils; import org.springframework.beans.DirectFieldAccessor; import org.springframework.scheduling.TaskScheduler; import org.springframework.transaction.PlatformTransactionManager; @@ -70,12 +72,17 @@ public void createSimpleContainer() { SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint(); endpoint.setMessageListener(this.messageListener); endpoint.setQueueNames("myQueue"); + BatchingStrategy bs1 = mock(BatchingStrategy.class); + this.factory.setBatchingStrategy(bs1); + BatchingStrategy bs2 = mock(BatchingStrategy.class); + endpoint.setBatchingStrategy(bs2); SimpleMessageListenerContainer container = this.factory.createListenerContainer(endpoint); assertBasicConfig(container); assertThat(container.getMessageListener()).isEqualTo(messageListener); assertThat(container.getQueueNames()[0]).isEqualTo("myQueue"); + assertThat(TestUtils.getPropertyValue(container, "batchingStrategy")).isSameAs(bs2); } @Test @@ -89,6 +96,8 @@ public void createContainerFullConfig() { this.factory.setTaskExecutor(executor); this.factory.setTransactionManager(transactionManager); this.factory.setBatchSize(10); + BatchingStrategy bs1 = mock(BatchingStrategy.class); + this.factory.setBatchingStrategy(bs1); this.factory.setConcurrentConsumers(2); this.factory.setMaxConcurrentConsumers(5); this.factory.setStartConsumerMinInterval(2000L); @@ -115,6 +124,7 @@ public void createContainerFullConfig() { SimpleMessageListenerContainer container = this.factory.createListenerContainer(endpoint); assertBasicConfig(container); + assertThat(TestUtils.getPropertyValue(container, "batchingStrategy")).isSameAs(bs1); DirectFieldAccessor fieldAccessor = new DirectFieldAccessor(container); assertThat(fieldAccessor.getPropertyValue("taskExecutor")).isSameAs(executor); assertThat(fieldAccessor.getPropertyValue("transactionManager")).isSameAs(transactionManager);