Skip to content

Commit

Permalink
[fix][client] fix negative message re-delivery twice issue (apache#20750
Browse files Browse the repository at this point in the history
)

(cherry picked from commit ecd16d6)
  • Loading branch information
aloyszhang authored and nodece committed Feb 23, 2024
1 parent 221553e commit 90ef8f9
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ public void testNegativeAcks(boolean batching, boolean usePartitions, Subscripti
consumer.negativeAcknowledge(msg);
}

assertTrue(consumer instanceof ConsumerBase<String>);
assertEquals(((ConsumerBase<String>) consumer).getUnAckedMessageTracker().size(), 0);

Set<String> receivedMessages = new HashSet<>();

// All the messages should be received again
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def build_extension(self, ext):
extras_require["functions"] = sorted(
{
"protobuf>=3.6.1,<=3.20.*",
"grpcio<1.28,>=1.8.2",
"grpcio>=1.60",
"apache-bookkeeper-client>=4.9.2",
"prometheus_client",
"ratelimit"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat

}

protected UnAckedMessageTracker getUnAckedMessageTracker() {
return unAckedMessageTracker;
}

protected void triggerBatchReceiveTimeoutTask() {
if (!hasBatchReceiveTimeout() && batchReceivePolicy.getTimeoutMs() > 0) {
batchReceiveTimeout = client.timer().newTimeout(this::pendingBatchReceiveTask,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ public ConnectionHandler getConnectionHandler() {
return connectionHandler;
}

@Override
public UnAckedMessageTracker getUnAckedMessageTracker() {
return unAckedMessageTracker;
}
Expand Down Expand Up @@ -769,7 +770,7 @@ public void negativeAcknowledge(Message<?> message) {
negativeAcksTracker.add(message);

// Ensure the message is not redelivered for ack-timeout, since we did receive an "ack"
unAckedMessageTracker.remove(message.getMessageId());
unAckedMessageTracker.remove(MessageIdAdvUtils.discardBatch(message.getMessageId()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,7 @@ public void negativeAcknowledge(MessageId messageId) {

ConsumerImpl<T> consumer = consumers.get(topicMessageId.getTopicPartitionName());
consumer.negativeAcknowledge(topicMessageId.getInnerMessageId());
unAckedMessageTracker.remove(messageId);
}

@Override
Expand All @@ -577,6 +578,7 @@ public void negativeAcknowledge(Message<?> message) {

ConsumerImpl<T> consumer = consumers.get(topicMessageId.getTopicPartitionName());
consumer.negativeAcknowledge(message);
unAckedMessageTracker.remove(messageId);
}

@Override
Expand Down Expand Up @@ -848,6 +850,7 @@ public synchronized ConsumerStats getStats() {
return stats;
}

@Override
public UnAckedMessageTracker getUnAckedMessageTracker() {
return unAckedMessageTracker;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class NegativeAcksTracker implements Closeable {
private static final Logger log = LoggerFactory.getLogger(NegativeAcksTracker.class);

private HashMap<MessageId, Long> nackedMessages = null;

Expand Down Expand Up @@ -77,9 +80,12 @@ private synchronized void triggerRedelivery(Timeout t) {
}
});

messagesToRedeliver.forEach(nackedMessages::remove);
consumer.onNegativeAcksSend(messagesToRedeliver);
consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);
if (!messagesToRedeliver.isEmpty()) {
messagesToRedeliver.forEach(nackedMessages::remove);
consumer.onNegativeAcksSend(messagesToRedeliver);
log.info("[{}] {} messages will be re-delivered", consumer, messagesToRedeliver.size());
consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);
}

this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ public void clear() {
}

public boolean add(MessageId messageId) {
if (messageId == null) {
return false;
}

writeLock.lock();
try {
HashSet<MessageId> partition = timePartitions.peekLast();
Expand Down Expand Up @@ -217,6 +221,10 @@ boolean isEmpty() {
}

public boolean remove(MessageId messageId) {
if (messageId == null) {
return false;
}

writeLock.lock();
try {
boolean removed = false;
Expand Down

0 comments on commit 90ef8f9

Please sign in to comment.