Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client]Duplicate messages when use MultiTopicsConsumerImpl #17443

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.AllArgsConstructor;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
Expand Down Expand Up @@ -85,6 +86,10 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
// shared incoming queue was full
private final ConcurrentLinkedQueue<ConsumerImpl<T>> pausedConsumers;

private final ConcurrentLinkedQueue<CompletableFuture<Void>> receiveMessageFutures;

private final ConcurrentLinkedQueue<PendingReceiveFromConsumer> pendingReceivesFromConsumer;

// sum of topicPartitions, simple topic has 1, partitioned topic equals to partition number.
AtomicInteger allTopicPartitionsNumber;

Expand All @@ -100,6 +105,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {

private volatile BatchMessageIdImpl startMessageId = null;
private final long startMessageRollbackDurationInSec;

MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf,
ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist) {
Expand Down Expand Up @@ -132,10 +138,11 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {

checkArgument(conf.getReceiverQueueSize() > 0,
"Receiver queue size needs to be greater than 0 for Topics Consumer");

this.partitionedTopics = new ConcurrentHashMap<>();
this.consumers = new ConcurrentHashMap<>();
this.pausedConsumers = new ConcurrentLinkedQueue<>();
this.receiveMessageFutures = new ConcurrentLinkedQueue<>();
this.pendingReceivesFromConsumer = new ConcurrentLinkedQueue<>();
this.allTopicPartitionsNumber = new AtomicInteger(0);
this.startMessageId = startMessageId != null
? new BatchMessageIdImpl(MessageIdImpl.convertToMessageIdImpl(startMessageId))
Expand Down Expand Up @@ -245,13 +252,16 @@ private void startReceivingMessages(List<ConsumerImpl<T>> newConsumers) {
}

private void receiveMessageFromConsumer(ConsumerImpl<T> consumer, boolean batchReceive) {
if (paused){
pendingReceivesFromConsumer.add(new PendingReceiveFromConsumer(consumer, batchReceive));
}
CompletableFuture<List<Message<T>>> messagesFuture;
if (batchReceive) {
messagesFuture = consumer.batchReceiveAsync().thenApply(msgs -> ((MessagesImpl<T>) msgs).getMessageList());
} else {
messagesFuture = consumer.receiveAsync().thenApply(Collections::singletonList);
}
messagesFuture.thenAcceptAsync(messages -> {
CompletableFuture receiveMessageFuture = messagesFuture.thenAcceptAsync(messages -> {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Receive message from sub consumer:{}",
topic, subscription, consumer.getTopic());
Expand Down Expand Up @@ -288,6 +298,10 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer, boolean batchR
.schedule(() -> receiveMessageFromConsumer(consumer, true), 10, TimeUnit.SECONDS);
return null;
});
receiveMessageFuture.whenComplete((ignore, ex) -> {
receiveMessageFutures.remove(receiveMessageFutures);
});
receiveMessageFutures.add(receiveMessageFuture);
}

// Must be called from the internalPinnedExecutor thread
Expand Down Expand Up @@ -771,14 +785,15 @@ public void seek(Function<String, Object> function) throws PulsarClientException
}
}

@Override
public CompletableFuture<Void> seekAsync(Function<String, Object> function) {
List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
consumers.values().forEach(consumer -> futures.add(consumer.seekAsync(function)));
private void cleanAfterSeek(){
unAckedMessageTracker.clear();
incomingMessages.clear();
resetIncomingMessageSize();
return FutureUtil.waitForAll(futures);
}

@Override
public CompletableFuture<Void> seekAsync(Function<String, Object> function) {
return internalSeekAsync(consumer -> consumer.seekAsync(function));
}

@Override
Expand All @@ -789,20 +804,31 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {
new PulsarClientException("Illegal messageId, messageId can only be earliest/latest")
);
}
List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
consumers.values().forEach(consumerImpl -> futures.add(consumerImpl.seekAsync(targetMessageId)));

unAckedMessageTracker.clear();
clearIncomingMessages();

return FutureUtil.waitForAll(futures);
return internalSeekAsync(consumer -> consumer.seekAsync(messageId));
}

@Override
public CompletableFuture<Void> seekAsync(long timestamp) {
List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
consumers.values().forEach(consumer -> futures.add(consumer.seekAsync(timestamp)));
return FutureUtil.waitForAll(futures);
return internalSeekAsync(consumer -> consumer.seekAsync(timestamp));
}

private CompletableFuture<Void> internalSeekAsync(Function<Consumer, CompletableFuture<Void>> childSeekFunction) {
pause();
CompletableFuture<Void> res = FutureUtil.waitForAll(receiveMessageFutures).thenCompose(__ -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to wait for completion of these ongoing receives as the messages will be discarded anyway, can we just cancel them?

Copy link
Contributor Author

@poorbarcode poorbarcode Sep 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion, but we don't have the operability of cancel, and the waiting-event only happens on this scenario: call seek multi times quickly, along with read, this is very rare.

List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
consumers.values().forEach(consumer -> futures.add(childSeekFunction.apply(consumer)));
return FutureUtil.waitForAll(futures).thenAccept(ignore -> {
cleanAfterSeek();
resume();
});
});
res.whenComplete((ignore, ex) -> {
log.error("[{}] [{}] seek fail", topic, subscription, ex);
if (ex != null){
resume();
}
});
return res;
}

@Override
Expand Down Expand Up @@ -1310,6 +1336,12 @@ public void resume() {
synchronized (pauseMutex) {
paused = false;
consumers.forEach((name, consumer) -> consumer.resume());
internalPinnedExecutor.execute(() -> {
while (!pendingReceivesFromConsumer.isEmpty()){
PendingReceiveFromConsumer pendingReceive = pendingReceivesFromConsumer.poll();
receiveMessageFromConsumer((ConsumerImpl<T>) pendingReceive.consumer, pendingReceive.batchReceive);
}
});
}
}

Expand Down Expand Up @@ -1528,4 +1560,10 @@ protected void setCurrentReceiverQueueSize(int newSize) {
CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, newSize);
resumeReceivingFromPausedConsumersIfNeeded();
}

@AllArgsConstructor
private static class PendingReceiveFromConsumer {
private Consumer consumer;
private boolean batchReceive;
}
}