Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Fix for early hit beforeConsume for MultiTopicConsumer #23141

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;

import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -29,8 +30,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.collect.Sets;
import lombok.Cleanup;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
Expand Down Expand Up @@ -856,6 +855,91 @@ public void onPartitionsChange(String topicName, int partitions) {
Assert.assertNull(reader.readNext(3, TimeUnit.SECONDS));
}

@Test
public void testConsumerInterceptorWithMultiTopics() throws PulsarClientException {
codelipenghui marked this conversation as resolved.
Show resolved Hide resolved

AtomicInteger hitCount = new AtomicInteger();

ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<String>() {
@Override
public void close() {

}

@Override
public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
hitCount.incrementAndGet();
log.info("beforeConsume consumer: {}, messageId: {}", consumer.getTopic(), message.getMessageId());
return message;
}

@Override
public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {
log.info("onAcknowledge messageId: {}", messageId, cause);
}

@Override
public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
log.info("onAcknowledgeCumulative messageIds: {}", messageId, cause);
}

@Override
public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {

}

@Override
public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) {

}
};

List<String> topics = Arrays.asList("persistent://my-property/my-ns/my-topic",
"persistent://my-property/my-ns/my-topic1", "persistent://my-property/my-ns/my-topic2");
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topics(topics)
.subscriptionType(SubscriptionType.Shared)
.receiverQueueSize(100)
.intercept(interceptor)
.subscriptionName("my-subscription")
.subscribe();

Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic")
.create();

for (int i = 0; i < 50; i++) {
producer.newMessage().value("Hello Pulsar!").send();
}

Producer<String> producer2 = pulsarClient.newProducer(Schema.STRING)
.topic("persistent://my-property/my-ns/my-topic1")
.create();

for (int i = 0; i < 50; i++) {
producer2.newMessage().value("Hello Pulsar-2!").send();
}


for (int i = 0; i < 100; i++) {
Message<String> msg;
if (i % 2 == 0) {
msg = consumer.receive();
} else {
msg = consumer.receiveAsync().join();
}
Assert.assertEquals(hitCount.get(), i + 1);
log.info("Received message: {}, count: {}", msg.getMessageId(), hitCount.get());
consumer.acknowledge(msg);
}
producer.close();
producer2.close();
consumer.close();

Assert.assertEquals(100, hitCount.get());
}


private void produceAndConsume(int msgCount, Producer<byte[]> producer, Reader<byte[]> reader)
throws PulsarClientException {
for (int i = 0; i < msgCount; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.collect.Lists;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -108,6 +109,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
private final MessageIdAdv startMessageId;
private volatile boolean duringSeek = false;
private final long startMessageRollbackDurationInSec;
private final ConsumerInterceptors<T> internalConsumerInterceptors;
MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf,
ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist) {
Expand Down Expand Up @@ -137,6 +139,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
long startMessageRollbackDurationInSec) {
super(client, singleTopic, conf, Math.max(2, conf.getReceiverQueueSize()), executorProvider, subscribeFuture,
schema, interceptors);
this.internalConsumerInterceptors = getInternalConsumerInterceptors(interceptors);

checkArgument(conf.getReceiverQueueSize() > 0,
"Receiver queue size needs to be greater than 0 for Topics Consumer");
Expand Down Expand Up @@ -316,7 +319,8 @@ private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) {
CompletableFuture<Message<T>> receivedFuture = nextPendingReceive();
if (receivedFuture != null) {
unAckedMessageTracker.add(topicMessage.getMessageId(), topicMessage.getRedeliveryCount());
completePendingReceive(receivedFuture, topicMessage);
final Message<T> interceptMessage = beforeConsume(topicMessage);
completePendingReceive(receivedFuture, interceptMessage);
} else if (enqueueMessageAndCheckBatchReceive(topicMessage) && hasPendingBatchReceive()) {
notifyPendingBatchReceivedCallBack();
}
Expand Down Expand Up @@ -369,7 +373,7 @@ protected Message<T> internalReceive() throws PulsarClientException {
checkState(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount());
resumeReceivingFromPausedConsumersIfNeeded();
return message;
return beforeConsume(message);
} catch (Exception e) {
ExceptionHandler.handleInterruptedException(e);
throw PulsarClientException.unwrap(e);
Expand All @@ -390,7 +394,7 @@ protected Message<T> internalReceive(long timeout, TimeUnit unit) throws PulsarC
trackUnAckedMsgIfNoListener(message.getMessageId(), message.getRedeliveryCount());
}
resumeReceivingFromPausedConsumersIfNeeded();
return message;
return beforeConsume(message);
} catch (Exception e) {
ExceptionHandler.handleInterruptedException(e);
throw PulsarClientException.unwrap(e);
Expand Down Expand Up @@ -447,7 +451,7 @@ protected CompletableFuture<Message<T>> internalReceiveAsync() {
checkState(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount());
resumeReceivingFromPausedConsumersIfNeeded();
result.complete(message);
result.complete(beforeConsume(message));
}
});
return result;
Expand Down Expand Up @@ -1185,7 +1189,7 @@ private ConsumerImpl<T> createInternalConsumer(ConsumerConfigurationData<T> conf
return ConsumerImpl.newConsumerImpl(client, partitionName,
configurationData, client.externalExecutorProvider(),
partitionIndex, true, listener != null, subFuture,
startMessageId, schema, interceptors,
startMessageId, schema, this.internalConsumerInterceptors,
createIfDoesNotExist, startMessageRollbackDurationInSec);
}

Expand Down Expand Up @@ -1595,4 +1599,45 @@ private CompletableFuture<List<Integer>> getExistsPartitions(String topic) {
return list;
});
}

private ConsumerInterceptors<T> getInternalConsumerInterceptors(ConsumerInterceptors<T> interceptors) {
return new ConsumerInterceptors<T>(new ArrayList<>()) {

@Override
public Message<T> beforeConsume(Consumer<T> consumer, Message<T> message) {
return message;
}

@Override
public void onAcknowledge(Consumer<T> consumer, MessageId messageId, Throwable exception) {
interceptors.onAcknowledge(consumer, messageId, exception);
}

@Override
public void onAcknowledgeCumulative(Consumer<T> consumer,
MessageId messageId, Throwable exception) {
interceptors.onAcknowledgeCumulative(consumer, messageId, exception);
}

@Override
public void onNegativeAcksSend(Consumer<T> consumer, Set<MessageId> set) {
interceptors.onNegativeAcksSend(consumer, set);
}

@Override
public void onAckTimeoutSend(Consumer<T> consumer, Set<MessageId> set) {
interceptors.onAckTimeoutSend(consumer, set);
}

@Override
public void onPartitionsChange(String topicName, int partitions) {
interceptors.onPartitionsChange(topicName, partitions);
}

@Override
public void close() throws IOException {
interceptors.close();
}
};
}
}
Loading