Skip to content

Commit

Permalink
[fix][client]Duplicate messages when use MultiTopicsConsumerImpl
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode committed Sep 2, 2022
1 parent 1bf9a26 commit bcfde76
Showing 1 changed file with 55 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.AllArgsConstructor;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
Expand Down Expand Up @@ -85,6 +86,10 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
// shared incoming queue was full
private final ConcurrentLinkedQueue<ConsumerImpl<T>> pausedConsumers;

private final ConcurrentLinkedQueue<CompletableFuture<Void>> receiveMessageFutures;

private final ConcurrentLinkedQueue<PendingReceiveFromConsumer> pendingReceivesFromConsumer;

// sum of topicPartitions, simple topic has 1, partitioned topic equals to partition number.
AtomicInteger allTopicPartitionsNumber;

Expand All @@ -100,6 +105,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {

private volatile BatchMessageIdImpl startMessageId = null;
private final long startMessageRollbackDurationInSec;

MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf,
ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist) {
Expand Down Expand Up @@ -132,10 +138,11 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {

checkArgument(conf.getReceiverQueueSize() > 0,
"Receiver queue size needs to be greater than 0 for Topics Consumer");

this.partitionedTopics = new ConcurrentHashMap<>();
this.consumers = new ConcurrentHashMap<>();
this.pausedConsumers = new ConcurrentLinkedQueue<>();
this.receiveMessageFutures = new ConcurrentLinkedQueue<>();
this.pendingReceivesFromConsumer = new ConcurrentLinkedQueue<>();
this.allTopicPartitionsNumber = new AtomicInteger(0);
this.startMessageId = startMessageId != null
? new BatchMessageIdImpl(MessageIdImpl.convertToMessageIdImpl(startMessageId))
Expand Down Expand Up @@ -245,13 +252,16 @@ private void startReceivingMessages(List<ConsumerImpl<T>> newConsumers) {
}

private void receiveMessageFromConsumer(ConsumerImpl<T> consumer, boolean batchReceive) {
if (paused){
pendingReceivesFromConsumer.add(new PendingReceiveFromConsumer(consumer, batchReceive));
}
CompletableFuture<List<Message<T>>> messagesFuture;
if (batchReceive) {
messagesFuture = consumer.batchReceiveAsync().thenApply(msgs -> ((MessagesImpl<T>) msgs).getMessageList());
} else {
messagesFuture = consumer.receiveAsync().thenApply(Collections::singletonList);
}
messagesFuture.thenAcceptAsync(messages -> {
CompletableFuture receiveMessageFuture = messagesFuture.thenAcceptAsync(messages -> {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Receive message from sub consumer:{}",
topic, subscription, consumer.getTopic());
Expand Down Expand Up @@ -288,6 +298,10 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer, boolean batchR
.schedule(() -> receiveMessageFromConsumer(consumer, true), 10, TimeUnit.SECONDS);
return null;
});
receiveMessageFuture.whenComplete((ignore, ex) -> {
receiveMessageFutures.remove(receiveMessageFutures);
});
receiveMessageFutures.add(receiveMessageFuture);
}

// Must be called from the internalPinnedExecutor thread
Expand Down Expand Up @@ -771,14 +785,15 @@ public void seek(Function<String, Object> function) throws PulsarClientException
}
}

@Override
public CompletableFuture<Void> seekAsync(Function<String, Object> function) {
List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
consumers.values().forEach(consumer -> futures.add(consumer.seekAsync(function)));
private void cleanAfterSeek(){
unAckedMessageTracker.clear();
incomingMessages.clear();
resetIncomingMessageSize();
return FutureUtil.waitForAll(futures);
}

@Override
public CompletableFuture<Void> seekAsync(Function<String, Object> function) {
return internalSeekAsync(consumer -> consumer.seekAsync(function));
}

@Override
Expand All @@ -789,20 +804,31 @@ public CompletableFuture<Void> seekAsync(MessageId messageId) {
new PulsarClientException("Illegal messageId, messageId can only be earliest/latest")
);
}
List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
consumers.values().forEach(consumerImpl -> futures.add(consumerImpl.seekAsync(targetMessageId)));

unAckedMessageTracker.clear();
clearIncomingMessages();

return FutureUtil.waitForAll(futures);
return internalSeekAsync(consumer -> consumer.seekAsync(messageId));
}

@Override
public CompletableFuture<Void> seekAsync(long timestamp) {
List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
consumers.values().forEach(consumer -> futures.add(consumer.seekAsync(timestamp)));
return FutureUtil.waitForAll(futures);
return internalSeekAsync(consumer -> consumer.seekAsync(timestamp));
}

private CompletableFuture<Void> internalSeekAsync(Function<Consumer, CompletableFuture<Void>> childSeekFunction) {
pause();
CompletableFuture<Void> res = FutureUtil.waitForAll(receiveMessageFutures).thenCompose(__ -> {
List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
consumers.values().forEach(consumer -> futures.add(childSeekFunction.apply(consumer)));
return FutureUtil.waitForAll(futures).thenAccept(ignore -> {
cleanAfterSeek();
resume();
});
});
res.whenComplete((ignore, ex) -> {
log.error("[{}] [{}] seek fail", topic, subscription, ex);
if (ex != null){
resume();
}
});
return res;
}

@Override
Expand Down Expand Up @@ -1310,6 +1336,12 @@ public void resume() {
synchronized (pauseMutex) {
paused = false;
consumers.forEach((name, consumer) -> consumer.resume());
internalPinnedExecutor.execute(() -> {
while (!pendingReceivesFromConsumer.isEmpty()){
PendingReceiveFromConsumer pendingReceive = pendingReceivesFromConsumer.poll();
receiveMessageFromConsumer((ConsumerImpl<T>) pendingReceive.consumer, pendingReceive.batchReceive);
}
});
}
}

Expand Down Expand Up @@ -1528,4 +1560,10 @@ protected void setCurrentReceiverQueueSize(int newSize) {
CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, newSize);
resumeReceivingFromPausedConsumersIfNeeded();
}

@AllArgsConstructor
private static class PendingReceiveFromConsumer {
private Consumer consumer;
private boolean batchReceive;
}
}

0 comments on commit bcfde76

Please sign in to comment.