Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] fix incomingMessageSize and client memory usage is negative #23624

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4252,6 +4252,62 @@ public void testIncomingMessageSize(boolean isPartitioned) throws Exception {
});
}

@Test(timeOut = 100000)
public void testNegativeIncomingMessageSize() throws Exception {
final String topicName = "persistent://my-property/my-ns/testIncomingMessageSize-" +
UUID.randomUUID().toString();
final String subName = "my-sub";

admin.topics().createPartitionedTopic(topicName, 3);

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.create();

final int messages = 1000;
List<CompletableFuture<MessageId>> messageIds = new ArrayList<>(messages);
for (int i = 0; i < messages; i++) {
messageIds.add(producer.newMessage().key(i + "").value(("Message-" + i).getBytes()).sendAsync());
}
FutureUtil.waitForAll(messageIds).get();

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subName)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();


Awaitility.await().untilAsserted(() -> {
long size = ((ConsumerBase<byte[]>) consumer).getIncomingMessageSize();
log.info("Check the incoming message size should greater that 0, current size is {}", size);
Assert.assertTrue(size > 0);
});


for (int i = 0; i < messages; i++) {
consumer.receive();
}


Awaitility.await().untilAsserted(() -> {
long size = ((ConsumerBase<byte[]>) consumer).getIncomingMessageSize();
log.info("Check the incoming message size should be 0, current size is {}", size);
Assert.assertEquals(size, 0);
});


MultiTopicsConsumerImpl multiTopicsConsumer = (MultiTopicsConsumerImpl) consumer;
List<ConsumerImpl<byte[]>> list = multiTopicsConsumer.getConsumers();
for (ConsumerImpl<byte[]> subConsumer : list) {
long size = subConsumer.getIncomingMessageSize();
log.info("Check the sub consumer incoming message size should be 0, current size is {}", size);
Assert.assertEquals(size, 0);
}
}

@Data
@EqualsAndHashCode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,22 @@


import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -257,4 +265,58 @@ public void testMultiConsumerImplBatchReceive() throws PulsarClientException, Pu
Awaitility.await().until(() -> consumer.getCurrentReceiverQueueSize() == currentSize * 2);
log.info("getCurrentReceiverQueueSize={}", consumer.getCurrentReceiverQueueSize());
}

@Test
public void testNegativeClientMemory() throws Exception {
final String topicName = "persistent://public/default/testMemory-" +
UUID.randomUUID().toString();
final String subName = "my-sub";

admin.topics().createPartitionedTopic(topicName, 3);

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.create();

final int messages = 1000;
List<CompletableFuture<MessageId>> messageIds = new ArrayList<>(messages);
for (int i = 0; i < messages; i++) {
messageIds.add(producer.newMessage().key(i + "").value(("Message-" + i).getBytes()).sendAsync());
}
FutureUtil.waitForAll(messageIds).get();


@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subName)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.autoScaledReceiverQueueSizeEnabled(true)
.subscribe();


Awaitility.await().untilAsserted(() -> {
long size = ((ConsumerBase<byte[]>) consumer).getIncomingMessageSize();
log.info("Check the incoming message size should greater that 0, current size is {}", size);
Assert.assertTrue(size > 0);
});


for (int i = 0; i < messages; i++) {
consumer.receive();
}

Awaitility.await().untilAsserted(() -> {
long size = ((ConsumerBase<byte[]>) consumer).getIncomingMessageSize();
log.info("Check the incoming message size should be 0, current size is {}", size);
Assert.assertEquals(size, 0);
});


MemoryLimitController controller = ((PulsarClientImpl)pulsarClient).getMemoryLimitController();
Assert.assertEquals(controller.currentUsage(), 0);
Assert.assertEquals(controller.currentUsagePercent(), 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1231,6 +1231,11 @@ protected void decreaseIncomingMessageSize(final Message<?> message) {
getMemoryLimitController().ifPresent(limiter -> limiter.releaseMemory(message.size()));
}

protected void increaseIncomingMessageSize(final Message<?> message) {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, message.size());
getMemoryLimitController().ifPresent(limiter -> limiter.forceReserveMemory(message.size()));
}

public long getIncomingMessageSize() {
return INCOMING_MESSAGES_SIZE_UPDATER.get(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1667,6 +1667,8 @@ void notifyPendingReceivedCallback(final Message<T> message, Exception exception
return;
}

// increase incomingMessageSize here because the size would be decreased in messageProcessed() next step
increaseIncomingMessageSize(message);
// increase permits for available message-queue
messageProcessed(message);
// call interceptor and complete received callback
Expand Down
Loading