From ac182a72543e3331978c863fe31bce1c16c25b8a Mon Sep 17 00:00:00 2001 From: Howard Gao Date: Sun, 18 Aug 2024 22:24:46 +0800 Subject: [PATCH] ARTEMIS-5002 AMQP producer not unblock if the disk space is freed --- .../core/paging/impl/PagingStoreImpl.java | 1 + .../integration/amqp/GlobalDiskFullTest.java | 103 ++++++++++++++---- 2 files changed, 81 insertions(+), 23 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 5bf29c32c53e..982c6e388053 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -1096,6 +1096,7 @@ public boolean checkMemory(boolean runOnFailure, Runnable runWhenAvailableParame if (isFull()) { if (runOnFailure && runWhenAvailable != null) { addToBlockList(runWhenAvailable, blockedCallback); + pagingManager.addBlockedStore(this); } return false; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java index bf260770e488..3c359c0d4569 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/GlobalDiskFullTest.java @@ -19,22 +19,54 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.lang.invoke.MethodHandles; import java.net.URI; +import java.util.Arrays; +import java.util.Collection; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.extensions.parameterized.Parameter; +import org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension; +import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +@ExtendWith(ParameterizedTestExtension.class) public class GlobalDiskFullTest extends AmqpClientTestSupport { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + @Parameter(index = 0) + public AddressFullMessagePolicy addressFullPolicy; + + @Parameters(name = "addressFullPolicy={0}") + public static Collection parameters() { + return Arrays.asList(new Object[][] { + {AddressFullMessagePolicy.FAIL}, {AddressFullMessagePolicy.DROP}, {AddressFullMessagePolicy.PAGE} + }); + } + + @Override + protected void configureAddressPolicy(ActiveMQServer server) { + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setAddressFullMessagePolicy(addressFullPolicy); + server.getConfiguration().addAddressSetting(getQueueName(), addressSettings); + } @Override protected void addConfiguration(ActiveMQServer server) { @@ -42,15 +74,37 @@ protected void addConfiguration(ActiveMQServer server) { serverConfig.setDiskScanPeriod(100); } - @Test + @TestTemplate public void testProducerOnDiskFull() throws Exception { - FileStoreMonitor monitor = ((ActiveMQServerImpl)server).getMonitor().setMaxUsage(0.0); + + FileStoreMonitor monitor = ((ActiveMQServerImpl)server).getMonitor(); + final CountDownLatch latch = new CountDownLatch(1); monitor.addCallback((usableSpace, totalSpace, ok, type) -> { - latch.countDown(); + + double usage = FileStoreMonitor.calculateUsage(usableSpace, totalSpace); + + if (type == FileStoreMonitor.FileStoreMonitorType.MaxDiskUsage && ok && usage < monitor.getMaxUsage()) { + latch.countDown(); + } + }); + + latch.await(2000, TimeUnit.MILLISECONDS); + + //make it full + monitor.setMaxUsage(0.0); + + final CountDownLatch latch1 = new CountDownLatch(1); + monitor.addCallback((usableSpace, totalSpace, ok, type) -> { + + double usage = FileStoreMonitor.calculateUsage(usableSpace, totalSpace); + + if (!ok && usage >= monitor.getMaxUsage()) { + latch1.countDown(); + } }); - assertTrue(latch.await(1, TimeUnit.MINUTES)); + latch1.await(2000, TimeUnit.MILLISECONDS); AmqpClient client = createAmqpClient(new URI("tcp://localhost:" + AMQP_PORT)); AmqpConnection connection = addConnection(client.connect()); @@ -60,59 +114,62 @@ public void testProducerOnDiskFull() throws Exception { AmqpSender sender = session.createSender(getQueueName()); byte[] payload = new byte[1000]; - AmqpSender anonSender = session.createSender(); CountDownLatch sentWithName = new CountDownLatch(1); CountDownLatch sentAnon = new CountDownLatch(1); - Thread threadWithName = new Thread(() -> { + ExecutorService pool = Executors.newCachedThreadPool(); + runAfter(pool::shutdownNow); + pool.execute(() -> { try { final AmqpMessage message = new AmqpMessage(); message.setBytes(payload); sender.setSendTimeout(-1); sender.send(message); } catch (Exception e) { - e.printStackTrace(); + logger.warn("Caught exception while sending", e); } finally { sentWithName.countDown(); } }); - - threadWithName.start(); - - - Thread threadWithAnon = new Thread(() -> { + pool.execute(()-> { try { final AmqpMessage message = new AmqpMessage(); message.setBytes(payload); anonSender.setSendTimeout(-1); message.setAddress(getQueueName()); anonSender.send(message); - } catch (Exception e) { - e.printStackTrace(); - } finally { sentAnon.countDown(); + } catch (Exception e) { + logger.warn("Caught exception while sending", e); } }); - threadWithAnon.start(); - assertFalse(sentWithName.await(500, TimeUnit.MILLISECONDS), "Thread sender should be blocked"); assertFalse(sentAnon.await(500, TimeUnit.MILLISECONDS), "Thread sender anonymous should be blocked"); + // unblock monitor.setMaxUsage(100.0); + final CountDownLatch latch2 = new CountDownLatch(1); + monitor.addCallback((usableSpace, totalSpace, ok, type) -> { + + double usage = FileStoreMonitor.calculateUsage(usableSpace, totalSpace); + + if (ok && usage < monitor.getMaxUsage()) { + latch2.countDown(); + } + }); + + latch2.await(2000, TimeUnit.MILLISECONDS); + assertTrue(sentWithName.await(30, TimeUnit.SECONDS), "Thread sender should be released"); assertTrue(sentAnon.await(30, TimeUnit.SECONDS), "Thread sender anonymous should be released"); - - threadWithName.join(TimeUnit.SECONDS.toMillis(30)); - threadWithAnon.join(TimeUnit.SECONDS.toMillis(30)); - assertFalse(threadWithName.isAlive()); - assertFalse(threadWithAnon.isAlive()); } finally { connection.close(); } } + }