From dd07bd6dd23b10da6f9099f6fca0f22fdaa6da82 Mon Sep 17 00:00:00 2001 From: ken <1647023764@qq.com> Date: Fri, 22 Nov 2024 09:51:02 +0800 Subject: [PATCH] [fix][client] fix incomingMessageSize and client memory usage is negative (#23624) Co-authored-by: fanjianye (cherry picked from commit 708c5cc0c5f86d6c6bbdb438067122074f4de994) --- .../api/SimpleProducerConsumerTest.java | 56 +++++++++++++++++ .../impl/AutoScaledReceiverQueueSizeTest.java | 62 +++++++++++++++++++ .../pulsar/client/impl/ConsumerBase.java | 5 ++ .../pulsar/client/impl/ConsumerImpl.java | 2 + 4 files changed, 125 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 6edfb634eef52..d91f2fefda059 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -4227,6 +4227,62 @@ public void testIncomingMessageSize(boolean isPartitioned) throws Exception { }); } + @Test(timeOut = 100000) + public void testNegativeIncomingMessageSize() throws Exception { + final String topicName = "persistent://my-property/my-ns/testIncomingMessageSize-" + + UUID.randomUUID().toString(); + final String subName = "my-sub"; + + admin.topics().createPartitionedTopic(topicName, 3); + + @Cleanup + Producer producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .create(); + + final int messages = 1000; + List> messageIds = new ArrayList<>(messages); + for (int i = 0; i < messages; i++) { + messageIds.add(producer.newMessage().key(i + "").value(("Message-" + i).getBytes()).sendAsync()); + } + FutureUtil.waitForAll(messageIds).get(); + + @Cleanup + Consumer consumer = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName(subName) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + + Awaitility.await().untilAsserted(() -> { + long size = ((ConsumerBase) consumer).getIncomingMessageSize(); + log.info("Check the incoming message size should greater that 0, current size is {}", size); + Assert.assertTrue(size > 0); + }); + + + for (int i = 0; i < messages; i++) { + consumer.receive(); + } + + + Awaitility.await().untilAsserted(() -> { + long size = ((ConsumerBase) consumer).getIncomingMessageSize(); + log.info("Check the incoming message size should be 0, current size is {}", size); + Assert.assertEquals(size, 0); + }); + + + MultiTopicsConsumerImpl multiTopicsConsumer = (MultiTopicsConsumerImpl) consumer; + List> list = multiTopicsConsumer.getConsumers(); + for (ConsumerImpl subConsumer : list) { + long size = subConsumer.getIncomingMessageSize(); + log.info("Check the sub consumer incoming message size should be 0, current size is {}", size); + Assert.assertEquals(size, 0); + } + } @Data @EqualsAndHashCode diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java index 858e43e84656f..5359158bf7214 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java @@ -20,14 +20,22 @@ import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.BatchReceivePolicy; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -257,4 +265,58 @@ public void testMultiConsumerImplBatchReceive() throws PulsarClientException, Pu Awaitility.await().until(() -> consumer.getCurrentReceiverQueueSize() == currentSize * 2); log.info("getCurrentReceiverQueueSize={}", consumer.getCurrentReceiverQueueSize()); } + + @Test + public void testNegativeClientMemory() throws Exception { + final String topicName = "persistent://public/default/testMemory-" + + UUID.randomUUID().toString(); + final String subName = "my-sub"; + + admin.topics().createPartitionedTopic(topicName, 3); + + @Cleanup + Producer producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .create(); + + final int messages = 1000; + List> messageIds = new ArrayList<>(messages); + for (int i = 0; i < messages; i++) { + messageIds.add(producer.newMessage().key(i + "").value(("Message-" + i).getBytes()).sendAsync()); + } + FutureUtil.waitForAll(messageIds).get(); + + + @Cleanup + Consumer consumer = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName(subName) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .autoScaledReceiverQueueSizeEnabled(true) + .subscribe(); + + + Awaitility.await().untilAsserted(() -> { + long size = ((ConsumerBase) consumer).getIncomingMessageSize(); + log.info("Check the incoming message size should greater that 0, current size is {}", size); + Assert.assertTrue(size > 0); + }); + + + for (int i = 0; i < messages; i++) { + consumer.receive(); + } + + Awaitility.await().untilAsserted(() -> { + long size = ((ConsumerBase) consumer).getIncomingMessageSize(); + log.info("Check the incoming message size should be 0, current size is {}", size); + Assert.assertEquals(size, 0); + }); + + + MemoryLimitController controller = ((PulsarClientImpl)pulsarClient).getMemoryLimitController(); + Assert.assertEquals(controller.currentUsage(), 0); + Assert.assertEquals(controller.currentUsagePercent(), 0); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 4836fd9cd92d2..df8eb28b95508 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -1214,6 +1214,11 @@ protected void decreaseIncomingMessageSize(final Message message) { getMemoryLimitController().ifPresent(limiter -> limiter.releaseMemory(message.size())); } + protected void increaseIncomingMessageSize(final Message message) { + INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, message.size()); + getMemoryLimitController().ifPresent(limiter -> limiter.forceReserveMemory(message.size())); + } + public long getIncomingMessageSize() { return INCOMING_MESSAGES_SIZE_UPDATER.get(this); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index e527a92e4ac35..c857c8e12d0a8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1608,6 +1608,8 @@ void notifyPendingReceivedCallback(final Message message, Exception exception return; } + // increase incomingMessageSize here because the size would be decreased in messageProcessed() next step + increaseIncomingMessageSize(message); // increase permits for available message-queue messageProcessed(message); // call interceptor and complete received callback