Skip to content

Commit

Permalink
[fix][broker] Fix passing incorrect authentication data (#16201)
Browse files Browse the repository at this point in the history
### Motivation

#16065 fixes the race condition issue, but introduces a new issue. This issue is triggered when the Proxy and Broker work together, when we use the proxy to request the broker to do lookup/subscribe/produce operation, the broker always uses the original authentication data for authorization, not proxy authentication data, which causes this issue.

### Modification

- Fix passing authentication data, differentiate between original auth data and connected auth data by avoid to use the  `getAuthenticationData()`, this method name is easy to cause confusion and can not correctly get the authentication data

(cherry picked from commit 936bbbc)
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
  • Loading branch information
nodece committed Jun 29, 2022
1 parent aaa6ef5 commit 6d51424
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,14 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers;
private State state;
private volatile boolean isActive = true;
String authRole = null;
AuthenticationDataSource authenticationData;
AuthenticationProvider authenticationProvider;
AuthenticationState authState;
private String authRole = null;
private volatile AuthenticationDataSource authenticationData;
private AuthenticationProvider authenticationProvider;
private AuthenticationState authState;
// In case of proxy, if the authentication credentials are forwardable,
// it will hold the credentials of the original client
AuthenticationState originalAuthState;
AuthenticationDataSource originalAuthData;
private AuthenticationState originalAuthState;
private AuthenticationDataSource originalAuthData;
private boolean pendingAuthChallengeResponse = false;

// Max number of pending requests per connections. If multiple producers are sharing the same connection the flow
Expand Down Expand Up @@ -294,57 +294,46 @@ private boolean invalidOriginalPrincipal(String originalPrincipal) {
// ////

private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName topicName, TopicOperation operation,
AuthenticationDataSource authData) {
AuthenticationDataSource authDataSource, AuthenticationDataSource originalAuthDataSource) {
if (!service.isAuthorizationEnabled()) {
return CompletableFuture.completedFuture(true);
}
CompletableFuture<Boolean> isProxyAuthorizedFuture;
CompletableFuture<Boolean> isAuthorizedFuture;
if (service.isAuthorizationEnabled()) {
if (originalPrincipal != null) {
isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(
topicName, operation, originalPrincipal, authData);
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
}
isAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(
topicName, operation, authRole, authData);
if (originalPrincipal != null) {
isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(
topicName, operation, originalPrincipal,
originalAuthDataSource != null ? originalAuthDataSource : authDataSource);
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
isAuthorizedFuture = CompletableFuture.completedFuture(true);
}
CompletableFuture<Boolean> isAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(
topicName, operation, authRole, authDataSource);
return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture, (isProxyAuthorized, isAuthorized) -> {
if (!isProxyAuthorized) {
log.warn("OriginalRole {} is not authorized to perform operation {} on topic {}",
originalPrincipal, operation, topicName);
originalPrincipal, operation, topicName);
}
if (!isAuthorized) {
log.warn("Role {} is not authorized to perform operation {} on topic {}",
authRole, operation, topicName);
authRole, operation, topicName);
}
return isProxyAuthorized && isAuthorized;
});
}

private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName topicName, String subscriptionName, TopicOperation operation) {
CompletableFuture<Boolean> isProxyAuthorizedFuture;
CompletableFuture<Boolean> isAuthorizedFuture;
private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName topicName, String subscriptionName,
TopicOperation operation) {
if (service.isAuthorizationEnabled()) {
AuthenticationDataSource authData =
new AuthenticationDataSubscription(getAuthenticationData(), subscriptionName);
return isTopicOperationAllowed(topicName, operation, authData);
AuthenticationDataSource authDataSource =
new AuthenticationDataSubscription(authenticationData, subscriptionName);
AuthenticationDataSource originalAuthDataSource = null;
if (originalAuthData != null) {
originalAuthDataSource = new AuthenticationDataSubscription(originalAuthData, subscriptionName);
}
return isTopicOperationAllowed(topicName, operation, authDataSource, originalAuthDataSource);
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
isAuthorizedFuture = CompletableFuture.completedFuture(true);
return CompletableFuture.completedFuture(true);
}
return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture, (isProxyAuthorized, isAuthorized) -> {
if (!isProxyAuthorized) {
log.warn("OriginalRole {} is not authorized to perform operation {} on topic {}, subscription {}",
originalPrincipal, operation, topicName, subscriptionName);
}
if (!isAuthorized) {
log.warn("Role {} is not authorized to perform operation {} on topic {}, subscription {}",
authRole, operation, topicName, subscriptionName);
}
return isProxyAuthorized && isAuthorized;
});
}

@Override
Expand All @@ -371,7 +360,7 @@ protected void handleLookup(CommandLookupTopic lookup) {
lookupSemaphore.release();
return;
}
isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, getAuthenticationData()).thenApply(
isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, authenticationData, originalAuthData).thenApply(
isAuthorized -> {
if (isAuthorized) {
lookupTopicAsync(getBrokerService().pulsar(), topicName, authoritative,
Expand Down Expand Up @@ -436,7 +425,7 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
lookupSemaphore.release();
return;
}
isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, getAuthenticationData()).thenApply(
isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, authenticationData, originalAuthData).thenApply(
isAuthorized -> {
if (isAuthorized) {
unsafeGetPartitionedTopicMetadataAsync(getBrokerService().pulsar(), topicName)
Expand Down Expand Up @@ -632,7 +621,8 @@ public void refreshAuthenticationCredentials() {
}

ctx.executor().execute(SafeRun.safeRun(() -> {
log.info("[{}] Refreshing authentication credentials for originalPrincipal {} and authRole {}", remoteAddress, originalPrincipal, this.authRole);
log.info("[{}] Refreshing authentication credentials for originalPrincipal {} and authRole {}",
remoteAddress, originalPrincipal, this.authRole);

if (!supportsAuthenticationRefresh()) {
log.warn("[{}] Closing connection because client doesn't support auth credentials refresh", remoteAddress);
Expand Down Expand Up @@ -1039,7 +1029,7 @@ protected void handleProducer(final CommandProducer cmdProducer) {
}

CompletableFuture<Boolean> isAuthorizedFuture = isTopicOperationAllowed(
topicName, TopicOperation.PRODUCE, getAuthenticationData()
topicName, TopicOperation.PRODUCE, authenticationData, originalAuthData
);
isAuthorizedFuture.thenApply(isAuthorized -> {
if (isAuthorized) {
Expand Down Expand Up @@ -1630,21 +1620,18 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {

private CompletableFuture<Boolean> isNamespaceOperationAllowed(NamespaceName namespaceName,
NamespaceOperation operation) {
if (!service.isAuthorizationEnabled()) {
return CompletableFuture.completedFuture(true);
}
CompletableFuture<Boolean> isProxyAuthorizedFuture;
CompletableFuture<Boolean> isAuthorizedFuture;
if (service.isAuthorizationEnabled()) {
if (originalPrincipal != null) {
isProxyAuthorizedFuture = service.getAuthorizationService().allowNamespaceOperationAsync(
namespaceName, operation, originalPrincipal, getAuthenticationData());
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
}
isAuthorizedFuture = service.getAuthorizationService().allowNamespaceOperationAsync(
namespaceName, operation, authRole, authenticationData);
if (originalPrincipal != null) {
isProxyAuthorizedFuture = service.getAuthorizationService().allowNamespaceOperationAsync(
namespaceName, operation, originalPrincipal, originalAuthData);
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
isAuthorizedFuture = CompletableFuture.completedFuture(true);
}
CompletableFuture<Boolean> isAuthorizedFuture = service.getAuthorizationService().allowNamespaceOperationAsync(
namespaceName, operation, authRole, authenticationData);
return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture, (isProxyAuthorized, isAuthorized) -> {
if (!isProxyAuthorized) {
log.warn("OriginalRole {} is not authorized to perform operation {} on namespace {}",
Expand Down Expand Up @@ -2365,4 +2352,29 @@ private static void logNamespaceNameAuthException(SocketAddress remoteAddress, S
remoteAddress, operation, principal, namespaceNameString, ex);
}
}

@VisibleForTesting
protected String getOriginalPrincipal() {
return originalPrincipal;
}

@VisibleForTesting
protected AuthenticationDataSource getAuthData() {
return authenticationData;
}

@VisibleForTesting
protected AuthenticationDataSource getOriginalAuthData() {
return originalAuthData;
}

@VisibleForTesting
protected AuthenticationState getOriginalAuthState() {
return originalAuthState;
}

@VisibleForTesting
protected void setAuthRole(String authRole) {
this.authRole = authRole;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1535,7 +1535,7 @@ protected void resetChannel() throws Exception {
channel.close().get();
}
serverCnx = new ServerCnx(pulsar);
serverCnx.authRole = "";
serverCnx.setAuthRole("");
channel = new EmbeddedChannel(new LengthFieldBasedFrameDecoder(MaxMessageSize, 0, 4, 0, 4), serverCnx);
}

Expand Down

0 comments on commit 6d51424

Please sign in to comment.