Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix update authentication data #18130

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,14 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
private final BrokerInterceptor brokerInterceptor;
private State state;
private volatile boolean isActive = true;
private SSLSession sslSession;
private String authRole = null;
private volatile AuthenticationDataSource authenticationData;
private AuthenticationProvider authenticationProvider;
private AuthenticationState authState;
// In case of proxy, if the authentication credentials are forwardable,
// it will hold the credentials of the original client
private AuthenticationProvider originalAuthenticationProvider;
private AuthenticationState originalAuthState;
private AuthenticationDataSource originalAuthData;
private boolean pendingAuthChallengeResponse = false;
Expand Down Expand Up @@ -707,73 +709,96 @@ private void completeConnect(int clientProtoVersion, String clientVersion, boole
}

// According to auth result, send newConnected or newAuthChallenge command.
private State doAuthentication(AuthData clientData,
int clientProtocolVersion,
String clientVersion) throws Exception {

private CompletableFuture<Void> doAuthenticationAsync(AuthData clientData, int clientProtocolVersion,
String clientVersion) {
// The original auth state can only be set on subsequent auth attempts (and only
// in presence of a proxy and if the proxy is forwarding the credentials).
// In this case, the re-validation needs to be done against the original client
// credentials.
boolean useOriginalAuthState = (originalAuthState != null);
AuthenticationState authState = useOriginalAuthState ? originalAuthState : this.authState;
String authRole = useOriginalAuthState ? originalPrincipal : this.authRole;
AuthData brokerData = authState.authenticate(clientData);

if (log.isDebugEnabled()) {
log.debug("Authenticate using original auth state : {}, role = {}", useOriginalAuthState, authRole);
}
// credentials, but we only can new an authentication state, because some authentication data(TLS, SASL)
// based on outside service.
Comment on lines +717 to +718
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain this a bit more? In the case of TLS authentication, we are not able to refresh the originalAuthenticationState, that case seems unrelated.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, there is some confusion. For originalAuthentication, we don't call the authentication checks. Because there is some state in the AuthenticationState, which depends on outside service.

@Override
public AuthData authenticate(AuthData authData) throws AuthenticationException {
return pulsarSaslServer.response(authData);
}

originalAuthentication belongs to the user's client by the proxy forwarded, see

return Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion,
PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
clientAuthMethod);

Copy link
Member

@michaeljmarshall michaeljmarshall Jan 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For originalAuthentication, we don't call the authentication checks.

Isn't this a problem though? We aren't really authenticating the originalAuthData if we don't call the authenticate method and make sure authentication is "complete". In the ProxyConnectionToBroker case, we can send back AuthChallenge In the event that the proxy is forwarding authentication information, we can issue AuthChallenge responses. It might not work so easily in the ProxyLookupRequests state.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the current design, the originalAuthData is credible by the proxy forwarding authentication data. This authentication data was completed in the proxy and then sent this authentication data to the broker, so we don't check the authentication data in the broker.

For ProxyLookupRequests state, see #17831. I forward the AuthChallenge command to the user's client by the ProxyClient and Proxy Server.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the current design, the originalAuthData is credible by the proxy forwarding authentication data. This authentication data was completed in the proxy and then sent this authentication data to the broker, so we don't check the authentication data in the broker.

My primary objection is that the setting required to enter this code block is called isAuthenticateOriginalAuthData. Note that we are already verifying most auth data in the case that the newAuthState method actually authenticates the data. That leaves out cases like SASL which do not authentication on AuthenticationState initialization because it is a multi-stage auth. However, I've just realized that forwarding the authentication information will especially not work with SASL because the proxy only forwards the last AuthData that it receives:

private void doAuthentication(AuthData clientData)
throws Exception {
AuthData brokerData = authState.authenticate(clientData);
// authentication has completed, will send newConnected command.
if (authState.isComplete()) {
clientAuthRole = authState.getAuthRole();
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Client successfully authenticated with {} role {}",
remoteAddress, authMethod, clientAuthRole);
}
// First connection
if (this.connectionPool == null || state == State.Connecting) {
// authentication has completed, will send newConnected command.
completeConnect(clientData);

This seems like a gap in the protocol where multi-staged authentication is not able to be properly processed through the proxy. However, if its a gap, that means it is something that could be improved without breaking anything.

One question is whether it would help to have a distinct stages where we first authenticate the proxy and then authenticate its original client (when configured to do so). That could help to open up the design and to make the protocol a bit clearer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a gap in the protocol where multi-staged authentication is not able to be properly processed through the proxy. However, if its a gap, that means it is something that could be improved without breaking anything.

When using the multi-staged authentication, we need to set authenticateOriginalAuthData=false in the broker, and then it works fine, the broker just checks the proxy's authentication data.

One question is whether it would help to have a distinct stages where we first authenticate the proxy and then authenticate its original client (when configured to do so).

The protocol needs to be changed, distinguishing between client and proxy.

The authentication logic here is a bit confusing, but for the current design we need to give the correct configuration in our documentation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When using the multi-staged authentication, we need to set authenticateOriginalAuthData=false in the broker, and then it works fine, the broker just checks the proxy's authentication data.

I think this supports my statement that there is a gap in the protocol.

// If we can get the role from the authentication sate, the global variable need to be updated.

if (authState.isComplete()) {
// Authentication has completed. It was either:
// 1. the 1st time the authentication process was done, in which case we'll send
// a `CommandConnected` response
// 2. an authentication refresh, in which case we need to refresh authenticationData

String newAuthRole = authState.getAuthRole();

// Refresh the auth data.
this.authenticationData = authState.getAuthDataSource();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent many hours working on the authentication framework code today. Below are some of my thoughts. They aren't necessarily my final opinion.

In the PR description, this line was identified as the root cause of the issue, but based on the description and on this PR, I think I see it as a symptom of a larger issue. We do not have a clear enough definition for the state transitions in the ServerCnx class, which is essentially a finite state machine.

I want to describe part of the problem with this line of code. On the first pass through, the this.authenticationData is set by getting it from this.authState.getAuthDataSource() because useOriginalAuthState is false. Then, on subsequent AuthResponse commands from the original client, the authenticationData is set by getting it from this.originalAuthState.getAuthDataSource() because useOriginalAuthState is true. That means the this.authenticationData is incorrectly updated.

As a note, the broker gets AuthResponse commands from the original client when the connection through the client is in state ProxyConnectionToBroker. As of #17831, the broker also gets an AuthResponse from the original client when the proxy is in state ProxyLookupRequests with forwardAuthorizationCredentials=true.

Otherwise, the proxy sends its own authentication information.

Here are some problems with the current solution:

  1. The AuthResponse protocol message only has one field for AuthData, and there is no indication whether the AuthData is for the proxy or the original client. As a consequence, the broker does not know whether to update the authenticationData or the originalAuthData.
  2. The implementation for getAuthDataSource does not always get the most recent authenticationDataSource See TokenAuthenticationState. (I am already working on fixing this.)

Open questions:

  1. Can we just remove the originalAuthState and only keep track of one authState?
  2. Is the AuthenticationState object meant to last the whole lifecycle of a given ServerCnx? In my mind, the answer is yes, but this PR says otherwise. If not, it feels odd that we have a "state" object.

Copy link
Member Author

@nodece nodece Jan 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of your description is correct.

  1. The AuthResponse protocol message only has one field for AuthData, and there is no indication whether the AuthData is for the proxy or the original client. As a consequence, the broker does not know whether to update the authenticationData or the originalAuthData.

I started PIP a few months ago, see #17517

    1. Can we just remove the originalAuthState and only keep track of one authState?

No. We must check both the originalAuthState and authState, if you don't check, you will overstep your authority.

  1. Is the AuthenticationState object meant to last the whole lifecycle of a given ServerCnx? In my mind, the answer is yes, but this PR says otherwise. If not, it feels odd that we have a "state" object.

When authentication data changes, we need to renew the authentication state, and then we always get the correct data from that.

"state" is originalAuthState or authState.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. We must check both the originalAuthState and authState, if you don't check, you will overstep your authority.

I agree that we currently check authorization on the proxy role and the original principal as well as with the proxy's authenticationDataSource and the originalAuthenticationDataSource. However, I am not sure why that is necessary, and as we've discussed, this is not even done correctly for the auth data source.

What is the case that a client will overstep its authority?

When the proxy is forwarding authentication data, it seems sufficient to verify that the proxy's authentication data is valid for a proxy as a one time check during connection initialization. All subsequent auth challenges will go to the client anyway.

If the proxy is unable to forward authentication data, then the broker will only have the proxy's authentication data and the originalPrincipal.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, I am not sure why that is necessary, and as we've discussed, this is not even done correctly for the auth data source.

For the current design, the authenticationDataSource is credible by the proxy forwarding authentication data. This authentication data was completed in the proxy and then sent this authentication data to the broker, so we don't check the authentication data in the broker.

When the proxy is forwarding authentication data, it seems sufficient to verify that the proxy's authentication data is valid for a proxy as a one time check during connection initialization. All subsequent auth challenges will go to the client anyway.

I thought the same thing, but the Proxy's role usually is a superuser, if we don't check the client's role, this is overstepping its authority, and the client can get the lookup data and topic data.

If we don't consider this case, I agree with you.

No. We must check both the originalAuthState and authState, if you don't check, you will overstep your authority.

I agree that we currently check authorization on the proxy role and the original principal as well as with the proxy's authenticationDataSource and the originalAuthenticationDataSource. However, I am not sure why that is necessary, and as we've discussed, this is not even done correctly for the auth data source.

What is the case that a client will overstep its authority?

When the proxy is forwarding authentication data, it seems sufficient to verify that the proxy's authentication data is valid for a proxy as a one time check during connection initialization. All subsequent auth challenges will go to the client anyway.

If the proxy is unable to forward authentication data, then the broker will only have the proxy's authentication data and the originalPrincipal.

Right.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the same thing, but the Proxy's role usually is a superuser, if we don't check the client's role, this is overstepping its authority, and the client can get the lookup data and topic data.

If we don't consider this case, I agree with you.

Can you describe the attack vector? From my perspective, I don't see how a client would be able to assume the proxy's role.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the proxy is forwarding authentication data, it seems sufficient to verify that the proxy's authentication data is valid for a proxy as a one time check during connection initialization. All subsequent auth challenges will go to the client anyway.

This is ProxyConnectionToBroker operation, we also consider ProxyLookupRequests logic.

Can you describe the attack vector? From my perspective, I don't see how a client would be able to assume the proxy's role.

Would you like to pass only the proxy's authentication data? If yes, that's going to cross the line.

In proxy, the connection logic between ProxyLookupRequests and ProxyConnectionToBroker is different.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the proxy is forwarding authentication data, it seems sufficient to verify that the proxy's authentication data is valid for a proxy as a one time check during connection initialization. All subsequent auth challenges will go to the client anyway.

This is ProxyConnectionToBroker operation, we also consider ProxyLookupRequests logic.

After #17831, when a proxy connection is in state ProxyLookupRequests with forwardAuthorizationCredentials=true, the proxy connection responds to the AuthChallenge with the original client's auth data by forwarding the broker's challenge to the client. That is why I said the auth challenges go to the client in that case.

Would you like to pass only the proxy's authentication data? If yes, that's going to cross the line.

No, that's not my suggestion. I am suggesting that when the proxy sends both its authData and the client's authData, the broker needs to only verify the proxy's authData at the beginning and at no other time because in all of those cases, the broker's auth challenge is going back to the original client and the proxy's authData is never again updated. This is the case because the proxy only forwards the client's auth data when forwardAuthorizationCredentials=true. One issue with my proposed solution is that the solution introduced in #17831 is not present in older versions of the proxy, so we'd need a way to address those proxies.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good explanation!

boolean useOriginalAuthState = (originalAuthState != null);
if (state == State.Connected) {
// For auth challenge, the authentication state requires to be updated.
if (log.isDebugEnabled()) {
log.debug("[{}] Auth data refreshed for role={}", remoteAddress, this.authRole);
log.debug("Refreshing authenticate state, original auth state: {}, original auth role: {}, "
+ "auth role: {}",
useOriginalAuthState, originalPrincipal, authRole);
}

if (!useOriginalAuthState) {
this.authRole = newAuthRole;
try {
if (useOriginalAuthState) {
originalAuthState =
originalAuthenticationProvider.newAuthState(clientData, remoteAddress, sslSession);
} else {
authState = authenticationProvider.newAuthState(clientData, remoteAddress, sslSession);
Comment on lines +731 to +734
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not feel right based on the AuthenticationState interface, which provides hooks for calls to authenticate. Can you provide additional motivation for this change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When authentication data changes, we need to renew the authentication state, and then we always get the correct data from that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the underlying problem is that calling authenticate does not update the AuthenticationDataSource in our token authentication provider. One solution is to recreate an auth state object. However, this has not been the implicit contract in the ServerCnx for as long as we've had the AuthenticationState. The contract has been that the AuthenticationState lasts the whole life of the connection. I think we should update the TokenAuthenticationState so that authenticate updates the AuthenticationDataSource.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. We can fix built-in plugins easily, but we also need to consider third-party plugins.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually don't think third-party plugins are going to be a problem here. The AuthenticationState object has lived the whole life of the ServerCnx already, so updating the TokenAuthenticationState#authenticate implementation shouldn't be a problem. Or, when you say "third-party plugins", are you referencing to those plugins relying on the TokenAuthenticationState? Even so, I think it's a pretty easy change, and I'd even consider it a bug since the AuthenticationDataSource is static even though we try to update it in the ServerCnx.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created #19282 to start a concrete discussion.

Copy link
Member Author

@nodece nodece Jan 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Third-party plugins are the custom plug-in from users. I worry that breaks the user's plugin here if we don't recreate AuthenticationState.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I worry that breaks the user's plugin here if we don't recreate AuthenticationState.

We don't recreate the auth state now. Is there a specific case that you're worried about breaking?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't recreate the auth state now. Is there a specific case that you're worried about breaking?

Sure.

}
} catch (AuthenticationException e) {
return CompletableFuture.failedFuture(e);
}
}

if (log.isDebugEnabled()) {
log.debug("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}",
remoteAddress, authMethod, this.authRole, originalPrincipal);
CompletableFuture<AuthData> authFuture = CompletableFuture.completedFuture(null);
if (!useOriginalAuthState && !authState.isComplete()) {
try {
authFuture = CompletableFuture.completedFuture(authState.authenticate(clientData));
} catch (AuthenticationException e) {
authFuture = CompletableFuture.failedFuture(e);
}
}
return authFuture.thenCompose(nextAuthData -> {
if (nextAuthData == null) {
// Authentication has completed. It was either:
// 1. the 1st time the authentication process was done, in which case we'll send
// a `CommandConnected` response
// 2. an authentication refresh, in which case we need to refresh authenticationData and role

if (state != State.Connected) {
// First time authentication is done
completeConnect(clientProtocolVersion, clientVersion, enableSubscriptionPatternEvaluation);
} else {
// If the connection was already ready, it means we're doing a refresh
if (!StringUtils.isEmpty(authRole)) {
if (!authRole.equals(newAuthRole)) {
log.warn("[{}] Principal cannot change during an authentication refresh expected={} got={}",
remoteAddress, authRole, newAuthRole);
ctx.close();
} else {
log.info("[{}] Refreshed authentication credentials for role {}", remoteAddress, authRole);
AuthenticationState authState = useOriginalAuthState ? originalAuthState : this.authState;
AuthenticationDataSource newAuthDataSource = authState.getAuthDataSource();

String newAuthRole = null;
try {
newAuthRole = authState.getAuthRole();
} catch (AuthenticationException e) {
log.warn("[{}] Failed to get auth role", remoteAddress, e);
if (!useOriginalAuthState) {
return CompletableFuture.failedFuture(e);
}
}
}

return State.Connected;
}
if (useOriginalAuthState) {
if (newAuthRole != null) {
this.originalPrincipal = newAuthRole;
}
this.originalAuthData = newAuthDataSource;
} else {
this.authRole = newAuthRole;
this.authenticationData = newAuthDataSource;
}

// auth not complete, continue auth with client side.
writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData, clientProtocolVersion));
if (log.isDebugEnabled()) {
log.debug("[{}] Authentication in progress client by method {}.",
remoteAddress, authMethod);
log.debug("[{}] connect state change to : [{}]", remoteAddress, State.Connecting.name());
}
return State.Connecting;
log.info("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}, "
+ "using original auth state: {}",
remoteAddress, authMethod, this.authRole, originalPrincipal, originalAuthState);
if (state != State.Connected) {
// First time authentication is done
completeConnect(clientProtocolVersion, clientVersion, enableSubscriptionPatternEvaluation);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we can move out from the thenCompose()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems unnecessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have checked if the state is connected at the beginning. But we going to check the connection is not connected again here. If we can merge them, it will make the code more easy to read.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have checked if the state is connected at the beginning.

The state == State.Connected means doing an authentication refresh operation.

The state != State.Connected means first authentication is done.

} else {
// Auth challenge is done, switch state to Connected.
state = State.Connected;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And if we can reach here. The state should be Connected right? Why need to set to Connected again?

And we should avoid change the state in another thread. Here is executed by the callback thread for completing the authFuture.

Copy link
Member Author

@nodece nodece Oct 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And if we can reach here. The state should be Connected right? Why need to set to Connected again?

You are right., For auth challenge multiple times, we still need to set to Conected, see line 764.

And we should avoid change the state in another thread. Here is executed by the callback thread for completing the authFuture.

Do you mean I should switch to Pulsar thread to change the state?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And we should avoid change the state in another thread. Here is executed by the callback thread for completing the authFuture.

This is safe, each connection has its own ServerCnx.java, don't warry.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe @codelipenghui is right here. We are updating the state from a callback which could be another thread, and that is not a safe operation here. Instead, I think we should remove all of the async references in this PR and just focus on fixing the underlying problem with authentication data. I already started work on implementing PIP 97, and I will follow up on this PR to replace the synchronous methods with asynchronous calls.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to know why this is not a safe operation. Are you worried that some other handler will change this state?

I think we should remove all of the async references in this PR and just focus on fixing the underlying problem with authentication data.

Good point out.

log.info("[{}] Refreshed authentication credentials", remoteAddress);
}
} else {
// auth not complete, continue auth with client side.
writeAndFlush(Commands.newAuthChallenge(authMethod, nextAuthData, clientProtocolVersion));
if (log.isDebugEnabled()) {
log.debug("[{}] Authentication in progress client by method {}, using original auth state: {}",
remoteAddress, authMethod, useOriginalAuthState);
log.debug("[{}] connect state change to : [{}]", remoteAddress, State.Connecting.name());
}
state = State.Connecting;
}
return CompletableFuture.completedFuture(null);
});
}

public void refreshAuthenticationCredentials() {
Expand Down Expand Up @@ -905,72 +930,62 @@ protected void handleConnect(CommandConnect connect) {

// init authState and other var
ChannelHandler sslHandler = ctx.channel().pipeline().get(PulsarChannelInitializer.TLS_HANDLER);
SSLSession sslSession = null;
if (sslHandler != null) {
sslSession = ((SslHandler) sslHandler).engine().getSession();
}

authState = authenticationProvider.newAuthState(clientData, remoteAddress, sslSession);
doAuthenticationAsync(clientData, clientProtocolVersion, clientVersion).thenCompose(__ -> {
// This will fail the check if:
// 1. client is coming through a proxy
// 2. we require to validate the original credentials
// 3. no credentials were passed
if (connect.hasOriginalPrincipal()
&& service.getPulsar().getConfig().isAuthenticateOriginalAuthData()) {
// init authentication
String originalAuthMethod;
if (connect.hasOriginalAuthMethod()) {
originalAuthMethod = connect.getOriginalAuthMethod();
} else {
originalAuthMethod = "none";
}

if (log.isDebugEnabled()) {
String role = "";
if (authState != null && authState.isComplete()) {
role = authState.getAuthRole();
} else {
role = "authentication incomplete or null";
}
log.debug("[{}] Authenticate role : {}", remoteAddress, role);
}

state = doAuthentication(clientData, clientProtocolVersion, clientVersion);

// This will fail the check if:
// 1. client is coming through a proxy
// 2. we require to validate the original credentials
// 3. no credentials were passed
if (connect.hasOriginalPrincipal() && service.getPulsar().getConfig().isAuthenticateOriginalAuthData()) {
// init authentication
String originalAuthMethod;
if (connect.hasOriginalAuthMethod()) {
originalAuthMethod = connect.getOriginalAuthMethod();
} else {
originalAuthMethod = "none";
}

AuthenticationProvider originalAuthenticationProvider = getBrokerService()
.getAuthenticationService()
.getAuthenticationProvider(originalAuthMethod);
originalAuthenticationProvider =
getBrokerService().getAuthenticationService().getAuthenticationProvider(originalAuthMethod);

if (originalAuthenticationProvider == null) {
throw new AuthenticationException(
String.format("Can't find AuthenticationProvider for original role"
+ " using auth method [%s] is not available", originalAuthMethod));
}
if (originalAuthenticationProvider == null) {
return CompletableFuture.failedFuture(new AuthenticationException(String.format(
"Can't find AuthenticationProvider for original role"
+ " using auth method [%s] is not available", originalAuthMethod)));
}

originalAuthState = originalAuthenticationProvider.newAuthState(
AuthData.of(connect.getOriginalAuthData().getBytes()),
remoteAddress,
sslSession);
originalAuthData = originalAuthState.getAuthDataSource();
originalPrincipal = originalAuthState.getAuthRole();
try {
originalAuthState = originalAuthenticationProvider.newAuthState(
AuthData.of(connect.getOriginalAuthData().getBytes()), remoteAddress, sslSession);
originalAuthData = originalAuthState.getAuthDataSource();
originalPrincipal = originalAuthState.getAuthRole();

if (log.isDebugEnabled()) {
log.debug("[{}] Authenticate original role : {}", remoteAddress, originalPrincipal);
}
} else {
originalPrincipal = connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null;
if (log.isDebugEnabled()) {
log.debug("[{}] Authenticate original role : {}", remoteAddress, originalPrincipal);
}
} catch (AuthenticationException e) {
return CompletableFuture.failedFuture(e);
}
} else {
originalPrincipal = connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null;

if (log.isDebugEnabled()) {
log.debug("[{}] Authenticate original role (forwarded from proxy): {}",
remoteAddress, originalPrincipal);
if (log.isDebugEnabled()) {
log.debug("[{}] Authenticate original role (forwarded from proxy): {}", remoteAddress,
originalPrincipal);
}
}
}
return CompletableFuture.completedFuture(null);
}).exceptionally(e -> {
closeWithAuthException("connect", e);
codelipenghui marked this conversation as resolved.
Show resolved Hide resolved
return null;
});
} catch (Exception e) {
service.getPulsarStats().recordConnectionCreateFail();
logAuthException(remoteAddress, "connect", getPrincipal(), Optional.empty(), e);
String msg = "Unable to authenticate";
writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, msg));
close();
closeWithAuthException("connect", e);
}
}

Expand All @@ -986,22 +1001,14 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {
remoteAddress, authResponse.getResponse().getAuthMethodName());
}

try {
AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
doAuthentication(clientData, authResponse.getProtocolVersion(),
authResponse.hasClientVersion() ? authResponse.getClientVersion() : EMPTY);
} catch (AuthenticationException e) {
service.getPulsarStats().recordConnectionCreateFail();
log.warn("[{}] Authentication failed: {} ", remoteAddress, e.getMessage());
writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, e.getMessage()));
close();
} catch (Exception e) {
service.getPulsarStats().recordConnectionCreateFail();
String msg = "Unable to handleAuthResponse";
log.warn("[{}] {} ", remoteAddress, msg, e);
writeAndFlush(Commands.newError(-1, ServerError.UnknownError, msg));
close();
}
AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
String clientVersion = authResponse.hasClientVersion() ? authResponse.getClientVersion() : EMPTY;
doAuthenticationAsync(clientData, authResponse.getProtocolVersion(), clientVersion)
.whenComplete((__, e) -> {
if (e != null) {
closeWithAuthException("authResponse", e);
}
});
}

@Override
Expand Down Expand Up @@ -3172,11 +3179,19 @@ public String clientSourceAddress() {
}
}

private void closeWithAuthException(String operation, Throwable e) {
Throwable unwrapEx = FutureUtil.unwrapCompletionException(e);
service.getPulsarStats().recordConnectionCreateFail();
logAuthException(remoteAddress, operation, getPrincipal(), Optional.empty(), unwrapEx);
writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, unwrapEx.getMessage()));
close();
}

private static void logAuthException(SocketAddress remoteAddress, String operation,
String principal, Optional<TopicName> topic, Throwable ex) {
String topicString = topic.map(t -> ", topic=" + t.toString()).orElse("");
if (ex instanceof AuthenticationException) {
log.info("[{}] Failed to authenticate: operation={}, principal={}{}, reason={}",
log.error("[{}] Failed to authenticate: operation={}, principal={}{}, reason={}",
remoteAddress, operation, principal, topicString, ex.getMessage());
} else {
log.error("[{}] Error trying to authenticate: operation={}, principal={}{}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ public void testPulsarSqlAuth() throws PulsarAdminException {
Assert.fail(); // should fail
} catch (TrinoException e){
Assert.assertEquals(PERMISSION_DENIED.toErrorCode(), e.getErrorCode());
Assert.assertTrue(e.getMessage().contains("not authorized"));
}

// test clean session
Expand All @@ -209,7 +208,6 @@ public void testPulsarSqlAuth() throws PulsarAdminException {
Assert.fail(); // should fail
} catch (TrinoException e){
Assert.assertEquals(PERMISSION_DENIED.toErrorCode(), e.getErrorCode());
Assert.assertTrue(e.getMessage().contains("Unable to authenticate"));
}

pulsarAuth.cleanSession(session);
Expand All @@ -226,7 +224,6 @@ public void testPulsarSqlAuth() throws PulsarAdminException {
Assert.fail(); // should fail
} catch (TrinoException e){
Assert.assertEquals(PERMISSION_DENIED.toErrorCode(), e.getErrorCode());
Assert.assertTrue(e.getMessage().contains("not authorized"));
}

pulsarAuth.cleanSession(session);
Expand Down
Loading