Regarding receiverQueueSize and multitopic consumer behavior #22317
Unanswered
Debashish-Mallick
asked this question in
Q&A
Replies: 1 comment
-
hi Team, |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Hi Everyone,
Pulsar allowed receiveQ size as 0 for a single topic consumer .
We have the following scenario.
Assume that we have 5 topics. We want to have a single Shared Subscription consumer across the 5 topics to consume the message in round robin fashion across the 5 topics. We do not want the consumer to hold the message in receive queue. The consumer needs to pick the message only it intends to handle and handle over the message to some other process. In my case, the number of topics 5 is not constant. It can keep changing. To handle the above situation with multi-topic consumer, when a new topic gets added, I am trying to close the existing consumer and create a new consumer with additional topics. Also, please note that the topics will be across the pulsar tenants (i.e, the topics will not be from same single pulsar tenant).
I am trying to use multitopic consumer and setting receiverQueueSize as 1 (as this queue size should be >0). then after some time I am creating one more new consumer with the newly added topic from different tenant, and started receiving message from newly created consumer .
Below are the observations , need to know weather this is valid behavior in pulsar.
With the new consumer messages are receiving properly for newly added topic.
But for old consumer we have 3 messages left ( checked availablePermits for old consumer , It was showing 0) ,
and once we close the old consumer , these 3 messages started consuming in new consumer with the same message id with the redelivery count as 1.
Tried with new pulsar client and new consumer name while creation of new consumer , behavior is same.
allTopicsConsumerOld = pulsarClient.newConsumer() .topic(**"pulsartenant_1/default/test"**) // this is for multiconsumer .consumerName("my-consumer") .subscriptionName("my-sub") .subscriptionType(SubscriptionType.Shared) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .receiverQueueSize(1) .subscribe();
allTopicsConsumerNew = pulsarClient.newConsumer() .topic(**"pulsartenant_1/default/test","pulsartenant_2/default/test"**) // this is for multiconsumer .consumerName("my-consumer") .subscriptionName("my-sub") .subscriptionType(SubscriptionType.Shared) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .receiverQueueSize(1) .subscribe();
Beta Was this translation helpful? Give feedback.
All reactions