diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 4e5b577b1a986..dd5d4687cd5c2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -379,6 +379,7 @@ protected Message internalReceive(long timeout, TimeUnit unit) throws PulsarC long callTime = System.nanoTime(); try { message = incomingMessages.poll(timeout, unit); + message = beforeConsume(message); if (message != null) { decreaseIncomingMessageSize(message); checkArgument(message instanceof TopicMessageImpl); @@ -1113,7 +1114,7 @@ private ConsumerImpl createInternalConsumer(ConsumerConfigurationData conf return ConsumerImpl.newConsumerImpl(client, partitionName, configurationData, client.externalExecutorProvider(), partitionIndex, true, listener != null, subFuture, - startMessageId, schema, interceptors, + startMessageId, schema, null, createIfDoesNotExist, startMessageRollbackDurationInSec); }