-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][client] Fix duplicate messages caused by seek #16171
Conversation
/pulsarbot rerun-failure-checks |
Why didn't it fail before? |
@BewareMyPower I don't know how to explain, the master branch cannot reproduce this issue, but the #15568 can reproduce this issue. |
Did you mean after applying the changes of #15568 will this test fail? |
Yes, could you take a look this issue? |
Sure. |
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
Outdated
Show resolved
Hide resolved
Yes this PR fixes the race condition when org.apache.pulsar.client.api.Consumer<String> consumer = pulsarClient
.newConsumer(Schema.STRING).startMessageIdInclusive()
.topics(Arrays.asList(topicName, topicName2)).subscriptionName("my-sub").subscribe();
Thread.sleep(2000);
consumer.seek((partitionedTopic) -> { I think the root cause is that |
|
79a9fae
to
6b8ef55
Compare
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
6b8ef55
to
25abae8
Compare
…ting logic for better handle the checkpoint. (#19972) * Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171
…ting logic for better handle the checkpoint. (apache#19972) * Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171 (cherry picked from commit 18d21a0)
…ting logic for better handle the checkpoint. (apache#19972) * Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171
…ting logic for better handle the checkpoint. (apache#19972) * Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171
…ting logic for better handle the checkpoint. (apache#19972) * Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171
…ting logic for better handle the checkpoint. (apache#19972) * Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171
…ting logic for better handle the checkpoint. (#19972) (#20565) * Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171
…ting logic for better handle the checkpoint. (#19972) (#20564) * Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171
The pr had no activity for 30 days, mark with Stale label. |
@@ -252,6 +252,10 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer, boolean batchR | |||
messagesFuture = consumer.receiveAsync().thenApply(Collections::singletonList); | |||
} | |||
messagesFuture.thenAcceptAsync(messages -> { | |||
if (consumer.isDuringSeek()) { | |||
receiveMessageFromConsumer(consumer, batchReceive); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When incomingMessages
is not empty, there are two possibilities:
- loop until there is no message in the
incomingMessages
- If
ConsumerImpl.seek
fails, the messages have popped fromincomingMessages
cannot be consumed untilredeliver
executes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also have a question about this part.
It looks like not only from the MultiTopicConsumer. If it is from the user side, using a consumer to receive the messages and another thread try to seek the subscription to another position, they will also receive the duplicated messages right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2. If
ConsumerImpl.seek
fails, the messages have popped fromincomingMessages
cannot be consumed untilredeliver
executes
Good catch! We need to consider this.
It looks like not only from the MultiTopicConsumer.
Yes.
If it is from the user side, using a consumer to receive the messages and another thread try to seek the subscription to another position, they will also receive the duplicated messages right?
For MultiTopicConsumer
, this is right, because the MultiTopicConsumer
has a loop to pulling the messsage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I guess the fix can only fix the case that the MultiTopicConsumer poll messages from the internal consumer, but it can't fix the issue that the user facing. After the message polled from the internal consumer to the queue of the MultiTopicConsumer, the user will still have chance to get duplicated messages during the seek operation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I guess the fix can only fix the case that the MultiTopicConsumer poll messages from the internal consumer.
Right.
the user will still have chance to get duplicated messages during the seek operation?
I think we need to figure out the details of this seek.
- When to clean
incomingMessages
, after or before seek? - During the seek, can the client continue to consume the
incomingMessages
?
…ting logic for better handle the checkpoint. (apache#19972) * Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171
…ting logic for better handle the checkpoint. (#19972) * Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171
…ting logic for better handle the checkpoint. (#19972) * Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171
…ting logic for better handle the checkpoint. (#19972) * Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171
No reviewer, so close this PR. |
Signed-off-by: Zixuan Liu nodeces@gmail.com
Motivation
When subscribing to a message on a partitioned topic, do the seek operation and then consume the messages, which sometimes will receive duplicate messages.
The root cause is that when a seek operation is performed if have the task in the
pendingReceives
queue, we will get old messages from before doing the seek operation.Modifications
incomingMessages
queueVerifying this change
org.apache.pulsar.broker.service.SubscriptionSeekTest#testSeekByFunctionAndMultiTopic
cover this changes.Documentation
doc-not-needed