Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][pip] PIP-323: Complete Backlog Quota Telemetry #21709

Merged
merged 4 commits into from
Dec 20, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 171 additions & 0 deletions pip/pip-323.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
# PIP-323: Complete Backlog Quota Telemetry

# Background knowledge

## Backlog

A topic in Pulsar is the place where messages are written to. They are consumed by subscriptions. A topic can have many
subscriptions, and it is those that maintains the state of message acknowledgment, per subscription - which messages
were acknowledged and which were not.

A subscription backlog is the set of unacknowledged messages in that subscription.
A subscription backlog size is the sum of the size of the unacknowledged messages (in bytes)..

Since a topic can have many subscriptions, and each has its own backlog, how does one define a backlog for a topic?
A topic backlog is defined as the backlog of the subscription which has the **oldest** unacknowledged message.
Since acknowledged messages can be interleaved with unacknowledged messages, calculating the exact size of that
subscription backlog can be expensive as it requires I/O operations to read the messages from the ledgers.
For that reason, the topic backlog size is actually defined to be the *estimated* backlog size of that subscription.
It does so by summarizing the size of all the ledgers, starting from the current active one (the one being written to),
up to the ledger which contains the oldest unacknowledged message for that subscription (There is actually a faster
way to calculate it, but this was the definition chosen for this estimation in Pulsar).

A topic backlog age is the age of the oldest unacknowledged message (same subscription as defined for topic backlog size).
If that message was written 30 minutes ago, its age is 30 minutes, and so is the topic backlog age.

## Backlog Quota

Pulsar has a feature called [backlog quota](https://pulsar.apache.org/docs/3.1.x/cookbooks-retention-expiry/#backlog-quotas).
It allows a user to define a quota - in effect, a limit - which limits the topic backlog.
There are two types of quotas:

1. Size based: The limit is for the topic backlog size (as we defined above).
2. Time based: The limit is for the topic backlog age (as we defined above).

Once a topic backlog exceeds either one of those limits, an action is taken to hold the backlog to that limit:

* The producer write is placed on hold for a certain amount of time before failing.
* The producer write is failed
* The subscriptions oldest unacknowledged messages will be acknowledged in-order until both the topic backlog size or
age will fall inside the limit (quota). The process is called backlog eviction (happens every interval).

The quotas can be defined as a default value for any topic, by using the following broker configuration keys:
`backlogQuotaDefaultLimitBytes` and `backlogQuotaDefaultLimitSecond`.

The quota can also be specified directly for all topics in a given namespace using the namespace policy,
or a specific topic using a topic policy.

## Monitoring Backlog Quota

The user today can calculate quota used for size based limit, since there are two metrics exposed today on
a topic level: `pulsar_storage_backlog_quota_limit` and `pulsar_storage_backlog_size`.
You can just divide the two to get a percentage and know how close the topic backlog to its size limit.

For the time-based limit, the only metric exposed today is the quota itself - `pulsar_storage_backlog_quota_limit_time`

## Backlog Quota Eviction in the Broker

The broker has a method called `BrokerService.monitorBacklogQuota()`. It is scheduled to run every x seconds,
as defined by the configuration `backlogQuotaCheckIntervalInSeconds`.
This method loops over all persistent topics, and for each topic is checks whether the topic backlog exceeded
either one of those topics.

As mentioned before, checking backlog size is a memory-only calculation, since
each topic has the list of ledgers stored in-memory, including the size of each ledger. Same goes for the subscriptions,
they are all stored in memory, and the `ManagedCursor` keeps track of the subscription with the oldest unacknowledged
message, thus retrieveing it is O(1). Checking backlog based on time is costly if configuration key
`preciseTimeBasedBacklogQuotaCheck` was set to true. In that case, it needs to read the oldest message to obtain
its public timestamp, which is expensive in terms of I/O. If it was set to false, it's in-memory access only, since
it uses the age of the ledger instead of the message, and the ledgers metadata is kept in memory.

For each topic which has exceeded its quota, if the policy chosen is eviction, then the process it performed
synchronously. This process consumes I/O, as it needs read messages (using skip) to know where to stop acknowledging
messages.


# Motivation

Users which have defined backlog quota based on time, have no means today to monitor the backlog quota usage,
time-wise, to know whether the topic backlog is close to its time limit or even passed it.

If it has passed it, the user has no means to know if it happened, when and how many times.


# Goals

## In Scope
- Allow the user to know the backlog quota usage for time-based quota, per topic
- Allow the user to know how many times backlog eviction happened, and for which backlog quota type

## Out of Scope

None


# High Level Design

We'll use the existing backlog monitoring process running in intervals. For each topic, the subscription with
the oldest unacknowledged message is retrieved, to calculate the topic backlog age. At that point, we will
cache the following for the oldest unacknowledged message:
* Subscription name
* Message position
* Message publish timestamp

That cache will allow us to add a metric exposing the topic backlog age - `pulsar_storage_backlog_age_seconds`,
which will be both consistent (same ones used for deciding on backlog eviction) and cheap to retrieve
(no additional I/O involved).
Coupled with the existing `pulsar_storage_backlog_quota_limit_time` metric, the user can use both to divide and
get the usage of the quota (both are in seconds units).

We will add the subscription name containing the oldest unacknowledged message to the Admin API
topic stats endpoints (`{tenant}/{namespace}/{topic}/stats` and `{tenant}/{namespace}/{topic}/partitioned-stats`),
allowing the user a complete workflow: alert using metrics when topic backlog is about to be exceeded, then
query topic stats for that topic to retrieve the subscription name which contains the oldest message.
For completeness, we will also add the backlog quota limits, both age and size, and the age of oldest
unacknowledged message.

We will add a metric allowing the user to know how many times the usage exceeded the quota, both for time or size -
`pulsar_storage_backlog_quota_exceeded_evictions_total`, where the `quota_type` label will be either `time` or
`size`. Monitoring that counter over time will allow the user to know when a topic backlog exceeded its quota,
and if backlog eviction was chosen as action, then it happened, and how many times.

Some users may want the backlog quota check to happen more frequently, and as a consequence, the backlog age
metric more frequently updated. They can modify `backlogQuotaCheckIntervalInSeconds` configuration key, but without
knowing how long this check takes, it will be hard for them. Hence, we will add the metric
`pulsar_storage_backlog_quota_check_duration_seconds` which will be of histogram type.
asafm marked this conversation as resolved.
Show resolved Hide resolved

# Detailed Design

## Public-facing Changes

### Public API
Adding the following to the response of topic stats, of both `{tenant}/{namespace}/{topic}/stats`
and `{tenant}/{namespace}/{topic}/partitioned-stats`:

* `backlogQuotaLimitSize` - the size in bytes of the topic backlog quota
* `backlogQuotaLimitTime` - the topic backlog age quota, in seconds.
* `oldestBacklogMessageAgeSeconds` - the age of the oldest unacknowledged (i.e. backlog) message, measured by
the time elapsed from its published time, in seconds. This value is recorded every backlog quota check
interval, hence it represents the value seen in the last check.
* `oldestBacklogMessageSubscriptionName` - the name of the subscription containing the oldest unacknowledged message.
This value is recorded every backlog quota check interval, hence it represents the value seen in the last check.


### Metrics

| Name | Description | Attributes | Units |
|----------------------------------------------------------------|-----------------------------------------------------------------------------------------------------|--------------------------------------------------------|---------|
| `pulsar_storage_backlog_age_seconds` | Gauge. The age of the oldest unacknowledged message (backlog) | cluster, namespace, topic | seconds |
| `pulsar_storage_backlog_quota_exceeded_evictions_total` | Counter. The number of times a backlog was evicted since it has exceeded its quota | cluster, namespace, topic, quota_type = (time \| size) | |
| `pulsar_storage_backlog_quota_check_duration_seconds` | Histogram. The duration of the backlog quota check process. | cluster | seconds |
| `pulsar_broker_storage_backlog_quota_exceeded_evictions_total` | Counter. The number of times a backlog was evicted since it has exceeded its quota, in broker level | cluster, quota_type = (time \| size) | |

* Since `pulsar_storage_backlog_age_seconds` can not be aggregated, with proper meaning, to a namespace-level, it will
not be included as a metric when configuration key `exposeTopicLevelMetricsInPrometheus` is set to false.
* `pulsar_storage_backlog_quota_exceeded_evictions_total` will be included as a metric also in namespace aggregation.

# Alternatives

One alternative is to separate the backlog quota check into 2 separate processes, running in their own frequency:
1. Check backlog quota exceeded for all persistent topics. The result will be marked in memory.
If precise time backlog quota was configured then this will the I/O cost as described before.
2. Evict messages for those topics marked.

This *may* enable more frequent updates to the backlog age metric making it more fresh, but the cost associated with it
might be high, since it might result in more frequent I/O calls, especially with many topics.
Another disadvantage is that it makes the backlog check and eviction more complex.

# Links

* Mailing List discussion thread: https://lists.apache.org/thread/xv33xjjzc3t2n06ynz2gmcd4s06ckrqh
* Mailing List voting thread: https://lists.apache.org/thread/x2ypnft3x5jdyyxbwgvzxgcw20o44vps