diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index 0ae36b4ca9045..876fa98bce414 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -146,6 +146,9 @@ public void testNegativeAcks(boolean batching, boolean usePartitions, Subscripti consumer.negativeAcknowledge(msg); } + assertTrue(consumer instanceof ConsumerBase); + assertEquals(((ConsumerBase) consumer).getUnAckedMessageTracker().size(), 0); + Set receivedMessages = new HashSet<>(); // All the messages should be received again diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index e933005f2d6ea..fec428824c205 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -193,6 +193,10 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat initReceiverQueueSize(); } + protected UnAckedMessageTracker getUnAckedMessageTracker() { + return unAckedMessageTracker; + } + protected void triggerBatchReceiveTimeoutTask() { if (!hasBatchReceiveTimeout() && batchReceivePolicy.getTimeoutMs() > 0) { batchReceiveTimeout = client.timer().newTimeout(this::pendingBatchReceiveTask, diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index b0d7d3a0f8b3a..a929fe9aa6bb2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -385,6 +385,7 @@ public ConnectionHandler getConnectionHandler() { return connectionHandler; } + @Override public UnAckedMessageTracker getUnAckedMessageTracker() { return unAckedMessageTracker; } @@ -756,7 +757,7 @@ public void negativeAcknowledge(Message message) { negativeAcksTracker.add(message); // Ensure the message is not redelivered for ack-timeout, since we did receive an "ack" - unAckedMessageTracker.remove(message.getMessageId()); + unAckedMessageTracker.remove(MessageIdAdvUtils.discardBatch(message.getMessageId())); } @Override diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index fb7be3c5a5ea2..8a515a9f9b8d7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -546,6 +546,7 @@ public void negativeAcknowledge(MessageId messageId) { checkArgument(messageId instanceof TopicMessageId); ConsumerImpl consumer = consumers.get(((TopicMessageId) messageId).getOwnerTopic()); consumer.negativeAcknowledge(messageId); + unAckedMessageTracker.remove(messageId); } @Override @@ -554,6 +555,7 @@ public void negativeAcknowledge(Message message) { checkArgument(messageId instanceof TopicMessageId); ConsumerImpl consumer = consumers.get(((TopicMessageId) messageId).getOwnerTopic()); consumer.negativeAcknowledge(message); + unAckedMessageTracker.remove(messageId); } @Override @@ -852,6 +854,7 @@ public synchronized ConsumerStats getStats() { return stats; } + @Override public UnAckedMessageTracker getUnAckedMessageTracker() { return unAckedMessageTracker; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java index 37f58a0218091..d6b86e3593dc2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java @@ -32,8 +32,11 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.RedeliveryBackoff; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class NegativeAcksTracker implements Closeable { + private static final Logger log = LoggerFactory.getLogger(NegativeAcksTracker.class); private HashMap nackedMessages = null; @@ -79,9 +82,12 @@ private synchronized void triggerRedelivery(Timeout t) { } }); - messagesToRedeliver.forEach(nackedMessages::remove); - consumer.onNegativeAcksSend(messagesToRedeliver); - consumer.redeliverUnacknowledgedMessages(messagesToRedeliver); + if (!messagesToRedeliver.isEmpty()) { + messagesToRedeliver.forEach(nackedMessages::remove); + consumer.onNegativeAcksSend(messagesToRedeliver); + log.info("[{}] {} messages will be re-delivered", consumer, messagesToRedeliver.size()); + consumer.redeliverUnacknowledgedMessages(messagesToRedeliver); + } this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java index 534f33350267d..69f86a1a89f2c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java @@ -189,6 +189,10 @@ public void clear() { } public boolean add(MessageId messageId) { + if (messageId == null) { + return false; + } + writeLock.lock(); try { HashSet partition = timePartitions.peekLast(); @@ -217,6 +221,10 @@ boolean isEmpty() { } public boolean remove(MessageId messageId) { + if (messageId == null) { + return false; + } + writeLock.lock(); try { boolean removed = false;