Skip to content

Commit

Permalink
[fix][broker]: support missing broker level fine-granted permissions (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
mattisonchao authored Nov 29, 2024
1 parent d1753ee commit eb60d0a
Show file tree
Hide file tree
Showing 6 changed files with 437 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BrokerOperation;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
Expand Down Expand Up @@ -383,4 +384,13 @@ default CompletableFuture<Map<String, Set<AuthAction>>> getPermissionsAsync(Name
String.format("getPermissionsAsync on namespaceName %s is not supported by the Authorization",
namespaceName)));
}

default CompletableFuture<Boolean> allowBrokerOperationAsync(String clusterName,

This comment has been minimized.

Copy link
@lhotari

lhotari Nov 29, 2024

Member

@mattisonchao Does org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider implement this method? How could the default configuration work if it doesn't?

String brokerId,
BrokerOperation brokerOperation,
String role,
AuthenticationDataSource authData) {
return FutureUtil.failedFuture(
new UnsupportedOperationException("allowBrokerOperationAsync is not supported yet."));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BrokerOperation;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
Expand Down Expand Up @@ -544,6 +545,28 @@ public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName,
}
}

public CompletableFuture<Boolean> allowBrokerOperationAsync(String clusterName,
String brokerId,
BrokerOperation brokerOperation,
String originalRole,
String role,
AuthenticationDataSource authData) {
if (!isValidOriginalPrincipal(role, originalRole, authData)) {
return CompletableFuture.completedFuture(false);
}

if (isProxyRole(role)) {
final var isRoleAuthorizedFuture = provider.allowBrokerOperationAsync(clusterName, brokerId,
brokerOperation, role, authData);
final var isOriginalAuthorizedFuture = provider.allowBrokerOperationAsync(clusterName, brokerId,
brokerOperation, originalRole, authData);
return isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture,
(isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized && isOriginalAuthorized);
} else {
return provider.allowBrokerOperationAsync(clusterName, brokerId, brokerOperation, role, authData);
}
}

/**
* @deprecated - will be removed after 2.12. Use async variant.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.policies.data.BrokerInfo;
import org.apache.pulsar.common.policies.data.BrokerOperation;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ThreadDumpUtil;
Expand Down Expand Up @@ -107,7 +108,8 @@ public class BrokersBase extends AdminResource {
@ApiResponse(code = 404, message = "Cluster does not exist: cluster={clustername}") })
public void getActiveBrokers(@Suspended final AsyncResponse asyncResponse,
@PathParam("cluster") String cluster) {
validateSuperUserAccessAsync()
validateBothSuperuserAndBrokerOperation(cluster == null ? pulsar().getConfiguration().getClusterName()
: cluster, pulsar().getBrokerId(), BrokerOperation.LIST_BROKERS)
.thenCompose(__ -> validateClusterOwnershipAsync(cluster))
.thenCompose(__ -> pulsar().getLoadManager().get().getAvailableBrokersAsync())
.thenAccept(activeBrokers -> {
Expand Down Expand Up @@ -148,7 +150,9 @@ public void getActiveBrokers(@Suspended final AsyncResponse asyncResponse) throw
@ApiResponse(code = 403, message = "This operation requires super-user access"),
@ApiResponse(code = 404, message = "Leader broker not found") })
public void getLeaderBroker(@Suspended final AsyncResponse asyncResponse) {
validateSuperUserAccessAsync().thenAccept(__ -> {
validateBothSuperuserAndBrokerOperation(pulsar().getConfig().getClusterName(),
pulsar().getBrokerId(), BrokerOperation.GET_LEADER_BROKER)
.thenAccept(__ -> {
LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader()
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Couldn't find leader broker"));
BrokerInfo brokerInfo = BrokerInfo.builder()
Expand All @@ -175,7 +179,8 @@ public void getLeaderBroker(@Suspended final AsyncResponse asyncResponse) {
public void getOwnedNamespaces(@Suspended final AsyncResponse asyncResponse,
@PathParam("clusterName") String cluster,
@PathParam("brokerId") String brokerId) {
validateSuperUserAccessAsync()
validateBothSuperuserAndBrokerOperation(pulsar().getConfig().getClusterName(),
pulsar().getBrokerId(), BrokerOperation.LIST_OWNED_NAMESPACES)
.thenCompose(__ -> maybeRedirectToBroker(brokerId))
.thenCompose(__ -> validateClusterOwnershipAsync(cluster))
.thenCompose(__ -> pulsar().getNamespaceService().getOwnedNameSpacesStatusAsync())
Expand Down Expand Up @@ -204,7 +209,8 @@ public void getOwnedNamespaces(@Suspended final AsyncResponse asyncResponse,
public void updateDynamicConfiguration(@Suspended AsyncResponse asyncResponse,
@PathParam("configName") String configName,
@PathParam("configValue") String configValue) {
validateSuperUserAccessAsync()
validateBothSuperuserAndBrokerOperation(pulsar().getConfig().getClusterName(), pulsar().getBrokerId(),
BrokerOperation.UPDATE_DYNAMIC_CONFIGURATION)
.thenCompose(__ -> persistDynamicConfigurationAsync(configName, configValue))
.thenAccept(__ -> {
LOG.info("[{}] Updated Service configuration {}/{}", clientAppId(), configName, configValue);
Expand All @@ -228,7 +234,8 @@ public void updateDynamicConfiguration(@Suspended AsyncResponse asyncResponse,
public void deleteDynamicConfiguration(
@Suspended AsyncResponse asyncResponse,
@PathParam("configName") String configName) {
validateSuperUserAccessAsync()
validateBothSuperuserAndBrokerOperation(pulsar().getConfig().getClusterName(), pulsar().getBrokerId(),
BrokerOperation.DELETE_DYNAMIC_CONFIGURATION)
.thenCompose(__ -> internalDeleteDynamicConfigurationOnMetadataAsync(configName))
.thenAccept(__ -> {
LOG.info("[{}] Successfully to delete dynamic configuration {}", clientAppId(), configName);
Expand All @@ -249,7 +256,8 @@ public void deleteDynamicConfiguration(
@ApiResponse(code = 404, message = "Configuration not found"),
@ApiResponse(code = 500, message = "Internal server error")})
public void getAllDynamicConfigurations(@Suspended AsyncResponse asyncResponse) {
validateSuperUserAccessAsync()
validateBothSuperuserAndBrokerOperation(pulsar().getConfig().getClusterName(), pulsar().getBrokerId(),
BrokerOperation.LIST_DYNAMIC_CONFIGURATIONS)
.thenCompose(__ -> dynamicConfigurationResources().getDynamicConfigurationAsync())
.thenAccept(configOpt -> asyncResponse.resume(configOpt.orElseGet(Collections::emptyMap)))
.exceptionally(ex -> {
Expand All @@ -266,7 +274,8 @@ public void getAllDynamicConfigurations(@Suspended AsyncResponse asyncResponse)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "You don't have admin permission to get configuration")})
public void getDynamicConfigurationName(@Suspended AsyncResponse asyncResponse) {
validateSuperUserAccessAsync()
validateBothSuperuserAndBrokerOperation(pulsar().getConfig().getClusterName(), pulsar().getBrokerId(),
BrokerOperation.LIST_DYNAMIC_CONFIGURATIONS)
.thenAccept(__ -> asyncResponse.resume(pulsar().getBrokerService().getDynamicConfiguration()))
.exceptionally(ex -> {
LOG.error("[{}] Failed to get all dynamic configuration names.", clientAppId(), ex);
Expand All @@ -281,7 +290,8 @@ public void getDynamicConfigurationName(@Suspended AsyncResponse asyncResponse)
response = String.class, responseContainer = "Map")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
public void getRuntimeConfiguration(@Suspended AsyncResponse asyncResponse) {
validateSuperUserAccessAsync()
validateBothSuperuserAndBrokerOperation(pulsar().getConfig().getClusterName(), pulsar().getBrokerId(),
BrokerOperation.LIST_RUNTIME_CONFIGURATIONS)
.thenAccept(__ -> asyncResponse.resume(pulsar().getBrokerService().getRuntimeConfiguration()))
.exceptionally(ex -> {
LOG.error("[{}] Failed to get runtime configuration.", clientAppId(), ex);
Expand Down Expand Up @@ -322,7 +332,8 @@ private synchronized CompletableFuture<Void> persistDynamicConfigurationAsync(
@ApiOperation(value = "Get the internal configuration data", response = InternalConfigurationData.class)
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
public void getInternalConfigurationData(@Suspended AsyncResponse asyncResponse) {
validateSuperUserAccessAsync()
validateBothSuperuserAndBrokerOperation(pulsar().getConfig().getClusterName(), pulsar().getBrokerId(),
BrokerOperation.GET_INTERNAL_CONFIGURATION_DATA)
.thenAccept(__ -> asyncResponse.resume(pulsar().getInternalConfigurationData()))
.exceptionally(ex -> {
LOG.error("[{}] Failed to get internal configuration data.", clientAppId(), ex);
Expand All @@ -339,7 +350,8 @@ public void getInternalConfigurationData(@Suspended AsyncResponse asyncResponse)
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 500, message = "Internal server error")})
public void backlogQuotaCheck(@Suspended AsyncResponse asyncResponse) {
validateSuperUserAccessAsync()
validateBothSuperuserAndBrokerOperation(pulsar().getConfig().getClusterName(), pulsar().getBrokerId(),
BrokerOperation.CHECK_BACKLOG_QUOTA)
.thenAcceptAsync(__ -> {
pulsar().getBrokerService().monitorBacklogQuota();
asyncResponse.resume(Response.noContent().build());
Expand Down Expand Up @@ -378,7 +390,8 @@ public void healthCheck(@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "Topic Version")
@QueryParam("topicVersion") TopicVersion topicVersion,
@QueryParam("brokerId") String brokerId) {
validateSuperUserAccessAsync()
validateBothSuperuserAndBrokerOperation(pulsar().getConfig().getClusterName(), StringUtils.isBlank(brokerId)
? pulsar().getBrokerId() : brokerId, BrokerOperation.HEALTH_CHECK)
.thenAccept(__ -> checkDeadlockedThreads())
.thenCompose(__ -> maybeRedirectToBroker(
StringUtils.isBlank(brokerId) ? pulsar().getBrokerId() : brokerId))
Expand Down Expand Up @@ -596,8 +609,9 @@ public void shutDownBrokerGracefully(
@QueryParam("forcedTerminateTopic") @DefaultValue("true") boolean forcedTerminateTopic,
@Suspended final AsyncResponse asyncResponse
) {
validateSuperUserAccess();
doShutDownBrokerGracefullyAsync(maxConcurrentUnloadPerSec, forcedTerminateTopic)
validateBothSuperuserAndBrokerOperation(pulsar().getConfig().getClusterName(), pulsar().getBrokerId(),
BrokerOperation.SHUTDOWN)
.thenCompose(__ -> doShutDownBrokerGracefullyAsync(maxConcurrentUnloadPerSec, forcedTerminateTopic))
.thenAccept(__ -> {
LOG.info("[{}] Successfully shutdown broker gracefully", clientAppId());
asyncResponse.resume(Response.noContent().build());
Expand All @@ -614,5 +628,65 @@ private CompletableFuture<Void> doShutDownBrokerGracefullyAsync(int maxConcurren
pulsar().getBrokerService().unloadNamespaceBundlesGracefully(maxConcurrentUnloadPerSec, forcedTerminateTopic);
return pulsar().closeAsync();
}


private CompletableFuture<Void> validateBothSuperuserAndBrokerOperation(String cluster, String brokerId,
BrokerOperation operation) {
final var superUserAccessValidation = validateSuperUserAccessAsync();
final var brokerOperationValidation = validateBrokerOperationAsync(cluster, brokerId, operation);
return FutureUtil.waitForAll(List.of(superUserAccessValidation, brokerOperationValidation))
.handle((result, err) -> {
if (!superUserAccessValidation.isCompletedExceptionally()
|| !brokerOperationValidation.isCompletedExceptionally()) {
return null;
}
if (LOG.isDebugEnabled()) {
Throwable superUserValidationException = null;
try {
superUserAccessValidation.join();
} catch (Throwable ex) {
superUserValidationException = FutureUtil.unwrapCompletionException(ex);
}
Throwable brokerOperationValidationException = null;
try {
brokerOperationValidation.join();
} catch (Throwable ex) {
brokerOperationValidationException = FutureUtil.unwrapCompletionException(ex);
}
LOG.debug("validateBothSuperuserAndBrokerOperation failed."
+ " originalPrincipal={} clientAppId={} operation={} broker={} "
+ "superuserValidationError={} brokerOperationValidationError={}",
originalPrincipal(), clientAppId(), operation.toString(), brokerId,
superUserValidationException, brokerOperationValidationException);
}
throw new RestException(Status.UNAUTHORIZED,
String.format("Unauthorized to validateBothSuperuserAndBrokerOperation for"
+ " originalPrincipal [%s] and clientAppId [%s] "
+ "about operation [%s] on broker [%s]",
originalPrincipal(), clientAppId(), operation.toString(), brokerId));
});
}


private CompletableFuture<Void> validateBrokerOperationAsync(String cluster, String brokerId,
BrokerOperation operation) {
final var pulsar = pulsar();
if (pulsar.getBrokerService().isAuthenticationEnabled()
&& pulsar.getBrokerService().isAuthorizationEnabled()) {
return pulsar.getBrokerService().getAuthorizationService()
.allowBrokerOperationAsync(cluster, brokerId, operation, originalPrincipal(),
clientAppId(), clientAuthData())
.thenAccept(isAuthorized -> {
if (!isAuthorized) {
throw new RestException(Status.UNAUTHORIZED,
String.format("Unauthorized to validateBrokerOperation for"
+ " originalPrincipal [%s] and clientAppId [%s] "
+ "about operation [%s] on broker [%s]",
originalPrincipal(), clientAppId(), operation.toString(), brokerId));
}
});
}
return CompletableFuture.completedFuture(null);
}
}

Loading

0 comments on commit eb60d0a

Please sign in to comment.