Skip to content

Commit

Permalink
GH-1419: Fix Early Exit in NodeLocator
Browse files Browse the repository at this point in the history
If a node was returned by the REST call and the node was not in the map
of nodes to addreses, the loop exited early.

The incorrect variable was being tested (never null).

Also add a more sophisticated integration test - using 2 brokers, ensure that
the correct broker is located for the queue.
  • Loading branch information
garyrussell authored and artembilan committed Oct 12, 2022
1 parent 8b4dd86 commit 8045f2c
Show file tree
Hide file tree
Showing 9 changed files with 158 additions and 32 deletions.
5 changes: 4 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ ext {
springDataVersion = '2022.0.0-SNAPSHOT'
springVersion = project.hasProperty('springVersion') ? project.springVersion : '6.0.0-SNAPSHOT'
springRetryVersion = '2.0.0-SNAPSHOT'
testContainersVersion = '1.17.3'
zstdJniVersion = '1.5.0-2'
}

Expand Down Expand Up @@ -410,6 +411,7 @@ project('spring-rabbit') {
testImplementation 'io.micrometer:micrometer-tracing-bridge-brave'
testImplementation 'io.micrometer:micrometer-tracing-test'
testImplementation 'io.micrometer:micrometer-tracing-integration-test'
testImplementation "org.testcontainers:rabbitmq:$testContainersVersion"
testRuntimeOnly 'com.fasterxml.jackson.core:jackson-core'
testRuntimeOnly 'com.fasterxml.jackson.core:jackson-databind'
testRuntimeOnly 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml'
Expand Down Expand Up @@ -474,7 +476,7 @@ project('spring-rabbit-stream') {
testRuntimeOnly "org.xerial.snappy:snappy-java:$snappyVersion"
testRuntimeOnly "org.lz4:lz4-java:$lz4Version"
testRuntimeOnly "com.github.luben:zstd-jni:$zstdJniVersion"
testImplementation "org.testcontainers:rabbitmq:1.17.3"
testImplementation "org.testcontainers:rabbitmq:$testContainersVersion"
testImplementation "org.apache.logging.log4j:log4j-slf4j-impl:$log4jVersion"
testImplementation 'org.springframework:spring-webflux'
}
Expand All @@ -495,6 +497,7 @@ project('spring-rabbit-junit') {
api 'org.springframework:spring-web'
api 'org.junit.jupiter:junit-jupiter-api'
api "org.assertj:assertj-core:$assertjVersion"
optionalApi "org.testcontainers:rabbitmq:$testContainersVersion"
optionalApi "ch.qos.logback:logback-classic:$logbackVersion"
optionalApi 'org.apache.logging.log4j:log4j-core'
compileOnly 'org.apiguardian:apiguardian-api:1.0.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,20 @@
* limitations under the License.
*/

package org.springframework.rabbit.stream.support;
package org.springframework.amqp.rabbit.junit;

import java.time.Duration;

import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.RabbitMQContainer;

/**
* @author Gary Russell
* @since 2.4
*
*/
public abstract class AbstractIntegrationTests {
public abstract class AbstractTestContainerTests {

static final GenericContainer<?> RABBITMQ;
protected static final RabbitMQContainer RABBITMQ;

static {
if (System.getProperty("spring.rabbit.use.local.server") == null
Expand All @@ -40,7 +39,7 @@ public abstract class AbstractIntegrationTests {
}
RABBITMQ = new RabbitMQContainer(image)
.withExposedPorts(5672, 15672, 5552)
.withPluginsEnabled("rabbitmq_stream", "rabbitmq_management")
.withPluginsEnabled("rabbitmq_stream")
.withStartupTimeout(Duration.ofMinutes(2));
RABBITMQ.start();
}
Expand All @@ -50,7 +49,7 @@ public abstract class AbstractIntegrationTests {
}

public static int amqpPort() {
return RABBITMQ != null ? RABBITMQ.getMappedPort(5672) : 5672;
return RABBITMQ != null ? RABBITMQ.getAmqpPort() : 5672;
}

public static int managementPort() {
Expand All @@ -61,4 +60,8 @@ public static int streamPort() {
return RABBITMQ != null ? RABBITMQ.getMappedPort(5552) : 5552;
}

public static String restUri() {
return RABBITMQ.getHttpUrl() + "/api/";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.junit.AbstractTestContainerTests;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.rabbit.stream.support.AbstractIntegrationTests;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

/**
Expand All @@ -40,7 +40,7 @@
*
*/
@SpringJUnitConfig
public class SuperStreamProvisioningTests extends AbstractIntegrationTests {
public class SuperStreamProvisioningTests extends AbstractTestContainerTests {

@Test
void provision(@Autowired Declarables declarables, @Autowired CachingConnectionFactory cf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.junit.AbstractTestContainerTests;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.SmartLifecycle;
Expand All @@ -51,7 +52,6 @@
import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory;
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
import org.springframework.rabbit.stream.retry.StreamRetryOperationsInterceptorFactoryBean;
import org.springframework.rabbit.stream.support.AbstractIntegrationTests;
import org.springframework.rabbit.stream.support.StreamMessageProperties;
import org.springframework.retry.interceptor.RetryOperationsInterceptor;
import org.springframework.test.annotation.DirtiesContext;
Expand All @@ -73,7 +73,7 @@
*/
@SpringJUnitConfig
@DirtiesContext
public class RabbitListenerTests extends AbstractIntegrationTests {
public class RabbitListenerTests extends AbstractTestContainerTests {

@Autowired
Config config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.junit.AbstractTestContainerTests;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.rabbit.stream.config.SuperStream;
import org.springframework.rabbit.stream.support.AbstractIntegrationTests;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

import com.rabbitmq.stream.Address;
Expand All @@ -49,7 +49,7 @@
*
*/
@SpringJUnitConfig
public class SuperStreamConcurrentSACTests extends AbstractIntegrationTests {
public class SuperStreamConcurrentSACTests extends AbstractTestContainerTests {

@Test
void concurrent(@Autowired StreamListenerContainer container, @Autowired RabbitTemplate template,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.junit.AbstractTestContainerTests;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.rabbit.stream.config.SuperStream;
import org.springframework.rabbit.stream.support.AbstractIntegrationTests;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

import com.rabbitmq.stream.Address;
Expand All @@ -57,7 +57,7 @@
*
*/
@SpringJUnitConfig
public class SuperStreamSACTests extends AbstractIntegrationTests {
public class SuperStreamSACTests extends AbstractTestContainerTests {

@Test
void superStream(@Autowired ApplicationContext context, @Autowired RabbitTemplate template,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ default ConnectionFactory locate(String[] adminUris, Map<String, String> nodeToA
String node = (String) queueInfo.get("node");
if (node != null) {
String nodeUri = nodeToAddress.get(node);
if (uri != null) {
if (nodeUri != null) {
close(client);
return factoryFunction.locate(queue, node, nodeUri);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,53 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Map;
import java.util.UUID;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.junit.AbstractTestContainerTests;
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.ExchangeFilterFunctions;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.UriUtils;


/**
*
* @author Gary Russell
*/
@RabbitAvailable(management = true, queues = "local")
public class LocalizedQueueConnectionFactoryIntegrationTests {
public class LocalizedQueueConnectionFactoryIntegrationTests extends AbstractTestContainerTests {

private LocalizedQueueConnectionFactory lqcf;

private CachingConnectionFactory defaultConnectionFactory;

private CachingConnectionFactory testContainerFactory;

private RabbitAdmin defaultAdmin;

private RabbitAdmin testContainerAdmin;

@BeforeEach
public void setup() {
this.defaultConnectionFactory = new CachingConnectionFactory("localhost");
String[] addresses = new String[] { "localhost:9999", "localhost:5672" };
String[] adminUris = new String[] { "http://localhost:15672", "http://localhost:15672" };
String[] nodes = new String[] { "foo@bar", "rabbit@localhost" };
this.defaultAdmin = new RabbitAdmin(this.defaultConnectionFactory);
this.testContainerFactory = new CachingConnectionFactory("localhost", amqpPort());
this.testContainerAdmin = new RabbitAdmin(this.testContainerFactory);
String[] addresses = new String[] { "localhost:5672", "localhost:" + amqpPort() };
String[] adminUris = new String[] { "http://localhost:15672", "http://localhost:" + managementPort() };
String[] nodes = new String[] { "rabbit@localhost", findTcNode() };
String vhost = "/";
String username = "guest";
String password = "guest";
Expand All @@ -60,18 +77,24 @@ public void setup() {
public void tearDown() {
this.lqcf.destroy();
this.defaultConnectionFactory.destroy();
this.testContainerFactory.destroy();
}

@Test
public void testConnect() throws Exception {
RabbitAdmin admin = new RabbitAdmin(this.lqcf);
Queue queue = new Queue(UUID.randomUUID().toString(), false, false, true);
admin.declareQueue(queue);
ConnectionFactory targetConnectionFactory = this.lqcf.getTargetConnectionFactory("[" + queue.getName() + "]");
RabbitTemplate template = new RabbitTemplate(targetConnectionFactory);
template.convertAndSend("", queue.getName(), "foo");
assertThat(template.receiveAndConvert(queue.getName())).isEqualTo("foo");
admin.deleteQueue(queue.getName());
public void testFindCorrectConnection() throws Exception {
AnonymousQueue externalQueue = new AnonymousQueue();
AnonymousQueue tcQueue = new AnonymousQueue();
this.defaultAdmin.declareQueue(externalQueue);
this.testContainerAdmin.declareQueue(tcQueue);
ConnectionFactory cf = this.lqcf
.getTargetConnectionFactory("[" + externalQueue.getName() + "]");
assertThat(cf).isNotSameAs(this.defaultConnectionFactory);
assertThat(this.defaultAdmin.getQueueProperties(externalQueue.getName())).isNotNull();
cf = this.lqcf.getTargetConnectionFactory("[" + tcQueue.getName() + "]");
assertThat(cf).isNotSameAs(this.defaultConnectionFactory);
assertThat(this.testContainerAdmin.getQueueProperties(tcQueue.getName())).isNotNull();
this.defaultAdmin.deleteQueue(externalQueue.getName());
this.testContainerAdmin.deleteQueue(tcQueue.getName());
}

@Test
Expand All @@ -89,4 +112,32 @@ void findLocal() {
lqcf.destroy();
}

private String findTcNode() {
AnonymousQueue queue = new AnonymousQueue();
this.testContainerAdmin.declareQueue(queue);
URI uri;
try {
uri = new URI(restUri())
.resolve("/api/queues/" + UriUtils.encodePathSegment("/", StandardCharsets.UTF_8) + "/"
+ queue.getName());
}
catch (URISyntaxException ex) {
throw new IllegalStateException(ex);
}
WebClient client = WebClient.builder()
.filter(ExchangeFilterFunctions.basicAuthentication(RABBITMQ.getAdminUsername(),
RABBITMQ.getAdminPassword()))
.build();
Map<String, Object> queueInfo = client.get()
.uri(uri)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(new ParameterizedTypeReference<Map<String, Object>>() {
})
.block(Duration.ofSeconds(10));
this.testContainerAdmin.deleteQueue(queue.getName());
return (String) queueInfo.get("node");
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp.rabbit.connection;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.net.URISyntaxException;
import java.util.Map;

import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

import org.springframework.amqp.rabbit.connection.LocalizedQueueConnectionFactory.NodeLocator;
import org.springframework.lang.Nullable;

/**
* @author Gary Russell
* @since 3.0
*
*/
public class NodeLocatorTests {

@Test
@DisplayName("don't exit early when node to address missing")
void missingNode() throws URISyntaxException {

NodeLocator<Object> nodeLocator = spy(new NodeLocator<Object>() {

@Override
public Object createClient(String userName, String password) {
return null;
}

@Override
@Nullable
public Map<String, Object> restCall(Object client, String baseUri, String vhost, String queue) {
if (baseUri.contains("foo")) {
return Map.of("node", "c@d");
}
else {
return Map.of("node", "a@b");
}
}
});
ConnectionFactory factory = nodeLocator.locate(new String[] { "http://foo", "http://bar" },
Map.of("a@b", "baz"), null, "q", null, null, (q, n, u) -> {
return null;
});
verify(nodeLocator, times(2)).restCall(any(), any(), any(), any());
}

}

0 comments on commit 8045f2c

Please sign in to comment.