From 999db2c7f5a8a3c09e2055fb47f9e3cb65e7d1d2 Mon Sep 17 00:00:00 2001 From: feynmanlin <315157973@qq.com> Date: Tue, 14 Jan 2025 10:41:59 +0800 Subject: [PATCH] [fix] [broker] Fix acknowledgeCumulativeAsync block when ackReceipt is enabled (#23841) --- .../pulsar/client/impl/ConsumerAckTest.java | 34 +++++++++++++++++++ ...sistentAcknowledgmentsGroupingTracker.java | 7 +++- 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java index a83283bc267b5..6d9025fd966b3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java @@ -116,6 +116,40 @@ public void testAckResponse() throws PulsarClientException, InterruptedException Assert.assertTrue(e.getCause() instanceof PulsarClientException.NotAllowedException); } } + @Test(timeOut = 30000) + public void testAckReceipt() throws Exception { + String topic = "testAckReceipt"; + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .enableBatching(false) + .create(); + @Cleanup + ConsumerImpl consumer = (ConsumerImpl) pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName("sub") + .isAckReceiptEnabled(true) + .subscribe(); + for (int i = 0; i < 10; i++) { + producer.send(i); + } + Message message = consumer.receive(); + MessageId messageId = message.getMessageId(); + consumer.acknowledgeCumulativeAsync(messageId).get(); + consumer.acknowledgeCumulativeAsync(messageId).get(); + consumer.close(); + @Cleanup + ConsumerImpl consumer2 = (ConsumerImpl) pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName("sub") + .isAckReceiptEnabled(true) + .acknowledgmentGroupTime(0, TimeUnit.SECONDS) + .subscribe(); + message = consumer2.receive(); + messageId = message.getMessageId(); + consumer2.acknowledgeCumulativeAsync(messageId).get(); + consumer2.acknowledgeCumulativeAsync(messageId).get(); + } @Test public void testIndividualAck() throws Exception { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index c0ee13b346a0b..d30c3de0fd720 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -312,7 +312,12 @@ private CompletableFuture doCumulativeAck(MessageIdAdv messageId, Map readLock = acquireReadLock(); try { doCumulativeAckAsync(messageId, bitSet); - return readLock.map(__ -> currentCumulativeAckFuture).orElse(CompletableFuture.completedFuture(null)); + return readLock.map(__ -> { + if (consumer.isAckReceiptEnabled() && lastCumulativeAck.compareTo(messageId) == 0) { + return CompletableFuture.completedFuture(null); + } + return currentCumulativeAckFuture; + }).orElse(CompletableFuture.completedFuture(null)); } finally { readLock.ifPresent(Lock::unlock); }