Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[branch-2.7][fix][broker] Fix passing incorrect authentication data #16278

Merged
merged 1 commit into from
Jul 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,14 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers;
private State state;
private volatile boolean isActive = true;
String authRole = null;
AuthenticationDataSource authenticationData;
AuthenticationProvider authenticationProvider;
AuthenticationState authState;
private String authRole = null;
private volatile AuthenticationDataSource authenticationData;
private AuthenticationProvider authenticationProvider;
private AuthenticationState authState;
// In case of proxy, if the authentication credentials are forwardable,
// it will hold the credentials of the original client
AuthenticationState originalAuthState;
AuthenticationDataSource originalAuthData;
private AuthenticationState originalAuthState;
private AuthenticationDataSource originalAuthData;
private boolean pendingAuthChallengeResponse = false;

// Max number of pending requests per connections. If multiple producers are sharing the same connection the flow
Expand Down Expand Up @@ -294,57 +294,46 @@ private boolean invalidOriginalPrincipal(String originalPrincipal) {
// ////

private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName topicName, TopicOperation operation,
AuthenticationDataSource authData) {
AuthenticationDataSource authDataSource, AuthenticationDataSource originalAuthDataSource) {
if (!service.isAuthorizationEnabled()) {
return CompletableFuture.completedFuture(true);
}
CompletableFuture<Boolean> isProxyAuthorizedFuture;
CompletableFuture<Boolean> isAuthorizedFuture;
if (service.isAuthorizationEnabled()) {
if (originalPrincipal != null) {
isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(
topicName, operation, originalPrincipal, authData);
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
}
isAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(
topicName, operation, authRole, authData);
if (originalPrincipal != null) {
isProxyAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(
topicName, operation, originalPrincipal,
originalAuthDataSource != null ? originalAuthDataSource : authDataSource);
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
isAuthorizedFuture = CompletableFuture.completedFuture(true);
}
CompletableFuture<Boolean> isAuthorizedFuture = service.getAuthorizationService().allowTopicOperationAsync(
topicName, operation, authRole, authDataSource);
return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture, (isProxyAuthorized, isAuthorized) -> {
if (!isProxyAuthorized) {
log.warn("OriginalRole {} is not authorized to perform operation {} on topic {}",
originalPrincipal, operation, topicName);
originalPrincipal, operation, topicName);
}
if (!isAuthorized) {
log.warn("Role {} is not authorized to perform operation {} on topic {}",
authRole, operation, topicName);
authRole, operation, topicName);
}
return isProxyAuthorized && isAuthorized;
});
}

private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName topicName, String subscriptionName, TopicOperation operation) {
CompletableFuture<Boolean> isProxyAuthorizedFuture;
CompletableFuture<Boolean> isAuthorizedFuture;
private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName topicName, String subscriptionName,
TopicOperation operation) {
if (service.isAuthorizationEnabled()) {
AuthenticationDataSource authData =
new AuthenticationDataSubscription(getAuthenticationData(), subscriptionName);
return isTopicOperationAllowed(topicName, operation, authData);
AuthenticationDataSource authDataSource =
new AuthenticationDataSubscription(authenticationData, subscriptionName);
AuthenticationDataSource originalAuthDataSource = null;
if (originalAuthData != null) {
originalAuthDataSource = new AuthenticationDataSubscription(originalAuthData, subscriptionName);
}
return isTopicOperationAllowed(topicName, operation, authDataSource, originalAuthDataSource);
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
isAuthorizedFuture = CompletableFuture.completedFuture(true);
return CompletableFuture.completedFuture(true);
}
return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture, (isProxyAuthorized, isAuthorized) -> {
if (!isProxyAuthorized) {
log.warn("OriginalRole {} is not authorized to perform operation {} on topic {}, subscription {}",
originalPrincipal, operation, topicName, subscriptionName);
}
if (!isAuthorized) {
log.warn("Role {} is not authorized to perform operation {} on topic {}, subscription {}",
authRole, operation, topicName, subscriptionName);
}
return isProxyAuthorized && isAuthorized;
});
}

@Override
Expand All @@ -371,7 +360,7 @@ protected void handleLookup(CommandLookupTopic lookup) {
lookupSemaphore.release();
return;
}
isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, getAuthenticationData()).thenApply(
isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, authenticationData, originalAuthData).thenApply(
isAuthorized -> {
if (isAuthorized) {
lookupTopicAsync(getBrokerService().pulsar(), topicName, authoritative,
Expand Down Expand Up @@ -436,7 +425,7 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
lookupSemaphore.release();
return;
}
isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, getAuthenticationData()).thenApply(
isTopicOperationAllowed(topicName, TopicOperation.LOOKUP, authenticationData, originalAuthData).thenApply(
isAuthorized -> {
if (isAuthorized) {
unsafeGetPartitionedTopicMetadataAsync(getBrokerService().pulsar(), topicName)
Expand Down Expand Up @@ -632,7 +621,8 @@ public void refreshAuthenticationCredentials() {
}

ctx.executor().execute(SafeRun.safeRun(() -> {
log.info("[{}] Refreshing authentication credentials for originalPrincipal {} and authRole {}", remoteAddress, originalPrincipal, this.authRole);
log.info("[{}] Refreshing authentication credentials for originalPrincipal {} and authRole {}",
remoteAddress, originalPrincipal, this.authRole);

if (!supportsAuthenticationRefresh()) {
log.warn("[{}] Closing connection because client doesn't support auth credentials refresh", remoteAddress);
Expand Down Expand Up @@ -1039,7 +1029,7 @@ protected void handleProducer(final CommandProducer cmdProducer) {
}

CompletableFuture<Boolean> isAuthorizedFuture = isTopicOperationAllowed(
topicName, TopicOperation.PRODUCE, getAuthenticationData()
topicName, TopicOperation.PRODUCE, authenticationData, originalAuthData
);
isAuthorizedFuture.thenApply(isAuthorized -> {
if (isAuthorized) {
Expand Down Expand Up @@ -1658,21 +1648,18 @@ private void handleLastMessageIdFromCompactedLedger(PersistentTopic persistentTo

private CompletableFuture<Boolean> isNamespaceOperationAllowed(NamespaceName namespaceName,
NamespaceOperation operation) {
if (!service.isAuthorizationEnabled()) {
return CompletableFuture.completedFuture(true);
}
CompletableFuture<Boolean> isProxyAuthorizedFuture;
CompletableFuture<Boolean> isAuthorizedFuture;
if (service.isAuthorizationEnabled()) {
if (originalPrincipal != null) {
isProxyAuthorizedFuture = service.getAuthorizationService().allowNamespaceOperationAsync(
namespaceName, operation, originalPrincipal, getAuthenticationData());
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
}
isAuthorizedFuture = service.getAuthorizationService().allowNamespaceOperationAsync(
namespaceName, operation, authRole, authenticationData);
if (originalPrincipal != null) {
isProxyAuthorizedFuture = service.getAuthorizationService().allowNamespaceOperationAsync(
namespaceName, operation, originalPrincipal, originalAuthData);
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
isAuthorizedFuture = CompletableFuture.completedFuture(true);
}
CompletableFuture<Boolean> isAuthorizedFuture = service.getAuthorizationService().allowNamespaceOperationAsync(
namespaceName, operation, authRole, authenticationData);
return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture, (isProxyAuthorized, isAuthorized) -> {
if (!isProxyAuthorized) {
log.warn("OriginalRole {} is not authorized to perform operation {} on namespace {}",
Expand Down Expand Up @@ -2393,4 +2380,29 @@ private static void logNamespaceNameAuthException(SocketAddress remoteAddress, S
remoteAddress, operation, principal, namespaceNameString, ex);
}
}

@VisibleForTesting
protected String getOriginalPrincipal() {
return originalPrincipal;
}

@VisibleForTesting
protected AuthenticationDataSource getAuthData() {
return authenticationData;
}

@VisibleForTesting
protected AuthenticationDataSource getOriginalAuthData() {
return originalAuthData;
}

@VisibleForTesting
protected AuthenticationState getOriginalAuthState() {
return originalAuthState;
}

@VisibleForTesting
protected void setAuthRole(String authRole) {
this.authRole = authRole;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1535,7 +1535,7 @@ protected void resetChannel() throws Exception {
channel.close().get();
}
serverCnx = new ServerCnx(pulsar);
serverCnx.authRole = "";
serverCnx.setAuthRole("");
channel = new EmbeddedChannel(new LengthFieldBasedFrameDecoder(MaxMessageSize, 0, 4, 0, 4), serverCnx);
}

Expand Down