Skip to content

Commit

Permalink
@RabbitListener property improvements
Browse files Browse the repository at this point in the history
For bean name properties that can have an expression, allow
the expression to evaluate to an instance of the desired
type instead of a bean name.

* - consistent use of `@` in the test case
  • Loading branch information
garyrussell authored Dec 23, 2020
1 parent 2d0763e commit d89f10d
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,15 @@
String id() default "";

/**
* The bean name of the {@link org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory}
* to use to create the message listener container responsible to serve this endpoint.
* <p>If not specified, the default container factory is used, if any.
* @return the {@link org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory}
* The bean name of the
* {@link org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory} to
* use to create the message listener container responsible to serve this endpoint.
* <p>
* If not specified, the default container factory is used, if any. If a SpEL
* expression is provided ({@code #{...}}), the expression can either evaluate to a
* container factory instance or a bean name.
* @return the
* {@link org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory}
* bean name.
*/
String containerFactory() default "";
Expand Down Expand Up @@ -166,12 +171,14 @@
String priority() default "";

/**
* Reference to a {@link org.springframework.amqp.rabbit.core.RabbitAdmin
* RabbitAdmin}. Required if the listener is using auto-delete
* queues and those queues are configured for conditional declaration. This
* is the admin that will (re)declare those queues when the container is
* (re)started. See the reference documentation for more information.
* @return the {@link org.springframework.amqp.rabbit.core.RabbitAdmin} bean name.
* Reference to a {@link org.springframework.amqp.core.AmqpAdmin AmqpAdmin}.
* Required if the listener is using auto-delete queues and those queues are
* configured for conditional declaration. This is the admin that will (re)declare
* those queues when the container is (re)started. See the reference documentation for
* more information. If a SpEL expression is provided ({@code #{...}}) the expression
* can evaluate to an {@link org.springframework.amqp.core.AmqpAdmin} instance
* or bean name.
* @return the {@link org.springframework.amqp.core.AmqpAdmin} bean name.
*/
String admin() default "";

Expand Down Expand Up @@ -246,8 +253,10 @@
String autoStartup() default "";

/**
* Set the task executor bean name to use for this listener's container; overrides
* any executor set on the container factory.
* Set the task executor bean name to use for this listener's container; overrides any
* executor set on the container factory. If a SpEL expression is provided
* ({@code #{...}}), the expression can either evaluate to a executor instance or a bean
* name.
* @return the executor bean name.
* @since 2.2
*/
Expand All @@ -266,7 +275,9 @@
/**
* The bean name of a
* {@link org.springframework.amqp.rabbit.listener.adapter.ReplyPostProcessor} to post
* process a response before it is sent.
* process a response before it is sent. If a SpEL expression is provided
* ({@code #{...}}), the expression can either evaluate to a post processor instance
* or a bean name.
* @return the bean name.
* @since 2.2.5
* @see org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener#setReplyPostProcessor(org.springframework.amqp.rabbit.listener.adapter.ReplyPostProcessor)
Expand All @@ -275,7 +286,9 @@

/**
* Override the container factory's message converter used for this listener.
* @return the message converter bean name.
* @return the message converter bean name. If a SpEL expression is provided
* ({@code #{...}}), the expression can either evaluate to a converter instance
* or a bean name.
* @since 2.3
*/
String messageConverter() default "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.commons.logging.LogFactory;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Base64UrlNamingStrategy;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Binding.DestinationType;
Expand Down Expand Up @@ -415,20 +416,7 @@ protected Collection<Declarable> processListener(MethodRabbitListenerEndpoint en
endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));
endpoint.setBeanFactory(this.beanFactory);
endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions()));
Object errorHandler = resolveExpression(rabbitListener.errorHandler());
if (errorHandler instanceof RabbitListenerErrorHandler) {
endpoint.setErrorHandler((RabbitListenerErrorHandler) errorHandler);
}
else if (errorHandler instanceof String) {
String errorHandlerBeanName = (String) errorHandler;
if (StringUtils.hasText(errorHandlerBeanName)) {
endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, RabbitListenerErrorHandler.class));
}
}
else {
throw new IllegalStateException("error handler mut be a bean name or RabbitListenerErrorHandler, not a "
+ errorHandler.getClass().toString());
}
resolveErrorHandler(endpoint, rabbitListener);
String group = rabbitListener.group();
if (StringUtils.hasText(group)) {
Object resolvedGroup = resolveExpression(group);
Expand Down Expand Up @@ -465,6 +453,20 @@ else if (errorHandler instanceof String) {
return declarables;
}

private void resolveErrorHandler(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener) {
Object errorHandler = resolveExpression(rabbitListener.errorHandler());
if (errorHandler instanceof RabbitListenerErrorHandler) {
endpoint.setErrorHandler((RabbitListenerErrorHandler) errorHandler);
}
else {
String errorHandlerBeanName = resolveExpressionAsString(rabbitListener.errorHandler(), "errorHandler");
if (StringUtils.hasText(errorHandlerBeanName)) {
endpoint.setErrorHandler(
this.beanFactory.getBean(errorHandlerBeanName, RabbitListenerErrorHandler.class));
}
}
}

private void resolveAckMode(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener) {
String ackModeAttr = rabbitListener.ackMode();
if (StringUtils.hasText(ackModeAttr)) {
Expand All @@ -482,16 +484,22 @@ else if (ackMode instanceof AcknowledgeMode) {
}

private void resolveAdmin(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object adminTarget) {
String rabbitAdmin = resolveExpressionAsString(rabbitListener.admin(), "admin");
if (StringUtils.hasText(rabbitAdmin)) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to resolve RabbitAdmin by bean name");
try {
endpoint.setAdmin(this.beanFactory.getBean(rabbitAdmin, RabbitAdmin.class));
}
catch (NoSuchBeanDefinitionException ex) {
throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
adminTarget + "], no " + RabbitAdmin.class.getSimpleName() + " with id '" +
rabbitAdmin + "' was found in the application context", ex);
Object resolved = resolveExpression(rabbitListener.admin());
if (resolved instanceof AmqpAdmin) {
endpoint.setAdmin((AmqpAdmin) resolved);
}
else {
String rabbitAdmin = resolveExpressionAsString(rabbitListener.admin(), "admin");
if (StringUtils.hasText(rabbitAdmin)) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to resolve RabbitAdmin by bean name");
try {
endpoint.setAdmin(this.beanFactory.getBean(rabbitAdmin, RabbitAdmin.class));
}
catch (NoSuchBeanDefinitionException ex) {
throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
adminTarget + "], no " + RabbitAdmin.class.getSimpleName() + " with id '" +
rabbitAdmin + "' was found in the application context", ex);
}
}
}
}
Expand All @@ -501,6 +509,10 @@ private RabbitListenerContainerFactory<?> resolveContainerFactory(RabbitListener
Object factoryTarget, String beanName) {

RabbitListenerContainerFactory<?> factory = null;
Object resolved = resolveExpression(rabbitListener.containerFactory());
if (resolved instanceof RabbitListenerContainerFactory) {
return (RabbitListenerContainerFactory<?>) resolved;
}
String containerFactoryBeanName = resolveExpressionAsString(rabbitListener.containerFactory(),
"containerFactory");
if (StringUtils.hasText(containerFactoryBeanName)) {
Expand All @@ -520,47 +532,65 @@ private RabbitListenerContainerFactory<?> resolveContainerFactory(RabbitListener
private void resolveExecutor(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener,
Object execTarget, String beanName) {

String execBeanName = resolveExpressionAsString(rabbitListener.executor(), "executor");
if (StringUtils.hasText(execBeanName)) {
assertBeanFactory();
try {
endpoint.setTaskExecutor(this.beanFactory.getBean(execBeanName, TaskExecutor.class));
}
catch (NoSuchBeanDefinitionException ex) {
throw new BeanInitializationException(
noBeanFoundMessage(execTarget, beanName, execBeanName, TaskExecutor.class), ex);
Object resolved = resolveExpression(rabbitListener.executor());
if (resolved instanceof TaskExecutor) {
endpoint.setTaskExecutor((TaskExecutor) resolved);
}
else {
String execBeanName = resolveExpressionAsString(rabbitListener.executor(), "executor");
if (StringUtils.hasText(execBeanName)) {
assertBeanFactory();
try {
endpoint.setTaskExecutor(this.beanFactory.getBean(execBeanName, TaskExecutor.class));
}
catch (NoSuchBeanDefinitionException ex) {
throw new BeanInitializationException(
noBeanFoundMessage(execTarget, beanName, execBeanName, TaskExecutor.class), ex);
}
}
}
}

private void resolvePostProcessor(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener,
Object target, String beanName) {

String ppBeanName = resolveExpressionAsString(rabbitListener.replyPostProcessor(), "replyPostProcessor");
if (StringUtils.hasText(ppBeanName)) {
assertBeanFactory();
try {
endpoint.setReplyPostProcessor(this.beanFactory.getBean(ppBeanName, ReplyPostProcessor.class));
}
catch (NoSuchBeanDefinitionException ex) {
throw new BeanInitializationException(
noBeanFoundMessage(target, beanName, ppBeanName, ReplyPostProcessor.class), ex);
Object resolved = resolveExpression(rabbitListener.replyPostProcessor());
if (resolved instanceof ReplyPostProcessor) {
endpoint.setReplyPostProcessor((ReplyPostProcessor) resolved);
}
else {
String ppBeanName = resolveExpressionAsString(rabbitListener.replyPostProcessor(), "replyPostProcessor");
if (StringUtils.hasText(ppBeanName)) {
assertBeanFactory();
try {
endpoint.setReplyPostProcessor(this.beanFactory.getBean(ppBeanName, ReplyPostProcessor.class));
}
catch (NoSuchBeanDefinitionException ex) {
throw new BeanInitializationException(
noBeanFoundMessage(target, beanName, ppBeanName, ReplyPostProcessor.class), ex);
}
}
}
}

private void resolveMessageConverter(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener,
Object target, String beanName) {

String mcBeanName = resolveExpressionAsString(rabbitListener.messageConverter(), "messageConverter");
if (StringUtils.hasText(mcBeanName)) {
assertBeanFactory();
try {
endpoint.setMessageConverter(this.beanFactory.getBean(mcBeanName, MessageConverter.class));
}
catch (NoSuchBeanDefinitionException ex) {
throw new BeanInitializationException(
noBeanFoundMessage(target, beanName, mcBeanName, MessageConverter.class), ex);
Object resolved = resolveExpression(rabbitListener.messageConverter());
if (resolved instanceof MessageConverter) {
endpoint.setMessageConverter((MessageConverter) resolved);
}
else {
String mcBeanName = resolveExpressionAsString(rabbitListener.messageConverter(), "messageConverter");
if (StringUtils.hasText(mcBeanName)) {
assertBeanFactory();
try {
endpoint.setMessageConverter(this.beanFactory.getBean(mcBeanName, MessageConverter.class));
}
catch (NoSuchBeanDefinitionException ex) {
throw new BeanInitializationException(
noBeanFoundMessage(target, beanName, mcBeanName, MessageConverter.class), ex);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,20 @@
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition;
import org.springframework.amqp.rabbit.listener.adapter.ReplyPostProcessor;
import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler;
import org.springframework.amqp.support.converter.ContentTypeDelegatingMessageConverter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

Expand Down Expand Up @@ -117,12 +121,35 @@ public CachingConnectionFactory cf() {
return new CachingConnectionFactory(RabbitAvailableCondition.getBrokerRunning().getConnectionFactory());
}

@Bean
public RabbitAdmin admin(CachingConnectionFactory cf) {
return new RabbitAdmin(cf);
}

@Bean
public Jackson2JsonMessageConverter converter() {
return new Jackson2JsonMessageConverter();
}

@RabbitListener(queues = "EnableRabbitReturnTypesTests.1")
@Bean
public SimpleAsyncTaskExecutor exec() {
return new SimpleAsyncTaskExecutor();
}

@Bean
public ReplyPostProcessor rpp() {
return (in, out) -> out;
}

@Bean
public RabbitListenerErrorHandler rleh() {
return (amqpMessage, message, exception) -> null;
}

@RabbitListener(queues = "EnableRabbitReturnTypesTests.1", admin = "#{@admin}",
containerFactory = "#{@rabbitListenerContainerFactory}",
executor = "#{@exec}", replyPostProcessor = "#{@rpp}", messageConverter = "#{@converter}",
errorHandler = "#{@rleh}")
public One listen1(String in) {
if ("3".equals(in)) {
return new Three();
Expand Down

0 comments on commit d89f10d

Please sign in to comment.