-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][broker] Fix multiple race conditions in topic unloading and loading #20540
base: master
Are you sure you want to change the base?
[fix][broker] Fix multiple race conditions in topic unloading and loading #20540
Conversation
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
Show resolved
Hide resolved
@lhotari A quick update, I think these are helpful for these issues
|
@poorbarcode the PRs you referenced don't seem to make this current PR #20540 obsolete. Please review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work @lhotari. I left a few comments.
for (TopicLoadingContext topicLoadingContext = getNextPendingTopic(); topicLoadingContext != null; | ||
topicLoadingContext = getNextPendingTopic()) { | ||
topicLoadingContext.getTopicFuture() | ||
.completeExceptionally(new NotAllowedException("Broker is shutting down")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: what do you think about ServiceNotReady
instead? I haven't looked too closely, but NotAllowed
might not give the right client side behavior:
pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
Lines 786 to 789 in 21c7c62
if (error.getError() == ServerError.NotAllowedError) { | |
log.error("Get not allowed error, {}", error.getMessage()); | |
connectionFuture.completeExceptionally(new PulsarClientException.NotAllowedException(error.getMessage())); | |
} |
TopicLoadingContext pendingTopic = pendingTopicLoadingQueue.poll(); | ||
if (!pulsar().isRunning()) { | ||
log.warn("Pulsar is not running, skip create pending topic"); | ||
failPendingTopics(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the purpose of calling this here if the closeAsync
method also calls it? The logic is correct, but it seems like we could skip this line and leave the cleanup logic to the closeAsync
method.
private void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle, | ||
CompletableFuture<Optional<Topic>> createTopicFuture) { | ||
String bundleName = namespaceBundle.toString(); | ||
private void removeTopicFromCache(String topic, | ||
String bundleName, CompletableFuture<Optional<Topic>> createTopicFuture) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I prefer to keep the types as long as possible to prevent misuse of a method in the future. Why remove the NamespaceBundle
type here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that keeping types would make sense. In this case, there was already String topic
so I thought that it's more consistent to use String also for the namespace bundle. We could also take it to another direction. :)
Codecov Report
@@ Coverage Diff @@
## master #20540 +/- ##
=============================================
- Coverage 73.00% 36.89% -36.12%
+ Complexity 32096 12083 -20013
=============================================
Files 1868 1691 -177
Lines 138999 129449 -9550
Branches 15292 14128 -1164
=============================================
- Hits 101479 47760 -53719
- Misses 29485 75426 +45941
+ Partials 8035 6263 -1772
Flags with carried forward coverage won't be shown. Click here to find out more.
|
Yes |
The pr had no activity for 30 days, mark with Stale label. |
LGTM. Is there a way to add a test to verify this? |
The pr had no activity for 30 days, mark with Stale label. |
Thanks for the fix, I tried using this PR but still encountered this problem on rolling restart bookies and was unable to consume until I restarted all the broker. I will try to troubleshoot this issue. Pulsar version: v3.1.1 log: WARN org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/default2/input_test2-partition-1] [subtest15] Error connecting to broker: org.apache.pulsar.client.api.PulsarClientException$LookupException: {"errorMsg":"Topic is temporarily unavailable","reqId":1138682772387920139, "remote":"21.21.47.12/21.21.47.12:6650", "local":"/11.145.30.236:60372"}
2023-12-05T19:03:59,569+0800 [pulsar-client-io-1-1] WARN org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/default2/input_test2-partition-1] [subtest15] Could not get connection to broker: org.apache.pulsar.client.api.PulsarClientException$LookupException: {"errorMsg":"Topic is temporarily unavailable","reqId":1138682772387920139, "remote":"21.21.47.12/21.21.47.12:6650", "local":"/11.145.30.236:60372"} -- Will try again in 0.779 s` |
bc9a806
to
76961f4
Compare
…ding - possible related issues are apache#5284, apache#14941 and apache#20526
76961f4
to
744581f
Compare
This test app ends up with a topic being fenced: https://github.com/lhotari/pulsar-playground/blob/master/src/main/java/com/github/lhotari/pulsar/playground/TestScenarioUnloading.java , at least with Pulsar 3.1.1 ( |
Motivation
In Pulsar, there's a unresolved issue around topic unloading and loading which could lead to "Attempting to add producer to a fenced topic" & "Topic is temporarily unavailable" errors.
Very recently #20526 was reported against Pulsar 2.11.1. Other related issues are #5284, #14941 and multiple others.
Modifications
false
, this mean that the topic was possibly unloaded concurrently and the newly loaded instance should be closed.maxConcurrentTopicLoadRequest
(default:5000
). Stop continuing to process thependingTopicLoadingQueue
if Pulsar broker is closing. Skip loading topics which have a topic future that has been completed (signals topic unloading).Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: lhotari#150