Skip to content

Commit

Permalink
[fix][broker] Fix passing incorrect authentication data (apache#16201)
Browse files Browse the repository at this point in the history
### Motivation

apache#16065 fixes the race condition issue, but introduces a new issue. This issue is triggered when the Proxy and Broker work together, when we use the proxy to request the broker to do lookup/subscribe/produce operation, the broker always uses the original authentication data for authorization, not proxy authentication data, which causes this issue.

### Modification

- Fix passing authentication data, differentiate between original auth data and connected auth data by avoid to use the  `getAuthenticationData()`, this method name is easy to cause confusion and can not correctly get the authentication data
  • Loading branch information
nodece authored Jun 28, 2022
1 parent 3542500 commit 936bbbc
Show file tree
Hide file tree
Showing 3 changed files with 483 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,14 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
private final int maxSubscriptionPatternLength;
private State state;
private volatile boolean isActive = true;
String authRole = null;
private String authRole = null;
private volatile AuthenticationDataSource authenticationData;
AuthenticationProvider authenticationProvider;
AuthenticationState authState;
private AuthenticationProvider authenticationProvider;
private AuthenticationState authState;
// In case of proxy, if the authentication credentials are forwardable,
// it will hold the credentials of the original client
AuthenticationState originalAuthState;
AuthenticationDataSource originalAuthData;
private AuthenticationState originalAuthState;
private AuthenticationDataSource originalAuthData;
private boolean pendingAuthChallengeResponse = false;

// Max number of pending requests per connections. If multiple producers are sharing the same connection the flow
Expand Down Expand Up @@ -382,19 +382,20 @@ private boolean invalidOriginalPrincipal(String originalPrincipal) {
// ////

private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName topicName, TopicOperation operation,
AuthenticationDataSource authData) {
AuthenticationDataSource authDataSource, AuthenticationDataSource originalAuthDataSource) {
if (!service.isAuthorizationEnabled()) {
return CompletableFuture.completedFuture(true);
}
CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (originalPrincipal != null) {
isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(
topicName, operation, originalPrincipal, authData);
topicName, operation, originalPrincipal,
originalAuthDataSource != null ? originalAuthDataSource : authDataSource);
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
}
CompletableFuture<Boolean> isAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(
topicName, operation, authRole, authData);
topicName, operation, authRole, authDataSource);
return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture, (isProxyAuthorized, isAuthorized) -> {
if (!isProxyAuthorized) {
log.warn("OriginalRole {} is not authorized to perform operation {} on topic {}",
Expand All @@ -411,9 +412,13 @@ private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName topicName,
private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName topicName, String subscriptionName,
TopicOperation operation) {
if (service.isAuthorizationEnabled()) {
AuthenticationDataSource authData =
new AuthenticationDataSubscription(getAuthenticationData(), subscriptionName);
return isTopicOperationAllowed(topicName, operation, authData);
AuthenticationDataSource authDataSource =
new AuthenticationDataSubscription(authenticationData, subscriptionName);
AuthenticationDataSource originalAuthDataSource = null;
if (originalAuthData != null) {
originalAuthDataSource = new AuthenticationDataSubscription(originalAuthData, subscriptionName);
}
return isTopicOperationAllowed(topicName, operation, authDataSource, originalAuthDataSource);
} else {
return CompletableFuture.completedFuture(true);
}
Expand Down Expand Up @@ -448,7 +453,7 @@ protected void handleLookup(CommandLookupTopic lookup) {
lookupSemaphore.release();
return;
}
isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, getAuthenticationData()).thenApply(
isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, authenticationData, originalAuthData).thenApply(
isAuthorized -> {
if (isAuthorized) {
lookupTopicAsync(getBrokerService().pulsar(), topicName, authoritative,
Expand Down Expand Up @@ -512,7 +517,7 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
lookupSemaphore.release();
return;
}
isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, getAuthenticationData()).thenApply(
isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, authenticationData, originalAuthData).thenApply(
isAuthorized -> {
if (isAuthorized) {
unsafeGetPartitionedTopicMetadataAsync(getBrokerService().pulsar(), topicName)
Expand Down Expand Up @@ -875,6 +880,7 @@ protected void handleConnect(CommandConnect connect) {
}
} else {
originalPrincipal = connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null;

if (log.isDebugEnabled()) {
log.debug("[{}] Authenticate original role (forwarded from proxy): {}",
remoteAddress, originalPrincipal);
Expand Down Expand Up @@ -1200,13 +1206,14 @@ protected void handleProducer(final CommandProducer cmdProducer) {
}

CompletableFuture<Boolean> isAuthorizedFuture = isTopicOperationAllowed(
topicName, TopicOperation.PRODUCE, getAuthenticationData()
topicName, TopicOperation.PRODUCE, authenticationData, originalAuthData
);

if (!Strings.isNullOrEmpty(initialSubscriptionName)) {
isAuthorizedFuture =
isAuthorizedFuture.thenCombine(
isTopicOperationAllowed(topicName, TopicOperation.SUBSCRIBE, getAuthenticationData()),
isTopicOperationAllowed(topicName, TopicOperation.SUBSCRIBE, authenticationData,
originalAuthData),
(canProduce, canSubscribe) -> canProduce && canSubscribe);
}

Expand Down Expand Up @@ -1949,7 +1956,7 @@ private CompletableFuture<Boolean> isNamespaceOperationAllowed(NamespaceName nam
CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (originalPrincipal != null) {
isProxyAuthorizedFuture = service.getAuthorizationService().allowNamespaceOperationAsync(
namespaceName, operation, originalPrincipal, getAuthenticationData());
namespaceName, operation, originalPrincipal, originalAuthData);
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
}
Expand Down Expand Up @@ -2956,6 +2963,7 @@ public void execute(Runnable runnable) {

@Override
public String clientSourceAddress() {
AuthenticationDataSource authenticationDataSource = this.getAuthData();
if (proxyMessage != null) {
return proxyMessage.sourceAddress();
} else if (remoteAddress instanceof InetSocketAddress) {
Expand Down Expand Up @@ -2993,4 +3001,29 @@ private static void logNamespaceNameAuthException(SocketAddress remoteAddress, S
public boolean hasProducers() {
return !producers.isEmpty();
}

@VisibleForTesting
protected String getOriginalPrincipal() {
return originalPrincipal;
}

@VisibleForTesting
protected AuthenticationDataSource getAuthData() {
return authenticationData;
}

@VisibleForTesting
protected AuthenticationDataSource getOriginalAuthData() {
return originalAuthData;
}

@VisibleForTesting
protected AuthenticationState getOriginalAuthState() {
return originalAuthState;
}

@VisibleForTesting
protected void setAuthRole(String authRole) {
this.authRole = authRole;
}
}
Loading

0 comments on commit 936bbbc

Please sign in to comment.