-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][client] Fix for multiple hit at the interceptor for MultiTopicConsumer #23080
base: branch-2.10
Are you sure you want to change the base?
Conversation
…tiTopicConsumer" This reverts commit 3305c0a.
@@ -1113,7 +1114,7 @@ private ConsumerImpl<T> createInternalConsumer(ConsumerConfigurationData<T> conf | |||
return ConsumerImpl.newConsumerImpl(client, partitionName, | |||
configurationData, client.externalExecutorProvider(), | |||
partitionIndex, true, listener != null, subFuture, | |||
startMessageId, schema, interceptors, | |||
startMessageId, schema, null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can't remove interceptors from internal consumer directly this will make other interceptor methods not work
@@ -379,6 +379,7 @@ protected Message<T> internalReceive(long timeout, TimeUnit unit) throws PulsarC | |||
long callTime = System.nanoTime(); | |||
try { | |||
message = incomingMessages.poll(timeout, unit); | |||
message = beforeConsume(message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also need to call beforeConsume
on internalReceiveAsync、internalReceive(long timeout, TimeUnit unit) 、messageReceived
I open another PR(#23141) to fix this issue |
No description provided.