Skip to content

Commit

Permalink
GH-1561: Alwways run stop callback
Browse files Browse the repository at this point in the history
Fixes #1561

* GH-1561 run callback when container is stopping for abort too
* GH-1561 add author information

**Cherry-pick to `2.4.x`**
  • Loading branch information
timbq authored Feb 7, 2023
1 parent b3a4b25 commit 931896d
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -79,6 +79,7 @@
* @author Alex Panchenko
* @author Mat Jaggard
* @author Yansong Ren
* @author Tim Bourquin
*
* @since 1.0
*/
Expand Down Expand Up @@ -622,6 +623,7 @@ private void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
Thread thread = this.containerStoppingForAbort.get();
if (thread != null && !thread.equals(Thread.currentThread())) {
logger.info("Shutdown ignored - container is stopping due to an aborted consumer");
runCallbackIfNotNull(callback);
return;
}

Expand All @@ -641,6 +643,7 @@ private void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
}
else {
logger.info("Shutdown ignored - container is already stopped");
runCallbackIfNotNull(callback);
return;
}
}
Expand Down Expand Up @@ -674,9 +677,7 @@ private void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
this.cancellationLock.deactivate();
}

if (callback != null) {
callback.run();
}
runCallbackIfNotNull(callback);
};
if (callback == null) {
awaitShutdown.run();
Expand All @@ -686,6 +687,12 @@ private void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
}
}

private void runCallbackIfNotNull(@Nullable Runnable callback) {
if (callback != null) {
callback.run();
}
}

private boolean isActive(BlockingQueueConsumer consumer) {
boolean consumerActive;
synchronized (this.consumersMonitor) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -108,6 +108,7 @@
* @author Artem Bilan
* @author Mohammad Hewedy
* @author Yansong Ren
* @author Tim Bourquin
*/
public class SimpleMessageListenerContainerTests {

Expand Down Expand Up @@ -431,6 +432,29 @@ protected void setUpMockCancel(Channel channel, final List<Consumer> consumers)
}).given(channel).basicCancel(anyString());
}

@Test
public void testCallbackIsRunOnStopAlsoWhenNoConsumerIsActive() throws InterruptedException {
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);

final CountDownLatch countDownLatch = new CountDownLatch(1);
container.stop(countDownLatch::countDown);
assertThat(countDownLatch.await(100, TimeUnit.MILLISECONDS)).isTrue();
}

@Test
public void testCallbackIsRunOnStopAlsoWhenContainerIsStoppingForAbort() throws InterruptedException {
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
ReflectionTestUtils.setField(container, "containerStoppingForAbort", new AtomicReference<>(new Thread()));

final CountDownLatch countDownLatch = new CountDownLatch(1);
container.stop(countDownLatch::countDown);
assertThat(countDownLatch.await(100, TimeUnit.MILLISECONDS)).isTrue();
}

@Test
public void testWithConnectionPerListenerThread() throws Exception {
com.rabbitmq.client.ConnectionFactory mockConnectionFactory =
Expand Down

0 comments on commit 931896d

Please sign in to comment.