This repository has been archived by the owner on Apr 1, 2024. It is now read-only.
forked from apache/pulsar
-
Notifications
You must be signed in to change notification settings - Fork 25
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[improve][pip] PIP-315: Configurable max delay limit for delayed deli…
…very (apache#21490)
- Loading branch information
Showing
1 changed file
with
137 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
# PIP-315: Configurable max delay limit for delayed delivery | ||
|
||
# Background knowledge | ||
Delayed message delivery is an important feature which allows a producer to specify that a message should be delivered/consumed at a later time. Currently the broker will save a delayed message without any check. The message's `deliverAt` time is checked when the broker dispatches messages to the Consumer. If a message has a `deliverAt` time, then it is added to the `DelayedDeliveryTracker` and will be delivered later when eligible. | ||
|
||
Delayed message delivery is only available for persistent topics, and shared/key-shared subscription types. | ||
|
||
# Motivation | ||
Currently there is no max delay limit so a producer can specify any delay when publishing a message. | ||
|
||
This poses a few challenges: | ||
1. Producer may miscalculate/misconfigure a very large delay (ex. 1,000 day instead of 100 day delay) | ||
2. Pulsar administrators may want to limit the max allowed delay since unacked messages (ex. messages with a large delay) will be stored forever (unless TTL is configured) | ||
3. The configured delay may be greater than the configured TTL which means the delayed message may be deleted before the `deliverAt` time (before the consumer can process it) | ||
|
||
# Goals | ||
The purpose of this PIP is to introduce an optional configuration to limit the max allowed delay for delayed delivery. | ||
|
||
## In Scope | ||
- Add broker configuration to limit the max allowed delay for delayed delivery | ||
- Configurable at broker/topic/namespace-level | ||
|
||
# High Level Design | ||
We will add a configuration `maxDeliveryDelayInMillis` and if configured, the broker will check incoming delayed messages to see if the message's `deliverAt` time exceeds the configured limit. If it exceeds the limit, the broker will send an error back to the Producer. | ||
|
||
# Detailed Design | ||
|
||
## Design & Implementation Details | ||
|
||
### Broker Changes | ||
A new `maxDeliveryDelayInMillis` config will be added to the broker which is initially defaulted to 0 (disabled). The default (disabled) behavior will match the current delayed delivery behavior (no limit on delivery delay). | ||
``` | ||
# broker.conf | ||
delayedDeliveryMaxDeliveryDelayInMillis=0 | ||
``` | ||
|
||
This field will also be added to the existing `DelayedDeliveryPolicies` interface to support topic & namespace-level configuration: | ||
```java | ||
public interface DelayedDeliveryPolicies { | ||
long getMaxDeliveryDelayInMillis(); | ||
} | ||
``` | ||
|
||
The max delivery delay check will occur in the broker's `Producer` class inside of `checkAndStartPublish` (same place as other checks such as `isEncryptionEnabled`). | ||
|
||
We will give a `ServerError.NotAllowedError` error if all of the following are true: | ||
1. Sending to a persistent topic | ||
2. Topic has `delayedDeliveryEnabled=true` | ||
3. `MessageMetadata` `deliver_at_time` has been specified | ||
4. Topic has `>0` value for `maxDeliveryDelayInMillis` | ||
5. `deliver_at_time - publish_time` > `maxDeliveryDelayInMillis` | ||
|
||
```java | ||
// In org.apache.pulsar.broker.service.Producer#checkAndStartPublish | ||
if (topic.isPersistent()) { | ||
PersistentTopic pTopic = (PersistentTopic) topic; | ||
if (pTopic.isDelayedDeliveryEnabled()) { | ||
headersAndPayload.markReaderIndex(); | ||
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); | ||
headersAndPayload.resetReaderIndex(); | ||
if (msgMetadata.hasDeliverAtTime()) { | ||
long maxDeliveryDelayInMillis = pTopic.getMaxDeliveryDelayInMillis(); | ||
if (maxDeliveryDelayInMillis > 0 | ||
&& msgMetadata.getDeliverAtTime() - msgMetadata.getPublishTime() > maxDeliveryDelayInMillis) { | ||
cnx.execute(() -> { | ||
cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.NotAllowedError, | ||
String.format("Exceeds max allowed delivery delay of %s milliseconds", maxDeliveryDelayInMillis)); | ||
cnx.completedSendOperation(false, headersAndPayload.readableBytes()); | ||
}); | ||
return false; | ||
} | ||
} | ||
} | ||
} | ||
``` | ||
|
||
### Consumer Impact | ||
The proposal does not involve any client changes, however it is important to note that setting a max delivery delay may impact the `Consumer` since the `Consumer` uses delayed delivery for retrying to the retry/dlq topic (ex. `reconsumeLater` API). So the max `Consumer` retry delay will be the same as the configured `maxDeliveryDelayInMillis` (if enabled). | ||
|
||
A problem will occur if max delivery delay is configured but a `Consumer` uses a larger custom retry delay. In this scenario, the `Consumer` will actually get stuck redelivering the message as the publish to the retry topic will fail. For this scenario, a larger retry delay should be configured specifically for the Consumer's retry topic (or no delay limit should be used for retry topics). | ||
|
||
A more elegant solution would require a protocol change (see `Alternatives` section below). | ||
|
||
## Public-facing Changes | ||
|
||
### Public API | ||
The optional `maxDeliveryDelayInMillis` field will be added to the admin REST APIs for configuring topic/namespace policies: | ||
- `POST /admin/v2/namespaces/{tenant}/{namespace}/delayedDelivery` | ||
- `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/delayedDelivery` | ||
|
||
And the corresponding `GET` APIs will show `maxDeliveryDelayInMillis` in the response: | ||
- `GET /admin/v2/namespaces/{tenant}/{namespace}/delayedDelivery` | ||
- `GET /admin/v2/persistent/{tenant}/{namespace}/{topic}/delayedDelivery` | ||
|
||
### Configuration | ||
Broker will have a new config in `broker.conf`: | ||
``` | ||
# The max allowed delay for delayed delivery (in milliseconds). If the broker receives a message which exceeds this max delay, then | ||
# it will return an error to the producer. | ||
# The default value is 0 which means there is no limit on the max delivery delay. | ||
delayedDeliveryMaxDeliveryDelayInMillis=0 | ||
``` | ||
|
||
### CLI | ||
Both `CmdTopics` and `CmdNamespaces` will be updated to include this additional optional configuration. | ||
|
||
# Backward & Forward Compatibility | ||
|
||
## Revert | ||
Reverting to a previous version will simply get rid of this config/limitation which is the previous behavior. | ||
|
||
## Upgrade | ||
We will default the value to 0/disabled (no limitation), so this is a backwards compatible change and will not cause any functional change when upgrading to this feature/version. This feature will only be applied once the config is changed. | ||
|
||
If configured, the `maxDeliveryDelayInMillis` limitation will affect: | ||
1. Producers who configure a longer max delivery delay (PIP-26: 2.4.0+) | ||
2. Consumers who configure a longer retry delay when using retry topic (PIP-58: 2.6.0+) | ||
|
||
# Alternatives | ||
## Add delayed delivery limit check at client-side | ||
An alternative is to add the limit check to the client-side which requires a protocol change so that client `Producer`/`Consumer` will receive the delayed delivery configurations from the broker. The client `Producer` can then throw an exception if the caller provides a delay greater than the configured limit. The client `Consumer` can more elegantly handle when the retry publish delay is greater than the configured limit as it can default to using the limit instead of being stuck waiting for the limit to be increased. | ||
|
||
This would still require the broker-side check as someone may be using a custom client. The main benefit is being able to elegantly handle the `Consumer` retry topic scenario. | ||
|
||
If we were to make this protocol change, then it might make sense to also have the `Producer` check the `delayedDeliveryEnabled` config. If delayed delivery is disabled and the `Producer` tries to send a delayed message, then an exception is thrown to the caller (current behavior is the broker will just deliver the message instantly and no error is provided to the `Producer` so it can be misleading). | ||
|
||
We would also need to add the client-side checks to other supported client libraries. | ||
|
||
Since the scope of this alternative would be quite expansive, we may want to pursue this in a follow-up PIP instead of trying to address it all at once. | ||
|
||
# Links | ||
|
||
<!-- | ||
Updated afterwards | ||
--> | ||
* Mailing List discussion thread: https://lists.apache.org/thread/285nm08842or324rxc2zy83wxgqxtcjp | ||
* Mailing List voting thread: https://lists.apache.org/thread/gkqrfrxx74j0dmrogg3now29v1of9zm9 |