-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[improve][pip] PIP-401: Support set batching configurations for Pulsa…
…r Functions&Sources (#23793)
- Loading branch information
1 parent
6d59b1a
commit 9149720
Showing
1 changed file
with
141 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
# PIP-401: Support set batching configurations for Pulsar Functions&Sources | ||
|
||
# Background knowledge | ||
|
||
Pulsar Functions and Sources enable the batching feature hard-coded, and also set the `batchingMaxPublishDelay` to 10ms, it only | ||
supports set the `batch-builder` for now, this is not suitable for all the use cases, and also not feasible for users. | ||
|
||
# Motivation | ||
|
||
Support setting batching configurations for Pulsar Functions&Sources, to make it more flexible and suitable for users. | ||
|
||
# Goals | ||
|
||
## In Scope | ||
|
||
- Support setting batching configurations for Pulsar Functions&Sources. | ||
|
||
# High Level Design | ||
|
||
Make users able to enable&disable batching and set batching configurations for Pulsar Functions&Sources. | ||
|
||
# Detailed Design | ||
|
||
## Design & Implementation Details | ||
|
||
- Add a new message `BatchingSpec` with below fields in `Function.proto`, and add it as a new filed `batchingSpec` to the `ProducerSpec` message | ||
- `bool enabled` | ||
- `int32 batchingMaxPublishDelayMs` | ||
- `int32 roundRobinRouterBatchingPartitionSwitchFrequency` | ||
- `int32 batchingMaxMessages` | ||
- `int32 batchingMaxBytes` | ||
- `string batchBuilder` | ||
- Add a new class `BatchingConfig` with below fields and add it as a new field `batchingConfig` to the `ProducerConfig`: | ||
- `bool enabled` | ||
- `int batchingMaxPublishDelayMs` | ||
- `int roundRobinRouterBatchingPartitionSwitchFrequency` | ||
- `int batchingMaxMessages` | ||
- `int batchingMaxBytes` | ||
- `String batchBuilder` | ||
|
||
And related logic also will be added: | ||
- convert the `batchingSpec` field of the `ProducerSpec` from `FunctionDetails` to the `batchingConfig` field of the `ProducerConfig` and vice versa | ||
|
||
To keep the compatibility, when the `batchingSpec` of the `ProducerSpec` is null when creating the `ProducerConfig` from the `ProducerSpec`, | ||
the `batchingConfig` field will be fallback to: `BatchingConfig(enabled=true, batchingMaxPublishDelayMs=10)`. | ||
|
||
After the changes, users can pass the batching configurations when creating the functions and sources, like below using CLI arguments: | ||
|
||
```shell | ||
./bin/pulsar-admin functions create \ | ||
--tenant public \ | ||
--namespace default \ | ||
--name test-java \ | ||
--className org.apache.pulsar.functions.api.examples.ExclamationFunction \ | ||
--inputs persistent://public/default/test-java-input \ | ||
--producer-config '{"batchingConfig": {"enabled": true, "batchingMaxPublishDelayMs": 100, "roundRobinRouterBatchingPartitionSwitchFrequency": 10, "batchingMaxMessages": 1000}}' \ | ||
--jar /pulsar/examples/api-examples.jar | ||
``` | ||
|
||
```shell | ||
./bin/pulsar-admin sources create \ | ||
--name data-generator-source \ | ||
--source-type data-generator \ | ||
--destination-topic-name persistent://public/default/data-source-topic \ | ||
--producer-config '{"batchingConfig": {"enabled": true, "batchingMaxPublishDelayMs": 100, "roundRobinRouterBatchingPartitionSwitchFrequency": 10, "batchingMaxMessages": 1000}}' \ | ||
--source-config '{"sleepBetweenMessages": "1000"}' | ||
``` | ||
|
||
Users can also use the function config file to set the batching configs for functions: | ||
|
||
```yaml | ||
tenant: "public" | ||
namespace: "default" | ||
name: "test-java" | ||
jar: "/pulsar/examples/api-examples.jar" | ||
className: "org.apache.pulsar.functions.api.examples.ExclamationFunction" | ||
inputs: ["persistent://public/default/test-java-input"] | ||
output: "persistent://public/default/test-java-output" | ||
autoAck: true | ||
parallelism: 1 | ||
producerConfig: | ||
batchingConfig: | ||
enabled: true | ||
batchingMaxPublishDelayMs: 100 | ||
roundRobinRouterBatchingPartitionSwitchFrequency: 10 | ||
batchingMaxMessages: 1000 | ||
``` | ||
And use source config file to set the batching configs for sources: | ||
```yaml | ||
tenant: "public" | ||
namespace: "default" | ||
name: "data-generator-source" | ||
topicName: "persistent://public/default/data-source-topic" | ||
archive: "builtin://data-generator" | ||
parallelism: 1 | ||
configs: | ||
sleepBetweenMessages: "5000" | ||
producerConfig: | ||
batchingConfig: | ||
enabled: true | ||
batchingMaxPublishDelayMs: 100 | ||
roundRobinRouterBatchingPartitionSwitchFrequency: 10 | ||
batchingMaxMessages: 1000 | ||
``` | ||
## Public-facing Changes | ||
### CLI | ||
# Monitoring | ||
# Security Considerations | ||
# Backward & Forward Compatibility | ||
## Revert | ||
No changes are needed to revert to the previous version. | ||
## Upgrade | ||
No other changes are needed to upgrade to the new version. | ||
# Alternatives | ||
None | ||
# General Notes | ||
# Links | ||
<!-- | ||
Updated afterwards | ||
--> | ||
* Mailing List discussion thread: https://lists.apache.org/thread/olx4xm8cdy43omp5c0jm44sj1gp0grcr | ||
* Mailing List voting thread: https://lists.apache.org/thread/vhq6ox4nh2rx59yoxowftqzv8f9lnm4q |