Skip to content

Commit

Permalink
[PIP-167][Authorization] Make it Configurable to Require Subscription…
Browse files Browse the repository at this point in the history
… Permission (apache#15576)
  • Loading branch information
michaeljmarshall authored and Demogorgon314 committed Dec 29, 2022
1 parent 170d125 commit dc66031
Show file tree
Hide file tree
Showing 10 changed files with 321 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,21 @@ public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String ro
return pulsarResources.getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject())
.thenCompose(policies -> {
if (!policies.isPresent()) {
// TODO this case seems like it could bypass authorization checks.
if (log.isDebugEnabled()) {
log.debug("Policies node couldn't be found for topic : {}", topicName);
}
} else {
if (isNotBlank(subscription)) {
// validate if role is authorized to access subscription. (skip validation if authorization
// list is empty)
// Reject request if role is unauthorized to access subscription.
// If subscriptionAuthRequired is enabled, role must be in the set of roles.
// Otherwise, set of roles must be null or empty, or role must be in set of roles.
Set<String> roles = policies.get().auth_policies
.getSubscriptionAuthentication().get(subscription);
if (roles != null && !roles.isEmpty() && !roles.contains(role)) {
boolean isUnauthorized = policies.get().auth_policies.isSubscriptionAuthRequired()
? (roles == null || roles.isEmpty() || !roles.contains(role))
: (roles != null && !roles.isEmpty() && !roles.contains(role));
if (isUnauthorized) {
log.warn("[{}] is not authorized to subscribe on {}-{}", role, topicName, subscription);
return CompletableFuture.completedFuture(false);
}
Expand Down Expand Up @@ -483,6 +488,8 @@ public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName nam
case GET_TOPICS:
case GET_BUNDLE:
return allowConsumeOrProduceOpsAsync(namespaceName, role, authData);
// TODO these only require ability to consume on namespace; ignore namespace's subscription
// permission.
case UNSUBSCRIBE:
case CLEAR_BACKLOG:
return allowTheSpecifiedActionOpsAsync(
Expand Down Expand Up @@ -537,6 +544,7 @@ public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName,
return canLookupAsync(topicName, role, authData);
case PRODUCE:
return canProduceAsync(topicName, role, authData);
// TODO consume from single subscription lets role view all subscriptions on a topic
case GET_SUBSCRIPTIONS:
case CONSUME:
case SUBSCRIBE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2561,4 +2561,39 @@ protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse, BacklogQu
return null;
});
}

protected void getPermissionOnSubscriptionRequired(AsyncResponse asyncResponse) {
validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_PERMISSION)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName).thenApply(policies ->
asyncResponse.resume(Response.ok(policies.auth_policies.isSubscriptionAuthRequired()).build())
)).exceptionally(ex -> {
log.error("[{}] Failed to get PermissionOnSubscriptionRequired", clientAppId(), ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

protected void internalSetPermissionOnSubscriptionRequired(AsyncResponse asyncResponse,
boolean permissionOnSubscriptionRequired) {
CompletableFuture<Void> isAuthorized;
if (permissionOnSubscriptionRequired) {
isAuthorized = validateNamespaceOperationAsync(namespaceName, NamespaceOperation.REVOKE_PERMISSION);
} else {
isAuthorized = validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GRANT_PERMISSION);
}
isAuthorized
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
policies.auth_policies.setSubscriptionAuthRequired(permissionOnSubscriptionRequired);
return policies;
})).thenAccept(__ -> {
log.info("[{}] Updated PermissionOnSubscriptionRequired for namespace {} to {}", clientAppId(),
namespaceName, permissionOnSubscriptionRequired);
asyncResponse.resume(Response.ok().build());
}).exceptionally(ex -> {
log.error("[{}] Failed to update PermissionOnSubscriptionRequired", clientAppId(), ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,37 @@ public void revokePermissionOnSubscription(@Suspended AsyncResponse asyncRespons
});
}

@POST
@Path("/{property}/{cluster}/{namespace}/permissionOnSubscriptionRequired")
@ApiOperation(hidden = true, value = "Set whether a role requires explicit permission to consume from a "
+ "subscription that has no subscription permission defined in the namespace.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 501, message = "Authorization is not enabled")})
public void setPermissionOnSubscriptionRequired(
@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
boolean permissionOnSubscriptionRequired) {
validateNamespaceName(property, cluster, namespace);
internalSetPermissionOnSubscriptionRequired(asyncResponse, permissionOnSubscriptionRequired);
}

@GET
@Path("/{property}/{cluster}/{namespace}/permissionOnSubscriptionRequired")
@ApiOperation(value = "Get whether a role requires explicit permission to consume from a "
+ "subscription that has no subscription permission defined in the namespace.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Namespace is not empty")})
public void getPermissionOnSubscriptionRequired(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
getPermissionOnSubscriptionRequired(asyncResponse);
}

@GET
@Path("/{property}/{cluster}/{namespace}/replication")
@ApiOperation(hidden = true, value = "Get the replication clusters for a namespace.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,36 @@ public void revokePermissionOnSubscription(@Suspended AsyncResponse asyncRespons
});
}

@POST
@Path("/{property}/{namespace}/permissionOnSubscriptionRequired")
@ApiOperation(hidden = true, value = "Allow a consumer's role to have implicit permission to consume from a"
+ " subscription.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 501, message = "Authorization is not enabled")})
public void setPermissionOnSubscriptionRequired(
@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property,
@PathParam("namespace") String namespace,
boolean required) {
validateNamespaceName(property, namespace);
internalSetPermissionOnSubscriptionRequired(asyncResponse, required);
}

@GET
@Path("/{property}/{namespace}/permissionOnSubscriptionRequired")
@ApiOperation(value = "Get permission on subscription required for namespace.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Namespace is not empty")})
public void getPermissionOnSubscriptionRequired(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, namespace);
getPermissionOnSubscriptionRequired(asyncResponse);
}

@GET
@Path("/{tenant}/{namespace}/replication")
@ApiOperation(value = "Get the replication clusters for a namespace.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
Expand Down Expand Up @@ -365,6 +366,83 @@ public void testSubscriberPermission() throws Exception {
log.info("-- Exiting {} test --", methodName);
}

@Test
public void testSubscriberPermissionRequired() throws Exception {
log.info("-- Starting {} test --", methodName);

conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
setup();

final String tenantRole = "tenant-role";
final String subscriptionRole = "sub-role";
final String subscriptionName = "sub";
final String namespace = "my-property/ns-sub-auth-req";
final String topicName = "persistent://" + namespace + "/my-topic";
Authentication adminAuthentication = new ClientAuthentication("superUser");

clientAuthProviderSupportedRoles.add(subscriptionRole);

@Cleanup
PulsarAdmin superAdmin = spy(
PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()).authentication(adminAuthentication).build());

Authentication tenantAdminAuthentication = new ClientAuthentication(tenantRole);
@Cleanup
PulsarAdmin tenantAdmin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
.authentication(tenantAdminAuthentication).build());

Authentication subAdminAuthentication = new ClientAuthentication(subscriptionRole);
@Cleanup
PulsarAdmin sub1Admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
.authentication(subAdminAuthentication).build());

Authentication authentication = new ClientAuthentication(subscriptionRole);

superAdmin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());

// Initialize cluster and configure namespace to require permission on subscription
superAdmin.tenants().createTenant("my-property",
new TenantInfoImpl(Sets.newHashSet(tenantRole), Sets.newHashSet("test")));
superAdmin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
assertFalse(superAdmin.namespaces().getPermissionOnSubscriptionRequired(namespace), "Defaults to false.");
superAdmin.namespaces().setPermissionOnSubscriptionRequired(namespace, true);
tenantAdmin.topics().createNonPartitionedTopic(topicName);
tenantAdmin.topics().grantPermission(topicName, subscriptionRole,
Collections.singleton(AuthAction.consume));
assertNull(superAdmin.namespaces().getPublishRate(namespace));
assertTrue(superAdmin.namespaces().getPermissionOnSubscriptionRequired(namespace));
replacePulsarClient(PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.authentication(authentication));

// Cluster is initialized; the subscriptionRole has permission consume on the topic, but doesn't have
// explicit subscription permission. Verify that several operations which rely on subscription permission fail.
try {
sub1Admin.topics().resetCursor(topicName, subscriptionName, 0);
fail("should have failed with authorization exception");
} catch (Exception e) {
assertTrue(e.getMessage().startsWith(
"Unauthorized to validateTopicOperation for operation [RESET_CURSOR]"));
}
try {
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
fail("should have failed with authorization exception");
} catch (Exception e) {
assertTrue(e.getMessage().contains("Client is not authorized to subscribe"), e.getMessage());
}

// Grant the role permission.
tenantAdmin.namespaces().grantPermissionOnSubscription(namespace, subscriptionName, Set.of(subscriptionRole));

// Verify the role now has permission to consume (reset cursor second to avoid 404 on subscription)
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.subscribe();
consumer.close();
sub1Admin.topics().resetCursor(topicName, subscriptionName, 0);

log.info("-- Exiting {} test --", methodName);
}

@Test
public void testClearBacklogPermission() throws Exception {
log.info("-- Starting {} test --", methodName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,42 @@ CompletableFuture<Void> grantPermissionOnSubscriptionAsync(
*/
CompletableFuture<Void> revokePermissionOnSubscriptionAsync(String namespace, String subscription, String role);

/**
* Get whether a role requires explicit permission to consume from a subscription that has no subscription
* permission defined in the namespace.
*
* @param namespace Pulsar namespace name
* @return
* @throws PulsarAdminException
*/
boolean getPermissionOnSubscriptionRequired(String namespace) throws PulsarAdminException;

/**
* Get whether a role requires explicit permission to consume from a subscription that has no subscription
* permission defined in the namespace.
* @param namespace Pulsar namespace name
* @return
*/
CompletableFuture<Boolean> getPermissionOnSubscriptionRequiredAsync(String namespace);

/**
* Set whether a role requires explicit permission to consume from a subscription that has no subscription
* permission defined in the namespace.
* @param namespace Pulsar namespace name
* @throws PulsarAdminException
*/
void setPermissionOnSubscriptionRequired(String namespace, boolean permissionOnSubscriptionRequired)
throws PulsarAdminException;

/**
* Set whether a role requires explicit permission to consume from a subscription that has no subscription
* permission defined in the namespace.
* @param namespace Pulsar namespace name
* @return
*/
CompletableFuture<Void> setPermissionOnSubscriptionRequiredAsync(String namespace,
boolean permissionOnSubscriptionRequired);

/**
* Get the replication clusters for a namespace.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ public interface AuthPolicies {
Map<String, Map<String, Set<AuthAction>>> getTopicAuthentication();
Map<String, Set<String>> getSubscriptionAuthentication();

/**
* Whether an empty set of subscription authentication roles returned by {@link #getSubscriptionAuthentication()}
* requires explicit permission to consume from the target subscription.
* @return
*/
boolean isSubscriptionAuthRequired();
void setSubscriptionAuthRequired(boolean subscriptionAuthRequired);

static Builder builder() {
return ReflectionUtils.newBuilder("org.apache.pulsar.client.admin.internal.data.AuthPoliciesImpl");
}
Expand All @@ -39,5 +47,6 @@ interface Builder {
Builder namespaceAuthentication(Map<String, Set<AuthAction>> namespaceAuthentication);
Builder topicAuthentication(Map<String, Map<String, Set<AuthAction>>> topicAuthentication);
Builder subscriptionAuthentication(Map<String, Set<String>> subscriptionAuthentication);
Builder subscriptionAuthRequired(boolean subscriptionAuthRequired);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,45 @@ public CompletableFuture<Void> revokePermissionOnSubscriptionAsync(
return asyncDeleteRequest(path);
}

@Override
public void setPermissionOnSubscriptionRequired(String namespace, boolean permissionOnSubscriptionRequired)
throws PulsarAdminException {
sync(() -> setPermissionOnSubscriptionRequiredAsync(namespace, permissionOnSubscriptionRequired));
}

@Override
public CompletableFuture<Void> setPermissionOnSubscriptionRequiredAsync(String namespace,
boolean permissionOnSubscriptionRequired) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "permissionOnSubscriptionRequired");
return asyncPostRequest(path, Entity.entity(permissionOnSubscriptionRequired, MediaType.APPLICATION_JSON));
}

@Override
public boolean getPermissionOnSubscriptionRequired(String namespace) throws PulsarAdminException {
return sync(() -> getPermissionOnSubscriptionRequiredAsync(namespace));
}

@Override
public CompletableFuture<Boolean> getPermissionOnSubscriptionRequiredAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "permissionOnSubscriptionRequired");
final CompletableFuture<Boolean> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<Boolean>() {
@Override
public void completed(Boolean enabled) {
future.complete(enabled);
}

@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}

@Override
public List<String> getNamespaceReplicationClusters(String namespace) throws PulsarAdminException {
return sync(() -> getNamespaceReplicationClustersAsync(namespace));
Expand Down
Loading

0 comments on commit dc66031

Please sign in to comment.