Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PIP-167][Authorization] Make it Configurable to Require Subscription Permission #15576

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,21 @@ public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String ro
return pulsarResources.getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject())
.thenCompose(policies -> {
if (!policies.isPresent()) {
// TODO this case seems like it could bypass authorization checks.
if (log.isDebugEnabled()) {
log.debug("Policies node couldn't be found for topic : {}", topicName);
}
} else {
if (isNotBlank(subscription)) {
// validate if role is authorized to access subscription. (skip validation if authorization
// list is empty)
// Reject request if role is unauthorized to access subscription.
// If implicitSubscriptionAuthentication is enabled, set of roles must be null or empty, or
// role must be in set of roles. Otherwise, role must be in the set of roles.
Set<String> roles = policies.get().auth_policies
.getSubscriptionAuthentication().get(subscription);
if (roles != null && !roles.isEmpty() && !roles.contains(role)) {
boolean isUnauthorized = policies.get().auth_policies.isImplicitSubscriptionAuth()
? (roles != null && !roles.isEmpty() && !roles.contains(role))
: (roles == null || roles.isEmpty() || !roles.contains(role));
if (isUnauthorized) {
log.warn("[{}] is not authorized to subscribe on {}-{}", role, topicName, subscription);
return CompletableFuture.completedFuture(false);
}
Expand Down Expand Up @@ -482,6 +487,8 @@ public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName nam
case GET_TOPICS:
case GET_BUNDLE:
return allowConsumeOrProduceOpsAsync(namespaceName, role, authData);
// TODO these only require ability to consume on namespace; ignore namespace's subscription
// permission.
case UNSUBSCRIBE:
case CLEAR_BACKLOG:
return allowTheSpecifiedActionOpsAsync(
Expand Down Expand Up @@ -536,6 +543,7 @@ public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName,
return canLookupAsync(topicName, role, authData);
case PRODUCE:
return canProduceAsync(topicName, role, authData);
// TODO consume from single subscription lets role view all subscriptions on a topic
case GET_SUBSCRIPTIONS:
case CONSUME:
case SUBSCRIBE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,24 @@ protected void internalGrantPermissionOnNamespace(String role, Set<AuthAction> a
}
}

protected boolean getImplicitPermissionOnSubscription() {
validateNamespaceOperation(namespaceName, NamespaceOperation.GET_PERMISSION);
Policies policies = getNamespacePolicies(namespaceName);
return policies.auth_policies.isImplicitSubscriptionAuth();
}

protected void internalSetImplicitPermissionOnSubscription(boolean isImplicitPermissionOnSubscription) {
if (isImplicitPermissionOnSubscription) {
validateNamespaceOperation(namespaceName, NamespaceOperation.GRANT_PERMISSION);
} else {
validateNamespaceOperation(namespaceName, NamespaceOperation.REVOKE_PERMISSION);
}
validatePoliciesReadOnlyAccess();
updatePolicies(namespaceName, policies -> {
policies.auth_policies.setImplicitSubscriptionAuth(isImplicitPermissionOnSubscription);
return policies;
});
}
michaeljmarshall marked this conversation as resolved.
Show resolved Hide resolved

protected void internalGrantPermissionOnSubscription(String subscription, Set<String> roles) {
validateNamespaceOperation(namespaceName, NamespaceOperation.GRANT_PERMISSION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,49 @@ public void revokePermissionOnSubscription(@PathParam("property") String propert
internalRevokePermissionsOnSubscription(subscription, role);
}

@PUT
@Path("/{property}/{cluster}/{namespace}/implicitPermissionOnSubscription")
@ApiOperation(hidden = true, value = "Allow a consumer's role to have implicit permission to consume from a"
+ " subscription.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 501, message = "Authorization is not enabled")})
public void grantImplicitPermissionOnSubscription(
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
internalSetImplicitPermissionOnSubscription(true);
}

@DELETE
@Path("/{property}/{cluster}/{namespace}/implicitPermissionOnSubscription")
@ApiOperation(hidden = true, value = "Require a consumer's role to have explicit permission to consume from a"
+ " subscription.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 501, message = "Authorization is not enabled")})
public void revokeImplicitPermissionOnSubscription(
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
internalSetImplicitPermissionOnSubscription(false);
}

@GET
@Path("/{property}/{cluster}/{namespace}/implicitPermissionOnSubscription")
@ApiOperation(value = "Get permission on subscription required for namespace.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Namespace is not empty")})
public boolean getImplicitPermissionOnSubscription(@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
return getImplicitPermissionOnSubscription();
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this have to be added to the v1 API at all?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, I'm not sure. I don't know how we're approaching new features like this. I'm open to removing it, if that's the consensus.

@GET
@Path("/{property}/{cluster}/{namespace}/replication")
@ApiOperation(hidden = true, value = "Get the replication clusters for a namespace.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,48 @@ public void revokePermissionOnSubscription(@PathParam("property") String propert
internalRevokePermissionsOnSubscription(subscription, role);
}

@PUT
michaeljmarshall marked this conversation as resolved.
Show resolved Hide resolved
@Path("/{property}/{namespace}/implicitPermissionOnSubscription")
@ApiOperation(hidden = true, value = "Allow a consumer's role to have implicit permission to consume from a"
+ " subscription.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 501, message = "Authorization is not enabled")})
public void grantImplicitPermissionOnSubscription(
@PathParam("property") String property,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, namespace);
internalSetImplicitPermissionOnSubscription(true);
michaeljmarshall marked this conversation as resolved.
Show resolved Hide resolved
}

@DELETE
@Path("/{property}/{namespace}/implicitPermissionOnSubscription")
@ApiOperation(hidden = true, value = "Require a consumer's role to have explicit permission to consume from a"
+ " subscription.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 501, message = "Authorization is not enabled")})
public void revokeImplicitPermissionOnSubscription(
@PathParam("property") String property,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, namespace);
internalSetImplicitPermissionOnSubscription(false);
}

@GET
@Path("/{property}/{namespace}/implicitPermissionOnSubscription")
@ApiOperation(value = "Get permission on subscription required for namespace.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Namespace is not empty")})
public boolean getRequirePermissionOnSubscriptions(@PathParam("property") String property,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, namespace);
return getImplicitPermissionOnSubscription();
}

@GET
@Path("/{tenant}/{namespace}/replication")
@ApiOperation(value = "Get the replication clusters for a namespace.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,57 @@ CompletableFuture<Void> grantPermissionOnSubscriptionAsync(
*/
CompletableFuture<Void> revokePermissionOnSubscriptionAsync(String namespace, String subscription, String role);

/**
* Get whether a namespace allows implicit permission to consume from a subscription.
*
* @param namespace Pulsar namespace name
* @return
* @throws PulsarAdminException
*/
boolean getImplicitPermissionOnSubscription(String namespace) throws PulsarAdminException;

/**
* Get whether a namespace allows implicit permission to consume from a subscription.
* @param namespace Pulsar namespace name
* @return
*/
CompletableFuture<Boolean> getImplicitPermissionOnSubscriptionAsync(String namespace);

/**
* Grant all roles implicit permission to consume from a subscription if no subscription permission is defined
* for that subscription in the namespace.
* @param namespace Pulsar namespace name
* @throws PulsarAdminException
*/
void grantImplicitPermissionOnSubscription(String namespace)
throws PulsarAdminException;

/**
* Grant all roles implicit permission to consume from a subscription if no subscription permission is defined
* for that subscription in the namespace.
* @param namespace Pulsar namespace name
* @return
*/
CompletableFuture<Void> grantImplicitPermissionOnSubscriptionAsync(String namespace);

/**
* Revoke implicit permission for any role to consume from a subscription if no subscription permission is defined
* for that subscription in the namespace.
* @param namespace Pulsar namespace name
* @throws PulsarAdminException
*/
void revokeImplicitPermissionOnSubscription(String namespace)
throws PulsarAdminException;

/**
* Revoke implicit permission for any role to consume from a subscription if no subscription permission is defined
* for that subscription in the namespace.
* @param namespace Pulsar namespace name
* @return
*/
CompletableFuture<Void> revokeImplicitPermissionOnSubscriptionAsync(String namespace);


/**
* Get the replication clusters for a namespace.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ public interface AuthPolicies {
Map<String, Map<String, Set<AuthAction>>> getTopicAuthentication();
Map<String, Set<String>> getSubscriptionAuthentication();

/**
* Whether an empty set of subscription authentication roles returned by {@link #getSubscriptionAuthentication()}
* implicitly grants permission to consume from the target subscription.
* @return
*/
boolean isImplicitSubscriptionAuth();
void setImplicitSubscriptionAuth(boolean implicitSubscriptionAuth);

static Builder builder() {
return ReflectionUtils.newBuilder("org.apache.pulsar.client.admin.internal.data.AuthPoliciesImpl");
}
Expand All @@ -39,5 +47,6 @@ interface Builder {
Builder namespaceAuthentication(Map<String, Set<AuthAction>> namespaceAuthentication);
Builder topicAuthentication(Map<String, Map<String, Set<AuthAction>>> topicAuthentication);
Builder subscriptionAuthentication(Map<String, Set<String>> subscriptionAuthentication);
Builder implicitSubscriptionAuth(boolean implicitSubscriptionAuth);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,55 @@ public CompletableFuture<Void> revokePermissionOnSubscriptionAsync(
return asyncDeleteRequest(path);
}

@Override
public void grantImplicitPermissionOnSubscription(String namespace) throws PulsarAdminException {
sync(() -> grantImplicitPermissionOnSubscriptionAsync(namespace));
}

@Override
public CompletableFuture<Void> grantImplicitPermissionOnSubscriptionAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "implicitPermissionOnSubscription");
return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
}

@Override
public void revokeImplicitPermissionOnSubscription(String namespace) throws PulsarAdminException {
sync(() -> revokeImplicitPermissionOnSubscriptionAsync(namespace));
}

@Override
public CompletableFuture<Void> revokeImplicitPermissionOnSubscriptionAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "implicitPermissionOnSubscription");
return asyncDeleteRequest(path);
}

@Override
public boolean getImplicitPermissionOnSubscription(String namespace) throws PulsarAdminException {
return sync(() -> getImplicitPermissionOnSubscriptionAsync(namespace));
}

@Override
public CompletableFuture<Boolean> getImplicitPermissionOnSubscriptionAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "implicitPermissionOnSubscription");
final CompletableFuture<Boolean> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<Boolean>() {
@Override
public void completed(Boolean enabled) {
future.complete(enabled);
}

@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}

@Override
public List<String> getNamespaceReplicationClusters(String namespace) throws PulsarAdminException {
return sync(() -> getNamespaceReplicationClustersAsync(namespace));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,45 @@ void run() throws PulsarAdminException {
}
}

@Parameters(commandDescription =
"Get whether a namespace allows implicit permission to consume from a subscription.")
private class ImplicitSubscriptionPermission extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
print(getAdmin().namespaces().getImplicitPermissionOnSubscription(namespace));
}
}

@Parameters(commandDescription = "Grant all roles implicit permission to consume from a subscription if no "
+ "subscription permission is defined for that subscription in the namespace.")
private class GrantImplicitSubscriptionPermission extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
getAdmin().namespaces().grantImplicitPermissionOnSubscription(namespace);
}
}

@Parameters(commandDescription = "Revoke implicit permission for any role to consume from a subscription if no "
+ "subscription permission is defined for that subscription in the namespace.")
private class RevokeImplicitSubscriptionPermission extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
getAdmin().namespaces().revokeImplicitPermissionOnSubscription(namespace);
}
}

@Parameters(commandDescription = "Get the permissions on a namespace")
private class Permissions extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
Expand Down Expand Up @@ -2581,6 +2620,10 @@ public CmdNamespaces(Supplier<PulsarAdmin> admin) {
jcommander.addCommand("grant-subscription-permission", new GrantSubscriptionPermissions());
jcommander.addCommand("revoke-subscription-permission", new RevokeSubscriptionPermissions());

jcommander.addCommand("implicit-subscription-permission", new ImplicitSubscriptionPermission());
jcommander.addCommand("grant-implicit-subscription-permission", new GrantImplicitSubscriptionPermission());
jcommander.addCommand("revoke-implicit-subscription-permission", new RevokeImplicitSubscriptionPermission());

jcommander.addCommand("set-clusters", new SetReplicationClusters());
jcommander.addCommand("get-clusters", new GetReplicationClusters());

Expand Down
Loading