From 8ca01cd42edfd4efd986f752f6f8538ea5bf4f94 Mon Sep 17 00:00:00 2001 From: Jiwei Guo Date: Wed, 17 Apr 2024 18:46:22 +0800 Subject: [PATCH] [improve][admin] Align the auth and check it at the first place for topic related API (#22507) --- .../admin/impl/PersistentTopicsBase.java | 419 ++++++++---------- .../broker/admin/v2/PersistentTopics.java | 44 +- .../pulsar/broker/admin/TopicAuthZTest.java | 257 +++++++++-- 3 files changed, 447 insertions(+), 273 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index ab74b1e2bcc0e..1f8d06571908e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -128,8 +128,6 @@ import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.Policies; -import org.apache.pulsar.common.policies.data.PolicyName; -import org.apache.pulsar.common.policies.data.PolicyOperation; import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; @@ -2727,14 +2725,14 @@ public String toString() { } protected CompletableFuture internalGetMessageIdByTimestampAsync(long timestamp, boolean authoritative) { - CompletableFuture future; - if (topicName.isGlobal()) { - future = validateGlobalNamespaceOwnershipAsync(namespaceName); - } else { - future = CompletableFuture.completedFuture(null); - } - + CompletableFuture future = validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES); return future.thenCompose(__ -> { + if (topicName.isGlobal()) { + return validateGlobalNamespaceOwnershipAsync(namespaceName); + } else { + return CompletableFuture.completedFuture(null); + } + }).thenCompose(__ -> { if (topicName.isPartitioned()) { return CompletableFuture.completedFuture(null); } else { @@ -2748,7 +2746,6 @@ protected CompletableFuture internalGetMessageIdByTimestampAsync(long }); } }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)) - .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES)) .thenCompose(__ -> getTopicReferenceAsync(topicName)) .thenCompose(topic -> { if (!(topic instanceof PersistentTopic)) { @@ -3158,65 +3155,56 @@ protected CompletableFuture> in protected void internalGetBacklogSizeByMessageId(AsyncResponse asyncResponse, MessageIdImpl messageId, boolean authoritative) { - CompletableFuture ret; - // If the topic name is a partition name, no need to get partition topic metadata again - if (!topicName.isPartitioned()) { - ret = getPartitionedTopicMetadataAsync(topicName, authoritative, false) - .thenCompose(topicMetadata -> { - if (topicMetadata.partitions > 0) { - log.warn("[{}] Not supported calculate backlog size operation on partitioned-topic {}", - clientAppId(), topicName); - asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, - "calculate backlog size is not allowed for partitioned-topic")); - } - return CompletableFuture.completedFuture(null); - }); - } else { - ret = CompletableFuture.completedFuture(null); - } - CompletableFuture future; - if (topicName.isGlobal()) { - future = ret.thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(namespaceName)); - } else { - future = ret; - } - future.thenAccept(__ -> validateTopicOwnershipAsync(topicName, authoritative) - .thenCompose(unused -> validateTopicOperationAsync(topicName, - TopicOperation.GET_BACKLOG_SIZE)) - .thenCompose(unused -> getTopicReferenceAsync(topicName)) - .thenAccept(t -> { - PersistentTopic topic = (PersistentTopic) t; - PositionImpl pos = new PositionImpl(messageId.getLedgerId(), - messageId.getEntryId()); - if (topic == null) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, - getTopicNotFoundErrorMessage(topicName.toString()))); - return; - } - ManagedLedgerImpl managedLedger = - (ManagedLedgerImpl) topic.getManagedLedger(); - if (messageId.getLedgerId() == -1) { - asyncResponse.resume(managedLedger.getTotalSize()); - } else { - asyncResponse.resume(managedLedger.getEstimatedBacklogSize(pos)); - } - }).exceptionally(ex -> { - // If the exception is not redirect exception we need to log it. - if (isNot307And404Exception(ex)) { - log.error("[{}] Failed to get backlog size for topic {}", clientAppId(), - topicName, ex); - } - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - })).exceptionally(ex -> { - // If the exception is not redirect exception we need to log it. - if (isNot307And404Exception(ex)) { - log.error("[{}] Failed to validate global namespace ownership " - + "to get backlog size for topic {}", clientAppId(), topicName, ex); - } - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); + CompletableFuture ret = validateTopicOperationAsync(topicName, TopicOperation.GET_BACKLOG_SIZE); + ret.thenCompose(__ -> { + // If the topic name is a partition name, no need to get partition topic metadata again + if (!topicName.isPartitioned()) { + return getPartitionedTopicMetadataAsync(topicName, authoritative, false) + .thenCompose(topicMetadata -> { + if (topicMetadata.partitions > 0) { + log.warn("[{}] Not supported calculate backlog size operation on partitioned-topic {}", + clientAppId(), topicName); + asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, + "calculate backlog size is not allowed for partitioned-topic")); + } + return CompletableFuture.completedFuture(null); + }); + } else { + return CompletableFuture.completedFuture(null); + } + }).thenCompose(__ -> { + if (topicName.isGlobal()) { + return validateGlobalNamespaceOwnershipAsync(namespaceName); + } else { + return CompletableFuture.completedFuture(null); + } + }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)) + .thenCompose(unused -> getTopicReferenceAsync(topicName)) + .thenAccept(t -> { + PersistentTopic topic = (PersistentTopic) t; + PositionImpl pos = new PositionImpl(messageId.getLedgerId(), + messageId.getEntryId()); + if (topic == null) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, + getTopicNotFoundErrorMessage(topicName.toString()))); + return; + } + ManagedLedgerImpl managedLedger = + (ManagedLedgerImpl) topic.getManagedLedger(); + if (messageId.getLedgerId() == -1) { + asyncResponse.resume(managedLedger.getTotalSize()); + } else { + asyncResponse.resume(managedLedger.getEstimatedBacklogSize(pos)); + } + }).exceptionally(ex -> { + // If the exception is not redirect exception we need to log it. + if (!isNot307And404Exception(ex)) { + log.error("[{}] Failed to validate global namespace ownership " + + "to get backlog size for topic {}", clientAppId(), topicName, ex); + } + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } protected CompletableFuture internalSetBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType, @@ -3224,8 +3212,7 @@ protected CompletableFuture internalSetBacklogQuota(BacklogQuota.BacklogQu BacklogQuota.BacklogQuotaType finalBacklogQuotaType = backlogQuotaType == null ? BacklogQuota.BacklogQuotaType.destination_storage : backlogQuotaType; - return validateTopicPolicyOperationAsync(topicName, PolicyName.BACKLOG, PolicyOperation.WRITE) - .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + return validatePoliciesReadOnlyAccessAsync() .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, isGlobal)) .thenCompose(op -> { TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); @@ -3266,9 +3253,7 @@ protected CompletableFuture internalSetBacklogQuota(BacklogQuota.BacklogQu } protected CompletableFuture internalSetReplicationClusters(List clusterIds) { - - return validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION, PolicyOperation.WRITE) - .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + return validatePoliciesReadOnlyAccessAsync() .thenCompose(__ -> { if (CollectionUtils.isEmpty(clusterIds)) { throw new RestException(Status.PRECONDITION_FAILED, "ClusterIds should not be null or empty"); @@ -3306,22 +3291,21 @@ protected CompletableFuture internalSetReplicationClusters(List cl } protected CompletableFuture internalRemoveReplicationClusters() { - return validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION, PolicyOperation.WRITE) - .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) - .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op -> { - TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - topicPolicies.setReplicationClusters(null); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies) - .thenRun(() -> { - log.info("[{}] Successfully set replication clusters for namespace={}, " - + "topic={}, clusters={}", - clientAppId(), - namespaceName, - topicName.getLocalName(), - topicPolicies.getReplicationClusters()); - }); - }) - ); + return validatePoliciesReadOnlyAccessAsync() + .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName)) + .thenCompose(op -> { + TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); + topicPolicies.setReplicationClusters(null); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies) + .thenRun(() -> { + log.info("[{}] Successfully set replication clusters for namespace={}, " + + "topic={}, clusters={}", + clientAppId(), + namespaceName, + topicName.getLocalName(), + topicPolicies.getReplicationClusters()); + }); + }); } protected CompletableFuture internalGetDeduplication(boolean applied, boolean isGlobal) { @@ -3683,29 +3667,29 @@ protected CompletableFuture internalTerminateAsync(boolean authoritat "Termination of a system topic is not allowed")); } - CompletableFuture ret; - if (topicName.isGlobal()) { - ret = validateGlobalNamespaceOwnershipAsync(namespaceName); - } else { - ret = CompletableFuture.completedFuture(null); - } - return ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)) - .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.TERMINATE)) - .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false)) - .thenAccept(partitionMetadata -> { - if (partitionMetadata.partitions > 0) { - throw new RestException(Status.METHOD_NOT_ALLOWED, - "Termination of a partitioned topic is not allowed"); - } - }) - .thenCompose(__ -> getTopicReferenceAsync(topicName)) - .thenCompose(topic -> { - if (!(topic instanceof PersistentTopic)) { - throw new RestException(Status.METHOD_NOT_ALLOWED, - "Termination of a non-persistent topic is not allowed"); - } - return ((PersistentTopic) topic).terminate(); - }); + CompletableFuture ret = validateTopicOperationAsync(topicName, TopicOperation.TERMINATE); + return ret.thenCompose(__ -> { + if (topicName.isGlobal()) { + return validateGlobalNamespaceOwnershipAsync(namespaceName); + } else { + return CompletableFuture.completedFuture(null); + } + }).thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)) + .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false)) + .thenAccept(partitionMetadata -> { + if (partitionMetadata.partitions > 0) { + throw new RestException(Status.METHOD_NOT_ALLOWED, + "Termination of a partitioned topic is not allowed"); + } + }) + .thenCompose(__ -> getTopicReferenceAsync(topicName)) + .thenCompose(topic -> { + if (!(topic instanceof PersistentTopic)) { + throw new RestException(Status.METHOD_NOT_ALLOWED, + "Termination of a non-persistent topic is not allowed"); + } + return ((PersistentTopic) topic).terminate(); + }); } protected void internalTerminatePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) { @@ -3716,73 +3700,63 @@ protected void internalTerminatePartitionedTopic(AsyncResponse asyncResponse, bo asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, msg)); return; } + CompletableFuture future = validateTopicOperationAsync(topicName, TopicOperation.TERMINATE); + future.thenCompose(__ -> { + if (topicName.isGlobal()) { + return validateGlobalNamespaceOwnershipAsync(namespaceName); + } else { + return CompletableFuture.completedFuture(null); + } + }).thenCompose(unused -> getPartitionedTopicMetadataAsync(topicName, authoritative, false)) + .thenAccept(partitionMetadata -> { + if (partitionMetadata.partitions == 0) { + String msg = "Termination of a non-partitioned topic is not allowed using partitioned-terminate" + + ", please use terminate commands"; + log.error("[{}] [{}] {}", clientAppId(), topicName, msg); + asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, msg)); + return; + } + if (partitionMetadata.partitions > 0) { + Map messageIds = new ConcurrentHashMap<>(partitionMetadata.partitions); + final List> futures = + new ArrayList<>(partitionMetadata.partitions); - CompletableFuture future; - if (topicName.isGlobal()) { - future = validateGlobalNamespaceOwnershipAsync(namespaceName); - } else { - future = CompletableFuture.completedFuture(null); - } - - future.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.TERMINATE) - .thenCompose(unused -> getPartitionedTopicMetadataAsync(topicName, authoritative, false)) - .thenAccept(partitionMetadata -> { - if (partitionMetadata.partitions == 0) { - String msg = "Termination of a non-partitioned topic is not allowed using partitioned-terminate" - + ", please use terminate commands"; - log.error("[{}] [{}] {}", clientAppId(), topicName, msg); - asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED, msg)); - return; + for (int i = 0; i < partitionMetadata.partitions; i++) { + TopicName topicNamePartition = topicName.getPartition(i); + try { + int finalI = i; + futures.add(pulsar().getAdminClient().topics() + .terminateTopicAsync(topicNamePartition.toString()) + .whenComplete((messageId, throwable) -> { + if (throwable != null) { + log.error("[{}] Failed to terminate topic {}", clientAppId(), + topicNamePartition, throwable); + asyncResponse.resume(new RestException(throwable)); + } + messageIds.put(finalI, messageId); + })); + } catch (Exception e) { + log.error("[{}] Failed to terminate topic {}", clientAppId(), topicNamePartition, + e); + throw new RestException(e); } - if (partitionMetadata.partitions > 0) { - Map messageIds = new ConcurrentHashMap<>(partitionMetadata.partitions); - final List> futures = - new ArrayList<>(partitionMetadata.partitions); - - for (int i = 0; i < partitionMetadata.partitions; i++) { - TopicName topicNamePartition = topicName.getPartition(i); - try { - int finalI = i; - futures.add(pulsar().getAdminClient().topics() - .terminateTopicAsync(topicNamePartition.toString()) - .whenComplete((messageId, throwable) -> { - if (throwable != null) { - log.error("[{}] Failed to terminate topic {}", clientAppId(), - topicNamePartition, throwable); - asyncResponse.resume(new RestException(throwable)); - } - messageIds.put(finalI, messageId); - })); - } catch (Exception e) { - log.error("[{}] Failed to terminate topic {}", clientAppId(), topicNamePartition, - e); - throw new RestException(e); - } + } + FutureUtil.waitForAll(futures).handle((result, exception) -> { + if (exception != null) { + Throwable t = exception.getCause(); + if (t instanceof NotFoundException) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, + getTopicNotFoundErrorMessage(topicName.toString()))); + } else { + log.error("[{}] Failed to terminate topic {}", clientAppId(), topicName, t); + asyncResponse.resume(new RestException(t)); } - FutureUtil.waitForAll(futures).handle((result, exception) -> { - if (exception != null) { - Throwable t = exception.getCause(); - if (t instanceof NotFoundException) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, - getTopicNotFoundErrorMessage(topicName.toString()))); - } else { - log.error("[{}] Failed to terminate topic {}", clientAppId(), topicName, t); - asyncResponse.resume(new RestException(t)); - } - } - asyncResponse.resume(messageIds); - return null; - }); } - }).exceptionally(ex -> { - // If the exception is not redirect exception we need to log it. - if (isNot307And404Exception(ex)) { - log.error("[{}] Failed to terminate topic {}", clientAppId(), topicName, ex); - } - resumeAsyncResponseExceptionally(asyncResponse, ex); + asyncResponse.resume(messageIds); return null; - }) - ).exceptionally(ex -> { + }); + } + }).exceptionally(ex -> { // If the exception is not redirect exception we need to log it. if (isNot307And404Exception(ex)) { log.error("[{}] Failed to terminate topic {}", clientAppId(), topicName, ex); @@ -4186,16 +4160,16 @@ protected void internalTriggerCompactionNonPartitionedTopic(AsyncResponse asyncR } protected CompletableFuture internalCompactionStatusAsync(boolean authoritative) { - return validateTopicOwnershipAsync(topicName, authoritative) - .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.COMPACT)) + return validateTopicOperationAsync(topicName, TopicOperation.COMPACT) + .thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)) .thenCompose(__ -> getTopicReferenceAsync(topicName)) .thenApply(topic -> ((PersistentTopic) topic).compactionStatus()); } protected void internalTriggerOffload(AsyncResponse asyncResponse, boolean authoritative, MessageIdImpl messageId) { - validateTopicOwnershipAsync(topicName, authoritative) - .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.OFFLOAD)) + validateTopicOperationAsync(topicName, TopicOperation.OFFLOAD) + .thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)) .thenCompose(__ -> getTopicReferenceAsync(topicName)) .thenAccept(topic -> { try { @@ -4221,8 +4195,8 @@ protected void internalTriggerOffload(AsyncResponse asyncResponse, } protected void internalOffloadStatus(AsyncResponse asyncResponse, boolean authoritative) { - validateTopicOwnershipAsync(topicName, authoritative) - .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.OFFLOAD)) + validateTopicOperationAsync(topicName, TopicOperation.OFFLOAD) + .thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)) .thenCompose(__ -> getTopicReferenceAsync(topicName)) .thenAccept(topic -> { OffloadProcessStatus offloadProcessStatus = ((PersistentTopic) topic).offloadStatus(); @@ -4482,8 +4456,8 @@ private CompletableFuture validateNonPartitionTopicNameAsync(String topicN } protected void internalGetLastMessageId(AsyncResponse asyncResponse, boolean authoritative) { - validateTopicOwnershipAsync(topicName, authoritative) - .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES)) + validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES) + .thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)) .thenCompose(__ -> getTopicReferenceAsync(topicName)) .thenAccept(topic -> { if (topic == null) { @@ -5207,33 +5181,27 @@ private void internalGetReplicatedSubscriptionStatusForNonPartitionedTopic( } protected CompletableFuture internalGetSchemaCompatibilityStrategy(boolean applied) { - CompletableFuture future = validateTopicPolicyOperationAsync(topicName, - PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ); if (applied) { - return future.thenCompose(__ -> getSchemaCompatibilityStrategyAsync()); + return getSchemaCompatibilityStrategyAsync(); } - return future - .thenCompose(n -> getTopicPoliciesAsyncWithRetry(topicName).thenApply(op -> { + return getTopicPoliciesAsyncWithRetry(topicName).thenApply(op -> { if (!op.isPresent()) { return null; } SchemaCompatibilityStrategy strategy = op.get().getSchemaCompatibilityStrategy(); return SchemaCompatibilityStrategy.isUndefined(strategy) ? null : strategy; - })); + }); } protected CompletableFuture internalSetSchemaCompatibilityStrategy(SchemaCompatibilityStrategy strategy) { - return validateTopicPolicyOperationAsync(topicName, - PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, - PolicyOperation.WRITE) - .thenCompose((__) -> getTopicPoliciesAsyncWithRetry(topicName) + return getTopicPoliciesAsyncWithRetry(topicName) .thenCompose(op -> { TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); topicPolicies.setSchemaCompatibilityStrategy( strategy == SchemaCompatibilityStrategy.UNDEFINED ? null : strategy); return pulsar().getTopicPoliciesService() .updateTopicPoliciesAsync(topicName, topicPolicies); - })); + }); } protected CompletableFuture internalGetSchemaValidationEnforced(boolean applied) { @@ -5257,54 +5225,47 @@ protected CompletableFuture internalSetSchemaValidationEnforced(boolean sc } protected CompletableFuture internalGetEntryFilters(boolean applied, boolean isGlobal) { - return validateTopicPolicyOperationAsync(topicName, PolicyName.ENTRY_FILTERS, PolicyOperation.READ) - .thenCompose(__ -> { - if (!applied) { - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenApply(op -> op.map(TopicPolicies::getEntryFilters).orElse(null)); - } - if (!pulsar().getConfiguration().isAllowOverrideEntryFilters()) { - return CompletableFuture.completedFuture(new EntryFilters(String.join(",", - pulsar().getConfiguration().getEntryFilterNames()))); + if (!applied) { + return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) + .thenApply(op -> op.map(TopicPolicies::getEntryFilters).orElse(null)); + } + if (!pulsar().getConfiguration().isAllowOverrideEntryFilters()) { + return CompletableFuture.completedFuture(new EntryFilters(String.join(",", + pulsar().getConfiguration().getEntryFilterNames()))); + } + return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) + .thenApply(op -> op.map(TopicPolicies::getEntryFilters)) + .thenCompose(policyEntryFilters -> { + if (policyEntryFilters.isPresent()) { + return CompletableFuture.completedFuture(policyEntryFilters.get()); } - return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) - .thenApply(op -> op.map(TopicPolicies::getEntryFilters)) - .thenCompose(policyEntryFilters -> { - if (policyEntryFilters.isPresent()) { - return CompletableFuture.completedFuture(policyEntryFilters.get()); + return getNamespacePoliciesAsync(namespaceName) + .thenApply(policies -> policies.entryFilters) + .thenCompose(nsEntryFilters -> { + if (nsEntryFilters != null) { + return CompletableFuture.completedFuture(nsEntryFilters); } - return getNamespacePoliciesAsync(namespaceName) - .thenApply(policies -> policies.entryFilters) - .thenCompose(nsEntryFilters -> { - if (nsEntryFilters != null) { - return CompletableFuture.completedFuture(nsEntryFilters); - } - return CompletableFuture.completedFuture(new EntryFilters(String.join(",", - pulsar().getConfiguration().getEntryFilterNames()))); - }); + return CompletableFuture.completedFuture(new EntryFilters(String.join(",", + pulsar().getConfiguration().getEntryFilterNames()))); }); }); } protected CompletableFuture internalSetEntryFilters(EntryFilters entryFilters, boolean isGlobal) { - - return validateTopicPolicyOperationAsync(topicName, PolicyName.ENTRY_FILTERS, PolicyOperation.WRITE) - .thenAccept(__ -> validateEntryFilters(entryFilters)) - .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, isGlobal) + validateEntryFilters(entryFilters); + return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) .thenCompose(op -> { TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); topicPolicies.setEntryFilters(entryFilters); topicPolicies.setIsGlobal(isGlobal); return pulsar().getTopicPoliciesService() .updateTopicPoliciesAsync(topicName, topicPolicies); - })); + }); } protected CompletableFuture internalRemoveEntryFilters(boolean isGlobal) { - return validateTopicPolicyOperationAsync(topicName, PolicyName.ENTRY_FILTERS, PolicyOperation.WRITE) - .thenCompose(__ -> - getTopicPoliciesAsyncWithRetry(topicName, isGlobal) + return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) .thenCompose(op -> { if (!op.isPresent()) { return CompletableFuture.completedFuture(null); @@ -5312,7 +5273,7 @@ protected CompletableFuture internalRemoveEntryFilters(boolean isGlobal) { op.get().setEntryFilters(null); op.get().setIsGlobal(isGlobal); return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get()); - })); + }); } protected CompletableFuture validateShadowTopics(List shadowTopics) { @@ -5348,8 +5309,7 @@ protected CompletableFuture internalSetShadowTopic(List shadowTopi return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, "Cannot specify empty shadow topics, please use remove command instead.")); } - return validateTopicPolicyOperationAsync(topicName, PolicyName.SHADOW_TOPIC, PolicyOperation.WRITE) - .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + return validatePoliciesReadOnlyAccessAsync() .thenCompose(__ -> validateShadowTopics(shadowTopics)) .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName)) .thenCompose(op -> { @@ -5361,8 +5321,7 @@ protected CompletableFuture internalSetShadowTopic(List shadowTopi } protected CompletableFuture internalDeleteShadowTopics() { - return validateTopicPolicyOperationAsync(topicName, PolicyName.SHADOW_TOPIC, PolicyOperation.WRITE) - .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + return validatePoliciesReadOnlyAccessAsync() .thenCompose(shadowTopicName -> getTopicPoliciesAsyncWithRetry(topicName)) .thenCompose(op -> { TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 90f0208c81cd6..7e138442ae228 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -2149,7 +2149,8 @@ public void setBacklogQuota( @QueryParam("backlogQuotaType") BacklogQuotaType backlogQuotaType, @ApiParam(value = "backlog quota policies for the specified topic") BacklogQuotaImpl backlogQuota) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.BACKLOG, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetBacklogQuota(backlogQuotaType, backlogQuota, isGlobal)) .thenRun(() -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { @@ -2174,7 +2175,8 @@ public void removeBacklogQuota(@Suspended final AsyncResponse asyncResponse, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.BACKLOG, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetBacklogQuota(backlogQuotaType, null, isGlobal)) .thenRun(() -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { @@ -2237,7 +2239,8 @@ public void setReplicationClusters( @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @ApiParam(value = "List of replication clusters", required = true) List clusterIds) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetReplicationClusters(clusterIds)) .thenRun(() -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { @@ -2260,7 +2263,8 @@ public void removeReplicationClusters(@Suspended final AsyncResponse asyncRespon @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.REPLICATION, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalRemoveReplicationClusters()) .thenRun(() -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { @@ -4405,8 +4409,8 @@ public void getSchemaCompatibilityStrategy( @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.READ) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__-> internalGetSchemaCompatibilityStrategy(applied)) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { @@ -4436,8 +4440,8 @@ public void setSchemaCompatibilityStrategy( @ApiParam(value = "Strategy used to check the compatibility of new schema") SchemaCompatibilityStrategy strategy) { validateTopicName(tenant, namespace, encodedTopic); - - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetSchemaCompatibilityStrategy(strategy)) .thenRun(() -> { log.info( @@ -4476,8 +4480,8 @@ public void removeSchemaCompatibilityStrategy( @ApiParam(value = "Strategy used to check the compatibility of new schema") SchemaCompatibilityStrategy strategy) { validateTopicName(tenant, namespace, encodedTopic); - - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetSchemaCompatibilityStrategy(null)) .thenRun(() -> { log.info( @@ -4568,7 +4572,8 @@ public void getEntryFilters(@Suspended AsyncResponse asyncResponse, + "broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.ENTRY_FILTERS, PolicyOperation.READ) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalGetEntryFilters(applied, isGlobal)) .thenAccept(asyncResponse::resume) .exceptionally(ex -> { @@ -4596,7 +4601,8 @@ public void setEntryFilters(@Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Entry filters for the specified topic") EntryFilters entryFilters) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.ENTRY_FILTERS, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetEntryFilters(entryFilters, isGlobal)) .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { @@ -4622,7 +4628,8 @@ public void removeEntryFilters(@Suspended final AsyncResponse asyncResponse, + "call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.ENTRY_FILTERS, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalRemoveEntryFilters(isGlobal)) .thenRun(() -> { log.info( @@ -4655,9 +4662,8 @@ public void getShadowTopics( @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) - .thenCompose(__ -> validateTopicPolicyOperationAsync(topicName, PolicyName.SHADOW_TOPIC, - PolicyOperation.READ)) + validateTopicPolicyOperationAsync(topicName, PolicyName.SHADOW_TOPIC, PolicyOperation.READ) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName)) .thenAccept(op -> asyncResponse.resume(op.map(TopicPolicies::getShadowTopics).orElse(null))) .exceptionally(ex -> { @@ -4684,7 +4690,8 @@ public void setShadowTopics( @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @ApiParam(value = "List of shadow topics", required = true) List shadowTopics) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.SHADOW_TOPIC, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalSetShadowTopic(shadowTopics)) .thenRun(() -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { @@ -4710,7 +4717,8 @@ public void deleteShadowTopics( @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(authoritative) + validateTopicPolicyOperationAsync(topicName, PolicyName.SHADOW_TOPIC, PolicyOperation.WRITE) + .thenCompose(__ -> preValidation(authoritative)) .thenCompose(__ -> internalDeleteShadowTopics()) .thenRun(() -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java index e6ff0ce2bb43a..3c0596d531f41 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAuthZTest.java @@ -19,48 +19,54 @@ package org.apache.pulsar.broker.admin; +import com.google.common.collect.Lists; import io.jsonwebtoken.Jwts; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import lombok.Cleanup; import lombok.SneakyThrows; import org.apache.commons.lang3.reflect.FieldUtils; -import org.apache.pulsar.broker.authorization.AuthorizationService; +import org.apache.pulsar.broker.service.plugin.EntryFilterDefinition; +import org.apache.pulsar.broker.service.plugin.EntryFilterProvider; +import org.apache.pulsar.broker.service.plugin.EntryFilterTest; +import org.apache.pulsar.broker.testcontext.MockEntryFilterProvider; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.auth.AuthenticationToken; -import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.AuthAction; -import org.apache.pulsar.common.policies.data.NamespaceOperation; +import org.apache.pulsar.common.policies.data.EntryFilters; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.security.MockedPulsarStandalone; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.pulsar.broker.authorization.AuthorizationService; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.transaction.Transaction; +import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.metadata.api.MetadataStoreException; -import org.apache.pulsar.security.MockedPulsarStandalone; import org.mockito.Mockito; -import org.testng.Assert; -import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; @Test(groups = "broker-admin") public class TopicAuthZTest extends MockedPulsarStandalone { @@ -1105,15 +1111,15 @@ public void testExamineMessage() { deleteTopic(topic, false); } - @Test(dataProvider = "partitioned", groups = "flaky") + @Test @SneakyThrows - public void testExpireMessage(boolean partitioned) { + public void testExpireMessage() { final String random = UUID.randomUUID().toString(); final String topic = "persistent://public/default/" + random; final String subject = UUID.randomUUID().toString(); final String token = Jwts.builder() .claim("sub", subject).signWith(SECRET_KEY).compact(); - createTopic(topic, partitioned); + createTopic(topic, false); @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() .serviceHttpUrl(getPulsarService().getWebServiceAddress()) @@ -1153,7 +1159,7 @@ public void testExpireMessage(boolean partitioned) { } superUserAdmin.topics().revokePermissions(topic, subject); } - deleteTopic(topic, partitioned); + deleteTopic(topic, false); } @Test @@ -1373,6 +1379,37 @@ public Object[][] authFunction () throws Exception { }; } + @Test + @SneakyThrows + public void testSchemaCompatibility() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + createTopic(topic, false); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + superUserAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, true); + + // test tenant manager + tenantManagerAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, true); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false)); + + for (AuthAction action : AuthAction.values()) { + superUserAdmin.topics().grantPermission(topic, subject, Set.of(action)); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().getSchemaCompatibilityStrategy(topic, false)); + superUserAdmin.topics().revokePermissions(topic, subject); + } + deleteTopic(topic, false); + } + @Test(dataProvider = "authFunction") public void testSchemaAndTransactionAuthorization(ThrowingBiConsumer adminConsumer, OperationAuthType topicOpType) throws Exception { @@ -1380,6 +1417,7 @@ public void testSchemaAndTransactionAuthorization(ThrowingBiConsumer subAdmin.topicPolicies().getEntryFiltersPerTopic(topic, false)); + + for (AuthAction action : AuthAction.values()) { + superUserAdmin.topics().grantPermission(topic, subject, Set.of(action)); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().getEntryFiltersPerTopic(topic, false)); + superUserAdmin.topics().revokePermissions(topic, subject); + } + deleteTopic(topic, false); + } + + @Test + @SneakyThrows + public void testSetEntryFilter() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + createTopic(topic, false); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + // + final EntryFilterProvider oldEntryFilterProvider = getPulsarService().getBrokerService().getEntryFilterProvider(); + @Cleanup + final MockEntryFilterProvider testEntryFilterProvider = + new MockEntryFilterProvider(getServiceConfiguration()); + + testEntryFilterProvider + .setMockEntryFilters(new EntryFilterDefinition( + "test", + null, + EntryFilterTest.class.getName() + )); + FieldUtils.writeField(getPulsarService().getBrokerService(), + "entryFilterProvider", testEntryFilterProvider, true); + final EntryFilters entryFilter = new EntryFilters("test"); + superUserAdmin.topicPolicies().setEntryFiltersPerTopic(topic, entryFilter); + + // test tenant manager + tenantManagerAdmin.topicPolicies().setEntryFiltersPerTopic(topic, entryFilter); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().setEntryFiltersPerTopic(topic, entryFilter)); + + for (AuthAction action : AuthAction.values()) { + superUserAdmin.topics().grantPermission(topic, subject, Set.of(action)); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().setEntryFiltersPerTopic(topic, entryFilter)); + superUserAdmin.topics().revokePermissions(topic, subject); + } + deleteTopic(topic, false); + FieldUtils.writeField(getPulsarService().getBrokerService(), + "entryFilterProvider", oldEntryFilterProvider, true); + } + + @Test + @SneakyThrows + public void testRemoveEntryFilter() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + createTopic(topic, false); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + final EntryFilterProvider oldEntryFilterProvider = getPulsarService().getBrokerService().getEntryFilterProvider(); + @Cleanup + final MockEntryFilterProvider testEntryFilterProvider = + new MockEntryFilterProvider(getServiceConfiguration()); + + testEntryFilterProvider + .setMockEntryFilters(new EntryFilterDefinition( + "test", + null, + EntryFilterTest.class.getName() + )); + FieldUtils.writeField(getPulsarService().getBrokerService(), + "entryFilterProvider", testEntryFilterProvider, true); + final EntryFilters entryFilter = new EntryFilters("test"); + superUserAdmin.topicPolicies().removeEntryFiltersPerTopic(topic); + // test tenant manager + tenantManagerAdmin.topicPolicies().removeEntryFiltersPerTopic(topic); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().removeEntryFiltersPerTopic(topic)); + + for (AuthAction action : AuthAction.values()) { + superUserAdmin.topics().grantPermission(topic, subject, Set.of(action)); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topicPolicies().removeEntryFiltersPerTopic(topic)); + superUserAdmin.topics().revokePermissions(topic, subject); + } + deleteTopic(topic, false); + FieldUtils.writeField(getPulsarService().getBrokerService(), + "entryFilterProvider", oldEntryFilterProvider, true); + } + + @Test + @SneakyThrows + public void testShadowTopic() { + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://public/default/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + createTopic(topic, false); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + + String shadowTopic = topic + "-shadow-topic"; + superUserAdmin.topics().createShadowTopic(shadowTopic, topic); + superUserAdmin.topics().setShadowTopics(topic, Lists.newArrayList(shadowTopic)); + superUserAdmin.topics().getShadowTopics(topic); + superUserAdmin.topics().removeShadowTopics(topic); + + + // test tenant manager + tenantManagerAdmin.topics().setShadowTopics(topic, Lists.newArrayList(shadowTopic)); + tenantManagerAdmin.topics().getShadowTopics(topic); + tenantManagerAdmin.topics().removeShadowTopics(topic); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().setShadowTopics(topic, Lists.newArrayList(shadowTopic))); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().getShadowTopics(topic)); + + for (AuthAction action : AuthAction.values()) { + superUserAdmin.topics().grantPermission(topic, subject, Set.of(action)); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().setShadowTopics(topic, Lists.newArrayList(shadowTopic))); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.topics().getShadowTopics(topic)); + superUserAdmin.topics().revokePermissions(topic, subject); + } + deleteTopic(topic, false); + } + private void createTopic(String topic, boolean partitioned) throws Exception { if (partitioned) { superUserAdmin.topics().createPartitionedTopic(topic, 2);