Skip to content

Commit

Permalink
ARTEMIS-5002 AMQP producer not unblock if the disk space is freed
Browse files Browse the repository at this point in the history
  • Loading branch information
howardgao committed Sep 5, 2024
1 parent 7fb9aa5 commit ac182a7
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,7 @@ public boolean checkMemory(boolean runOnFailure, Runnable runWhenAvailableParame
if (isFull()) {
if (runOnFailure && runWhenAvailable != null) {
addToBlockList(runWhenAvailable, blockedCallback);
pagingManager.addBlockedStore(this);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,92 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.extensions.parameterized.Parameter;
import org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ExtendWith(ParameterizedTestExtension.class)
public class GlobalDiskFullTest extends AmqpClientTestSupport {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

@Parameter(index = 0)
public AddressFullMessagePolicy addressFullPolicy;

@Parameters(name = "addressFullPolicy={0}")
public static Collection<Object[]> parameters() {
return Arrays.asList(new Object[][] {
{AddressFullMessagePolicy.FAIL}, {AddressFullMessagePolicy.DROP}, {AddressFullMessagePolicy.PAGE}
});
}

@Override
protected void configureAddressPolicy(ActiveMQServer server) {
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAddressFullMessagePolicy(addressFullPolicy);
server.getConfiguration().addAddressSetting(getQueueName(), addressSettings);
}

@Override
protected void addConfiguration(ActiveMQServer server) {
Configuration serverConfig = server.getConfiguration();
serverConfig.setDiskScanPeriod(100);
}

@Test
@TestTemplate
public void testProducerOnDiskFull() throws Exception {
FileStoreMonitor monitor = ((ActiveMQServerImpl)server).getMonitor().setMaxUsage(0.0);

FileStoreMonitor monitor = ((ActiveMQServerImpl)server).getMonitor();

final CountDownLatch latch = new CountDownLatch(1);
monitor.addCallback((usableSpace, totalSpace, ok, type) -> {
latch.countDown();

double usage = FileStoreMonitor.calculateUsage(usableSpace, totalSpace);

if (type == FileStoreMonitor.FileStoreMonitorType.MaxDiskUsage && ok && usage < monitor.getMaxUsage()) {
latch.countDown();
}
});

latch.await(2000, TimeUnit.MILLISECONDS);

//make it full
monitor.setMaxUsage(0.0);

final CountDownLatch latch1 = new CountDownLatch(1);
monitor.addCallback((usableSpace, totalSpace, ok, type) -> {

double usage = FileStoreMonitor.calculateUsage(usableSpace, totalSpace);

if (!ok && usage >= monitor.getMaxUsage()) {
latch1.countDown();
}
});

assertTrue(latch.await(1, TimeUnit.MINUTES));
latch1.await(2000, TimeUnit.MILLISECONDS);

AmqpClient client = createAmqpClient(new URI("tcp://localhost:" + AMQP_PORT));
AmqpConnection connection = addConnection(client.connect());
Expand All @@ -60,59 +114,62 @@ public void testProducerOnDiskFull() throws Exception {
AmqpSender sender = session.createSender(getQueueName());
byte[] payload = new byte[1000];


AmqpSender anonSender = session.createSender();

CountDownLatch sentWithName = new CountDownLatch(1);
CountDownLatch sentAnon = new CountDownLatch(1);

Thread threadWithName = new Thread(() -> {
ExecutorService pool = Executors.newCachedThreadPool();
runAfter(pool::shutdownNow);

pool.execute(() -> {
try {
final AmqpMessage message = new AmqpMessage();
message.setBytes(payload);
sender.setSendTimeout(-1);
sender.send(message);
} catch (Exception e) {
e.printStackTrace();
logger.warn("Caught exception while sending", e);
} finally {
sentWithName.countDown();
}
});

threadWithName.start();


Thread threadWithAnon = new Thread(() -> {
pool.execute(()-> {
try {
final AmqpMessage message = new AmqpMessage();
message.setBytes(payload);
anonSender.setSendTimeout(-1);
message.setAddress(getQueueName());
anonSender.send(message);
} catch (Exception e) {
e.printStackTrace();
} finally {
sentAnon.countDown();
} catch (Exception e) {
logger.warn("Caught exception while sending", e);
}
});

threadWithAnon.start();

assertFalse(sentWithName.await(500, TimeUnit.MILLISECONDS), "Thread sender should be blocked");
assertFalse(sentAnon.await(500, TimeUnit.MILLISECONDS), "Thread sender anonymous should be blocked");

// unblock
monitor.setMaxUsage(100.0);

final CountDownLatch latch2 = new CountDownLatch(1);
monitor.addCallback((usableSpace, totalSpace, ok, type) -> {

double usage = FileStoreMonitor.calculateUsage(usableSpace, totalSpace);

if (ok && usage < monitor.getMaxUsage()) {
latch2.countDown();
}
});

latch2.await(2000, TimeUnit.MILLISECONDS);

assertTrue(sentWithName.await(30, TimeUnit.SECONDS), "Thread sender should be released");
assertTrue(sentAnon.await(30, TimeUnit.SECONDS), "Thread sender anonymous should be released");

threadWithName.join(TimeUnit.SECONDS.toMillis(30));
threadWithAnon.join(TimeUnit.SECONDS.toMillis(30));
assertFalse(threadWithName.isAlive());
assertFalse(threadWithAnon.isAlive());
} finally {
connection.close();
}
}

}

0 comments on commit ac182a7

Please sign in to comment.