From 0b0731a3a92b1ba2d5f6754d56d481fe807a3ff8 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Mon, 23 Jan 2023 14:17:52 -0600 Subject: [PATCH 1/3] [improve][broker] ServerCnx: go to Failed state when auth fails --- .../pulsar/broker/service/ServerCnx.java | 1 + .../pulsar/broker/service/ServerCnxTest.java | 36 ++++++++++++++++++- 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 935934f6bdfc1..feea80a2dc372 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -969,6 +969,7 @@ protected void handleConnect(CommandConnect connect) { } } catch (Exception e) { service.getPulsarStats().recordConnectionCreateFail(); + state = State.Failed; logAuthException(remoteAddress, "connect", getPrincipal(), Optional.empty(), e); String msg = "Unable to authenticate"; writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, msg)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 1a98822340fc3..235c70cc9fc4f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -80,6 +80,7 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.TransactionMetadataStoreService; +import org.apache.pulsar.broker.auth.MockAuthenticationProvider; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationProvider; import org.apache.pulsar.broker.authentication.AuthenticationService; @@ -450,11 +451,44 @@ public void testConnectCommandWithAuthenticationNegative() throws Exception { ByteBuf clientCommand = Commands.newConnect("none", "", null); channel.writeInbound(clientCommand); - assertEquals(serverCnx.getState(), State.Start); + assertEquals(serverCnx.getState(), State.Failed); assertTrue(getResponse() instanceof CommandError); channel.finish(); } + @Test(timeOut = 30000) + public void testConnectCommandWithInvalidOriginalAuthData() throws Exception { + AuthenticationService authenticationService = mock(AuthenticationService.class); + AuthenticationProvider authenticationProvider = new MockAuthenticationProvider(); + String authMethodName = authenticationProvider.getAuthMethodName(); + + when(brokerService.getAuthenticationService()).thenReturn(authenticationService); + when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); + svcConfig.setAuthenticationEnabled(true); + svcConfig.setAuthenticateOriginalAuthData(true); + svcConfig.setProxyRoles(Collections.singleton("proxy")); + + resetChannel(); + assertTrue(channel.isActive()); + assertEquals(serverCnx.getState(), State.Start); + + ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1,null, + null, "client", "fail", authMethodName); + channel.writeInbound(clientCommand); + + // We currently expect two responses because the originalAuthData is verified after sending + // a successful response to the proxy. Because this is a synchronous operation, there is currently + // no risk. It would be better to fix this. See https://github.com/apache/pulsar/issues/19311. + Object response1 = getResponse(); + assertTrue(response1 instanceof CommandConnected); + Object response2 = getResponse(); + assertTrue(response2 instanceof CommandError); + assertEquals(((CommandError) response2).getMessage(), "Unable to authenticate"); + assertEquals(serverCnx.getState(), State.Failed); + assertFalse(serverCnx.isActive()); + channel.finish(); + } + @Test(timeOut = 30000) public void testProducerCommand() throws Exception { resetChannel(); From fa62ba51cf37d75146c697ccc3e4c1c222d4a0c9 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Tue, 24 Jan 2023 17:31:02 -0600 Subject: [PATCH 2/3] Use Close callback in ServerCnx; go to failed in AuthResponse --- .../pulsar/broker/service/ServerCnx.java | 15 ++-- .../MockMultiStageAuthenticationProvider.java | 45 +++++++++++ .../MockMultiStageAuthenticationState.java | 76 +++++++++++++++++++ .../pulsar/broker/service/ServerCnxTest.java | 44 ++++++++++- .../service/utils/ClientChannelHelper.java | 6 ++ 5 files changed, 178 insertions(+), 8 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMultiStageAuthenticationProvider.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMultiStageAuthenticationState.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index feea80a2dc372..2a83252c3091b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -971,9 +971,8 @@ protected void handleConnect(CommandConnect connect) { service.getPulsarStats().recordConnectionCreateFail(); state = State.Failed; logAuthException(remoteAddress, "connect", getPrincipal(), Optional.empty(), e); - String msg = "Unable to authenticate"; - writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, msg)); - close(); + ByteBuf msg = Commands.newError(-1, ServerError.AuthenticationError, "Unable to authenticate"); + NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg); } } @@ -995,15 +994,17 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) { authResponse.hasClientVersion() ? authResponse.getClientVersion() : EMPTY); } catch (AuthenticationException e) { service.getPulsarStats().recordConnectionCreateFail(); + state = State.Failed; log.warn("[{}] Authentication failed: {} ", remoteAddress, e.getMessage()); - writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, e.getMessage())); - close(); + ByteBuf msg = Commands.newError(-1, ServerError.AuthenticationError, "Unable to authenticate"); + NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg); } catch (Exception e) { service.getPulsarStats().recordConnectionCreateFail(); + state = State.Failed; String msg = "Unable to handleAuthResponse"; log.warn("[{}] {} ", remoteAddress, msg, e); - writeAndFlush(Commands.newError(-1, ServerError.UnknownError, msg)); - close(); + ByteBuf command = Commands.newError(-1, ServerError.UnknownError, msg); + NettyChannelUtil.writeAndFlushWithClosePromise(ctx, command); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMultiStageAuthenticationProvider.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMultiStageAuthenticationProvider.java new file mode 100644 index 0000000000000..c429677ac72e1 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMultiStageAuthenticationProvider.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.auth; + +import javax.naming.AuthenticationException; +import javax.net.ssl.SSLSession; +import java.net.SocketAddress; +import org.apache.pulsar.broker.authentication.AuthenticationState; +import org.apache.pulsar.common.api.AuthData; + +/** + * Class that provides the same authentication semantics as the {@link MockAuthenticationProvider} except + * that this one initializes the {@link MockMultiStageAuthenticationState} class to support testing + * multi-staged authentication. + */ +public class MockMultiStageAuthenticationProvider extends MockAuthenticationProvider { + + @Override + public String getAuthMethodName() { + return "multi-stage"; + } + + @Override + public AuthenticationState newAuthState(AuthData authData, + SocketAddress remoteAddress, + SSLSession sslSession) throws AuthenticationException { + return new MockMultiStageAuthenticationState(this); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMultiStageAuthenticationState.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMultiStageAuthenticationState.java new file mode 100644 index 0000000000000..1ea56748867d7 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMultiStageAuthenticationState.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.auth; + +import org.apache.pulsar.broker.authentication.AuthenticationDataCommand; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.authentication.AuthenticationState; +import org.apache.pulsar.common.api.AuthData; + +import javax.naming.AuthenticationException; + +import static java.nio.charset.StandardCharsets.UTF_8; + +/** + * Performs multistage authentication by extending the paradigm created in {@link MockAuthenticationProvider}. + */ +public class MockMultiStageAuthenticationState implements AuthenticationState { + + private final MockMultiStageAuthenticationProvider provider; + private String authRole = null; + + MockMultiStageAuthenticationState(MockMultiStageAuthenticationProvider provider) { + this.provider = provider; + } + + @Override + public String getAuthRole() throws AuthenticationException { + if (authRole == null) { + throw new AuthenticationException("Must authenticate first"); + } + return null; + } + + @Override + public AuthData authenticate(AuthData authData) throws AuthenticationException { + String data = new String(authData.getBytes(), UTF_8); + String[] parts = data.split("\\."); + if (parts.length == 2) { + if ("challenge".equals(parts[0])) { + return AuthData.of("challenged".getBytes()); + } else { + AuthenticationDataCommand command = new AuthenticationDataCommand(data); + authRole = provider.authenticate(command); + // Auth successful, no more auth required + return null; + } + } + throw new AuthenticationException("Failed to authenticate"); + } + + @Override + public AuthenticationDataSource getAuthDataSource() { + return null; + } + + @Override + public boolean isComplete() { + return authRole != null; + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 235c70cc9fc4f..665ac51446b1f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -81,6 +81,7 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.TransactionMetadataStoreService; import org.apache.pulsar.broker.auth.MockAuthenticationProvider; +import org.apache.pulsar.broker.auth.MockMultiStageAuthenticationProvider; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationProvider; import org.apache.pulsar.broker.authentication.AuthenticationService; @@ -104,6 +105,7 @@ import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse; import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse; +import org.apache.pulsar.common.api.proto.CommandAuthChallenge; import org.apache.pulsar.common.api.proto.CommandAuthResponse; import org.apache.pulsar.common.api.proto.CommandConnected; import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse; @@ -457,7 +459,7 @@ public void testConnectCommandWithAuthenticationNegative() throws Exception { } @Test(timeOut = 30000) - public void testConnectCommandWithInvalidOriginalAuthData() throws Exception { + public void testConnectCommandWithFailingOriginalAuthData() throws Exception { AuthenticationService authenticationService = mock(AuthenticationService.class); AuthenticationProvider authenticationProvider = new MockAuthenticationProvider(); String authMethodName = authenticationProvider.getAuthMethodName(); @@ -489,6 +491,46 @@ public void testConnectCommandWithInvalidOriginalAuthData() throws Exception { channel.finish(); } + @Test(timeOut = 30000) + public void testAuthResponseWithFailingAuthData() throws Exception { + AuthenticationService authenticationService = mock(AuthenticationService.class); + AuthenticationProvider authenticationProvider = new MockMultiStageAuthenticationProvider(); + String authMethodName = authenticationProvider.getAuthMethodName(); + + when(brokerService.getAuthenticationService()).thenReturn(authenticationService); + when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); + svcConfig.setAuthenticationEnabled(true); + + resetChannel(); + assertTrue(channel.isActive()); + assertEquals(serverCnx.getState(), State.Start); + + // Trigger connect command to result in AuthChallenge + ByteBuf clientCommand = Commands.newConnect(authMethodName, "challenge.client", "1"); + channel.writeInbound(clientCommand); + + Object challenge1 = getResponse(); + assertTrue(challenge1 instanceof CommandAuthChallenge); + + // Trigger another AuthChallenge to verify that code path continues to challenge + ByteBuf authResponse1 = Commands.newAuthResponse(authMethodName, AuthData.of("challenge.client".getBytes()), 1, "1"); + channel.writeInbound(authResponse1); + + Object challenge2 = getResponse(); + assertTrue(challenge2 instanceof CommandAuthChallenge); + + // Trigger failure + ByteBuf authResponse2 = Commands.newAuthResponse(authMethodName, AuthData.of("fail.client".getBytes()), 1, "1"); + channel.writeInbound(authResponse2); + + Object response3 = getResponse(); + assertTrue(response3 instanceof CommandError); + assertEquals(((CommandError) response3).getMessage(), "Unable to authenticate"); + assertEquals(serverCnx.getState(), State.Failed); + assertFalse(serverCnx.isActive()); + channel.finish(); + } + @Test(timeOut = 30000) public void testProducerCommand() throws Exception { resetChannel(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java index 2dc56282a7974..bf0dd3aa9c1c5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java @@ -21,6 +21,7 @@ import java.util.Queue; import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse; import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse; +import org.apache.pulsar.common.api.proto.CommandAuthChallenge; import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse; import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse; import org.apache.pulsar.common.api.proto.CommandEndTxnResponse; @@ -83,6 +84,11 @@ protected void handleConnected(CommandConnected connected) { queue.offer(new CommandConnected().copyFrom(connected)); } + @Override + protected void handleAuthChallenge(CommandAuthChallenge challenge) { + queue.offer(new CommandAuthChallenge().copyFrom(challenge)); + } + @Override protected void handleSubscribe(CommandSubscribe subscribe) { queue.offer(new CommandSubscribe().copyFrom(subscribe)); From 6fecb6fe38c86d0121f1e68230f3eaca52b92531 Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Tue, 24 Jan 2023 17:33:07 -0600 Subject: [PATCH 3/3] fix typo --- .../broker/auth/MockMultiStageAuthenticationProvider.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMultiStageAuthenticationProvider.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMultiStageAuthenticationProvider.java index c429677ac72e1..c62ff537bb00f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMultiStageAuthenticationProvider.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockMultiStageAuthenticationProvider.java @@ -27,7 +27,7 @@ /** * Class that provides the same authentication semantics as the {@link MockAuthenticationProvider} except * that this one initializes the {@link MockMultiStageAuthenticationState} class to support testing - * multi-staged authentication. + * multistage authentication. */ public class MockMultiStageAuthenticationProvider extends MockAuthenticationProvider {