Skip to content

Commit

Permalink
[improve][broker] Support get/remove permissions for AuthorizationPro…
Browse files Browse the repository at this point in the history
…vider (#20496)
  • Loading branch information
Technoboy- authored Jul 10, 2023
1 parent c749872 commit e96b339
Show file tree
Hide file tree
Showing 8 changed files with 212 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -428,4 +429,48 @@ default Boolean allowTopicPolicyOperation(TopicName topicName,
throw new RestException(e.getCause());
}
}

/**
* Remove authorization-action permissions on a topic.
* @param topicName
* @return CompletableFuture<Void>
*/
default CompletableFuture<Void> removePermissionsAsync(TopicName topicName) {
return FutureUtil.failedFuture(new IllegalStateException(
String.format("removePermissionsAsync on topicName %s is not supported by the Authorization",
topicName)));
}

/**
* Get authorization-action permissions on a topic.
* @param topicName
* @return CompletableFuture<Map<String, Set<AuthAction>>>
*/
default CompletableFuture<Map<String, Set<AuthAction>>> getPermissionsAsync(TopicName topicName) {
return FutureUtil.failedFuture(new IllegalStateException(
String.format("getPermissionsAsync on topicName %s is not supported by the Authorization",
topicName)));
}

/**
* Get authorization-action permissions on a topic.
* @param namespaceName
* @return CompletableFuture<Map<String, Set<String>>>
*/
default CompletableFuture<Map<String, Set<String>>> getSubscriptionPermissionsAsync(NamespaceName namespaceName) {
return FutureUtil.failedFuture(new IllegalStateException(
String.format("getSubscriptionPermissionsAsync on namespace %s is not supported by the Authorization",
namespaceName)));
}

/**
* Get authorization-action permissions on a namespace.
* @param namespaceName
* @return CompletableFuture<Map<String, Set<AuthAction>>>
*/
default CompletableFuture<Map<String, Set<AuthAction>>> getPermissionsAsync(NamespaceName namespaceName) {
return FutureUtil.failedFuture(new IllegalStateException(
String.format("getPermissionsAsync on namespaceName %s is not supported by the Authorization",
namespaceName)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static java.util.concurrent.TimeUnit.SECONDS;
import java.net.SocketAddress;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -815,4 +816,20 @@ public Boolean allowTopicOperation(TopicName topicName,
throw new RestException(e.getCause());
}
}

public CompletableFuture<Void> removePermissionsAsync(TopicName topicName) {
return provider.removePermissionsAsync(topicName);
}

public CompletableFuture<Map<String, Set<AuthAction>>> getPermissionsAsync(TopicName topicName) {
return provider.getPermissionsAsync(topicName);
}

public CompletableFuture<Map<String, Set<AuthAction>>> getPermissionsAsync(NamespaceName namespaceName) {
return provider.getPermissionsAsync(namespaceName);
}

public CompletableFuture<Map<String, Set<String>>> getSubscriptionPermissionsAsync(NamespaceName namespaceName) {
return provider.getSubscriptionPermissionsAsync(namespaceName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static java.util.Objects.requireNonNull;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -34,6 +35,7 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AuthPolicies;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
Expand Down Expand Up @@ -641,4 +643,131 @@ public CompletableFuture<Boolean> validateTenantAdminAccess(String tenantName, S
});
}

@Override
public CompletableFuture<Void> removePermissionsAsync(TopicName topicName) {
return getPoliciesReadOnlyAsync().thenCompose(readonly -> {
if (readonly) {
if (log.isDebugEnabled()) {
log.debug("Policies are read-only. Broker cannot do read-write operations");
}
throw new IllegalStateException("policies are in readonly mode");
}
return pulsarResources.getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject())
.thenCompose(policies -> {
if (!policies.isPresent()
|| !policies.get().auth_policies.getTopicAuthentication()
.containsKey(topicName.toString())) {
return CompletableFuture.completedFuture(null);
}
return pulsarResources.getNamespaceResources().
setPoliciesAsync(topicName.getNamespaceObject(), policies2 -> {
policies2.auth_policies.getTopicAuthentication().remove(topicName.toString());
return policies2;
}).whenComplete((__, ex) -> {
if (ex != null) {
log.error("Failed to remove permissions on topic {}", topicName, ex);
} else {
log.info("Successfully remove permissions on topic {}", topicName);
}
});
});
});
}

@Override
public CompletableFuture<Map<String, Set<AuthAction>>> getPermissionsAsync(TopicName topicName) {
return getPoliciesReadOnlyAsync().thenCompose(readonly -> {
if (readonly) {
if (log.isDebugEnabled()) {
log.debug("Policies are read-only. Broker cannot do read-write operations");
}
throw new IllegalStateException("policies are in readonly mode");
}
return pulsarResources.getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject())
.thenApply(policies -> {
if (!policies.isPresent()) {
throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
}
Map<String, Set<AuthAction>> permissions = new HashMap<>();
String topicUri = topicName.toString();
AuthPolicies auth = policies.get().auth_policies;
// First add namespace level permissions
permissions.putAll(auth.getNamespaceAuthentication());
// Then add topic level permissions
if (auth.getTopicAuthentication().containsKey(topicUri)) {
for (Map.Entry<String, Set<AuthAction>> entry :
auth.getTopicAuthentication().get(topicUri).entrySet()) {
String role = entry.getKey();
Set<AuthAction> topicPermissions = entry.getValue();

if (!permissions.containsKey(role)) {
permissions.put(role, topicPermissions);
} else {
// Do the union between namespace and topic level
Set<AuthAction> union = Sets.union(permissions.get(role), topicPermissions);
permissions.put(role, union);
}
}
}
return permissions;
}).whenComplete((__, ex) -> {
if (ex != null) {
log.error("Failed to get permissions on topic {}", topicName, ex);
} else {
log.info("Successfully get permissions on topic {}", topicName);
}
});
});
}

@Override
public CompletableFuture<Map<String, Set<String>>> getSubscriptionPermissionsAsync(NamespaceName namespaceName) {
return getPoliciesReadOnlyAsync().thenCompose(readonly -> {
if (readonly) {
if (log.isDebugEnabled()) {
log.debug("Policies are read-only. Broker cannot do read-write operations");
}
throw new IllegalStateException("policies are in readonly mode");
}
return pulsarResources.getNamespaceResources().getPoliciesAsync(namespaceName)
.thenApply(policies -> {
if (!policies.isPresent()) {
throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
}

return policies.get().auth_policies.getSubscriptionAuthentication();
}).whenComplete((__, ex) -> {
if (ex != null) {
log.error("Failed to get subscription permissions on namespace {}", namespaceName, ex);
} else {
log.info("Successfully get subscription permissions on namespaceName {}", namespaceName);
}
});
});
}

@Override
public CompletableFuture<Map<String, Set<AuthAction>>> getPermissionsAsync(NamespaceName namespaceName) {
return getPoliciesReadOnlyAsync().thenCompose(readonly -> {
if (readonly) {
if (log.isDebugEnabled()) {
log.debug("Policies are read-only. Broker cannot do read-write operations");
}
throw new IllegalStateException("policies are in readonly mode");
}
return pulsarResources.getNamespaceResources().getPoliciesAsync(namespaceName)
.thenApply(policies -> {
if (!policies.isPresent()) {
throw new RestException(Response.Status.NOT_FOUND, "Namespace does not exist");
}
return policies.get().auth_policies.getNamespaceAuthentication();
}).whenComplete((__, ex) -> {
if (ex != null) {
log.error("Failed to get permissions on namespaceName {}", namespaceName, ex);
} else {
log.info("Successfully get permissions on namespaceName {}", namespaceName);
}
});
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AuthPolicies;
import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
Expand Down Expand Up @@ -218,36 +217,7 @@ protected CompletableFuture<List<String>> internalGetPartitionedTopicListAsync()
protected CompletableFuture<Map<String, Set<AuthAction>>> internalGetPermissionsOnTopic() {
// This operation should be reading from zookeeper and it should be allowed without having admin privileges
return validateAdminAccessForTenantAsync(namespaceName.getTenant())
.thenCompose(__ -> namespaceResources().getPoliciesAsync(namespaceName)
.thenApply(policies -> {
if (!policies.isPresent()) {
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
}

Map<String, Set<AuthAction>> permissions = new HashMap<>();
String topicUri = topicName.toString();
AuthPolicies auth = policies.get().auth_policies;
// First add namespace level permissions
permissions.putAll(auth.getNamespaceAuthentication());

// Then add topic level permissions
if (auth.getTopicAuthentication().containsKey(topicUri)) {
for (Map.Entry<String, Set<AuthAction>> entry :
auth.getTopicAuthentication().get(topicUri).entrySet()) {
String role = entry.getKey();
Set<AuthAction> topicPermissions = entry.getValue();

if (!permissions.containsKey(role)) {
permissions.put(role, topicPermissions);
} else {
// Do the union between namespace and topic level
Set<AuthAction> union = Sets.union(permissions.get(role), topicPermissions);
permissions.put(role, union);
}
}
}
return permissions;
}));
.thenCompose(__ -> getAuthorizationService().getPermissionsAsync(topicName));
}

protected void validateCreateTopic(TopicName topicName) {
Expand Down Expand Up @@ -746,7 +716,7 @@ protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse,
if (numPartitions < 1) {
return CompletableFuture.completedFuture(null);
}
return internalRemovePartitionsAuthenticationPoliciesAsync(numPartitions)
return internalRemovePartitionsAuthenticationPoliciesAsync()
.thenCompose(unused -> internalRemovePartitionsTopicAsync(numPartitions, force));
})
// Only tries to delete the znode for partitioned topic when all its partitions are successfully deleted
Expand Down Expand Up @@ -788,10 +758,10 @@ protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse,
private CompletableFuture<Void> internalRemovePartitionsTopicAsync(int numPartitions, boolean force) {
return pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
.runWithMarkDeleteAsync(topicName,
() -> internalRemovePartitionsTopicNoAutocreationDisableAsync(numPartitions, force));
() -> internalRemovePartitionsTopicNoAutoCreationDisableAsync(numPartitions, force));
}

private CompletableFuture<Void> internalRemovePartitionsTopicNoAutocreationDisableAsync(int numPartitions,
private CompletableFuture<Void> internalRemovePartitionsTopicNoAutoCreationDisableAsync(int numPartitions,
boolean force) {
return FutureUtil.waitForAll(IntStream.range(0, numPartitions)
.mapToObj(i -> {
Expand Down Expand Up @@ -833,16 +803,9 @@ private CompletableFuture<Void> internalRemovePartitionsTopicNoAutocreationDisab
}).collect(Collectors.toList()));
}

private CompletableFuture<Void> internalRemovePartitionsAuthenticationPoliciesAsync(int numPartitions) {
private CompletableFuture<Void> internalRemovePartitionsAuthenticationPoliciesAsync() {
CompletableFuture<Void> future = new CompletableFuture<>();
pulsar().getPulsarResources().getNamespaceResources()
.setPoliciesAsync(topicName.getNamespaceObject(), p -> {
IntStream.range(0, numPartitions)
.forEach(i -> p.auth_policies.getTopicAuthentication()
.remove(topicName.getPartition(i).toString()));
p.auth_policies.getTopicAuthentication().remove(topicName.toString());
return p;
})
getAuthorizationService().removePermissionsAsync(topicName)
.whenComplete((r, ex) -> {
if (ex != null){
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,8 @@ public void getPermissions(@Suspended AsyncResponse response,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
validateNamespaceOperationAsync(NamespaceName.get(property, namespace), NamespaceOperation.GET_PERMISSION)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> response.resume(policies.auth_policies.getNamespaceAuthentication()))
.thenCompose(__ -> getAuthorizationService().getPermissionsAsync(namespaceName))
.thenAccept(permissions -> response.resume(permissions))
.exceptionally(ex -> {
log.error("Failed to get permissions for namespace {}", namespaceName, ex);
resumeAsyncResponseExceptionally(response, ex);
Expand All @@ -314,8 +314,8 @@ public void getPermissionOnSubscription(@Suspended AsyncResponse response,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
validateNamespaceOperationAsync(NamespaceName.get(property, namespace), NamespaceOperation.GET_PERMISSION)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> response.resume(policies.auth_policies.getSubscriptionAuthentication()))
.thenCompose(__ -> getAuthorizationService().getSubscriptionPermissionsAsync(namespaceName))
.thenAccept(permissions -> response.resume(permissions))
.exceptionally(ex -> {
log.error("[{}] Failed to get permissions on subscription for namespace {}: {} ",
clientAppId(), namespaceName,
Expand Down
Loading

0 comments on commit e96b339

Please sign in to comment.