From fa6af432ef3d015b371121afd9324ba7f393994d Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Wed, 1 Feb 2023 00:55:53 -0600 Subject: [PATCH] [feat][proxy] PIP 97: Implement for ProxyConnection (#19292) PIP: #12105 ### Motivation Implement asynchronous auth for the proxy connection. This is one of the core PRs for implementing #12105. ### Modifications * Update `ProxyConnection` class to asynchronously handle the authentication result. The result is handled on the handler's event loop to ensure correctness. * Update `ProxyAuthenticationTest` class to implement async auth methods and to make authentication asynchronous to test that code path. ### Verifying this change There is an updated test, but it doesn't cover all code paths in this PR. ### Documentation - [x] `doc-not-needed` We do not need to document this portion of PIP 97. ### Matching PR in forked repository PR in forked repository: https://github.com/michaeljmarshall/pulsar/pull/16 --- .../pulsar/proxy/server/ProxyConnection.java | 84 ++++++++++++------- .../proxy/server/ProxyAuthenticationTest.java | 10 ++- 2 files changed, 61 insertions(+), 33 deletions(-) diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index e27ab19ee4b00..5ee79f4ad23a1 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -311,13 +311,9 @@ protected static boolean isTlsChannel(Channel channel) { return channel.pipeline().get(ServiceChannelInitializer.TLS_HANDLER) != null; } - private synchronized void completeConnect(AuthData clientData) throws PulsarClientException { + private synchronized void completeConnect() throws PulsarClientException { Supplier clientCnxSupplier; if (service.getConfiguration().isAuthenticationEnabled()) { - if (service.getConfiguration().isForwardAuthorizationCredentials()) { - this.clientAuthData = clientData; - this.clientAuthMethod = authMethod; - } clientCnxSupplier = () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData, clientAuthMethod, protocolVersionToAdvertise, service.getConfiguration().isForwardAuthorizationCredentials(), this); @@ -423,29 +419,51 @@ public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnec // According to auth result, send newConnected or newAuthChallenge command. private void doAuthentication(AuthData clientData) throws Exception { - AuthData brokerData = authState.authenticate(clientData); - // authentication has completed, will send newConnected command. - if (authState.isComplete()) { - clientAuthRole = authState.getAuthRole(); - if (LOG.isDebugEnabled()) { - LOG.debug("[{}] Client successfully authenticated with {} role {}", - remoteAddress, authMethod, clientAuthRole); - } + authState + .authenticateAsync(clientData) + .whenCompleteAsync((authChallenge, throwable) -> { + if (throwable == null) { + authChallengeSuccessCallback(authChallenge); + } else { + authenticationFailedCallback(throwable); + } + }, ctx.executor()); + } + + protected void authenticationFailedCallback(Throwable t) { + LOG.warn("[{}] Unable to authenticate: ", remoteAddress, t); + final ByteBuf msg = Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate"); + writeAndFlushAndClose(msg); + } - // First connection - if (this.connectionPool == null || state == State.Connecting) { - // authentication has completed, will send newConnected command. - completeConnect(clientData); + // Always run in this class's event loop. + protected void authChallengeSuccessCallback(AuthData authChallenge) { + try { + // authentication has completed, will send newConnected command. + if (authChallenge == null) { + clientAuthRole = authState.getAuthRole(); + if (LOG.isDebugEnabled()) { + LOG.debug("[{}] Client successfully authenticated with {} role {}", + remoteAddress, authMethod, clientAuthRole); + } + + // First connection + if (this.connectionPool == null || state == State.Connecting) { + // authentication has completed, will send newConnected command. + completeConnect(); + } + return; } - return; - } - // auth not complete, continue auth with client side. - final ByteBuf msg = Commands.newAuthChallenge(authMethod, brokerData, protocolVersionToAdvertise); - writeAndFlush(msg); - if (LOG.isDebugEnabled()) { - LOG.debug("[{}] Authentication in progress client by method {}.", - remoteAddress, authMethod); + // auth not complete, continue auth with client side. + final ByteBuf msg = Commands.newAuthChallenge(authMethod, authChallenge, protocolVersionToAdvertise); + writeAndFlush(msg); + if (LOG.isDebugEnabled()) { + LOG.debug("[{}] Authentication in progress client by method {}.", + remoteAddress, authMethod); + } + } catch (Exception e) { + authenticationFailedCallback(e); } } @@ -479,7 +497,7 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(), // authn not enabled, complete if (!service.getConfiguration().isAuthenticationEnabled()) { - completeConnect(null); + completeConnect(); return; } @@ -493,6 +511,14 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(), authMethod = "none"; } + if (service.getConfiguration().isForwardAuthorizationCredentials()) { + // We store the first clientData here. Before this commit, we stored the last clientData. + // Since this only works when forwarding single staged authentication, first == last is true. + // Here is an issue to fix the protocol: https://github.com/apache/pulsar/issues/19291. + this.clientAuthData = clientData; + this.clientAuthMethod = authMethod; + } + authenticationProvider = service .getAuthenticationService() .getAuthenticationProvider(authMethod); @@ -504,7 +530,7 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(), .orElseThrow(() -> new AuthenticationException("No anonymous role, and no authentication provider configured")); - completeConnect(clientData); + completeConnect(); return; } @@ -518,9 +544,7 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(), authState = authenticationProvider.newAuthState(clientData, remoteAddress, sslSession); doAuthentication(clientData); } catch (Exception e) { - LOG.warn("[{}] Unable to authenticate: ", remoteAddress, e); - final ByteBuf msg = Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate"); - writeAndFlushAndClose(msg); + authenticationFailedCallback(e); } } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java index eea5c26e66728..8229d929ee5e3 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java @@ -31,6 +31,7 @@ import java.util.Map.Entry; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import javax.naming.AuthenticationException; @@ -136,7 +137,7 @@ public String getAuthMethodName() { } @Override - public String authenticate(AuthenticationDataSource authData) throws AuthenticationException { + public CompletableFuture authenticateAsync(AuthenticationDataSource authData) { String commandData = null; if (authData.hasDataFromCommand()) { commandData = authData.getCommandData(); @@ -150,9 +151,12 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat long currentTimeInMillis = System.currentTimeMillis(); if (expiryTimeInMillis < currentTimeInMillis) { log.warn("Auth failed due to timeout"); - throw new AuthenticationException("Authentication data has been expired"); + return CompletableFuture + .failedFuture(new AuthenticationException("Authentication data has been expired")); } - return element.get("entityType").getAsString(); + final String result = element.get("entityType").getAsString(); + // Run in another thread to attempt to test the async logic + return CompletableFuture.supplyAsync(() -> result); } }