-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
1 changed file
with
105 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
## Background | ||
|
||
In the implementation of the Pulsar Transaction, each topic is configured with a `Transaction Buffer` to prevent consumers from reading uncommitted messages, which are invisible until the transaction is committed. Transaction Buffer works with Position (maxReadPosition) and `TxnID` Set (aborts). The broker only dispatches messages, before the maxReadPosition, to the consumers. When the broker dispatches the messages before maxReadPosition to the consumer, the messages sent by aborted transactions will get filtered by the Transaction Buffer. | ||
|
||
## Motivation | ||
|
||
Currently, Pulsar transactions do not have configurable isolation levels. By introducing isolation level configuration for consumers, we can enhance the flexibility of Pulsar transactions. | ||
|
||
## Goal | ||
|
||
### In Scope | ||
|
||
Implement Read Committed and Read Uncommitted isolation levels for Pulsar transactions.Allow consumers to configure isolation levels during the building process. | ||
|
||
### Out of Scope | ||
|
||
None. | ||
|
||
## API Changes | ||
|
||
Update the PulsarConsumer builder process to include isolation level configurations for Read Committed and Read Uncommitted. | ||
|
||
### Before the Change | ||
|
||
The PulsarConsumer builder process currently does not include isolation level configurations. The consumer creation process might look like this: | ||
|
||
``` | ||
PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build(); | ||
Consumer<String> consumer = client.newConsumer(Schema.STRING) | ||
.topic("persistent://my-tenant/my-namespace/my-topic") | ||
.subscriptionName("my-subscription") | ||
.subscriptionType(SubscriptionType.Shared) | ||
.subscribe(); | ||
``` | ||
|
||
### After the Change | ||
|
||
Update the PulsarConsumer builder process to include isolation level configurations for Read Committed and Read Uncommitted. Introduce a new method isolationLevel() in the consumer builder, which accepts an enumeration value representing the isolation level: | ||
|
||
``` | ||
public enum IsolationLevel { | ||
READ_COMMITTED, | ||
READ_UNCOMMITTED | ||
} | ||
``` | ||
|
||
Then, modify the consumer creation process to include the new isolation level configuration: | ||
|
||
``` | ||
PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build(); | ||
Consumer<String> consumer = client.newConsumer(Schema.STRING) | ||
.topic("persistent://my-tenant/my-namespace/my-topic") | ||
.subscriptionName("my-subscription") | ||
.subscriptionType(SubscriptionType.Shared) | ||
.subscriptionIsolationLevel(IsolationLevel.READ_COMMITTED) // Adding the isolation level configuration | ||
.subscribe(); | ||
``` | ||
|
||
With this change, users can now choose between Read Committed and Read Uncommitted isolation levels when creating a new | ||
consumer. If the isolationLevel() method is not called during the builder process, the default isolation level will be | ||
Read Committed. | ||
Note that this is a subscription dimension configuration, and all consumers under the same subscription need to be | ||
configured with the same IsolationLevel. | ||
|
||
## Implementation | ||
|
||
### Client Changes | ||
|
||
Update the PulsarConsumer builder to accept isolation level configurations for Read Committed and Read Uncommitted levels. | ||
|
||
In order to achieve the above goals, the following modifications need to be made: | ||
|
||
- Added `IsolationLevel` related fields and methods in `ConsumerConfigurationData` and `ConsumerBuilderImpl` and `ConsumerImpl` | ||
|
||
- Modify PulsarApi.CommandSubscribe, add field -- IsolationLevel | ||
|
||
``` | ||
message CommandSubscribe { | ||
enum IsolationLevel { | ||
READ_COMMITTED = 0; | ||
READ_UNCOMMITTED = 1; | ||
} | ||
optional IsolationLevel isolation_level = 20 [default = READ_COMMITTED]; | ||
} | ||
``` | ||
|
||
### Broker changes | ||
|
||
Modify the transaction buffer and dispatching mechanisms to handle messages based on the chosen isolation level. | ||
|
||
In order to achieve the above goals, the following modifications need to be made: | ||
|
||
- Determine in the `readMoreEntries` method of Dispatchers such as `PersistentDispatcherSingleActiveConsumer` and `PersistentDispatcherMultipleConsumers`: | ||
|
||
- If Subscription.isolationLevel == ReadCommitted, then MaxReadPosition = topic.getMaxReadPosition(), that is, transactionBuffer.getMaxReadPosition() | ||
|
||
- If Subscription.isolationLevel == ReadUnCommitted, then MaxReadPosition = PositionImpl.LATEST | ||
|
||
|
||
## Reject Alternatives | ||
|
||
None |