Skip to content

Commit

Permalink
GH-1484: RabbitListener.batch() Override
Browse files Browse the repository at this point in the history
Resolves #1484
  • Loading branch information
garyrussell authored and artembilan committed Aug 4, 2022
1 parent 97a508e commit 515eb9a
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -332,4 +332,18 @@
*/
String converterWinsContentType() default "true";

/**
* Override the container factory's {@code batchListener} property. The listener
* method signature should receive a {@code List<?>}; refer to the reference
* documentation. This allows a single container factory to be used for both record
* and batch listeners; previously separate container factories were required.
* @return "true" for the annotated method to be a batch listener or "false" for a
* single message listener. If not set, the container factory setting is used. SpEL and
* property place holders are not supported because the listener type cannot be
* variable.
* @since 3.0
* @see Boolean#parseBoolean(String)
*/
String batch() default "";

}
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,9 @@ protected Collection<Declarable> processListener(MethodRabbitListenerEndpoint en
resolvePostProcessor(endpoint, rabbitListener, target, beanName);
resolveMessageConverter(endpoint, rabbitListener, target, beanName);
resolveReplyContentType(endpoint, rabbitListener);
if (StringUtils.hasText(rabbitListener.batch())) {
endpoint.setBatchListener(Boolean.parseBoolean(rabbitListener.batch()));
}
RabbitListenerContainerFactory<?> factory = resolveContainerFactory(rabbitListener, target, beanName);

this.registrar.registerEndpoint(endpoint, factory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,9 @@ public C createListenerContainer(RabbitListenerEndpoint endpoint) {
.acceptIfNotNull(endpoint.getAckMode(), instance::setAcknowledgeMode)
.acceptIfNotNull(endpoint.getBatchingStrategy(), instance::setBatchingStrategy);
instance.setListenerId(endpoint.getId());
endpoint.setBatchListener(this.batchListener);
if (endpoint.getBatchListener() == null) {
endpoint.setBatchListener(this.batchListener);
}
}
applyCommonOverrides(endpoint, instance);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public abstract class AbstractRabbitListenerEndpoint implements RabbitListenerEn

private TaskExecutor taskExecutor;

private boolean batchListener;
private Boolean batchListener;

private BatchingStrategy batchingStrategy;

Expand Down Expand Up @@ -293,7 +293,21 @@ public void setTaskExecutor(TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}

/**
* True if this endpoint is for a batch listener.
* @return true if batch.
*/
public boolean isBatchListener() {
return this.batchListener == null ? false : this.batchListener;
}

/**
* True if this endpoint is for a batch listener.
* @return {@link Boolean#TRUE} if batch.
* @since 3.0
*/
@Nullable
public Boolean getBatchListener() {
return this.batchListener;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2020 the original author or authors.
* Copyright 2014-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -130,7 +130,7 @@ public void setAdapterProvider(AdapterProvider adapterProvider) {
protected MessagingMessageListenerAdapter createMessageListener(MessageListenerContainer container) {
Assert.state(this.messageHandlerMethodFactory != null,
"Could not create message listener - MessageHandlerMethodFactory not set");
MessagingMessageListenerAdapter messageListener = createMessageListenerInstance();
MessagingMessageListenerAdapter messageListener = createMessageListenerInstance(getBatchListener());
messageListener.setHandlerAdapter(configureListenerAdapter(messageListener));
String replyToAddress = getDefaultReplyToAddress();
if (replyToAddress != null) {
Expand Down Expand Up @@ -159,11 +159,12 @@ protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapte

/**
* Create an empty {@link MessagingMessageListenerAdapter} instance.
* @param batch whether this endpoint is for a batch listener.
* @return the {@link MessagingMessageListenerAdapter} instance.
*/
protected MessagingMessageListenerAdapter createMessageListenerInstance() {
return this.adapterProvider.getAdapter(isBatchListener(), this.bean, this.method, this.returnExceptions,
this.errorHandler, getBatchingStrategy());
protected MessagingMessageListenerAdapter createMessageListenerInstance(@Nullable Boolean batch) {
return this.adapterProvider.getAdapter(batch == null ? isBatchListener() : batch, this.bean, this.method,
this.returnExceptions, this.errorHandler, getBatchingStrategy());
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,14 @@ default TaskExecutor getTaskExecutor() {
default void setBatchListener(boolean batchListener) {
}

/**
* Whether this endpoint is for a batch listener.
* @return {@link Boolean#TRUE} if batch.
* @since 3.0
*/
@Nullable
Boolean getBatchListener();

/**
* Set a {@link BatchingStrategy} to use when debatching messages.
* @param batchingStrategy the batching strategy.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2020 the original author or authors.
* Copyright 2019-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -143,7 +143,7 @@ public DirectRabbitListenerContainerFactory directListenerContainerFactory() {
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setBatchListener(true);
factory.setBatchListener(false);
factory.setConsumerBatchEnabled(true);
factory.setBatchSize(2);
return factory;
Expand Down Expand Up @@ -202,7 +202,7 @@ public void listen2(List<Message<Foo>> in) {
this.fooMessagesLatch.countDown();
}

@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")
@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory", batch = "true")
public void listen3(List<Foo> in) {
this.foosConsumerBatchToo = in;
this.fooConsumerBatchTooLatch.countDown();
Expand Down

0 comments on commit 515eb9a

Please sign in to comment.