From 3305c0a7f5edddff5c6f7c987aa9ca7b11d68f35 Mon Sep 17 00:00:00 2001 From: vbhat6 Date: Fri, 26 Jul 2024 18:56:55 +0530 Subject: [PATCH 1/7] [fix][client] Fix for multiple hit at the interceptor for MultiTopicConsumer --- .../apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 4e5b577b1a986..8f74187ccc4bd 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -243,7 +243,11 @@ private void startReceivingMessages(List> newConsumers) { if (getState() == State.Ready) { newConsumers.forEach(consumer -> { consumer.increaseAvailablePermits(consumer.getConnectionHandler().cnx(), conf.getReceiverQueueSize()); - internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer, true)); + if (conf.getBatchReceivePolicy() != null) { + internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer, true)); + } else { + internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer, false)); + } }); } } From e0f72f80e5ec1c12fc53cd54ff459c626d112a04 Mon Sep 17 00:00:00 2001 From: vbhat6 Date: Tue, 30 Jul 2024 10:23:03 +0530 Subject: [PATCH 2/7] Revert "[fix][client] Fix for multiple hit at the interceptor for MultiTopicConsumer" This reverts commit 3305c0a7f5edddff5c6f7c987aa9ca7b11d68f35. --- .../apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 8f74187ccc4bd..4e5b577b1a986 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -243,11 +243,7 @@ private void startReceivingMessages(List> newConsumers) { if (getState() == State.Ready) { newConsumers.forEach(consumer -> { consumer.increaseAvailablePermits(consumer.getConnectionHandler().cnx(), conf.getReceiverQueueSize()); - if (conf.getBatchReceivePolicy() != null) { - internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer, true)); - } else { - internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer, false)); - } + internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer, true)); }); } } From 5705a1df67c227df787f091dd0a8e5ea2f24492e Mon Sep 17 00:00:00 2001 From: vbhat6 Date: Tue, 30 Jul 2024 11:46:45 +0530 Subject: [PATCH 3/7] [fix][client] Fix for multiple hit at the interceptor for MultiTopicConsumer --- .../main/java/org/apache/pulsar/client/impl/ConsumerBase.java | 4 ++-- .../main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 33c2a3cc266cf..7db5c9d1c89e0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -877,8 +877,8 @@ protected final void notifyPendingBatchReceivedCallBack(OpBatchReceive opBatc Message msg = incomingMessages.poll(); if (msg != null) { messageProcessed(msg); - Message interceptMsg = beforeConsume(msg); - messages.add(interceptMsg); + //Message interceptMsg = beforeConsume(msg); + messages.add(msg); } msgPeeked = incomingMessages.peek(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 190ddd8b435cb..86bcc9b49660e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -532,8 +532,8 @@ protected CompletableFuture> internalBatchReceiveAsync() { msgPeeked = incomingMessages.peek(); continue; } - Message interceptMsg = beforeConsume(msg); - messages.add(interceptMsg); + //Message interceptMsg = beforeConsume(msg); + messages.add(msg); } msgPeeked = incomingMessages.peek(); } From 2b6df0af3aa544d668683a96e408f2f4803dd0ec Mon Sep 17 00:00:00 2001 From: vbhat6 Date: Mon, 5 Aug 2024 10:04:44 +0530 Subject: [PATCH 4/7] [fix][client] Fix for multiple hit at the interceptor for MultiTopicConsumer --- .../org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 4e5b577b1a986..4ffc5f5f6cb7c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -379,6 +379,7 @@ protected Message internalReceive(long timeout, TimeUnit unit) throws PulsarC long callTime = System.nanoTime(); try { message = incomingMessages.poll(timeout, unit); + message = beforeConsume(message); if (message != null) { decreaseIncomingMessageSize(message); checkArgument(message instanceof TopicMessageImpl); From 36acf31fcad4c8f6ee11c58a1cdcbafbab823ff7 Mon Sep 17 00:00:00 2001 From: vbhat6 Date: Wed, 7 Aug 2024 11:48:17 +0530 Subject: [PATCH 5/7] [fix][client] Fix for multiple hit at the interceptor for MultiTopicConsumer --- .../java/org/apache/pulsar/client/impl/ConsumerBase.java | 4 ++-- .../java/org/apache/pulsar/client/impl/ConsumerImpl.java | 8 ++++---- .../pulsar/client/impl/MultiTopicsConsumerImpl.java | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 7db5c9d1c89e0..33c2a3cc266cf 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -877,8 +877,8 @@ protected final void notifyPendingBatchReceivedCallBack(OpBatchReceive opBatc Message msg = incomingMessages.poll(); if (msg != null) { messageProcessed(msg); - //Message interceptMsg = beforeConsume(msg); - messages.add(msg); + Message interceptMsg = beforeConsume(msg); + messages.add(interceptMsg); } msgPeeked = incomingMessages.peek(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 86bcc9b49660e..47e97ac0834a6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -532,8 +532,8 @@ protected CompletableFuture> internalBatchReceiveAsync() { msgPeeked = incomingMessages.peek(); continue; } - //Message interceptMsg = beforeConsume(msg); - messages.add(msg); + Message interceptMsg = beforeConsume(msg); + messages.add(interceptMsg); } msgPeeked = incomingMessages.peek(); } @@ -1490,9 +1490,9 @@ void notifyPendingReceivedCallback(final Message message, Exception exception private void interceptAndComplete(final Message message, final CompletableFuture> receivedFuture) { // call proper interceptor - final Message interceptMessage = beforeConsume(message); + //final Message interceptMessage = beforeConsume(message); // return message to receivedCallback - completePendingReceive(receivedFuture, interceptMessage); + completePendingReceive(receivedFuture, message); } void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata, MessageMetadata msgMetadata, diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 4ffc5f5f6cb7c..dd5d4687cd5c2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -1114,7 +1114,7 @@ private ConsumerImpl createInternalConsumer(ConsumerConfigurationData conf return ConsumerImpl.newConsumerImpl(client, partitionName, configurationData, client.externalExecutorProvider(), partitionIndex, true, listener != null, subFuture, - startMessageId, schema, interceptors, + startMessageId, schema, null, createIfDoesNotExist, startMessageRollbackDurationInSec); } From e930443f417b4033cf5a97d7de68c4fbf37bc7dd Mon Sep 17 00:00:00 2001 From: vbhat6 Date: Wed, 7 Aug 2024 11:49:20 +0530 Subject: [PATCH 6/7] [fix][client] Fix for multiple hit at the interceptor for MultiTopicConsumer --- .../main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 47e97ac0834a6..33da07488e7a9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1490,7 +1490,7 @@ void notifyPendingReceivedCallback(final Message message, Exception exception private void interceptAndComplete(final Message message, final CompletableFuture> receivedFuture) { // call proper interceptor - //final Message interceptMessage = beforeConsume(message); + final Message interceptMessage = beforeConsume(message); // return message to receivedCallback completePendingReceive(receivedFuture, message); } From d15a04de9cadb339a2b0a38be25fac04cef65486 Mon Sep 17 00:00:00 2001 From: vbhat6 Date: Wed, 7 Aug 2024 11:49:56 +0530 Subject: [PATCH 7/7] [fix][client] Fix for multiple hit at the interceptor for MultiTopicConsumer --- .../main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 33da07488e7a9..190ddd8b435cb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1492,7 +1492,7 @@ private void interceptAndComplete(final Message message, final CompletableFut // call proper interceptor final Message interceptMessage = beforeConsume(message); // return message to receivedCallback - completePendingReceive(receivedFuture, message); + completePendingReceive(receivedFuture, interceptMessage); } void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata, MessageMetadata msgMetadata,