Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][proxy] PIP 97: Implement for ProxyConnection #19292

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -311,13 +313,9 @@ protected static boolean isTlsChannel(Channel channel) {
return channel.pipeline().get(ServiceChannelInitializer.TLS_HANDLER) != null;
}

private synchronized void completeConnect(AuthData clientData) throws PulsarClientException {
private synchronized void completeConnect() throws PulsarClientException {
Supplier<ClientCnx> clientCnxSupplier;
if (service.getConfiguration().isAuthenticationEnabled()) {
if (service.getConfiguration().isForwardAuthorizationCredentials()) {
this.clientAuthData = clientData;
this.clientAuthMethod = authMethod;
}
clientCnxSupplier = () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole,
clientAuthData, clientAuthMethod, protocolVersionToAdvertise,
service.getConfiguration().isForwardAuthorizationCredentials(), this);
Expand Down Expand Up @@ -423,36 +421,57 @@ public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnec
// According to auth result, send newConnected or newAuthChallenge command.
private void doAuthentication(AuthData clientData)
throws Exception {
AuthData brokerData = authState.authenticate(clientData);
// authentication has completed, will send newConnected command.
if (authState.isComplete()) {
clientAuthRole = authState.getAuthRole();
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Client successfully authenticated with {} role {}",
remoteAddress, authMethod, clientAuthRole);
}
authState
.authenticateAsync(clientData)
.whenCompleteAsync((authChallenge, throwable) -> {
if (throwable == null) {
authChallengeSuccessCallback(authChallenge);
} else {
authenticationFailedCallback(throwable);
}}, ctx.executor());
}

// First connection
if (this.connectionPool == null || state == State.Connecting) {
// authentication has completed, will send newConnected command.
completeConnect(clientData);
protected void authenticationFailedCallback(Throwable t) {
LOG.warn("[{}] Unable to authenticate: ", remoteAddress, t);
final ByteBuf msg = Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate");
writeAndFlushAndClose(msg);
}

// Always run in this class's event loop.
protected void authChallengeSuccessCallback(AuthData authChallenge) {
try {
// authentication has completed, will send newConnected command.
if (authChallenge == null) {
clientAuthRole = authState.getAuthRole();
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Client successfully authenticated with {} role {}",
remoteAddress, authMethod, clientAuthRole);
}

// First connection
if (this.connectionPool == null || state == State.Connecting) {
// authentication has completed, will send newConnected command.
completeConnect();
}
return;
}
return;
}

// auth not complete, continue auth with client side.
final ByteBuf msg = Commands.newAuthChallenge(authMethod, brokerData, protocolVersionToAdvertise);
writeAndFlush(msg);
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Authentication in progress client by method {}.",
remoteAddress, authMethod);
// auth not complete, continue auth with client side.
final ByteBuf msg = Commands.newAuthChallenge(authMethod, authChallenge, protocolVersionToAdvertise);
writeAndFlush(msg);
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Authentication in progress client by method {}.",
remoteAddress, authMethod);
}
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we catch blindly "Exception" here ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote it this way because we are in a callback and if we miss the exception, we will leak the connection forever. If we think we should only catch more specific exceptions, I can update it to catch AuthenticationExcpetion and RuntimeException.

authenticationFailedCallback(e);
}
state = State.Connecting;
}

@Override
protected void handleConnect(CommandConnect connect) {
checkArgument(state == State.Init);
state = State.Connecting;
this.setRemoteEndpointProtocolVersion(connect.getProtocolVersion());
this.hasProxyToBrokerUrl = connect.hasProxyToBrokerUrl();
this.protocolVersionToAdvertise = getProtocolVersionToAdvertise(connect);
Expand All @@ -479,7 +498,7 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),

// authn not enabled, complete
if (!service.getConfiguration().isAuthenticationEnabled()) {
completeConnect(null);
completeConnect();
return;
}

Expand All @@ -493,6 +512,14 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
authMethod = "none";
}

if (service.getConfiguration().isForwardAuthorizationCredentials()) {
// We store the first clientData here. Before this commit, we stored the last clientData.
// Since this only works when forwarding single staged authentication, first == last is true.
// Here is an issue to fix the protocol: https://github.com/apache/pulsar/issues/19291.
this.clientAuthData = clientData;
this.clientAuthMethod = authMethod;
}

authenticationProvider = service
.getAuthenticationService()
.getAuthenticationProvider(authMethod);
Expand All @@ -504,7 +531,7 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
.orElseThrow(() ->
new AuthenticationException("No anonymous role, and no authentication provider configured"));

completeConnect(clientData);
completeConnect();
return;
}

Expand All @@ -518,9 +545,7 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
authState = authenticationProvider.newAuthState(clientData, remoteAddress, sslSession);
doAuthentication(clientData);
} catch (Exception e) {
LOG.warn("[{}] Unable to authenticate: ", remoteAddress, e);
final ByteBuf msg = Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate");
writeAndFlushAndClose(msg);
authenticationFailedCallback(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import javax.naming.AuthenticationException;

Expand Down Expand Up @@ -136,7 +137,7 @@ public String getAuthMethodName() {
}

@Override
public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
public CompletableFuture<String> authenticateAsync(AuthenticationDataSource authData) {
String commandData = null;
if (authData.hasDataFromCommand()) {
commandData = authData.getCommandData();
Expand All @@ -150,9 +151,12 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat
long currentTimeInMillis = System.currentTimeMillis();
if (expiryTimeInMillis < currentTimeInMillis) {
log.warn("Auth failed due to timeout");
throw new AuthenticationException("Authentication data has been expired");
return CompletableFuture
.failedFuture(new AuthenticationException("Authentication data has been expired"));
}
return element.get("entityType").getAsString();
final String result = element.get("entityType").getAsString();
eolivelli marked this conversation as resolved.
Show resolved Hide resolved
// Run in another thread to attempt to test the async logic
return CompletableFuture.supplyAsync(() -> result);
}
}

Expand Down