Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Fix for early hit beforeConsume for MultiTopicConsumer #23141

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;

import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -29,8 +30,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.collect.Sets;
import lombok.Cleanup;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
Expand Down Expand Up @@ -79,6 +78,12 @@ public Object[][] getTopicPartition() {
return new Object[][] {{ 0 }, { 3 }};
}

@DataProvider(name = "topics")
public Object[][] getTopics() {
return new Object[][] {{ List.of("persistent://my-property/my-ns/my-topic") },
{ List.of("persistent://my-property/my-ns/my-topic", "persistent://my-property/my-ns/my-topic1") }};
}

@Test
public void testProducerInterceptor() throws Exception {
Map<MessageId, List<String>> ackCallback = new HashMap<>();
Expand Down Expand Up @@ -403,9 +408,9 @@ public void close() {

@Override
public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
MessageImpl<String> msg = (MessageImpl<String>) message;
MessageImpl<String> msg = ((MessageImpl<String>) ((TopicMessageImpl<String>) message).getMessage());
msg.getMessageBuilder().addProperty().setKey("beforeConsumer").setValue("1");
return msg;
return message;
}

@Override
Expand Down Expand Up @@ -449,13 +454,19 @@ public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageId

int keyCount = 0;
for (int i = 0; i < 2; i++) {
Message<String> received = consumer.receive();
Message<String> received;
if (i % 2 == 0) {
received = consumer.receive();
} else {
received = consumer.receiveAsync().join();
}
MessageImpl<String> msg = (MessageImpl<String>) ((TopicMessageImpl<String>) received).getMessage();
for (KeyValue keyValue : msg.getMessageBuilder().getPropertiesList()) {
if ("beforeConsumer".equals(keyValue.getKey())) {
keyCount++;
}
}
Assert.assertEquals(keyCount, i + 1);
consumer.acknowledge(received);
}
Assert.assertEquals(2, keyCount);
Expand All @@ -475,9 +486,9 @@ public void close() {

@Override
public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
MessageImpl<String> msg = (MessageImpl<String>) message;
MessageImpl<String> msg = ((MessageImpl<String>) ((TopicMessageImpl<String>) message).getMessage());
msg.getMessageBuilder().addProperty().setKey("beforeConsumer").setValue("1");
return msg;
return message;
}

@Override
Expand Down Expand Up @@ -612,8 +623,8 @@ public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageId
consumer.close();
}

@Test
public void testConsumerInterceptorForNegativeAcksSend() throws PulsarClientException, InterruptedException {
@Test(dataProvider = "topics")
public void testConsumerInterceptorForNegativeAcksSend(List<String> topics) throws PulsarClientException, InterruptedException {
final int totalNumOfMessages = 100;
CountDownLatch latch = new CountDownLatch(totalNumOfMessages / 2);

Expand All @@ -640,6 +651,7 @@ public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId message

@Override
public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
Assert.assertTrue(latch.getCount() > 0);
messageIds.forEach(messageId -> latch.countDown());
}

Expand All @@ -650,15 +662,15 @@ public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageId
};

Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic")
.topics(topics)
.subscriptionType(SubscriptionType.Failover)
.intercept(interceptor)
.negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
.subscriptionName("my-subscription")
.subscribe();

Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic")
.topic(topics.get(0))
.create();

for (int i = 0; i < totalNumOfMessages; i++) {
Expand All @@ -682,8 +694,9 @@ public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageId
consumer.close();
}

@Test
public void testConsumerInterceptorForAckTimeoutSend() throws PulsarClientException, InterruptedException {
@Test(dataProvider = "topics")
public void testConsumerInterceptorForAckTimeoutSend(List<String> topics) throws PulsarClientException,
InterruptedException {
final int totalNumOfMessages = 100;
CountDownLatch latch = new CountDownLatch(totalNumOfMessages / 2);

Expand Down Expand Up @@ -714,16 +727,17 @@ public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> message

@Override
public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) {
Assert.assertTrue(latch.getCount() > 0);
messageIds.forEach(messageId -> latch.countDown());
}
};

Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic")
.topic(topics.get(0))
.create();

Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic")
.topics(topics)
.subscriptionName("foo")
.intercept(interceptor)
.ackTimeout(2, TimeUnit.SECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.collect.Lists;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -108,6 +109,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
private final MessageIdAdv startMessageId;
private volatile boolean duringSeek = false;
private final long startMessageRollbackDurationInSec;
private final ConsumerInterceptors<T> internalConsumerInterceptors;
MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf,
ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist) {
Expand Down Expand Up @@ -137,6 +139,11 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
long startMessageRollbackDurationInSec) {
super(client, singleTopic, conf, Math.max(2, conf.getReceiverQueueSize()), executorProvider, subscribeFuture,
schema, interceptors);
if (interceptors != null) {
this.internalConsumerInterceptors = getInternalConsumerInterceptors(interceptors);
} else {
this.internalConsumerInterceptors = null;
}

checkArgument(conf.getReceiverQueueSize() > 0,
"Receiver queue size needs to be greater than 0 for Topics Consumer");
Expand Down Expand Up @@ -316,7 +323,8 @@ private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) {
CompletableFuture<Message<T>> receivedFuture = nextPendingReceive();
if (receivedFuture != null) {
unAckedMessageTracker.add(topicMessage.getMessageId(), topicMessage.getRedeliveryCount());
completePendingReceive(receivedFuture, topicMessage);
final Message<T> interceptMessage = beforeConsume(topicMessage);
completePendingReceive(receivedFuture, interceptMessage);
} else if (enqueueMessageAndCheckBatchReceive(topicMessage) && hasPendingBatchReceive()) {
notifyPendingBatchReceivedCallBack();
}
Expand Down Expand Up @@ -369,7 +377,7 @@ protected Message<T> internalReceive() throws PulsarClientException {
checkState(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount());
resumeReceivingFromPausedConsumersIfNeeded();
return message;
return beforeConsume(message);
} catch (Exception e) {
ExceptionHandler.handleInterruptedException(e);
throw PulsarClientException.unwrap(e);
Expand All @@ -388,6 +396,7 @@ protected Message<T> internalReceive(long timeout, TimeUnit unit) throws PulsarC
decreaseIncomingMessageSize(message);
checkArgument(message instanceof TopicMessageImpl);
trackUnAckedMsgIfNoListener(message.getMessageId(), message.getRedeliveryCount());
message = beforeConsume(message);
}
resumeReceivingFromPausedConsumersIfNeeded();
return message;
Expand Down Expand Up @@ -447,7 +456,7 @@ protected CompletableFuture<Message<T>> internalReceiveAsync() {
checkState(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount());
resumeReceivingFromPausedConsumersIfNeeded();
result.complete(message);
result.complete(beforeConsume(message));
}
});
return result;
Expand Down Expand Up @@ -1185,7 +1194,7 @@ private ConsumerImpl<T> createInternalConsumer(ConsumerConfigurationData<T> conf
return ConsumerImpl.newConsumerImpl(client, partitionName,
configurationData, client.externalExecutorProvider(),
partitionIndex, true, listener != null, subFuture,
startMessageId, schema, interceptors,
startMessageId, schema, this.internalConsumerInterceptors,
createIfDoesNotExist, startMessageRollbackDurationInSec);
}

Expand Down Expand Up @@ -1595,4 +1604,45 @@ private CompletableFuture<List<Integer>> getExistsPartitions(String topic) {
return list;
});
}

private ConsumerInterceptors<T> getInternalConsumerInterceptors(ConsumerInterceptors<T> multiTopicInterceptors) {
return new ConsumerInterceptors<T>(new ArrayList<>()) {

@Override
public Message<T> beforeConsume(Consumer<T> consumer, Message<T> message) {
return message;
}

@Override
public void onAcknowledge(Consumer<T> consumer, MessageId messageId, Throwable exception) {
multiTopicInterceptors.onAcknowledge(consumer, messageId, exception);
}

@Override
public void onAcknowledgeCumulative(Consumer<T> consumer,
MessageId messageId, Throwable exception) {
multiTopicInterceptors.onAcknowledgeCumulative(consumer, messageId, exception);
}

@Override
public void onNegativeAcksSend(Consumer<T> consumer, Set<MessageId> set) {
multiTopicInterceptors.onNegativeAcksSend(consumer, set);
}

@Override
public void onAckTimeoutSend(Consumer<T> consumer, Set<MessageId> set) {
multiTopicInterceptors.onAckTimeoutSend(consumer, set);
}

@Override
public void onPartitionsChange(String topicName, int partitions) {
multiTopicInterceptors.onPartitionsChange(topicName, partitions);
}

@Override
public void close() throws IOException {
multiTopicInterceptors.close();
}
};
}
}
Loading