From 22aa7f1bf2d146b60d0a72763aa039c1d75a85d0 Mon Sep 17 00:00:00 2001
From: jiangpengcheng <scjiangpengcheng@gmail.com>
Date: Mon, 30 Dec 2024 15:16:52 +0800
Subject: [PATCH 1/7] [improve][pip] Support set batching configurations for
 Pulsar Functions&Sources

---
 pip/pip-401.md | 98 ++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 98 insertions(+)
 create mode 100644 pip/pip-401.md

diff --git a/pip/pip-401.md b/pip/pip-401.md
new file mode 100644
index 0000000000000..d7bc26772cec5
--- /dev/null
+++ b/pip/pip-401.md
@@ -0,0 +1,98 @@
+# PIP-401: Support set batching configurations for Pulsar Functions&Sources
+
+# Background knowledge
+
+Pulsar Functions and Sources enable the batching feature hard-coded, and also set the `batchingMaxPublishDelay` to 10ms, it only
+supports set the `batch-builder` for now, this is not suitable for all the use cases, and also not feasible for users.
+
+# Motivation
+
+Support setting batching configurations for Pulsar Functions&Sources, to make it more flexible and suitable for users.
+
+# Goals
+
+## In Scope
+
+- Support setting batching configurations for Pulsar Functions&Sources.
+
+# High Level Design
+
+Make users able to enable&disable batching and set batching configurations for Pulsar Functions&Sources.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+- Add below cli arguments to create&update functions/sources command to support setting batching configurations.
+  - `--batching-enabled` with default value = true (to keep the compatibility)
+  - `--batching-max-publish-delay-ms` with default value = 10 (to keep the compatibility)
+  - `--batching-max-messages`
+  - `--batching-max-bytes`
+- Add below fields to the `FunctionConfig` and `SourceConfig`
+  - `bool batchingEnabled`
+  - `int batchingMaxPublishDelayMs`
+  - `int batchingMaxMessages`
+  - `int batchingMaxBytes`
+- Add a new message `BatchingSpec` with below fields in `Function.proto`, and add it as a new filed `batchingSpec` to the `ProducerSpec` message
+  - `bool enabled`
+  - `int32 batchingMaxPublishDelayMs`
+  - `int32 batchingMaxMessages`
+  - `int32 batchingMaxBytes`
+  - `string batchBuilder`
+- Add a new class `BatchingConfig` with below fields and add it as a new field `batchingConfig` to the `ProducerConfig`:
+  - `bool enabled`
+  - `int batchingMaxPublishDelayMs`
+  - `int batchingMaxMessages`
+  - `int batchingMaxBytes`
+  - `String batchBuilder`
+
+And related logic also will be added:
+- set the batching related configs in the `FunctionConfig` and `SourceConfig` according to the cli arguments
+- convert the batching related configs in `FunctionConfig`/`SourceConfig` to the `batchingSpec` field of the `ProducerSpec` in `FunctionDetails` and vice versa
+- convert the `batchingSpec` field of the `ProducerSpec` from `FunctionDetails` to the `batchingConfig` field of the `ProducerConfig` and vice versa
+
+To keep the compatibility, when the `batchingSpec` of the `ProducerSpec` is null when creating the `ProducerConfig` from the `ProducerSpec` of `FunctionDetails`,
+the `batchingConfig` field will be fallback to: `BatchingConfig(enabled=true, batchingMaxPublishDelayMs=10)`.
+
+## Public-facing Changes
+
+### CLI
+
+Four new arguments are added to the create&update functions/sources command:
+
+- `--batching-enabled`
+- `--batching-max-publish-delay-ms`
+- `--batching-max-messages`
+- `--batching-max-bytes`
+
+# Monitoring
+
+No new metrics are added in this proposal.
+
+# Security Considerations
+
+No new security considerations are added in this proposal.
+
+# Backward & Forward Compatibility
+
+## Revert
+
+No changes are needed to revert to the previous version.
+
+## Upgrade
+
+No other changes are needed to upgrade to the new version.
+
+# Alternatives
+
+None
+
+# General Notes
+
+# Links
+
+<!--
+Updated afterwards
+-->
+* Mailing List discussion thread: 
+* Mailing List voting thread: 

From 96d7bb8b5997d5f3d91edd96f14e460e679dfc2a Mon Sep 17 00:00:00 2001
From: jiangpengcheng <scjiangpengcheng@gmail.com>
Date: Mon, 30 Dec 2024 16:28:16 +0800
Subject: [PATCH 2/7] Update

---
 pip/pip-401.md | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/pip/pip-401.md b/pip/pip-401.md
index d7bc26772cec5..91eb6d1026340 100644
--- a/pip/pip-401.md
+++ b/pip/pip-401.md
@@ -26,22 +26,26 @@ Make users able to enable&disable batching and set batching configurations for P
 - Add below cli arguments to create&update functions/sources command to support setting batching configurations.
   - `--batching-enabled` with default value = true (to keep the compatibility)
   - `--batching-max-publish-delay-ms` with default value = 10 (to keep the compatibility)
+  - `--batching-partition-switch-frequency`
   - `--batching-max-messages`
   - `--batching-max-bytes`
 - Add below fields to the `FunctionConfig` and `SourceConfig`
   - `bool batchingEnabled`
   - `int batchingMaxPublishDelayMs`
+  - `int roundRobinRouterBatchingPartitionSwitchFrequency`
   - `int batchingMaxMessages`
   - `int batchingMaxBytes`
 - Add a new message `BatchingSpec` with below fields in `Function.proto`, and add it as a new filed `batchingSpec` to the `ProducerSpec` message
   - `bool enabled`
   - `int32 batchingMaxPublishDelayMs`
+  - `int32 roundRobinRouterBatchingPartitionSwitchFrequency`
   - `int32 batchingMaxMessages`
   - `int32 batchingMaxBytes`
   - `string batchBuilder`
 - Add a new class `BatchingConfig` with below fields and add it as a new field `batchingConfig` to the `ProducerConfig`:
   - `bool enabled`
   - `int batchingMaxPublishDelayMs`
+  - `int roundRobinRouterBatchingPartitionSwitchFrequency`
   - `int batchingMaxMessages`
   - `int batchingMaxBytes`
   - `String batchBuilder`

From 8ed2201b34e773d4934dba4ef7234e7e3ba9a7e1 Mon Sep 17 00:00:00 2001
From: jiangpengcheng <scjiangpengcheng@gmail.com>
Date: Mon, 30 Dec 2024 16:37:23 +0800
Subject: [PATCH 3/7] Update

---
 pip/pip-401.md | 24 +-----------------------
 1 file changed, 1 insertion(+), 23 deletions(-)

diff --git a/pip/pip-401.md b/pip/pip-401.md
index 91eb6d1026340..b2ce1d713e382 100644
--- a/pip/pip-401.md
+++ b/pip/pip-401.md
@@ -23,18 +23,6 @@ Make users able to enable&disable batching and set batching configurations for P
 
 ## Design & Implementation Details
 
-- Add below cli arguments to create&update functions/sources command to support setting batching configurations.
-  - `--batching-enabled` with default value = true (to keep the compatibility)
-  - `--batching-max-publish-delay-ms` with default value = 10 (to keep the compatibility)
-  - `--batching-partition-switch-frequency`
-  - `--batching-max-messages`
-  - `--batching-max-bytes`
-- Add below fields to the `FunctionConfig` and `SourceConfig`
-  - `bool batchingEnabled`
-  - `int batchingMaxPublishDelayMs`
-  - `int roundRobinRouterBatchingPartitionSwitchFrequency`
-  - `int batchingMaxMessages`
-  - `int batchingMaxBytes`
 - Add a new message `BatchingSpec` with below fields in `Function.proto`, and add it as a new filed `batchingSpec` to the `ProducerSpec` message
   - `bool enabled`
   - `int32 batchingMaxPublishDelayMs`
@@ -51,31 +39,21 @@ Make users able to enable&disable batching and set batching configurations for P
   - `String batchBuilder`
 
 And related logic also will be added:
-- set the batching related configs in the `FunctionConfig` and `SourceConfig` according to the cli arguments
-- convert the batching related configs in `FunctionConfig`/`SourceConfig` to the `batchingSpec` field of the `ProducerSpec` in `FunctionDetails` and vice versa
 - convert the `batchingSpec` field of the `ProducerSpec` from `FunctionDetails` to the `batchingConfig` field of the `ProducerConfig` and vice versa
 
-To keep the compatibility, when the `batchingSpec` of the `ProducerSpec` is null when creating the `ProducerConfig` from the `ProducerSpec` of `FunctionDetails`,
+To keep the compatibility, when the `batchingSpec` of the `ProducerSpec` is null when creating the `ProducerConfig` from the `ProducerSpec`,
 the `batchingConfig` field will be fallback to: `BatchingConfig(enabled=true, batchingMaxPublishDelayMs=10)`.
 
 ## Public-facing Changes
 
 ### CLI
 
-Four new arguments are added to the create&update functions/sources command:
-
-- `--batching-enabled`
-- `--batching-max-publish-delay-ms`
-- `--batching-max-messages`
-- `--batching-max-bytes`
 
 # Monitoring
 
-No new metrics are added in this proposal.
 
 # Security Considerations
 
-No new security considerations are added in this proposal.
 
 # Backward & Forward Compatibility
 

From 1dba4331f95692b3ae3459df2d771e27016dd239 Mon Sep 17 00:00:00 2001
From: jiangpengcheng <scjiangpengcheng@gmail.com>
Date: Mon, 30 Dec 2024 16:44:34 +0800
Subject: [PATCH 4/7] Add discussion thread

---
 pip/pip-401.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pip/pip-401.md b/pip/pip-401.md
index b2ce1d713e382..973bd5757fb15 100644
--- a/pip/pip-401.md
+++ b/pip/pip-401.md
@@ -76,5 +76,5 @@ None
 <!--
 Updated afterwards
 -->
-* Mailing List discussion thread: 
+* Mailing List discussion thread: https://lists.apache.org/thread/olx4xm8cdy43omp5c0jm44sj1gp0grcr
 * Mailing List voting thread: 

From bca97be6fa059c63505a3d589b6ccf42b556087a Mon Sep 17 00:00:00 2001
From: jiangpengcheng <scjiangpengcheng@gmail.com>
Date: Wed, 1 Jan 2025 14:36:30 +0800
Subject: [PATCH 5/7] Update

---
 pip/pip-401.md | 13 +++++++++++++
 1 file changed, 13 insertions(+)

diff --git a/pip/pip-401.md b/pip/pip-401.md
index 973bd5757fb15..3f604854483c4 100644
--- a/pip/pip-401.md
+++ b/pip/pip-401.md
@@ -44,6 +44,19 @@ And related logic also will be added:
 To keep the compatibility, when the `batchingSpec` of the `ProducerSpec` is null when creating the `ProducerConfig` from the `ProducerSpec`,
 the `batchingConfig` field will be fallback to: `BatchingConfig(enabled=true, batchingMaxPublishDelayMs=10)`.
 
+After the changes, users can pass the batching configurations when creating the functions and sources, like below:
+
+```shell
+./bin/pulsar-admin functions create \
+    --tenant public \
+    --namespace default \
+    --name test-java \
+    --className org.apache.pulsar.functions.api.examples.ExclamationFunction \
+    --inputs persistent://public/default/test-java-input \
+    --producer-config '{"batchingConfig": {"enabled": true, "batchingMaxPublishDelayMs": 100, "roundRobinRouterBatchingPartitionSwitchFrequency": 10, "batchingMaxMessages": 1000}}' \
+    --jar /pulsar/examples/api-examples.jar
+```
+
 ## Public-facing Changes
 
 ### CLI

From 61a772e1aa7774fbbd8b92d788f02c4c781ca82e Mon Sep 17 00:00:00 2001
From: jiangpengcheng <scjiangpengcheng@gmail.com>
Date: Fri, 3 Jan 2025 12:46:44 +0800
Subject: [PATCH 6/7] Add vote thread

---
 pip/pip-401.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pip/pip-401.md b/pip/pip-401.md
index 3f604854483c4..144c030fc3898 100644
--- a/pip/pip-401.md
+++ b/pip/pip-401.md
@@ -90,4 +90,4 @@ None
 Updated afterwards
 -->
 * Mailing List discussion thread: https://lists.apache.org/thread/olx4xm8cdy43omp5c0jm44sj1gp0grcr
-* Mailing List voting thread: 
+* Mailing List voting thread: https://lists.apache.org/thread/vhq6ox4nh2rx59yoxowftqzv8f9lnm4q

From 3a9334ee163fd9698bb92f8f49c67f68a5e11952 Mon Sep 17 00:00:00 2001
From: jiangpengcheng <scjiangpengcheng@gmail.com>
Date: Sat, 4 Jan 2025 13:34:19 +0800
Subject: [PATCH 7/7] Add more examples

---
 pip/pip-401.md | 50 +++++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 49 insertions(+), 1 deletion(-)

diff --git a/pip/pip-401.md b/pip/pip-401.md
index 144c030fc3898..8f1bc7851f7da 100644
--- a/pip/pip-401.md
+++ b/pip/pip-401.md
@@ -44,7 +44,7 @@ And related logic also will be added:
 To keep the compatibility, when the `batchingSpec` of the `ProducerSpec` is null when creating the `ProducerConfig` from the `ProducerSpec`,
 the `batchingConfig` field will be fallback to: `BatchingConfig(enabled=true, batchingMaxPublishDelayMs=10)`.
 
-After the changes, users can pass the batching configurations when creating the functions and sources, like below:
+After the changes, users can pass the batching configurations when creating the functions and sources, like below using CLI arguments:
 
 ```shell
 ./bin/pulsar-admin functions create \
@@ -57,6 +57,54 @@ After the changes, users can pass the batching configurations when creating the
     --jar /pulsar/examples/api-examples.jar
 ```
 
+```shell
+./bin/pulsar-admin sources create \
+    --name data-generator-source \
+    --source-type data-generator \
+    --destination-topic-name persistent://public/default/data-source-topic \
+    --producer-config '{"batchingConfig": {"enabled": true, "batchingMaxPublishDelayMs": 100, "roundRobinRouterBatchingPartitionSwitchFrequency": 10, "batchingMaxMessages": 1000}}' \
+    --source-config '{"sleepBetweenMessages": "1000"}'
+```
+
+Users can also use the function config file to set the batching configs for functions:
+
+```yaml
+tenant: "public"
+namespace: "default"
+name: "test-java"
+jar: "/pulsar/examples/api-examples.jar"
+className: "org.apache.pulsar.functions.api.examples.ExclamationFunction"
+inputs: ["persistent://public/default/test-java-input"]
+output: "persistent://public/default/test-java-output"
+autoAck: true
+parallelism: 1
+producerConfig:
+  batchingConfig:
+    enabled: true
+    batchingMaxPublishDelayMs: 100
+    roundRobinRouterBatchingPartitionSwitchFrequency: 10
+    batchingMaxMessages: 1000
+```
+
+And use source config file to set the batching configs for sources:
+
+```yaml
+tenant: "public"
+namespace: "default"
+name: "data-generator-source"
+topicName: "persistent://public/default/data-source-topic"
+archive: "builtin://data-generator"
+parallelism: 1
+configs:
+  sleepBetweenMessages: "5000"
+producerConfig:
+  batchingConfig:
+    enabled: true
+    batchingMaxPublishDelayMs: 100
+    roundRobinRouterBatchingPartitionSwitchFrequency: 10
+    batchingMaxMessages: 1000
+```
+
 ## Public-facing Changes
 
 ### CLI