Skip to content

Commit

Permalink
GH-1533: Template Receive with Consumer Args
Browse files Browse the repository at this point in the history
Resolves #1533

Allow setting consumer arguments when using non-zero receive
timeouts using the `RabbitTemplate`.

**cherry-pick to 2.4.x**

# Conflicts:
#	spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java
  • Loading branch information
garyrussell authored and artembilan committed Nov 15, 2022
1 parent 0ff3eb9 commit 11d4282
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count

private final AtomicInteger containerInstance = new AtomicInteger();

private final Map<String, Object> consumerArgs = new HashMap<>();


private String exchange = DEFAULT_EXCHANGE;

private String routingKey = DEFAULT_ROUTING_KEY;
Expand Down Expand Up @@ -936,6 +939,33 @@ public int getUnconfirmedCount() {
.sum();
}

/**
* When using receive methods with a non-zero timeout, a
* {@link com.rabbitmq.client.Consumer} is created to receive the message. Use this
* property to add arguments to the consumer (e.g. {@code x-priority}).
* @param arg the argument name to pass into the {@code basicConsume} operation.
* @param value the argument value to pass into the {@code basicConsume} operation.
* @since 2.4.8
* @see #removeConsumerArg(String)
*/
public void addConsumerArg(String arg, Object value) {
this.consumerArgs.put(arg, value);
}

/**
* When using receive methods with a non-zero timeout, a
* {@link com.rabbitmq.client.Consumer} is created to receive the message. Use this
* method to remove an argument from those passed into the {@code basicConsume}
* operation.
* @param arg the argument name.
* @return the previous value.
* @since 2.4.8
* @see #addConsumerArg(String, Object)
*/
public Object removeConsumerArg(String arg) {
return this.consumerArgs.remove(arg);
}

@Override
public void start() {
doStart();
Expand Down Expand Up @@ -2743,7 +2773,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie
}

};
channel.basicConsume(queueName, consumer);
channel.basicConsume(queueName, false, this.consumerArgs, consumer);
if (!latch.await(timeoutMillis, TimeUnit.MILLISECONDS)) {
if (channel instanceof ChannelProxy) {
((ChannelProxy) channel).getTargetChannel().close();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -335,8 +335,10 @@ class MockChannel extends PublisherCallbackChannelImpl {
}

@Override
public String basicConsume(String queue, Consumer callback) throws IOException {
return super.basicConsume(queue, new MockConsumer(callback));
public String basicConsume(String queue, boolean autoAck, Map<String, Object> args, Consumer callback)
throws IOException {

return super.basicConsume(queue, autoAck, args, new MockConsumer(callback));
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer;
Expand All @@ -49,6 +50,7 @@
import java.util.concurrent.atomic.AtomicReference;

import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import org.springframework.amqp.AmqpAuthenticationException;
Expand Down Expand Up @@ -658,6 +660,37 @@ void resourcesClearedAfterTxFailsWithSync() throws IOException, TimeoutException
ConnectionFactoryUtils.enableAfterCompletionFailureCapture(false);
}

@Test
void consumerArgs() throws Exception {
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);
Connection mockConnection = mock(Connection.class);
Channel mockChannel = mock(Channel.class);

given(mockConnectionFactory.newConnection(any(ExecutorService.class), anyString())).willReturn(mockConnection);
given(mockConnection.isOpen()).willReturn(true);
given(mockConnection.createChannel()).willReturn(mockChannel);
willAnswer(inv -> {
Consumer consumer = inv.getArgument(3);
consumer.handleConsumeOk("tag");
return null;
}).given(mockChannel).basicConsume(any(), anyBoolean(), anyMap(), any());

SingleConnectionFactory connectionFactory = new SingleConnectionFactory(mockConnectionFactory);
connectionFactory.setExecutor(mock(ExecutorService.class));
RabbitTemplate template = new RabbitTemplate(connectionFactory);
assertThat(template.receive("foo", 1)).isNull();
@SuppressWarnings("unchecked")
ArgumentCaptor<Map<String, Object>> argsCaptor = ArgumentCaptor.forClass(Map.class);
verify(mockChannel).basicConsume(eq("foo"), eq(false), argsCaptor.capture(), any());
assertThat(argsCaptor.getValue()).isEmpty();
template.addConsumerArg("x-priority", 10);
assertThat(template.receive("foo", 1)).isNull();
assertThat(argsCaptor.getValue()).containsEntry("x-priority", 10);
assertThat(template.removeConsumerArg("x-priority")).isEqualTo(10);
assertThat(template.receive("foo", 1)).isNull();
assertThat(argsCaptor.getValue()).isEmpty();
}

@SuppressWarnings("serial")
private class TestTransactionManager extends AbstractPlatformTransactionManager {

Expand Down
5 changes: 4 additions & 1 deletion src/reference/asciidoc/amqp.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1885,11 +1885,14 @@ By default, if no message is available, `null` is returned immediately.
There is no blocking.
Starting with version 1.5, you can set a `receiveTimeout`, in milliseconds, and the receive methods block for up to that long, waiting for a message.
A value less than zero means block indefinitely (or at least until the connection to the broker is lost).
Version 1.6 introduced variants of the `receive` methods that let the timeout be passed in on each call.
Version 1.6 introduced variants of the `receive` methods that allows the timeout be passed in on each call.

CAUTION: Since the receive operation creates a new `QueueingConsumer` for each message, this technique is not really appropriate for high-volume environments.
Consider using an asynchronous consumer or a `receiveTimeout` of zero for those use cases.

Starting with version 2.4.8, when using a non-zero timeout, you can specify arguments passed into the `basicConsume` method used to associate the consumer with the channel.
For example: `template.addConsumerArg("x-priority", 10)`.

There are four simple `receive` methods available.
As with the `Exchange` on the sending side, there is a method that requires that a default queue property has been set
directly on the template itself, and there is a method that accepts a queue parameter at runtime.
Expand Down

0 comments on commit 11d4282

Please sign in to comment.