From d89f7c0b03744c6401c771e58cc601ffa55113ee Mon Sep 17 00:00:00 2001 From: Cong Zhao Date: Wed, 14 Aug 2024 10:26:47 +0800 Subject: [PATCH] [fix][client] Fix for early hit `beforeConsume` for MultiTopicConsumer (#23141) (cherry picked from commit c07b158f003c5a5623296189f0932d7058d2e75a) --- .../pulsar/client/api/InterceptorsTest.java | 45 +++++++++----- .../client/impl/MultiTopicsConsumerImpl.java | 58 +++++++++++++++++-- 2 files changed, 84 insertions(+), 19 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java index b3f5ed3b487d6..8f239aea1f0b4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.client.api; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -27,8 +29,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; - -import com.google.common.collect.Sets; import org.apache.commons.lang3.RandomUtils; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.TopicMessageImpl; @@ -66,6 +66,12 @@ public Object[][] getReceiverQueueSize() { return new Object[][] { { 0 }, { 1000 } }; } + @DataProvider(name = "topics") + public Object[][] getTopics() { + return new Object[][] {{Lists.newArrayList("persistent://my-property/my-ns/my-topic") }, + { Lists.newArrayList("persistent://my-property/my-ns/my-topic", "persistent://my-property/my-ns/my-topic1") }}; + } + @Test public void testProducerInterceptor() throws Exception { Map> ackCallback = new HashMap<>(); @@ -390,9 +396,9 @@ public void close() { @Override public Message beforeConsume(Consumer consumer, Message message) { - MessageImpl msg = (MessageImpl) message; + MessageImpl msg = ((MessageImpl) ((TopicMessageImpl) message).getMessage()); msg.getMessageBuilder().addProperty().setKey("beforeConsumer").setValue("1"); - return msg; + return message; } @Override @@ -436,13 +442,19 @@ public void onAckTimeoutSend(Consumer consumer, Set messageId int keyCount = 0; for (int i = 0; i < 2; i++) { - Message received = consumer.receive(); + Message received; + if (i % 2 == 0) { + received = consumer.receive(); + } else { + received = consumer.receiveAsync().join(); + } MessageImpl msg = (MessageImpl) ((TopicMessageImpl) received).getMessage(); for (KeyValue keyValue : msg.getMessageBuilder().getPropertiesList()) { if ("beforeConsumer".equals(keyValue.getKey())) { keyCount++; } } + Assert.assertEquals(keyCount, i + 1); consumer.acknowledge(received); } Assert.assertEquals(2, keyCount); @@ -462,9 +474,9 @@ public void close() { @Override public Message beforeConsume(Consumer consumer, Message message) { - MessageImpl msg = (MessageImpl) message; + MessageImpl msg = ((MessageImpl) ((TopicMessageImpl) message).getMessage()); msg.getMessageBuilder().addProperty().setKey("beforeConsumer").setValue("1"); - return msg; + return message; } @Override @@ -599,8 +611,8 @@ public void onAckTimeoutSend(Consumer consumer, Set messageId consumer.close(); } - @Test - public void testConsumerInterceptorForNegativeAcksSend() throws PulsarClientException, InterruptedException { + @Test(dataProvider = "topics") + public void testConsumerInterceptorForNegativeAcksSend(List topics) throws PulsarClientException, InterruptedException { final int totalNumOfMessages = 100; CountDownLatch latch = new CountDownLatch(totalNumOfMessages / 2); @@ -627,6 +639,7 @@ public void onAcknowledgeCumulative(Consumer consumer, MessageId message @Override public void onNegativeAcksSend(Consumer consumer, Set messageIds) { + Assert.assertTrue(latch.getCount() > 0); messageIds.forEach(messageId -> latch.countDown()); } @@ -637,7 +650,7 @@ public void onAckTimeoutSend(Consumer consumer, Set messageId }; Consumer consumer = pulsarClient.newConsumer(Schema.STRING) - .topic("persistent://my-property/my-ns/my-topic") + .topics(topics) .subscriptionType(SubscriptionType.Failover) .intercept(interceptor) .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS) @@ -645,7 +658,7 @@ public void onAckTimeoutSend(Consumer consumer, Set messageId .subscribe(); Producer producer = pulsarClient.newProducer(Schema.STRING) - .topic("persistent://my-property/my-ns/my-topic") + .topic(topics.get(0)) .create(); for (int i = 0; i < totalNumOfMessages; i++) { @@ -669,8 +682,9 @@ public void onAckTimeoutSend(Consumer consumer, Set messageId consumer.close(); } - @Test - public void testConsumerInterceptorForAckTimeoutSend() throws PulsarClientException, InterruptedException { + @Test(dataProvider = "topics") + public void testConsumerInterceptorForAckTimeoutSend(List topics) throws PulsarClientException, + InterruptedException { final int totalNumOfMessages = 100; CountDownLatch latch = new CountDownLatch(totalNumOfMessages / 2); @@ -701,16 +715,17 @@ public void onNegativeAcksSend(Consumer consumer, Set message @Override public void onAckTimeoutSend(Consumer consumer, Set messageIds) { + Assert.assertTrue(latch.getCount() > 0); messageIds.forEach(messageId -> latch.countDown()); } }; Producer producer = pulsarClient.newProducer(Schema.STRING) - .topic("persistent://my-property/my-ns/my-topic") + .topic(topics.get(0)) .create(); Consumer consumer = pulsarClient.newConsumer(Schema.STRING) - .topic("persistent://my-property/my-ns/my-topic") + .topics(topics) .subscriptionName("foo") .intercept(interceptor) .ackTimeout(2, TimeUnit.SECONDS) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index b01c25d215bdd..be618744180cc 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -26,6 +26,7 @@ import com.google.common.collect.Lists; import io.netty.util.Timeout; import io.netty.util.TimerTask; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -104,6 +105,7 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { private volatile BatchMessageIdImpl startMessageId = null; private final long startMessageRollbackDurationInSec; + private final ConsumerInterceptors internalConsumerInterceptors; MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData conf, ExecutorProvider executorProvider, CompletableFuture> subscribeFuture, Schema schema, ConsumerInterceptors interceptors, boolean createTopicIfDoesNotExist) { @@ -133,6 +135,11 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { long startMessageRollbackDurationInSec) { super(client, singleTopic, conf, Math.max(2, conf.getReceiverQueueSize()), executorProvider, subscribeFuture, schema, interceptors); + if (interceptors != null) { + this.internalConsumerInterceptors = getInternalConsumerInterceptors(interceptors); + } else { + this.internalConsumerInterceptors = null; + } checkArgument(conf.getReceiverQueueSize() > 0, "Receiver queue size needs to be greater than 0 for Topics Consumer"); @@ -315,7 +322,8 @@ private void messageReceived(ConsumerImpl consumer, Message message) { CompletableFuture> receivedFuture = nextPendingReceive(); if (receivedFuture != null) { unAckedMessageTracker.add(topicMessage.getMessageId(), topicMessage.getRedeliveryCount()); - completePendingReceive(receivedFuture, topicMessage); + final Message interceptMessage = beforeConsume(topicMessage); + completePendingReceive(receivedFuture, interceptMessage); } else if (enqueueMessageAndCheckBatchReceive(topicMessage) && hasPendingBatchReceive()) { notifyPendingBatchReceivedCallBack(); } @@ -366,7 +374,7 @@ protected Message internalReceive() throws PulsarClientException { } unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount()); resumeReceivingFromPausedConsumersIfNeeded(); - return message; + return beforeConsume(message); } catch (Exception e) { throw PulsarClientException.unwrap(e); } @@ -393,6 +401,7 @@ protected Message internalReceive(long timeout, TimeUnit unit) throws PulsarC } } unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount()); + message = beforeConsume(message); } resumeReceivingFromPausedConsumersIfNeeded(); return message; @@ -463,7 +472,7 @@ protected CompletableFuture> internalReceiveAsync() { checkState(message instanceof TopicMessageImpl); unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount()); resumeReceivingFromPausedConsumersIfNeeded(); - result.complete(message); + result.complete(beforeConsume(message)); } }); return result; @@ -1119,7 +1128,7 @@ private ConsumerImpl createInternalConsumer(ConsumerConfigurationData conf return ConsumerImpl.newConsumerImpl(client, partitionName, configurationData, client.externalExecutorProvider(), partitionIndex, true, listener != null, subFuture, - startMessageId, schema, interceptors, + startMessageId, schema, this.internalConsumerInterceptors, createIfDoesNotExist, startMessageRollbackDurationInSec); } @@ -1503,4 +1512,45 @@ public void tryAcknowledgeMessage(Message msg) { acknowledgeCumulativeAsync(msg); } } + + private ConsumerInterceptors getInternalConsumerInterceptors(ConsumerInterceptors multiTopicInterceptors) { + return new ConsumerInterceptors(new ArrayList<>()) { + + @Override + public Message beforeConsume(Consumer consumer, Message message) { + return message; + } + + @Override + public void onAcknowledge(Consumer consumer, MessageId messageId, Throwable exception) { + multiTopicInterceptors.onAcknowledge(consumer, messageId, exception); + } + + @Override + public void onAcknowledgeCumulative(Consumer consumer, + MessageId messageId, Throwable exception) { + multiTopicInterceptors.onAcknowledgeCumulative(consumer, messageId, exception); + } + + @Override + public void onNegativeAcksSend(Consumer consumer, Set set) { + multiTopicInterceptors.onNegativeAcksSend(consumer, set); + } + + @Override + public void onAckTimeoutSend(Consumer consumer, Set set) { + multiTopicInterceptors.onAckTimeoutSend(consumer, set); + } + + @Override + public void onPartitionsChange(String topicName, int partitions) { + multiTopicInterceptors.onPartitionsChange(topicName, partitions); + } + + @Override + public void close() throws IOException { + multiTopicInterceptors.close(); + } + }; + } }