Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [broker] Fix compatibility issues for PIP-344 #23136

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -1494,6 +1494,34 @@ public CompletableFuture<Boolean> checkNonPersistentNonPartitionedTopicExists(St
|| actEx instanceof PulsarClientException.TopicDoesNotExistException
|| actEx instanceof PulsarAdminException.NotFoundException) {
return CompletableFuture.completedFuture(false);
} else if (actEx instanceof PulsarClientException.NotSupportedException){
/**
* Summary: For compatibility of
* {@link org.apache.pulsar.client.impl.BinaryProtoLookupService
* #getPartitionedTopicMetadata(TopicName, boolean)}.
*
* Explanation:
* 1. Reason of why getting the error here.
* The feature method above was supported at "3.0.6" and "3.3.1", before that the API
* "getPartitionedTopicMetadata" will trigger a creation for partitioned topic
* metadata automatically even if you just want query it. So the brokers whose version
* is less than "3.0.6" and "3.3.1" do not support the new API.
* 2. The conditions to lead this error occur.
* There are 2 brokers in a cluster, and the version is less than "3.0.1", rolling
* upgrade brokers to "3.0.6". After the first broker restarted, there is one broker
* with version "3.0.6" and another is "3.0.1", and when the internal client tries
* to call "getPartitionedTopicMetadata" to the broker with lower version, it will
* get this error.
* 3. Compatibility
* Rollback to the original behavior before the fix #22838. Without the fix #22838,
* there is an issue that may cause a non-partitioned non-persistent topic and
* a partitioned non-persistent topic with the same name to exist at the same time.
*/
heesung-sn marked this conversation as resolved.
Show resolved Hide resolved
log.warn("{} The versions of the brokers in the same cluster are different( some are"
+ " less than 3.0.6), rollback to the original behavior before the bug fix that"
+ " may cause a non-partitioned non-persistent topic and a partitioned"
+ " non-persistent topic with the same name to exist at the same time.", topic);
lhotari marked this conversation as resolved.
Show resolved Hide resolved
return CompletableFuture.completedFuture(false);
lhotari marked this conversation as resolved.
Show resolved Hide resolved
} else {
log.error("{} Failed to get partition metadata due to redirecting fails", topic, ex);
return CompletableFuture.failedFuture(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,13 @@ private CompletableFuture<Boolean> checkDlqAlreadyExists(String topic) {
|| actEx instanceof PulsarClientException.TopicDoesNotExistException
|| actEx instanceof PulsarAdminException.NotFoundException) {
existsFuture.complete(false);
} else if (actEx instanceof PulsarClientException.NotSupportedException) {
lhotari marked this conversation as resolved.
Show resolved Hide resolved
existsFuture.completeExceptionally(new PulsarClientException.NotSupportedException("There is a bug that"
+ " the Retry/DLQ consumer will still trigger a Retry/DLQ topic with the old rule"
+ " ({namespace}/{subscription}-RETRY/DLQ), but the rule was changed to"
+ " {namespace}/{topic}-{subscription}-RETRY/DLQ after 2.8.0. Please upgrade the brokers' version"
+ " to >=3.0.6 or >=3.3.1; another solution is use HTTP protocol service URL instead of Binary"
+ " protocol service URL when building Pulsar Client"));
lhotari marked this conversation as resolved.
Show resolved Hide resolved
} else {
existsFuture.completeExceptionally(ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,31 @@ private CompletableFuture<Integer> checkPartitions(String topic, boolean forceNo
|| actEx instanceof PulsarClientException.TopicDoesNotExistException
|| actEx instanceof PulsarAdminException.NotFoundException) {
checkPartitions.complete(0);
} else if (actEx instanceof PulsarClientException.NotSupportedException) {
/**
* Summary: For compatibility of
* {@link BinaryProtoLookupService#getPartitionedTopicMetadata(TopicName, boolean)}.
*
* Explanation:
* 1. This error will only occur when using Geo-Replication, and one version of the two cluster is
* larger or equals than "3.0.6" and "3.3.1" and another is smaller than "3.0.6" and "3.3.1".
* 2. Reason of why getting the error here.
* The feature method above was supported at "3.0.6" and "3.3.1", before that the API
* "getPartitionedTopicMetadata" will trigger a creation for partitioned topic
* metadata automatically even if you just want query it. So the brokers whose version
* is less than "3.0.6" and "3.3.1" do not support the new API.
* 3. Compatibility
* Skip the check of comparing of topic's partitions, and force connect to the non-partitioned topic,
* it may cause both partitioned topic and non-partitioned topic to exist at the same time. But this
* is still better than the behavior before the fix #22983, without the fix #22838, there is an issue
* that may cause replication stuck and topics being created in confusion, see more details in
* #22838's motivation.
*/
log.warn("{} {} Since the target cluster does not support to get topic's partitions without"
+ " auto-creation, skip the partitions check. It may cause both partitioned topic and"
+ " non-partitioned topic to exist at the same time, please upgrade clusters to the version"
+ " that >=3.0.6 or >=3.3.1", topic, producerNameForLog);
lhotari marked this conversation as resolved.
Show resolved Hide resolved
checkPartitions.complete(0);
lhotari marked this conversation as resolved.
Show resolved Hide resolved
} else {
checkPartitions.completeExceptionally(ex);
}
Expand Down
Loading