Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][pip] PIP-392: Enable consistent hashing to select active consumer in partitioned topic for failover subscription #23583

Merged
merged 5 commits into from
Nov 20, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions pip/pip-392.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# PIP-392: Add configuration to enable consistent hashing to select active consumer for partitioned topic

# Background knowledge

After [#19502](https://github.com/apache/pulsar/pull/19502) will use consistent hashing to select active consumer for non-partitioned topic

# Motivation

Currently, for partitioned topics, the active consumer is selected using the formula [partitionedIndex % consumerSize](https://github.com/apache/pulsar/blob/137df29f85798b00de75460a1acb91c7bc25453f/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java#L129-L130).
This method can lead to uneven distribution of active consumers.

Consider a scenario with 100 topics named `public/default/topic-{0~100}`, each having `one partition`.
If 10 consumers are created using a `regex` subscription with the `Failover type`, all topic will be assigned to the same consumer(the first connected consumer). This results in an imbalanced distribution of consumers.

# Goals

## In Scope
- Address the issue of imbalance for `failover` subscription type consumers in single-partition or few-partition topics.
lhotari marked this conversation as resolved.
Show resolved Hide resolved

## Out of Scope
- Excluding the `exclusive` subscription type.

It's important to note that both the `modulo algorithm` and the `consistent hashing algorithm` can cause the consumer to be transferred.
This might result in messages being delivered multiple times to consumers, which is a known issue and has been mentioned in the documentation.
https://pulsar.apache.org/docs/4.0.x/concepts-messaging/#failover

# High Level Design
The solution involves adding a configuration setting that allows users to enable consistent hashing for partitioned topics.
When enabled, the consumer selection process will use consistent hashing instead of the modulo operation.

The algorithm already exists through [#19502](https://github.com/apache/pulsar/pull/19502)

In simple terms, the hash algorithm includes the following steps:

1. Hash Ring Creation: Traverse all consumers and use `consumer name` to calculate a hash ring with 100 virtual nodes.
shibd marked this conversation as resolved.
Show resolved Hide resolved

[Exist code](https://github.com/apache/pulsar/blob/1b1bd4b610dd768a6908964ef841a6790bb0f4f0/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java#L152-L162)
```java
private NavigableMap<Integer, Integer> makeHashRing(int consumerSize) {
NavigableMap<Integer, Integer> hashRing = new TreeMap<>();
for (int i = 0; i < consumerSize; i++) {
for (int j = 0; j < CONSUMER_CONSISTENT_HASH_REPLICAS; j++) {
String key = consumers.get(i).consumerName() + j;
int hash = Murmur3_32Hash.getInstance().makeHash(key.getBytes());
hashRing.put(hash, i);
}
}
return Collections.unmodifiableNavigableMap(hashRing);
}
```

2. Consumer Selection: Use the hash of the topic name to select the matching consumer from the hash ring.

[Exist code](https://github.com/apache/pulsar/blob/1b1bd4b610dd768a6908964ef841a6790bb0f4f0/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java#L146-L150)
```java
private int peekConsumerIndexFromHashRing(NavigableMap<Integer, Integer> hashRing) {
int hash = Murmur3Hash32.getInstance().makeHash(topicName);
Map.Entry<Integer, Integer> ceilingEntry = hashRing.ceilingEntry(hash);
return ceilingEntry != null ? ceilingEntry.getValue() : hashRing.firstEntry().getValue();
}
```

This approach ensures a more even distribution of active consumers across topics, improving load balancing and resource utilization.

# Detailed Design

## Design & Implementation Details
Refer to implementation PR: https://github.com/apache/pulsar/pull/23584

The implementation is simple. If this activeConsumerFailoverConsistentHashing is enabled, the consistent hashing algorithm is used regardless of whether the topic is partitioned.

## Public-facing Changes

If activeConsumerFailoverConsistentHashing is enabled, when users use the failover subscription model,
the `first consumer` will not necessarily consume `P1`, and the `second consumer` will not necessarily consume `P2`.

As described in the documentation:: https://pulsar.apache.org/docs/4.0.x/concepts-messaging/#failover--partitioned-topics

Instead, the hash algorithm will determine which consumer consumes which partition.

### Configuration

A new configuration field will be added:

```java
@FieldContext(
category = CATEGORY_POLICIES,
doc = "Enable consistent hashing for selecting the active consumer in partitioned "
+ "topics with Failover subscription type. "
+ "For non-partitioned topics, consistent hashing is used by default."
)
private boolean activeConsumerFailoverConsistentHashing = false;
```


# Backward & Forward Compatibility
The default value is false to keep original behavior.
Loading