Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][pip] PIP-401: Support set batching configurations for Pulsar Functions&Sources #23793

Merged
merged 7 commits into from
Jan 9, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 141 additions & 0 deletions pip/pip-401.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# PIP-401: Support set batching configurations for Pulsar Functions&Sources

# Background knowledge

Pulsar Functions and Sources enable the batching feature hard-coded, and also set the `batchingMaxPublishDelay` to 10ms, it only
supports set the `batch-builder` for now, this is not suitable for all the use cases, and also not feasible for users.

# Motivation

Support setting batching configurations for Pulsar Functions&Sources, to make it more flexible and suitable for users.

# Goals

## In Scope

- Support setting batching configurations for Pulsar Functions&Sources.

# High Level Design

Make users able to enable&disable batching and set batching configurations for Pulsar Functions&Sources.

# Detailed Design

## Design & Implementation Details

- Add a new message `BatchingSpec` with below fields in `Function.proto`, and add it as a new filed `batchingSpec` to the `ProducerSpec` message
- `bool enabled`
- `int32 batchingMaxPublishDelayMs`
- `int32 roundRobinRouterBatchingPartitionSwitchFrequency`
- `int32 batchingMaxMessages`
- `int32 batchingMaxBytes`
- `string batchBuilder`
- Add a new class `BatchingConfig` with below fields and add it as a new field `batchingConfig` to the `ProducerConfig`:
- `bool enabled`
- `int batchingMaxPublishDelayMs`
- `int roundRobinRouterBatchingPartitionSwitchFrequency`
- `int batchingMaxMessages`
- `int batchingMaxBytes`
- `String batchBuilder`

And related logic also will be added:
- convert the `batchingSpec` field of the `ProducerSpec` from `FunctionDetails` to the `batchingConfig` field of the `ProducerConfig` and vice versa

To keep the compatibility, when the `batchingSpec` of the `ProducerSpec` is null when creating the `ProducerConfig` from the `ProducerSpec`,
the `batchingConfig` field will be fallback to: `BatchingConfig(enabled=true, batchingMaxPublishDelayMs=10)`.

After the changes, users can pass the batching configurations when creating the functions and sources, like below using CLI arguments:

```shell
./bin/pulsar-admin functions create \
--tenant public \
--namespace default \
--name test-java \
--className org.apache.pulsar.functions.api.examples.ExclamationFunction \
--inputs persistent://public/default/test-java-input \
--producer-config '{"batchingConfig": {"enabled": true, "batchingMaxPublishDelayMs": 100, "roundRobinRouterBatchingPartitionSwitchFrequency": 10, "batchingMaxMessages": 1000}}' \
--jar /pulsar/examples/api-examples.jar
```

```shell
./bin/pulsar-admin sources create \
--name data-generator-source \
--source-type data-generator \
--destination-topic-name persistent://public/default/data-source-topic \
--producer-config '{"batchingConfig": {"enabled": true, "batchingMaxPublishDelayMs": 100, "roundRobinRouterBatchingPartitionSwitchFrequency": 10, "batchingMaxMessages": 1000}}' \
--source-config '{"sleepBetweenMessages": "1000"}'
```

Users can also use the function config file to set the batching configs for functions:

```yaml
tenant: "public"
namespace: "default"
name: "test-java"
jar: "/pulsar/examples/api-examples.jar"
className: "org.apache.pulsar.functions.api.examples.ExclamationFunction"
inputs: ["persistent://public/default/test-java-input"]
output: "persistent://public/default/test-java-output"
autoAck: true
parallelism: 1
producerConfig:
batchingConfig:
enabled: true
batchingMaxPublishDelayMs: 100
roundRobinRouterBatchingPartitionSwitchFrequency: 10
batchingMaxMessages: 1000
```

And use source config file to set the batching configs for sources:

```yaml
tenant: "public"
namespace: "default"
name: "data-generator-source"
topicName: "persistent://public/default/data-source-topic"
archive: "builtin://data-generator"
parallelism: 1
configs:
sleepBetweenMessages: "5000"
producerConfig:
batchingConfig:
enabled: true
batchingMaxPublishDelayMs: 100
roundRobinRouterBatchingPartitionSwitchFrequency: 10
batchingMaxMessages: 1000
```

## Public-facing Changes

### CLI


# Monitoring


# Security Considerations


# Backward & Forward Compatibility

## Revert

No changes are needed to revert to the previous version.

## Upgrade

No other changes are needed to upgrade to the new version.

# Alternatives

None

# General Notes

# Links

<!--
Updated afterwards
-->
* Mailing List discussion thread: https://lists.apache.org/thread/olx4xm8cdy43omp5c0jm44sj1gp0grcr
* Mailing List voting thread: https://lists.apache.org/thread/vhq6ox4nh2rx59yoxowftqzv8f9lnm4q
Loading