Skip to content

Commit

Permalink
GH-1487: Countdown not active AsyncMProcConsumer
Browse files Browse the repository at this point in the history
Fixes #1487

* SimpleMessageListenerContainer.AsyncMessageProcessingConsumer countdown when the container is not active
* test for AsyncMessageProcessingConsumer countdown when the container is not active
* change TestExecutor for the static and final

**Cherry-pick to `2.4.x`**
  • Loading branch information
renyansongno1 authored and artembilan committed Aug 10, 2022
1 parent 3d3dfa5 commit 38e0803
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
* @author Artem Bilan
* @author Alex Panchenko
* @author Mat Jaggard
* @author Yansong Ren
*
* @since 1.0
*/
Expand Down Expand Up @@ -1192,6 +1193,7 @@ private FatalListenerStartupException getStartupException() throws InterruptedEx
@Override // NOSONAR - complexity - many catch blocks
public void run() { // NOSONAR - line count
if (!isActive()) {
this.start.countDown();
return;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -86,6 +86,7 @@
import org.springframework.amqp.utils.test.TestUtils;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
Expand All @@ -106,6 +107,7 @@
* @author Gary Russell
* @author Artem Bilan
* @author Mohammad Hewedy
* @author Yansong Ren
*/
public class SimpleMessageListenerContainerTests {

Expand Down Expand Up @@ -731,6 +733,30 @@ void filterMppNoDoubleAck() throws Exception {
verifyNoMoreInteractions(listener);
}

@Test
void testWithConsumerStartWhenNotActive() {
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
Connection connection = mock(Connection.class);
Channel channel = mock(Channel.class);
given(connectionFactory.createConnection()).willReturn(connection);
given(connection.createChannel(false)).willReturn(channel);

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
// overwrite task execute. shutdown container before task execute.
TestExecutor testExecutor = new TestExecutor(container);
container.setTaskExecutor(testExecutor);
container.start();

// then add queue for trigger container shutdown
container.addQueueNames("bar");

// valid the 'start' countdown is 0. lastTask is AsyncMessageProcessingConsumer
Runnable lastTask = testExecutor.getLastTask();
CountDownLatch start = TestUtils.getPropertyValue(lastTask, "start", CountDownLatch.class);

assertThat(start.getCount()).isEqualTo(0L);
}

private Answer<Object> messageToConsumer(final Channel mockChannel, final SimpleMessageListenerContainer container,
final boolean cancel, final CountDownLatch latch) {
return invocation -> {
Expand Down Expand Up @@ -784,4 +810,33 @@ protected void doRollback(DefaultTransactionStatus status) throws TransactionExc

}

@SuppressWarnings("serial")
private static final class TestExecutor extends SimpleAsyncTaskExecutor {

private final SimpleMessageListenerContainer simpleMessageListenerContainer;

private int shutdownCount = 0;

private Runnable lastTask = null;

private TestExecutor(SimpleMessageListenerContainer simpleMessageListenerContainer) {
this.simpleMessageListenerContainer = simpleMessageListenerContainer;
}

public Runnable getLastTask() {
return lastTask;
}

@Override
public void execute(Runnable task) {
// skip the first execution
if (++shutdownCount > 1) {
lastTask = task;
// before execute, shutdown the container for test
this.simpleMessageListenerContainer.shutdown();
}
super.execute(task);
}
}

}

0 comments on commit 38e0803

Please sign in to comment.