Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][proxy] Update clientAuthData in ProxyConnection as needed #19026

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,11 @@
*/
public class ProxyClientCnx extends ClientCnx {
private final boolean forwardClientAuthData;
private final String clientAuthMethod;
private final String clientAuthRole;
private final AuthData clientAuthData;
private final ProxyConnection proxyConnection;

public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole,
AuthData clientAuthData, String clientAuthMethod, int protocolVersion,
public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, int protocolVersion,
boolean forwardClientAuthData, ProxyConnection proxyConnection) {
super(conf, eventLoopGroup, protocolVersion);
this.clientAuthRole = clientAuthRole;
this.clientAuthData = clientAuthData;
this.clientAuthMethod = clientAuthMethod;
this.forwardClientAuthData = forwardClientAuthData;
this.proxyConnection = proxyConnection;
}
Expand All @@ -60,14 +53,16 @@ protected ByteBuf newConnectCommand() throws Exception {
if (log.isDebugEnabled()) {
log.debug("New Connection opened via ProxyClientCnx with params clientAuthRole = {},"
+ " clientAuthData = {}, clientAuthMethod = {}",
clientAuthRole, clientAuthData, clientAuthMethod);
proxyConnection.clientAuthRole, proxyConnection.clientAuthData, proxyConnection.clientAuthMethod);
}

authenticationDataProvider = authentication.getAuthData(remoteHostName);
AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
// Get the auth related information dynamically because it can change over time and new connections
// require recent auth information.
return Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion,
PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData,
clientAuthMethod);
PulsarVersion.getVersion(), proxyToTargetBrokerAddress, proxyConnection.clientAuthRole,
proxyConnection.clientAuthData, proxyConnection.clientAuthMethod);
}

@Override
Expand All @@ -87,14 +82,15 @@ protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
+ "the proxy client {}", proxyConnection.ctx().channel(), ctx.channel());
}

proxyConnection.ctx().writeAndFlush(Commands.newAuthChallenge(clientAuthMethod, AuthData.REFRESH_AUTH_DATA,
protocolVersion))
proxyConnection.ctx().writeAndFlush(Commands.newAuthChallenge(proxyConnection.clientAuthMethod,
AuthData.REFRESH_AUTH_DATA, protocolVersion))
.addListener(writeFuture -> {
if (writeFuture.isSuccess()) {
if (log.isDebugEnabled()) {
log.debug("Proxy {} sent the auth challenge to original client to refresh credentials "
+ "with method {} for the proxy client {}",
proxyConnection.ctx().channel(), clientAuthMethod, ctx.channel());
proxyConnection.ctx().channel(), proxyConnection.clientAuthMethod,
ctx.channel());
}
} else {
log.error("Failed to send the auth challenge to original client by the proxy {} "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,9 @@ public class ProxyConnection extends PulsarHandler {
private DirectProxyHandler directProxyHandler = null;
private final BrokerProxyValidator brokerProxyValidator;
private final ConnectionController connectionController;
String clientAuthRole;
AuthData clientAuthData;
String clientAuthMethod;
volatile String clientAuthRole;
volatile AuthData clientAuthData;
volatile String clientAuthMethod;

private String authMethod = "none";
AuthenticationProvider authenticationProvider;
Expand Down Expand Up @@ -315,13 +315,8 @@ protected static boolean isTlsChannel(Channel channel) {
private synchronized void completeConnect(AuthData clientData) throws PulsarClientException {
Supplier<ClientCnx> clientCnxSupplier;
if (service.getConfiguration().isAuthenticationEnabled()) {
if (service.getConfiguration().isForwardAuthorizationCredentials()) {
this.clientAuthData = clientData;
this.clientAuthMethod = authMethod;
}
clientCnxSupplier = () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole,
clientAuthData, clientAuthMethod, protocolVersionToAdvertise,
service.getConfiguration().isForwardAuthorizationCredentials(), this);
clientCnxSupplier = () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(),
protocolVersionToAdvertise, service.getConfiguration().isForwardAuthorizationCredentials(), this);
} else {
clientCnxSupplier = () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise);
}
Expand Down Expand Up @@ -425,23 +420,42 @@ public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnec
}

// According to auth result, send newConnected or newAuthChallenge command.
private void doAuthentication(AuthData clientData)
private boolean doAuthentication(AuthData clientData)
throws Exception {
AuthData brokerData = authState.authenticate(clientData);
// authentication has completed, will send newConnected command.
if (authState.isComplete()) {
clientAuthRole = authState.getAuthRole();
String newClientAuthRole = authState.getAuthRole();
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Client successfully authenticated with {} role {}",
remoteAddress, authMethod, clientAuthRole);
}

// First connection
if (this.connectionPool == null || state == State.Connecting) {
clientAuthRole = newClientAuthRole;
if (service.getConfiguration().isForwardAuthorizationCredentials()) {
this.clientAuthData = clientData;
this.clientAuthMethod = authMethod;
}
// authentication has completed, will send newConnected command.
completeConnect(clientData);
} else {
// If the connection was already ready, it means we're doing a refresh
// This code path is currently only relevant when State == ProxyLookupRequests
if (!clientAuthRole.equals(newClientAuthRole)) {
LOG.warn("[{}] Principal cannot change during an authentication refresh expected={} got={}",
remoteAddress, clientAuthRole, newClientAuthRole);
ctx.close();
return false;
} else {
LOG.info("[{}] Refreshed authentication credentials for role {}", remoteAddress, clientAuthRole);
if (service.getConfiguration().isForwardAuthorizationCredentials()) {
clientAuthData = clientData;
}
}
}
return;
return true;
}

// auth not complete, continue auth with client side.
Expand All @@ -452,6 +466,8 @@ private void doAuthentication(AuthData clientData)
remoteAddress, authMethod);
}
state = State.Connecting;
// TODO how does multi-stage auth map to proxy auth refresh?
return false;
}

@Override
Expand Down Expand Up @@ -541,8 +557,8 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {

try {
AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
doAuthentication(clientData);
if (service.getConfiguration().isForwardAuthorizationCredentials()
boolean success = doAuthentication(clientData);
if (success && service.getConfiguration().isForwardAuthorizationCredentials()
&& connectionPool != null && state == State.ProxyLookupRequests) {
connectionPool.getConnections().forEach(toBrokerCnxFuture -> {
String clientVersion;
Expand Down