-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PIP-192: New Pulsar Broker Load Balancer #16691
Comments
The issue had no activity for 30 days, mark with Stale label. |
…18084) Master Issue: #16691 ### Motivation We will start raising PRs to implement PIP-192, #16691 ### Modifications The PR adds base classes for the new broker load balance project and does not integrate with the existing load balance logic. This PR should not impact the existing broker load balance behavior. For the pip-192 project, this PR * defines the base interface under `org.apache.pulsar.broker.loadbalance.extensible` package. * defines this `BrokerRegistry` public interface and its expected behaviors. * defines `BrokerFilter` interfaces. * defines `LoadDataReporter` interfaces. * defines `NamespaceBundleSplitStrategy` interfaces. * defines `LoadManagerScheduler` interfaces. * defines `NamespaceUnloadStrategy` interfaces. * defines `LoadDataStore` interfaces. * defines `ExtensibleLoadManager` interfaces. * defines `LoadManagerContext` interfaces. * defines `BrokerLoadData` and `BrokerLookupData` data classes.
…t-forward cursor behavior after compaction (#20110) Master Issue: #16691 ### Motivation Raising a PR to implement: #16691 After the compaction, the cursor can fast-forward to the compacted horizon when a large number of messages are compacted before the next read. Hence, ServiceUnitStateCompactionStrategy also needs to cover this case. Currently, the existing and slow(their states are far behind) tableviews with ServiceUnitStateCompactionStrategy could not accept those compacted messages. In the load balance extension context, this means the ownership data could be inconsistent among brokers. ### Modifications This PR - fixes ServiceUnitStateCompactionStrategy to accept the state data if its version is bigger than the current version +1. - (minor fix) does not repeatedly update the replication_clusters in the policies when creating the system namespace. This update redundantly triggers ZK watchers when restarting brokers. - sets closeWithoutWaitingClientDisconnect=true, upon unload(following the same setting as the modular LM's) (cherry picked from commit 6cfa468)
@merlimat this is marked for 3.0. Is everything done for this PIP ? Can it be closed ? |
Hi @heesung-sn thanks for introducing this great feature! I see some PRs related to this PIP were labeled with |
Hi, Yes we are working on docs too. @Demogorgon314 and I will raise PRs for the doc update. |
@heesung-sn @Demogorgon314 thanks! Feel free to ping me if you need a review. |
The code work is done, and we are working on documentation. |
PIP: #16691 ### Motivation When upgrading the pulsar version and changing the pulsar load manager to `ExtensibleLoadManagerImpl` it might cause NPE. The root cause is the old version of pulsar does not contain the `loadManagerClassName` field. ``` 2023-05-18T05:42:50,557+0000 [pulsar-io-4-1] INFO org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.6:51345] connected with role=[pulsarinstance-v3-0-n@test.dev](mailto:pulsarinstance-v3-0-n@test.dev) using authMethod=token, clientVersion=Pulsar Go 0.9.0, clientProtocolVersion=18, proxyVersion=null 2023-05-18T05:42:50,558+0000 [pulsar-io-4-1] WARN org.apache.pulsar.broker.lookup.TopicLookupBase - Failed to lookup [pulsarinstance-v3-0-n@test.dev](mailto:pulsarinstance-v3-0-n@test.dev) for topic persistent://xxx with error java.lang.NullPointerException: Cannot invoke “String.equals(Object)” because the return value of “org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData.getLoadManagerClassName()” is null java.util.concurrent.CompletionException: java.lang.NullPointerException: Cannot invoke “String.equals(Object)” because the return value of “org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData.getLoadManagerClassName()” is null at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315) ~[?:?] at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1194) ~[?:?] at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309) ~[?:?] at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.selectAsync(ExtensibleLoadManagerImpl.java:385) ~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1] at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.lambda$assign$6(ExtensibleLoadManagerImpl.java:336) ~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1] at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1187) ~[?:?] at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309) ~[?:?] at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.lambda$assign$10(ExtensibleLoadManagerImpl.java:333) ~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1] at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:409) ~[io.streamnative-pulsar-common-3.0.0.1.jar:3.0.0.1] at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:243) ~[io.streamnative-pulsar-common-3.0.0.1.jar:3.0.0.1] at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.assign(ExtensibleLoadManagerImpl.java:327) ~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1] at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper.findBrokerServiceUrl(ExtensibleLoadManagerWrapper.java:66) ~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1] at org.apache.pulsar.broker.namespace.NamespaceService.lambda$getBrokerServiceUrlAsync$0(NamespaceService.java:191) ~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1] ``` ### Modifications * Add null check when using`getLoadManagerClassName`. * Add test to cover this case. * Add `RedirectManager` unit test.
Currently, the system topics are non-partitioned and owned by a single leader. We can further partition these system topics with multi-leaders for really large clusters. This can be a follow-up task if requested. |
Should we use a single partition as system topic instead of non-partitioned topic? Since the non-partitioned topic can't transfer to a partitioned topic. If I understand correct. |
I expect the single partition could have some overhead over non-partitioned topic, but I agree that starting from a single partitioned topic can be easier to extend to multi-partitions in the future. For this first iteration, I think we can stick to non-partitioned topics. |
PIP: #16691 ### Motivation When upgrading the pulsar version and changing the pulsar load manager to `ExtensibleLoadManagerImpl` it might cause NPE. The root cause is the old version of pulsar does not contain the `loadManagerClassName` field. ``` 2023-05-18T05:42:50,557+0000 [pulsar-io-4-1] INFO org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.6:51345] connected with role=[pulsarinstance-v3-0-n@test.dev](mailto:pulsarinstance-v3-0-n@test.dev) using authMethod=token, clientVersion=Pulsar Go 0.9.0, clientProtocolVersion=18, proxyVersion=null 2023-05-18T05:42:50,558+0000 [pulsar-io-4-1] WARN org.apache.pulsar.broker.lookup.TopicLookupBase - Failed to lookup [pulsarinstance-v3-0-n@test.dev](mailto:pulsarinstance-v3-0-n@test.dev) for topic persistent://xxx with error java.lang.NullPointerException: Cannot invoke “String.equals(Object)” because the return value of “org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData.getLoadManagerClassName()” is null java.util.concurrent.CompletionException: java.lang.NullPointerException: Cannot invoke “String.equals(Object)” because the return value of “org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData.getLoadManagerClassName()” is null at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315) ~[?:?] at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1194) ~[?:?] at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309) ~[?:?] at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.selectAsync(ExtensibleLoadManagerImpl.java:385) ~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1] at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.lambda$assign$6(ExtensibleLoadManagerImpl.java:336) ~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1] at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1187) ~[?:?] at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309) ~[?:?] at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.lambda$assign$10(ExtensibleLoadManagerImpl.java:333) ~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1] at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:409) ~[io.streamnative-pulsar-common-3.0.0.1.jar:3.0.0.1] at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:243) ~[io.streamnative-pulsar-common-3.0.0.1.jar:3.0.0.1] at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.assign(ExtensibleLoadManagerImpl.java:327) ~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1] at org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper.findBrokerServiceUrl(ExtensibleLoadManagerWrapper.java:66) ~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1] at org.apache.pulsar.broker.namespace.NamespaceService.lambda$getBrokerServiceUrlAsync$0(NamespaceService.java:191) ~[io.streamnative-pulsar-broker-3.0.0.1.jar:3.0.0.1] ``` ### Modifications * Add null check when using`getLoadManagerClassName`. * Add test to cover this case. * Add `RedirectManager` unit test. (cherry picked from commit b7f0004)
…list in bundle admin API (#20528) PIP: #16691 ### Motivation When using `ExtensibleLoadManager` and list in bundle admin API, it will redirect forever because `isServiceUnitOwned` method is checking the `ownershipCache` as the ownership storage, however, when using `ExtensibleLoadManager`, it stored the ownership to table view. ### Modifications * Call `isServiceUnitOwnedAsync ` when using `isServiceUnitOwned `. * Add unit test to cover this case.
…list in bundle admin API (#20528) PIP: #16691 When using `ExtensibleLoadManager` and list in bundle admin API, it will redirect forever because `isServiceUnitOwned` method is checking the `ownershipCache` as the ownership storage, however, when using `ExtensibleLoadManager`, it stored the ownership to table view. * Call `isServiceUnitOwnedAsync ` when using `isServiceUnitOwned `. * Add unit test to cover this case.
|
Proposal: New Pulsar Broker Load Balancer
Motivation
As previously shared with the community, we observed many improvement areas around the Pulsar load balancer[1]. Since the improvement requires significant changes, first, we would like to share the overall goals for this project and the high-level components to design. This doc will highlight the architecture of the new broker load balancer.
Goals
We set up the project goals in the following areas.
User-facing goals
Logic
Logs / Metrics
Admin API / Configurations
Internal Implementation goals
Logic
Implementation
Logs / Metrics
Admin API / Configurations
Testing
API Changes
We will add the transfer unload option
--dest
to specifically unload the topic(bundle) to the destination broker.Implementation (High-Level Components)
New Load Manager
Load Data Models
LocalBrokerData: broker’s factual data
BrokerLoadData: broker’s load data
BundlesLoadData: bundle’s load data
TopBundlesLoadData: top-n high-loaded bundle load data from the broker
Load Data Write-Read Flow
LocalBrokerData
Write:
Read:
BrokerLoadData
Write:
Read:
BundlesLoadData
Write:
Read:
TopBundlesLoadData
Write:
Read:
Load Data Flow
Major Modifications on Bundle Split, Unload, and Assignment Flow
Bundle State Channel
This bundle state channel is a persistent topic table-view used as a WAL to broadcast the total order of all bundle state changes in the cluster. All brokers will asynchronously consume messages in this channel in the same order and react to bundle state changes(sequential consistency). With the table-view compaction, the bundle state channel will eventually materialize the current bundle-broker ownership. Read operations on this channel can be deferred(e.g., clients’ topic lookup requests) in a few seconds, depending on the current state of the bundle.
Bundle State Lifecycles
We define the following states and actions and linearize the bundle state changes.
(This is a high-level design to explain the concept here. The final version may differ.)
Bundle Actions
Bundle States
*New client connections to the bundle are deferred(with timeouts) in the Assigning state.
Bundle State Change Examples
The bundle state channel can be used like the followings.
Bundle Transfer Example
(State, Action) Sequence:
(Assigned, Transfer) => (Assigning, Return) => (Assigned,)
e.g. {key:bundleName, value:{flow:transfer, action:transfer, state:assigning, from:A, to:B}}}
Bundle Split Example
(State, Action) Sequence:
(Assigned, Split) => (Splitting, Unload | Create) => {(Unassigned, ) | (Assigned, ), (Assigned, )}
e.g. {key:bundleName, value:{flow: split, action:split, state: splitting, from: A, to: B, transfer: true}}}
a. After the “Split,” the owner broadcasts the children bundles’ ownership creation(state=assigned) and the parent bundle’s ownership unload(empty message).
b. By default, the owner publishes a message to the TopBundlesLoadData store asking the leader to unload(or transfer) the children bundles.
Bundle Assignment Example
(State, Action) Sequence:
(Unassigned, Own) => (Assigning, Return) => (Assigned,)
e.g. {key:bundleName, value:{flow: assignment, action:own, state:assigning, to: B}}}
Bundle-Broker Ownership State
Because the bundle state channel shows the current bundle-broker ownership, we can remove the redundant bundle ownership store(ZK znodes). Each broker will look up the bundle ownership channel to check which broker currently owns the requested bundles or is in the ownership assignment/unload(transfer) process. Besides, before return, the broker availability metadata store(LocalBrokerData znode existence) could be checked to confirm the owner brokers' availability further.
Bundle State Channel Owner Selection and Discovery
Bundle State Channel(BSC) is another topic, and because of its circular dependency, we can't use the BundleStateChannel to find the owner broker of the BSC topic. For example, when a cluster starts, each broker needs to initiate BSC TopicLookUp(to find the owner broker) in order to consume the messages in BSC. However, initially, each broker does not know which broker owns the BSC.
The ZK leader election can be a good option to break this circular dependency, like the followings.
Channel Owner Selection
The cluster can use the ZK leader election to select the owner broker. If the owner becomes unavailable, one of the followers will become the new owner. We can elect the owner for each bundle state partition.
Channel Owner Discovery
Then, in brokers’ TopicLookUp logic, we will add a special case to return the current leader(the elected BSC owner) for the BSC topics.
Conflict State Resolution(Race Conditions)
Without distributed locks, we can resolve conflicting state changes by a conflict state resolution algorithm in an optimistic and eventual manner. Brokers can take the first valid state change in the linearized view as the winner state and ignore the later ones.
One caveat is that because the current table-view compaction takes only the last ones as the result values, we need to introduce an internal compaction algo for this channel to follow the conflict resolution algorithm(the first valid state change as the result value).
For instance, let’s say for bundle x, there are two conflicting assignments initiated. The linearized state change messages will be like the following.
(own, to:B), (own, to:A)
By the conflict resolution algorithm, the second state change (own, to:A) will be ignored by all brokers(and by the compaction algorithm). Eventually, the “return” message will be broadcasted by declaring that the owner is “B.”
(own, to:B), (own, to:A), (return, to:B)
Let’s take another example. Let’s say bundle x is already assigned to broker B, but another broker initiates the “own” action(before consuming the “return” action). This last “own” state change will be ignored since this action “own” is invalid from the previous state “assigned.” (in the above state diagram, there is no “own” action arrow from the “assigned” state.)
(own, to:B), (return, to:B), (own, to:A)
Failure Recovery
When a broker is down
When state change participants(brokers) are suddenly unavailable, the state change could become an orphan, as the participants do not play the role. For these orphan state changes, the leader broker will run orphan state clean-up logic. For instance, the leader can add the bundle state clean-up logic in the broker unavailability notification handler(znode watcher) in order to clean the pending bundle state changes and ownerships from unavailable brokers. Also, to make the clean-up logic further fault-tolerant, the leader broker will run the clean-up function when it initializes. Additionally, we could make the leader periodically call the clean-up in a separate monitor thread(we shouldn’t redundantly call this cleanup too often).
When the entire ZK is down and comes back
Every broker will be notified when its ZK session undergoes the connection issue. Then, the brokers will be in the "safe" mode, serving the existing topics as-is, but not allowing the ZK-related operations. The leader won't run the bundle cleanup, transfer, nor unload logic in this case when it knows ZK is down.
When ZK comes back, each broker will know ZK sessions are re-established. They will wait 2-3 mins for all brokers to complete the ZK hand-shaking. Then, they will recover the bundle state table-view and return to the normal mode.
Bundle State and Load Data TableView Scalability
Expected read/write traffic:
Write: there will be relatively fewer messages from the write path with occasional spikes
Read: the fan-out broadcast could cause bottlenecks when the cluster is enormous.
This bundle state channel is relatively lightweight from the producers because bundle state change is relatively less frequent. Still, message dispatch to consumers could be heavier if the cluster is very large. The same issue can happen to other table-views(BrokerLoadDataStorage) introduced in this proposal. We could consider the following methods to scale the table views’ produce/consume rates in a large cluster.
Split Broker Cluster to multiple clusters
Simply, one can split a massive broker cluster into multiple clusters with different endpoints. The bookkeeper and configuration layer can be shared among the broker clusters.
Partitioned Table-View (short-term)
One can make the table views based on partitioned topics. Then, we can distribute message load to multiple partition owner brokers.
Sharding (long-term)
As the conventional scalability method, one could shard the cluster to multiple groups of brokers. Then, we can create a separate channel for each shard of brokers. This means we need an additional discovery layer to map topics to broker shards(also need to align with Namespace Isolation Policies)
We need to mention that this metadata sync scalability issue is not new in Pulsar, as the current Pulsar uses n-replication. For instance, all brokers' and all bundles' load metadata are replicated to all brokers via ZK watchers. Currently, distributed ZK servers send znode watch notifications to its clients(brokers). In this proposal, multiple table-view owner brokers(with partitioned table-views) can dispatch metadata change messages to the participants(brokers).
We think this metadata sync scalability is relatively low-priority, as only a few customers run Pulsar clusters on such a large scale. We could ask the customers first to split the cluster into multiple clusters and then enable partitioned table views. It is not practical for a single cluster to have thousands of brokers. However, we still want to ensure this design is seamlessly extensible, as a two-way-door decision.
Reject Alternatives
As the PIP changes almost every place (data models, event handlers, cache/storage, logs/metrics), creating a new load balancer and isolating the new code is safer and cleaner. Then, customers could safely enable/disable the new load balancer
by a configuration before deprecating the old one.
It gives the flexibility to start fresh without the existing baggage of choices and try a significantly different approach. The current ModularLoadManagerImpl will not go away. Once the new load manager will be ready and considered stable enough, there might be a new discussion on whether to change the default implementation. Even then, users will still be able to opt for the old load manager.
Modification Summary
The followings exclude logic and algorithm modifications as this pip does not focus on the logic and algorithm improvement.
Post Update
Added ServiceConfiguration
The text was updated successfully, but these errors were encountered: