From 8045f2c8661e2abdcd0955266543de33fa18ab36 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Wed, 12 Oct 2022 14:53:00 -0400 Subject: [PATCH] GH-1419: Fix Early Exit in NodeLocator If a node was returned by the REST call and the node was not in the map of nodes to addreses, the loop exited early. The incorrect variable was being tested (never null). Also add a more sophisticated integration test - using 2 brokers, ensure that the correct broker is located for the queue. --- build.gradle | 5 +- .../junit/AbstractTestContainerTests.java | 15 ++-- .../config/SuperStreamProvisioningTests.java | 4 +- .../stream/listener/RabbitListenerTests.java | 4 +- .../SuperStreamConcurrentSACTests.java | 4 +- .../stream/listener/SuperStreamSACTests.java | 4 +- .../LocalizedQueueConnectionFactory.java | 2 +- ...ueueConnectionFactoryIntegrationTests.java | 83 +++++++++++++++---- .../rabbit/connection/NodeLocatorTests.java | 69 +++++++++++++++ 9 files changed, 158 insertions(+), 32 deletions(-) rename spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/support/AbstractIntegrationTests.java => spring-rabbit-junit/src/main/java/org/springframework/amqp/rabbit/junit/AbstractTestContainerTests.java (81%) create mode 100644 spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/NodeLocatorTests.java diff --git a/build.gradle b/build.gradle index 2684b30b7c..e42d3f4964 100644 --- a/build.gradle +++ b/build.gradle @@ -67,6 +67,7 @@ ext { springDataVersion = '2022.0.0-SNAPSHOT' springVersion = project.hasProperty('springVersion') ? project.springVersion : '6.0.0-SNAPSHOT' springRetryVersion = '2.0.0-SNAPSHOT' + testContainersVersion = '1.17.3' zstdJniVersion = '1.5.0-2' } @@ -410,6 +411,7 @@ project('spring-rabbit') { testImplementation 'io.micrometer:micrometer-tracing-bridge-brave' testImplementation 'io.micrometer:micrometer-tracing-test' testImplementation 'io.micrometer:micrometer-tracing-integration-test' + testImplementation "org.testcontainers:rabbitmq:$testContainersVersion" testRuntimeOnly 'com.fasterxml.jackson.core:jackson-core' testRuntimeOnly 'com.fasterxml.jackson.core:jackson-databind' testRuntimeOnly 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml' @@ -474,7 +476,7 @@ project('spring-rabbit-stream') { testRuntimeOnly "org.xerial.snappy:snappy-java:$snappyVersion" testRuntimeOnly "org.lz4:lz4-java:$lz4Version" testRuntimeOnly "com.github.luben:zstd-jni:$zstdJniVersion" - testImplementation "org.testcontainers:rabbitmq:1.17.3" + testImplementation "org.testcontainers:rabbitmq:$testContainersVersion" testImplementation "org.apache.logging.log4j:log4j-slf4j-impl:$log4jVersion" testImplementation 'org.springframework:spring-webflux' } @@ -495,6 +497,7 @@ project('spring-rabbit-junit') { api 'org.springframework:spring-web' api 'org.junit.jupiter:junit-jupiter-api' api "org.assertj:assertj-core:$assertjVersion" + optionalApi "org.testcontainers:rabbitmq:$testContainersVersion" optionalApi "ch.qos.logback:logback-classic:$logbackVersion" optionalApi 'org.apache.logging.log4j:log4j-core' compileOnly 'org.apiguardian:apiguardian-api:1.0.0' diff --git a/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/support/AbstractIntegrationTests.java b/spring-rabbit-junit/src/main/java/org/springframework/amqp/rabbit/junit/AbstractTestContainerTests.java similarity index 81% rename from spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/support/AbstractIntegrationTests.java rename to spring-rabbit-junit/src/main/java/org/springframework/amqp/rabbit/junit/AbstractTestContainerTests.java index 9b6f1575d5..7504a46595 100644 --- a/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/support/AbstractIntegrationTests.java +++ b/spring-rabbit-junit/src/main/java/org/springframework/amqp/rabbit/junit/AbstractTestContainerTests.java @@ -14,11 +14,10 @@ * limitations under the License. */ -package org.springframework.rabbit.stream.support; +package org.springframework.amqp.rabbit.junit; import java.time.Duration; -import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.RabbitMQContainer; /** @@ -26,9 +25,9 @@ * @since 2.4 * */ -public abstract class AbstractIntegrationTests { +public abstract class AbstractTestContainerTests { - static final GenericContainer RABBITMQ; + protected static final RabbitMQContainer RABBITMQ; static { if (System.getProperty("spring.rabbit.use.local.server") == null @@ -40,7 +39,7 @@ public abstract class AbstractIntegrationTests { } RABBITMQ = new RabbitMQContainer(image) .withExposedPorts(5672, 15672, 5552) - .withPluginsEnabled("rabbitmq_stream", "rabbitmq_management") + .withPluginsEnabled("rabbitmq_stream") .withStartupTimeout(Duration.ofMinutes(2)); RABBITMQ.start(); } @@ -50,7 +49,7 @@ public abstract class AbstractIntegrationTests { } public static int amqpPort() { - return RABBITMQ != null ? RABBITMQ.getMappedPort(5672) : 5672; + return RABBITMQ != null ? RABBITMQ.getAmqpPort() : 5672; } public static int managementPort() { @@ -61,4 +60,8 @@ public static int streamPort() { return RABBITMQ != null ? RABBITMQ.getMappedPort(5552) : 5552; } + public static String restUri() { + return RABBITMQ.getHttpUrl() + "/api/"; + } + } diff --git a/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/config/SuperStreamProvisioningTests.java b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/config/SuperStreamProvisioningTests.java index b96eb88125..b87daf8db8 100644 --- a/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/config/SuperStreamProvisioningTests.java +++ b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/config/SuperStreamProvisioningTests.java @@ -28,10 +28,10 @@ import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.amqp.rabbit.junit.AbstractTestContainerTests; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.rabbit.stream.support.AbstractIntegrationTests; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; /** @@ -40,7 +40,7 @@ * */ @SpringJUnitConfig -public class SuperStreamProvisioningTests extends AbstractIntegrationTests { +public class SuperStreamProvisioningTests extends AbstractTestContainerTests { @Test void provision(@Autowired Declarables declarables, @Autowired CachingConnectionFactory cf, diff --git a/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/RabbitListenerTests.java b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/RabbitListenerTests.java index ba4bd3e534..0ea7fc50ea 100644 --- a/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/RabbitListenerTests.java +++ b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/RabbitListenerTests.java @@ -40,6 +40,7 @@ import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.junit.AbstractTestContainerTests; import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.SmartLifecycle; @@ -51,7 +52,6 @@ import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory; import org.springframework.rabbit.stream.producer.RabbitStreamTemplate; import org.springframework.rabbit.stream.retry.StreamRetryOperationsInterceptorFactoryBean; -import org.springframework.rabbit.stream.support.AbstractIntegrationTests; import org.springframework.rabbit.stream.support.StreamMessageProperties; import org.springframework.retry.interceptor.RetryOperationsInterceptor; import org.springframework.test.annotation.DirtiesContext; @@ -73,7 +73,7 @@ */ @SpringJUnitConfig @DirtiesContext -public class RabbitListenerTests extends AbstractIntegrationTests { +public class RabbitListenerTests extends AbstractTestContainerTests { @Autowired Config config; diff --git a/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/SuperStreamConcurrentSACTests.java b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/SuperStreamConcurrentSACTests.java index f089a2ca8b..40e7fdbada 100644 --- a/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/SuperStreamConcurrentSACTests.java +++ b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/SuperStreamConcurrentSACTests.java @@ -32,11 +32,11 @@ import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.junit.AbstractTestContainerTests; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.rabbit.stream.config.SuperStream; -import org.springframework.rabbit.stream.support.AbstractIntegrationTests; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import com.rabbitmq.stream.Address; @@ -49,7 +49,7 @@ * */ @SpringJUnitConfig -public class SuperStreamConcurrentSACTests extends AbstractIntegrationTests { +public class SuperStreamConcurrentSACTests extends AbstractTestContainerTests { @Test void concurrent(@Autowired StreamListenerContainer container, @Autowired RabbitTemplate template, diff --git a/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/SuperStreamSACTests.java b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/SuperStreamSACTests.java index 2dd1a4614e..595e18cb5b 100644 --- a/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/SuperStreamSACTests.java +++ b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/SuperStreamSACTests.java @@ -37,6 +37,7 @@ import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.junit.AbstractTestContainerTests; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.ApplicationContext; @@ -44,7 +45,6 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; import org.springframework.rabbit.stream.config.SuperStream; -import org.springframework.rabbit.stream.support.AbstractIntegrationTests; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import com.rabbitmq.stream.Address; @@ -57,7 +57,7 @@ * */ @SpringJUnitConfig -public class SuperStreamSACTests extends AbstractIntegrationTests { +public class SuperStreamSACTests extends AbstractTestContainerTests { @Test void superStream(@Autowired ApplicationContext context, @Autowired RabbitTemplate template, diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactory.java index aa25c7a5ff..ab96518bee 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactory.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactory.java @@ -398,7 +398,7 @@ default ConnectionFactory locate(String[] adminUris, Map nodeToA String node = (String) queueInfo.get("node"); if (node != null) { String nodeUri = nodeToAddress.get(node); - if (uri != null) { + if (nodeUri != null) { close(client); return factoryFunction.locate(queue, node, nodeUri); } diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactoryIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactoryIntegrationTests.java index 9c2a2dd7d2..b641bb69e2 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactoryIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactoryIntegrationTests.java @@ -19,17 +19,25 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.Map; -import java.util.UUID; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.AnonymousQueue; import org.springframework.amqp.rabbit.core.RabbitAdmin; -import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.junit.AbstractTestContainerTests; import org.springframework.amqp.rabbit.junit.RabbitAvailable; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.MediaType; +import org.springframework.web.reactive.function.client.ExchangeFilterFunctions; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.util.UriUtils; /** @@ -37,18 +45,27 @@ * @author Gary Russell */ @RabbitAvailable(management = true, queues = "local") -public class LocalizedQueueConnectionFactoryIntegrationTests { +public class LocalizedQueueConnectionFactoryIntegrationTests extends AbstractTestContainerTests { private LocalizedQueueConnectionFactory lqcf; private CachingConnectionFactory defaultConnectionFactory; + private CachingConnectionFactory testContainerFactory; + + private RabbitAdmin defaultAdmin; + + private RabbitAdmin testContainerAdmin; + @BeforeEach public void setup() { this.defaultConnectionFactory = new CachingConnectionFactory("localhost"); - String[] addresses = new String[] { "localhost:9999", "localhost:5672" }; - String[] adminUris = new String[] { "http://localhost:15672", "http://localhost:15672" }; - String[] nodes = new String[] { "foo@bar", "rabbit@localhost" }; + this.defaultAdmin = new RabbitAdmin(this.defaultConnectionFactory); + this.testContainerFactory = new CachingConnectionFactory("localhost", amqpPort()); + this.testContainerAdmin = new RabbitAdmin(this.testContainerFactory); + String[] addresses = new String[] { "localhost:5672", "localhost:" + amqpPort() }; + String[] adminUris = new String[] { "http://localhost:15672", "http://localhost:" + managementPort() }; + String[] nodes = new String[] { "rabbit@localhost", findTcNode() }; String vhost = "/"; String username = "guest"; String password = "guest"; @@ -60,18 +77,24 @@ public void setup() { public void tearDown() { this.lqcf.destroy(); this.defaultConnectionFactory.destroy(); + this.testContainerFactory.destroy(); } @Test - public void testConnect() throws Exception { - RabbitAdmin admin = new RabbitAdmin(this.lqcf); - Queue queue = new Queue(UUID.randomUUID().toString(), false, false, true); - admin.declareQueue(queue); - ConnectionFactory targetConnectionFactory = this.lqcf.getTargetConnectionFactory("[" + queue.getName() + "]"); - RabbitTemplate template = new RabbitTemplate(targetConnectionFactory); - template.convertAndSend("", queue.getName(), "foo"); - assertThat(template.receiveAndConvert(queue.getName())).isEqualTo("foo"); - admin.deleteQueue(queue.getName()); + public void testFindCorrectConnection() throws Exception { + AnonymousQueue externalQueue = new AnonymousQueue(); + AnonymousQueue tcQueue = new AnonymousQueue(); + this.defaultAdmin.declareQueue(externalQueue); + this.testContainerAdmin.declareQueue(tcQueue); + ConnectionFactory cf = this.lqcf + .getTargetConnectionFactory("[" + externalQueue.getName() + "]"); + assertThat(cf).isNotSameAs(this.defaultConnectionFactory); + assertThat(this.defaultAdmin.getQueueProperties(externalQueue.getName())).isNotNull(); + cf = this.lqcf.getTargetConnectionFactory("[" + tcQueue.getName() + "]"); + assertThat(cf).isNotSameAs(this.defaultConnectionFactory); + assertThat(this.testContainerAdmin.getQueueProperties(tcQueue.getName())).isNotNull(); + this.defaultAdmin.deleteQueue(externalQueue.getName()); + this.testContainerAdmin.deleteQueue(tcQueue.getName()); } @Test @@ -89,4 +112,32 @@ void findLocal() { lqcf.destroy(); } + private String findTcNode() { + AnonymousQueue queue = new AnonymousQueue(); + this.testContainerAdmin.declareQueue(queue); + URI uri; + try { + uri = new URI(restUri()) + .resolve("/api/queues/" + UriUtils.encodePathSegment("/", StandardCharsets.UTF_8) + "/" + + queue.getName()); + } + catch (URISyntaxException ex) { + throw new IllegalStateException(ex); + } + WebClient client = WebClient.builder() + .filter(ExchangeFilterFunctions.basicAuthentication(RABBITMQ.getAdminUsername(), + RABBITMQ.getAdminPassword())) + .build(); + Map queueInfo = client.get() + .uri(uri) + .accept(MediaType.APPLICATION_JSON) + .retrieve() + .bodyToMono(new ParameterizedTypeReference>() { + }) + .block(Duration.ofSeconds(10)); + this.testContainerAdmin.deleteQueue(queue.getName()); + return (String) queueInfo.get("node"); + } + + } diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/NodeLocatorTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/NodeLocatorTests.java new file mode 100644 index 0000000000..b05efe28fe --- /dev/null +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/NodeLocatorTests.java @@ -0,0 +1,69 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.amqp.rabbit.connection; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.net.URISyntaxException; +import java.util.Map; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import org.springframework.amqp.rabbit.connection.LocalizedQueueConnectionFactory.NodeLocator; +import org.springframework.lang.Nullable; + +/** + * @author Gary Russell + * @since 3.0 + * + */ +public class NodeLocatorTests { + + @Test + @DisplayName("don't exit early when node to address missing") + void missingNode() throws URISyntaxException { + + NodeLocator nodeLocator = spy(new NodeLocator() { + + @Override + public Object createClient(String userName, String password) { + return null; + } + + @Override + @Nullable + public Map restCall(Object client, String baseUri, String vhost, String queue) { + if (baseUri.contains("foo")) { + return Map.of("node", "c@d"); + } + else { + return Map.of("node", "a@b"); + } + } + }); + ConnectionFactory factory = nodeLocator.locate(new String[] { "http://foo", "http://bar" }, + Map.of("a@b", "baz"), null, "q", null, null, (q, n, u) -> { + return null; + }); + verify(nodeLocator, times(2)).restCall(any(), any(), any(), any()); + } + +}