Skip to content

Commit

Permalink
[fix][client] Fix for early hit beforeConsume for MultiTopicConsumer (
Browse files Browse the repository at this point in the history
apache#23141)

(cherry picked from commit c07b158)
  • Loading branch information
coderzc committed Aug 14, 2024
1 parent 8ce330a commit d89f7c0
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.client.api;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -27,8 +29,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.Sets;
import org.apache.commons.lang3.RandomUtils;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
Expand Down Expand Up @@ -66,6 +66,12 @@ public Object[][] getReceiverQueueSize() {
return new Object[][] { { 0 }, { 1000 } };
}

@DataProvider(name = "topics")
public Object[][] getTopics() {
return new Object[][] {{Lists.newArrayList("persistent://my-property/my-ns/my-topic") },
{ Lists.newArrayList("persistent://my-property/my-ns/my-topic", "persistent://my-property/my-ns/my-topic1") }};
}

@Test
public void testProducerInterceptor() throws Exception {
Map<MessageId, List<String>> ackCallback = new HashMap<>();
Expand Down Expand Up @@ -390,9 +396,9 @@ public void close() {

@Override
public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
MessageImpl<String> msg = (MessageImpl<String>) message;
MessageImpl<String> msg = ((MessageImpl<String>) ((TopicMessageImpl<String>) message).getMessage());
msg.getMessageBuilder().addProperty().setKey("beforeConsumer").setValue("1");
return msg;
return message;
}

@Override
Expand Down Expand Up @@ -436,13 +442,19 @@ public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageId

int keyCount = 0;
for (int i = 0; i < 2; i++) {
Message<String> received = consumer.receive();
Message<String> received;
if (i % 2 == 0) {
received = consumer.receive();
} else {
received = consumer.receiveAsync().join();
}
MessageImpl<String> msg = (MessageImpl<String>) ((TopicMessageImpl<String>) received).getMessage();
for (KeyValue keyValue : msg.getMessageBuilder().getPropertiesList()) {
if ("beforeConsumer".equals(keyValue.getKey())) {
keyCount++;
}
}
Assert.assertEquals(keyCount, i + 1);
consumer.acknowledge(received);
}
Assert.assertEquals(2, keyCount);
Expand All @@ -462,9 +474,9 @@ public void close() {

@Override
public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
MessageImpl<String> msg = (MessageImpl<String>) message;
MessageImpl<String> msg = ((MessageImpl<String>) ((TopicMessageImpl<String>) message).getMessage());
msg.getMessageBuilder().addProperty().setKey("beforeConsumer").setValue("1");
return msg;
return message;
}

@Override
Expand Down Expand Up @@ -599,8 +611,8 @@ public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageId
consumer.close();
}

@Test
public void testConsumerInterceptorForNegativeAcksSend() throws PulsarClientException, InterruptedException {
@Test(dataProvider = "topics")
public void testConsumerInterceptorForNegativeAcksSend(List<String> topics) throws PulsarClientException, InterruptedException {
final int totalNumOfMessages = 100;
CountDownLatch latch = new CountDownLatch(totalNumOfMessages / 2);

Expand All @@ -627,6 +639,7 @@ public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId message

@Override
public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
Assert.assertTrue(latch.getCount() > 0);
messageIds.forEach(messageId -> latch.countDown());
}

Expand All @@ -637,15 +650,15 @@ public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageId
};

Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic")
.topics(topics)
.subscriptionType(SubscriptionType.Failover)
.intercept(interceptor)
.negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
.subscriptionName("my-subscription")
.subscribe();

Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic")
.topic(topics.get(0))
.create();

for (int i = 0; i < totalNumOfMessages; i++) {
Expand All @@ -669,8 +682,9 @@ public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageId
consumer.close();
}

@Test
public void testConsumerInterceptorForAckTimeoutSend() throws PulsarClientException, InterruptedException {
@Test(dataProvider = "topics")
public void testConsumerInterceptorForAckTimeoutSend(List<String> topics) throws PulsarClientException,
InterruptedException {
final int totalNumOfMessages = 100;
CountDownLatch latch = new CountDownLatch(totalNumOfMessages / 2);

Expand Down Expand Up @@ -701,16 +715,17 @@ public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> message

@Override
public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) {
Assert.assertTrue(latch.getCount() > 0);
messageIds.forEach(messageId -> latch.countDown());
}
};

Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic")
.topic(topics.get(0))
.create();

Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic")
.topics(topics)
.subscriptionName("foo")
.intercept(interceptor)
.ackTimeout(2, TimeUnit.SECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.collect.Lists;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -104,6 +105,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {

private volatile BatchMessageIdImpl startMessageId = null;
private final long startMessageRollbackDurationInSec;
private final ConsumerInterceptors<T> internalConsumerInterceptors;
MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf,
ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist) {
Expand Down Expand Up @@ -133,6 +135,11 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
long startMessageRollbackDurationInSec) {
super(client, singleTopic, conf, Math.max(2, conf.getReceiverQueueSize()), executorProvider, subscribeFuture,
schema, interceptors);
if (interceptors != null) {
this.internalConsumerInterceptors = getInternalConsumerInterceptors(interceptors);
} else {
this.internalConsumerInterceptors = null;
}

checkArgument(conf.getReceiverQueueSize() > 0,
"Receiver queue size needs to be greater than 0 for Topics Consumer");
Expand Down Expand Up @@ -315,7 +322,8 @@ private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) {
CompletableFuture<Message<T>> receivedFuture = nextPendingReceive();
if (receivedFuture != null) {
unAckedMessageTracker.add(topicMessage.getMessageId(), topicMessage.getRedeliveryCount());
completePendingReceive(receivedFuture, topicMessage);
final Message<T> interceptMessage = beforeConsume(topicMessage);
completePendingReceive(receivedFuture, interceptMessage);
} else if (enqueueMessageAndCheckBatchReceive(topicMessage) && hasPendingBatchReceive()) {
notifyPendingBatchReceivedCallBack();
}
Expand Down Expand Up @@ -366,7 +374,7 @@ protected Message<T> internalReceive() throws PulsarClientException {
}
unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount());
resumeReceivingFromPausedConsumersIfNeeded();
return message;
return beforeConsume(message);
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
Expand All @@ -393,6 +401,7 @@ protected Message<T> internalReceive(long timeout, TimeUnit unit) throws PulsarC
}
}
unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount());
message = beforeConsume(message);
}
resumeReceivingFromPausedConsumersIfNeeded();
return message;
Expand Down Expand Up @@ -463,7 +472,7 @@ protected CompletableFuture<Message<T>> internalReceiveAsync() {
checkState(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount());
resumeReceivingFromPausedConsumersIfNeeded();
result.complete(message);
result.complete(beforeConsume(message));
}
});
return result;
Expand Down Expand Up @@ -1119,7 +1128,7 @@ private ConsumerImpl<T> createInternalConsumer(ConsumerConfigurationData<T> conf
return ConsumerImpl.newConsumerImpl(client, partitionName,
configurationData, client.externalExecutorProvider(),
partitionIndex, true, listener != null, subFuture,
startMessageId, schema, interceptors,
startMessageId, schema, this.internalConsumerInterceptors,
createIfDoesNotExist, startMessageRollbackDurationInSec);
}

Expand Down Expand Up @@ -1503,4 +1512,45 @@ public void tryAcknowledgeMessage(Message<T> msg) {
acknowledgeCumulativeAsync(msg);
}
}

private ConsumerInterceptors<T> getInternalConsumerInterceptors(ConsumerInterceptors<T> multiTopicInterceptors) {
return new ConsumerInterceptors<T>(new ArrayList<>()) {

@Override
public Message<T> beforeConsume(Consumer<T> consumer, Message<T> message) {
return message;
}

@Override
public void onAcknowledge(Consumer<T> consumer, MessageId messageId, Throwable exception) {
multiTopicInterceptors.onAcknowledge(consumer, messageId, exception);
}

@Override
public void onAcknowledgeCumulative(Consumer<T> consumer,
MessageId messageId, Throwable exception) {
multiTopicInterceptors.onAcknowledgeCumulative(consumer, messageId, exception);
}

@Override
public void onNegativeAcksSend(Consumer<T> consumer, Set<MessageId> set) {
multiTopicInterceptors.onNegativeAcksSend(consumer, set);
}

@Override
public void onAckTimeoutSend(Consumer<T> consumer, Set<MessageId> set) {
multiTopicInterceptors.onAckTimeoutSend(consumer, set);
}

@Override
public void onPartitionsChange(String topicName, int partitions) {
multiTopicInterceptors.onPartitionsChange(topicName, partitions);
}

@Override
public void close() throws IOException {
multiTopicInterceptors.close();
}
};
}
}

0 comments on commit d89f7c0

Please sign in to comment.