Skip to content

Commit

Permalink
Put validateTopicOwnershipAsync before validateTopicOperationAsync (
Browse files Browse the repository at this point in the history
apache#15265)

(cherry picked from commit 41f40f0)
  • Loading branch information
Technoboy- authored and codelipenghui committed Apr 28, 2022
1 parent 642159c commit 90caa1c
Showing 1 changed file with 17 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -572,8 +572,9 @@ protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean author

protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative,
boolean force, boolean deleteSchema) {
validateNamespaceOperationAsync(topicName.getNamespaceObject(), NamespaceOperation.DELETE_TOPIC)
.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateNamespaceOperationAsync(topicName.getNamespaceObject(),
NamespaceOperation.DELETE_TOPIC))
.thenCompose(__ -> pulsar().getBrokerService()
.fetchPartitionedTopicMetadataAsync(topicName)
.thenCompose(partitionedMeta -> {
Expand Down Expand Up @@ -963,8 +964,8 @@ protected CompletableFuture<Void> internalSetDeduplicationSnapshotInterval(Integ
}

private void internalUnloadNonPartitionedTopicAsync(AsyncResponse asyncResponse, boolean authoritative) {
validateTopicOperationAsync(topicName, TopicOperation.UNLOAD)
.thenCompose(unused -> validateTopicOwnershipAsync(topicName, authoritative)
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(unused -> validateTopicOperationAsync(topicName, TopicOperation.UNLOAD)
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> topic.close(false))
.thenRun(() -> {
Expand All @@ -982,8 +983,8 @@ private void internalUnloadNonPartitionedTopicAsync(AsyncResponse asyncResponse,
}

private void internalUnloadTransactionCoordinatorAsync(AsyncResponse asyncResponse, boolean authoritative) {
validateTopicOperationAsync(topicName, TopicOperation.UNLOAD)
.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.UNLOAD)
.thenCompose(v -> pulsar()
.getTransactionMetadataStoreService()
.removeTransactionMetadataStore(
Expand Down Expand Up @@ -1040,8 +1041,8 @@ protected void internalGetSubscriptions(AsyncResponse asyncResponse, boolean aut
future = CompletableFuture.completedFuture(null);
}
future.thenCompose(__ ->
validateTopicOperationAsync(topicName, TopicOperation.GET_SUBSCRIPTIONS)
.thenCompose(unused -> validateTopicOwnershipAsync(topicName, authoritative))
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(unused -> validateTopicOperationAsync(topicName, TopicOperation.GET_SUBSCRIPTIONS))
.thenAccept(unused1 -> {
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
Expand Down Expand Up @@ -1774,8 +1775,8 @@ protected void internalSkipMessages(AsyncResponse asyncResponse, String subName,
} else {
future = CompletableFuture.completedFuture(null);
}
future.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.SKIP))
.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.SKIP))
.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false)
.thenCompose(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
Expand Down Expand Up @@ -1902,8 +1903,8 @@ private void internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(Asy
int expireTimeInSeconds,
boolean authoritative) {
// validate ownership and redirect if current broker is not owner
validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES)
.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES))
.thenCompose(__ -> getTopicReferenceAsync(topicName).thenAccept(t -> {
if (t == null) {
resumeAsyncResponseExceptionally(asyncResponse, new RestException(Status.NOT_FOUND,
Expand Down Expand Up @@ -3437,8 +3438,8 @@ protected void internalExpireMessagesByTimestamp(AsyncResponse asyncResponse, St
future = CompletableFuture.completedFuture(null);
}
future.thenCompose(__ ->
validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES)
.thenCompose(unused -> validateTopicOwnershipAsync(topicName, authoritative))
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(unused -> validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES))
.thenCompose(unused2 ->
// If the topic name is a partition name, no need to get partition topic metadata again
getPartitionedTopicMetadataAsync(topicName, authoritative, false)
Expand Down Expand Up @@ -3586,8 +3587,8 @@ protected void internalExpireMessagesByPosition(AsyncResponse asyncResponse, Str
future = CompletableFuture.completedFuture(null);
}

future.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES))
.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES))
.thenCompose(__ -> {
log.info("[{}][{}] received expire messages on subscription {} to position {}", clientAppId(),
topicName, subName, messageId);
Expand Down

0 comments on commit 90caa1c

Please sign in to comment.