From 9149720841c15be2c83579e783abb959d34a23c1 Mon Sep 17 00:00:00 2001 From: jiangpengcheng Date: Thu, 9 Jan 2025 12:40:10 +0800 Subject: [PATCH] [improve][pip] PIP-401: Support set batching configurations for Pulsar Functions&Sources (#23793) --- pip/pip-401.md | 141 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 141 insertions(+) create mode 100644 pip/pip-401.md diff --git a/pip/pip-401.md b/pip/pip-401.md new file mode 100644 index 0000000000000..8f1bc7851f7da --- /dev/null +++ b/pip/pip-401.md @@ -0,0 +1,141 @@ +# PIP-401: Support set batching configurations for Pulsar Functions&Sources + +# Background knowledge + +Pulsar Functions and Sources enable the batching feature hard-coded, and also set the `batchingMaxPublishDelay` to 10ms, it only +supports set the `batch-builder` for now, this is not suitable for all the use cases, and also not feasible for users. + +# Motivation + +Support setting batching configurations for Pulsar Functions&Sources, to make it more flexible and suitable for users. + +# Goals + +## In Scope + +- Support setting batching configurations for Pulsar Functions&Sources. + +# High Level Design + +Make users able to enable&disable batching and set batching configurations for Pulsar Functions&Sources. + +# Detailed Design + +## Design & Implementation Details + +- Add a new message `BatchingSpec` with below fields in `Function.proto`, and add it as a new filed `batchingSpec` to the `ProducerSpec` message + - `bool enabled` + - `int32 batchingMaxPublishDelayMs` + - `int32 roundRobinRouterBatchingPartitionSwitchFrequency` + - `int32 batchingMaxMessages` + - `int32 batchingMaxBytes` + - `string batchBuilder` +- Add a new class `BatchingConfig` with below fields and add it as a new field `batchingConfig` to the `ProducerConfig`: + - `bool enabled` + - `int batchingMaxPublishDelayMs` + - `int roundRobinRouterBatchingPartitionSwitchFrequency` + - `int batchingMaxMessages` + - `int batchingMaxBytes` + - `String batchBuilder` + +And related logic also will be added: +- convert the `batchingSpec` field of the `ProducerSpec` from `FunctionDetails` to the `batchingConfig` field of the `ProducerConfig` and vice versa + +To keep the compatibility, when the `batchingSpec` of the `ProducerSpec` is null when creating the `ProducerConfig` from the `ProducerSpec`, +the `batchingConfig` field will be fallback to: `BatchingConfig(enabled=true, batchingMaxPublishDelayMs=10)`. + +After the changes, users can pass the batching configurations when creating the functions and sources, like below using CLI arguments: + +```shell +./bin/pulsar-admin functions create \ + --tenant public \ + --namespace default \ + --name test-java \ + --className org.apache.pulsar.functions.api.examples.ExclamationFunction \ + --inputs persistent://public/default/test-java-input \ + --producer-config '{"batchingConfig": {"enabled": true, "batchingMaxPublishDelayMs": 100, "roundRobinRouterBatchingPartitionSwitchFrequency": 10, "batchingMaxMessages": 1000}}' \ + --jar /pulsar/examples/api-examples.jar +``` + +```shell +./bin/pulsar-admin sources create \ + --name data-generator-source \ + --source-type data-generator \ + --destination-topic-name persistent://public/default/data-source-topic \ + --producer-config '{"batchingConfig": {"enabled": true, "batchingMaxPublishDelayMs": 100, "roundRobinRouterBatchingPartitionSwitchFrequency": 10, "batchingMaxMessages": 1000}}' \ + --source-config '{"sleepBetweenMessages": "1000"}' +``` + +Users can also use the function config file to set the batching configs for functions: + +```yaml +tenant: "public" +namespace: "default" +name: "test-java" +jar: "/pulsar/examples/api-examples.jar" +className: "org.apache.pulsar.functions.api.examples.ExclamationFunction" +inputs: ["persistent://public/default/test-java-input"] +output: "persistent://public/default/test-java-output" +autoAck: true +parallelism: 1 +producerConfig: + batchingConfig: + enabled: true + batchingMaxPublishDelayMs: 100 + roundRobinRouterBatchingPartitionSwitchFrequency: 10 + batchingMaxMessages: 1000 +``` + +And use source config file to set the batching configs for sources: + +```yaml +tenant: "public" +namespace: "default" +name: "data-generator-source" +topicName: "persistent://public/default/data-source-topic" +archive: "builtin://data-generator" +parallelism: 1 +configs: + sleepBetweenMessages: "5000" +producerConfig: + batchingConfig: + enabled: true + batchingMaxPublishDelayMs: 100 + roundRobinRouterBatchingPartitionSwitchFrequency: 10 + batchingMaxMessages: 1000 +``` + +## Public-facing Changes + +### CLI + + +# Monitoring + + +# Security Considerations + + +# Backward & Forward Compatibility + +## Revert + +No changes are needed to revert to the previous version. + +## Upgrade + +No other changes are needed to upgrade to the new version. + +# Alternatives + +None + +# General Notes + +# Links + + +* Mailing List discussion thread: https://lists.apache.org/thread/olx4xm8cdy43omp5c0jm44sj1gp0grcr +* Mailing List voting thread: https://lists.apache.org/thread/vhq6ox4nh2rx59yoxowftqzv8f9lnm4q