Skip to content

Commit

Permalink
DMLC: Publish event for connection failure
Browse files Browse the repository at this point in the history
The `DirectMessageListenerContainer` did not publish a listener
failed event for a connection failure.

**cherry-pick to all 2.x branches**

* Destroy beans in new test.

# Conflicts:
#	spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainerIntegrationTests.java
  • Loading branch information
garyrussell authored and artembilan committed Jun 11, 2019
1 parent bfd023f commit 68a6411
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,7 @@ private void doConsumeFromQueue(String queue) {
connection = getConnectionFactory().createConnection();
}
catch (Exception e) {
publishConsumerFailedEvent(e.getMessage(), false, e);
addConsumerToRestart(new SimpleConsumer(null, null, queue));
throw e instanceof AmqpConnectException
? (AmqpConnectException) e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -151,6 +152,36 @@ public void testSimple() throws Exception {
assertEquals(0, TestUtils.getPropertyValue(container, "consumersByQueue", MultiValueMap.class).size());
template.stop();
cf.destroy();
executor.destroy();
}

@Test
public void testBadHost() throws InterruptedException {
CachingConnectionFactory cf = new CachingConnectionFactory("this.host.does.not.exist");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("client-");
executor.afterPropertiesSet();
cf.setExecutor(executor);
DirectMessageListenerContainer container = new DirectMessageListenerContainer(cf);
container.setQueueNames("dummy");
container.setConsumersPerQueue(2);
container.setMessageListener(in -> {
});
container.setBeanName("badHost");
container.setConsumerTagStrategy(new Tag());
CountDownLatch latch = new CountDownLatch(1);
container.setApplicationEventPublisher(ev -> {
if (ev instanceof ListenerContainerConsumerFailedEvent) {
latch.countDown();
}
});
container.setRecoveryInterval(100);
container.afterPropertiesSet();
container.start();
assertTrue(latch.await(10, TimeUnit.SECONDS));
container.stop();
cf.destroy();
executor.destroy();
}

@Test
Expand Down Expand Up @@ -222,6 +253,7 @@ public void testQueueManagement() throws Exception {
assertEquals(0, TestUtils.getPropertyValue(container, "consumersByQueue", MultiValueMap.class).size());
template.stop();
cf.destroy();
executor.destroy();
}

@Test
Expand Down Expand Up @@ -264,6 +296,7 @@ public void testQueueManagementQueueInstances() throws Exception {
assertEquals(0, TestUtils.getPropertyValue(container, "consumersByQueue", MultiValueMap.class).size());
template.stop();
cf.destroy();
executor.destroy();
}

@Test
Expand Down Expand Up @@ -306,6 +339,7 @@ public void testAddRemoveConsumers() throws Exception {
assertEquals(0, TestUtils.getPropertyValue(container, "consumersByQueue", MultiValueMap.class).size());
template.stop();
cf.destroy();
executor.destroy();
}

@Test
Expand Down Expand Up @@ -347,8 +381,8 @@ public void testErrorHandler() throws Exception {
.put("x-dead-letter-routing-key", DLQ1)
.get());
CachingConnectionFactory cf = new CachingConnectionFactory("localhost");
RabbitAdmin admin = new RabbitAdmin(cf);
admin.declareQueue(q1);
RabbitAdmin rabbitAdmin = new RabbitAdmin(cf);
rabbitAdmin.declareQueue(q1);
DirectMessageListenerContainer container = new DirectMessageListenerContainer(cf);
container.setQueueNames(Q1);
container.setConsumersPerQueue(2);
Expand Down Expand Up @@ -457,13 +491,15 @@ public void testCancelConsumerBeforeConsumeOk() throws Exception {
container.start();
assertTrue(latch1.await(10, TimeUnit.SECONDS));
Consumer consumer = consumerCaptor.getValue();
Executors.newSingleThreadExecutor().execute(() -> {
ExecutorService exec = Executors.newSingleThreadExecutor();
exec.execute(() -> {
container.stop();
latch2.countDown();
});
assertTrue(latch2.await(10, TimeUnit.SECONDS));
verify(channel).basicCancel(tag); // canceled properly even without consumeOk
consumer.handleCancelOk(tag);
exec.shutdownNow();
}

@Test
Expand All @@ -482,9 +518,9 @@ private void testRecoverDeletedQueueGuts(boolean autoDeclare) throws Exception {
if (autoDeclare) {
GenericApplicationContext context = new GenericApplicationContext();
context.getBeanFactory().registerSingleton("foo", new Queue(Q1));
RabbitAdmin admin = new RabbitAdmin(cf);
admin.setApplicationContext(context);
context.getBeanFactory().registerSingleton("admin", admin);
RabbitAdmin rabbitAdmin = new RabbitAdmin(cf);
rabbitAdmin.setApplicationContext(context);
context.getBeanFactory().registerSingleton("admin", rabbitAdmin);
context.refresh();
container.setApplicationContext(context);
}
Expand All @@ -505,10 +541,10 @@ private void testRecoverDeletedQueueGuts(boolean autoDeclare) throws Exception {
assertTrue(consumersOnQueue(Q2, 2));
assertTrue(activeConsumerCount(container, 2));
assertTrue(restartConsumerCount(container, 2));
RabbitAdmin admin = new RabbitAdmin(cf);
RabbitAdmin rabbitAdmin = new RabbitAdmin(cf);
if (!autoDeclare) {
Thread.sleep(2000);
admin.declareQueue(new Queue(Q1));
rabbitAdmin.declareQueue(new Queue(Q1));
}
assertTrue(consumersOnQueue(Q1, 2));
assertTrue(consumersOnQueue(Q2, 2));
Expand Down

0 comments on commit 68a6411

Please sign in to comment.