diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 6f9f51ff25343..b7c9c9c4dbc3d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -48,6 +48,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; +import lombok.AllArgsConstructor; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.api.BatchReceivePolicy; import org.apache.pulsar.client.api.Consumer; @@ -85,6 +86,10 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { // shared incoming queue was full private final ConcurrentLinkedQueue> pausedConsumers; + private final ConcurrentLinkedQueue> receiveMessageFutures; + + private final ConcurrentLinkedQueue pendingReceivesFromConsumer; + // sum of topicPartitions, simple topic has 1, partitioned topic equals to partition number. AtomicInteger allTopicPartitionsNumber; @@ -100,6 +105,7 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { private volatile BatchMessageIdImpl startMessageId = null; private final long startMessageRollbackDurationInSec; + MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData conf, ExecutorProvider executorProvider, CompletableFuture> subscribeFuture, Schema schema, ConsumerInterceptors interceptors, boolean createTopicIfDoesNotExist) { @@ -132,10 +138,11 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { checkArgument(conf.getReceiverQueueSize() > 0, "Receiver queue size needs to be greater than 0 for Topics Consumer"); - this.partitionedTopics = new ConcurrentHashMap<>(); this.consumers = new ConcurrentHashMap<>(); this.pausedConsumers = new ConcurrentLinkedQueue<>(); + this.receiveMessageFutures = new ConcurrentLinkedQueue<>(); + this.pendingReceivesFromConsumer = new ConcurrentLinkedQueue<>(); this.allTopicPartitionsNumber = new AtomicInteger(0); this.startMessageId = startMessageId != null ? new BatchMessageIdImpl(MessageIdImpl.convertToMessageIdImpl(startMessageId)) @@ -245,13 +252,16 @@ private void startReceivingMessages(List> newConsumers) { } private void receiveMessageFromConsumer(ConsumerImpl consumer, boolean batchReceive) { + if (paused){ + pendingReceivesFromConsumer.add(new PendingReceiveFromConsumer(consumer, batchReceive)); + } CompletableFuture>> messagesFuture; if (batchReceive) { messagesFuture = consumer.batchReceiveAsync().thenApply(msgs -> ((MessagesImpl) msgs).getMessageList()); } else { messagesFuture = consumer.receiveAsync().thenApply(Collections::singletonList); } - messagesFuture.thenAcceptAsync(messages -> { + CompletableFuture receiveMessageFuture = messagesFuture.thenAcceptAsync(messages -> { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Receive message from sub consumer:{}", topic, subscription, consumer.getTopic()); @@ -288,6 +298,10 @@ private void receiveMessageFromConsumer(ConsumerImpl consumer, boolean batchR .schedule(() -> receiveMessageFromConsumer(consumer, true), 10, TimeUnit.SECONDS); return null; }); + receiveMessageFuture.whenComplete((ignore, ex) -> { + receiveMessageFutures.remove(receiveMessageFutures); + }); + receiveMessageFutures.add(receiveMessageFuture); } // Must be called from the internalPinnedExecutor thread @@ -771,14 +785,15 @@ public void seek(Function function) throws PulsarClientException } } - @Override - public CompletableFuture seekAsync(Function function) { - List> futures = new ArrayList<>(consumers.size()); - consumers.values().forEach(consumer -> futures.add(consumer.seekAsync(function))); + private void cleanAfterSeek(){ unAckedMessageTracker.clear(); incomingMessages.clear(); resetIncomingMessageSize(); - return FutureUtil.waitForAll(futures); + } + + @Override + public CompletableFuture seekAsync(Function function) { + return internalSeekAsync(consumer -> consumer.seekAsync(function)); } @Override @@ -789,20 +804,31 @@ public CompletableFuture seekAsync(MessageId messageId) { new PulsarClientException("Illegal messageId, messageId can only be earliest/latest") ); } - List> futures = new ArrayList<>(consumers.size()); - consumers.values().forEach(consumerImpl -> futures.add(consumerImpl.seekAsync(targetMessageId))); - - unAckedMessageTracker.clear(); - clearIncomingMessages(); - - return FutureUtil.waitForAll(futures); + return internalSeekAsync(consumer -> consumer.seekAsync(messageId)); } @Override public CompletableFuture seekAsync(long timestamp) { - List> futures = new ArrayList<>(consumers.size()); - consumers.values().forEach(consumer -> futures.add(consumer.seekAsync(timestamp))); - return FutureUtil.waitForAll(futures); + return internalSeekAsync(consumer -> consumer.seekAsync(timestamp)); + } + + private CompletableFuture internalSeekAsync(Function> childSeekFunction) { + pause(); + CompletableFuture res = FutureUtil.waitForAll(receiveMessageFutures).thenCompose(__ -> { + List> futures = new ArrayList<>(consumers.size()); + consumers.values().forEach(consumer -> futures.add(childSeekFunction.apply(consumer))); + return FutureUtil.waitForAll(futures).thenAccept(ignore -> { + cleanAfterSeek(); + resume(); + }); + }); + res.whenComplete((ignore, ex) -> { + log.error("[{}] [{}] seek fail", topic, subscription, ex); + if (ex != null){ + resume(); + } + }); + return res; } @Override @@ -1310,6 +1336,12 @@ public void resume() { synchronized (pauseMutex) { paused = false; consumers.forEach((name, consumer) -> consumer.resume()); + internalPinnedExecutor.execute(() -> { + while (!pendingReceivesFromConsumer.isEmpty()){ + PendingReceiveFromConsumer pendingReceive = pendingReceivesFromConsumer.poll(); + receiveMessageFromConsumer((ConsumerImpl) pendingReceive.consumer, pendingReceive.batchReceive); + } + }); } } @@ -1528,4 +1560,10 @@ protected void setCurrentReceiverQueueSize(int newSize) { CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, newSize); resumeReceivingFromPausedConsumersIfNeeded(); } + + @AllArgsConstructor + private static class PendingReceiveFromConsumer { + private Consumer consumer; + private boolean batchReceive; + } }