diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 93f8a12c55219..e6e20707ef2a8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -615,9 +615,9 @@ private void completeConnect(int clientProtoVersion, String clientVersion) { // According to auth result, send newConnected or newAuthChallenge command. private void doAuthentication(AuthData clientData, - boolean useOriginalAuthState, - int clientProtocolVersion, - String clientVersion) throws Exception { + boolean useOriginalAuthState, + int clientProtocolVersion, + String clientVersion) throws Exception { // The original auth state can only be set on subsequent auth attempts (and only // in presence of a proxy and if the proxy is forwarding the credentials). diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java deleted file mode 100644 index 6d108ce675d17..0000000000000 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java +++ /dev/null @@ -1,441 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.pulsar.broker.service; - -import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; -import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper; -import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper; -import static org.mockito.ArgumentMatchers.argThat; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNull; -import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; -import com.google.common.collect.Sets; -import io.jsonwebtoken.Jwts; -import io.jsonwebtoken.SignatureAlgorithm; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import javax.crypto.SecretKey; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.charset.StandardCharsets; -import java.util.Base64; -import java.util.Collections; -import java.util.Optional; -import java.util.Properties; -import java.util.concurrent.CompletableFuture; -import org.apache.bookkeeper.common.util.OrderedExecutor; -import org.apache.bookkeeper.mledger.ManagedLedgerFactory; -import org.apache.pulsar.broker.PulsarService; -import org.apache.pulsar.broker.PulsarServiceMockSupport; -import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.authentication.AuthenticationDataSource; -import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription; -import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; -import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; -import org.apache.pulsar.broker.authorization.AuthorizationService; -import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider; -import org.apache.pulsar.broker.intercept.BrokerInterceptor; -import org.apache.pulsar.broker.resources.NamespaceResources; -import org.apache.pulsar.broker.resources.PulsarResources; -import org.apache.pulsar.broker.resources.TenantResources; -import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.impl.auth.AuthenticationToken; -import org.apache.pulsar.common.api.proto.CommandConnect; -import org.apache.pulsar.common.api.proto.CommandLookupTopic; -import org.apache.pulsar.common.api.proto.CommandProducer; -import org.apache.pulsar.common.api.proto.CommandSubscribe; -import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.TenantInfo; -import org.apache.pulsar.common.policies.data.TopicOperation; -import org.apache.pulsar.metadata.api.MetadataStore; -import org.apache.pulsar.metadata.impl.ZKMetadataStore; -import org.apache.zookeeper.ZooKeeper; -import org.mockito.ArgumentMatcher; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -@Test(groups = "broker") -public class ServerCnxAuthorizationTest { - private final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); - private final String CLIENT_PRINCIPAL = "client"; - private final String PROXY_PRINCIPAL = "proxy"; - private final String CLIENT_TOKEN = Jwts.builder().setSubject(CLIENT_PRINCIPAL).signWith(SECRET_KEY).compact(); - private final String PROXY_TOKEN = Jwts.builder().setSubject(PROXY_PRINCIPAL).signWith(SECRET_KEY).compact(); - - private PulsarService pulsar; - private PulsarResources pulsarResources; - private BrokerService brokerService; - private ServiceConfiguration svcConfig; - - @BeforeMethod(alwaysRun = true) - public void beforeMethod() throws Exception { - EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); - svcConfig = spy(ServiceConfiguration.class); - svcConfig.setKeepAliveIntervalSeconds(0); - svcConfig.setBrokerShutdownTimeoutMs(0L); - svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); - svcConfig.setClusterName("pulsar-cluster"); - svcConfig.setSuperUserRoles(Collections.singleton(PROXY_PRINCIPAL)); - svcConfig.setAuthenticationEnabled(true); - svcConfig.setAuthenticationProviders(Sets.newHashSet(AuthenticationProviderToken.class.getName())); - svcConfig.setAuthorizationEnabled(true); - svcConfig.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName()); - Properties properties = new Properties(); - properties.setProperty("tokenSecretKey", "data:;base64," - + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded())); - svcConfig.setProperties(properties); - - pulsar = spyWithClassAndConstructorArgs(PulsarService.class, svcConfig); - doReturn(new DefaultSchemaRegistryService()).when(pulsar).getSchemaRegistryService(); - - doReturn(svcConfig).when(pulsar).getConfiguration(); - PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> { - doReturn(mock(PulsarResources.class)).when(pulsar).getPulsarResources(); - }); - - - ManagedLedgerFactory mlFactoryMock = mock(ManagedLedgerFactory.class); - doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory(); - - ZooKeeper mockZk = createMockZooKeeper(); - OrderedExecutor executor = OrderedExecutor.newBuilder().numThreads(1).build(); - doReturn(createMockBookKeeper(executor)) - .when(pulsar).getBookKeeperClient(); - - MetadataStore store = new ZKMetadataStore(mockZk); - - doReturn(store).when(pulsar).getLocalMetadataStore(); - doReturn(store).when(pulsar).getConfigurationMetadataStore(); - - pulsarResources = spyWithClassAndConstructorArgs(PulsarResources.class, store, store); - PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> { - doReturn(pulsarResources).when(pulsar).getPulsarResources(); - }); - NamespaceResources namespaceResources = - spyWithClassAndConstructorArgs(NamespaceResources.class, store, store, 30); - doReturn(namespaceResources).when(pulsarResources).getNamespaceResources(); - - TenantResources tenantResources = spyWithClassAndConstructorArgs(TenantResources.class, store, 30); - doReturn(tenantResources).when(pulsarResources).getTenantResources(); - - doReturn(CompletableFuture.completedFuture(Optional.of(TenantInfo.builder().build()))).when(tenantResources) - .getTenantAsync("public"); - - brokerService = spyWithClassAndConstructorArgs(BrokerService.class, pulsar, eventLoopGroup); - BrokerInterceptor interceptor = mock(BrokerInterceptor.class); - doReturn(interceptor).when(brokerService).getInterceptor(); - PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> { - doReturn(brokerService).when(pulsar).getBrokerService(); - doReturn(executor).when(pulsar).getOrderedExecutor(); - }); - } - - @Test - public void testVerifyOriginalPrincipalWithAuthDataForwardedFromProxy() throws Exception { - doReturn(true).when(svcConfig).isAuthenticateOriginalAuthData(); - - ServerCnx serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar); - ChannelHandlerContext channelHandlerContext = mock(ChannelHandlerContext.class); - Channel channel = mock(Channel.class); - ChannelPipeline channelPipeline = mock(ChannelPipeline.class); - doReturn(channelPipeline).when(channel).pipeline(); - doReturn(null).when(channelPipeline).get(PulsarChannelInitializer.TLS_HANDLER); - - SocketAddress socketAddress = new InetSocketAddress(0); - doReturn(socketAddress).when(channel).remoteAddress(); - doReturn(channel).when(channelHandlerContext).channel(); - channelHandlerContext.channel().remoteAddress(); - serverCnx.channelActive(channelHandlerContext); - - // connect - AuthenticationToken clientAuthenticationToken = new AuthenticationToken(CLIENT_TOKEN); - AuthenticationToken proxyAuthenticationToken = new AuthenticationToken(PROXY_TOKEN); - CommandConnect connect = new CommandConnect(); - connect.setAuthMethodName(proxyAuthenticationToken.getAuthMethodName()); - connect.setAuthData(proxyAuthenticationToken.getAuthData().getCommandData().getBytes(StandardCharsets.UTF_8)); - connect.setClientVersion("test"); - connect.setProtocolVersion(1); - connect.setOriginalPrincipal(CLIENT_PRINCIPAL); - connect.setOriginalAuthData(clientAuthenticationToken.getAuthData().getCommandData()); - connect.setOriginalAuthMethod(clientAuthenticationToken.getAuthMethodName()); - - serverCnx.handleConnect(connect); - assertEquals(serverCnx.getOriginalAuthData().getCommandData(), - clientAuthenticationToken.getAuthData().getCommandData()); - assertEquals(serverCnx.getOriginalAuthState().getAuthRole(), CLIENT_PRINCIPAL); - assertEquals(serverCnx.getOriginalPrincipal(), CLIENT_PRINCIPAL); - assertEquals(serverCnx.getAuthData().getCommandData(), - proxyAuthenticationToken.getAuthData().getCommandData()); - assertEquals(serverCnx.getAuthRole(), PROXY_PRINCIPAL); - assertEquals(serverCnx.getAuthState().getAuthRole(), PROXY_PRINCIPAL); - - AuthorizationService authorizationService = - spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, pulsarResources); - doReturn(authorizationService).when(brokerService).getAuthorizationService(); - - // lookup - CommandLookupTopic commandLookupTopic = new CommandLookupTopic(); - TopicName topicName = TopicName.get("persistent://public/default/test-topic"); - commandLookupTopic.setTopic(topicName.toString()); - commandLookupTopic.setRequestId(1); - serverCnx.handleLookup(commandLookupTopic); - verify(authorizationService, times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, - CLIENT_PRINCIPAL, - serverCnx.getOriginalAuthData()); - verify(authorizationService, times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, - PROXY_PRINCIPAL, - serverCnx.getAuthData()); - - // producer - CommandProducer commandProducer = new CommandProducer(); - commandProducer.setRequestId(1); - commandProducer.setProducerId(1); - commandProducer.setProducerName("test-producer"); - commandProducer.setTopic(topicName.toString()); - serverCnx.handleProducer(commandProducer); - verify(authorizationService, times(1)).allowTopicOperationAsync(topicName, TopicOperation.PRODUCE, - CLIENT_PRINCIPAL, - serverCnx.getOriginalAuthData()); - verify(authorizationService, times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, - PROXY_PRINCIPAL, - serverCnx.getAuthData()); - - // consumer - CommandSubscribe commandSubscribe = new CommandSubscribe(); - commandSubscribe.setTopic(topicName.toString()); - commandSubscribe.setRequestId(1); - commandSubscribe.setConsumerId(1); - final String subscriptionName = "test-subscribe"; - commandSubscribe.setSubscription("test-subscribe"); - commandSubscribe.setSubType(CommandSubscribe.SubType.Shared); - serverCnx.handleSubscribe(commandSubscribe); - - verify(authorizationService, times(1)).allowTopicOperationAsync( - eq(topicName), eq(TopicOperation.CONSUME), - eq(CLIENT_PRINCIPAL), argThat(arg -> { - assertTrue(arg instanceof AuthenticationDataSubscription); - try { - assertEquals(arg.getCommandData(), clientAuthenticationToken.getAuthData().getCommandData()); - } catch (PulsarClientException e) { - fail(e.getMessage()); - } - assertEquals(arg.getSubscription(), subscriptionName); - return true; - })); - verify(authorizationService, times(1)).allowTopicOperationAsync( - eq(topicName), eq(TopicOperation.CONSUME), - eq(PROXY_PRINCIPAL), argThat(arg -> { - assertTrue(arg instanceof AuthenticationDataSubscription); - try { - assertEquals(arg.getCommandData(), proxyAuthenticationToken.getAuthData().getCommandData()); - } catch (PulsarClientException e) { - fail(e.getMessage()); - } - assertEquals(arg.getSubscription(), subscriptionName); - return true; - })); - } - - @Test - public void testVerifyOriginalPrincipalWithoutAuthDataForwardedFromProxy() throws Exception { - doReturn(false).when(svcConfig).isAuthenticateOriginalAuthData(); - - ServerCnx serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar); - ChannelHandlerContext channelHandlerContext = mock(ChannelHandlerContext.class); - Channel channel = mock(Channel.class); - ChannelPipeline channelPipeline = mock(ChannelPipeline.class); - doReturn(channelPipeline).when(channel).pipeline(); - doReturn(null).when(channelPipeline).get(PulsarChannelInitializer.TLS_HANDLER); - - SocketAddress socketAddress = new InetSocketAddress(0); - doReturn(socketAddress).when(channel).remoteAddress(); - doReturn(channel).when(channelHandlerContext).channel(); - channelHandlerContext.channel().remoteAddress(); - serverCnx.channelActive(channelHandlerContext); - - // connect - AuthenticationToken proxyAuthenticationToken = new AuthenticationToken(PROXY_TOKEN); - CommandConnect connect = new CommandConnect(); - connect.setAuthMethodName(proxyAuthenticationToken.getAuthMethodName()); - connect.setAuthData(proxyAuthenticationToken.getAuthData().getCommandData().getBytes(StandardCharsets.UTF_8)); - connect.setClientVersion("test"); - connect.setProtocolVersion(1); - connect.setOriginalPrincipal(CLIENT_PRINCIPAL); - serverCnx.handleConnect(connect); - assertNull(serverCnx.getOriginalAuthData()); - assertNull(serverCnx.getOriginalAuthState()); - assertEquals(serverCnx.getOriginalPrincipal(), CLIENT_PRINCIPAL); - assertEquals(serverCnx.getAuthData().getCommandData(), - proxyAuthenticationToken.getAuthData().getCommandData()); - assertEquals(serverCnx.getAuthRole(), PROXY_PRINCIPAL); - assertEquals(serverCnx.getAuthState().getAuthRole(), PROXY_PRINCIPAL); - - AuthorizationService authorizationService = - spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, pulsarResources); - doReturn(authorizationService).when(brokerService).getAuthorizationService(); - - // lookup - CommandLookupTopic commandLookupTopic = new CommandLookupTopic(); - TopicName topicName = TopicName.get("persistent://public/default/test-topic"); - commandLookupTopic.setTopic(topicName.toString()); - commandLookupTopic.setRequestId(1); - serverCnx.handleLookup(commandLookupTopic); - verify(authorizationService, times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, - CLIENT_PRINCIPAL, - serverCnx.getAuthData()); - verify(authorizationService, times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, - PROXY_PRINCIPAL, - serverCnx.getAuthData()); - - // producer - CommandProducer commandProducer = new CommandProducer(); - commandProducer.setRequestId(1); - commandProducer.setProducerId(1); - commandProducer.setProducerName("test-producer"); - commandProducer.setTopic(topicName.toString()); - serverCnx.handleProducer(commandProducer); - verify(authorizationService, times(1)).allowTopicOperationAsync(topicName, TopicOperation.PRODUCE, - CLIENT_PRINCIPAL, - serverCnx.getAuthData()); - verify(authorizationService, times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, - PROXY_PRINCIPAL, - serverCnx.getAuthData()); - - // consumer - CommandSubscribe commandSubscribe = new CommandSubscribe(); - commandSubscribe.setTopic(topicName.toString()); - commandSubscribe.setRequestId(1); - commandSubscribe.setConsumerId(1); - final String subscriptionName = "test-subscribe"; - commandSubscribe.setSubscription("test-subscribe"); - commandSubscribe.setSubType(CommandSubscribe.SubType.Shared); - serverCnx.handleSubscribe(commandSubscribe); - - ArgumentMatcher authenticationDataSourceArgumentMatcher = arg -> { - assertTrue(arg instanceof AuthenticationDataSubscription); - try { - assertEquals(arg.getCommandData(), proxyAuthenticationToken.getAuthData().getCommandData()); - } catch (PulsarClientException e) { - fail(e.getMessage()); - } - assertEquals(arg.getSubscription(), subscriptionName); - return true; - }; - - verify(authorizationService, times(1)).allowTopicOperationAsync( - eq(topicName), eq(TopicOperation.CONSUME), - eq(CLIENT_PRINCIPAL), argThat(authenticationDataSourceArgumentMatcher)); - verify(authorizationService, times(1)).allowTopicOperationAsync( - eq(topicName), eq(TopicOperation.CONSUME), - eq(PROXY_PRINCIPAL), argThat(authenticationDataSourceArgumentMatcher)); - } - - @Test - public void testVerifyAuthRoleAndAuthDataFromDirectConnectionBroker() throws Exception { - ServerCnx serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar); - - ChannelHandlerContext channelHandlerContext = mock(ChannelHandlerContext.class); - Channel channel = mock(Channel.class); - ChannelPipeline channelPipeline = mock(ChannelPipeline.class); - doReturn(channelPipeline).when(channel).pipeline(); - doReturn(null).when(channelPipeline).get(PulsarChannelInitializer.TLS_HANDLER); - - SocketAddress socketAddress = new InetSocketAddress(0); - doReturn(socketAddress).when(channel).remoteAddress(); - doReturn(channel).when(channelHandlerContext).channel(); - channelHandlerContext.channel().remoteAddress(); - serverCnx.channelActive(channelHandlerContext); - - // connect - AuthenticationToken clientAuthenticationToken = new AuthenticationToken(CLIENT_TOKEN); - CommandConnect connect = new CommandConnect(); - connect.setAuthMethodName(clientAuthenticationToken.getAuthMethodName()); - connect.setAuthData(clientAuthenticationToken.getAuthData().getCommandData().getBytes(StandardCharsets.UTF_8)); - connect.setClientVersion("test"); - connect.setProtocolVersion(1); - serverCnx.handleConnect(connect); - assertNull(serverCnx.getOriginalAuthData()); - assertNull(serverCnx.getOriginalAuthState()); - assertNull(serverCnx.getOriginalPrincipal()); - assertEquals(serverCnx.getAuthData().getCommandData(), - clientAuthenticationToken.getAuthData().getCommandData()); - assertEquals(serverCnx.getAuthRole(), CLIENT_PRINCIPAL); - assertEquals(serverCnx.getAuthState().getAuthRole(), CLIENT_PRINCIPAL); - - AuthorizationService authorizationService = - spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, pulsarResources); - doReturn(authorizationService).when(brokerService).getAuthorizationService(); - - // lookup - CommandLookupTopic commandLookupTopic = new CommandLookupTopic(); - TopicName topicName = TopicName.get("persistent://public/default/test-topic"); - commandLookupTopic.setTopic(topicName.toString()); - commandLookupTopic.setRequestId(1); - serverCnx.handleLookup(commandLookupTopic); - verify(authorizationService, times(1)).allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, - CLIENT_PRINCIPAL, - serverCnx.getAuthData()); - - // producer - CommandProducer commandProducer = new CommandProducer(); - commandProducer.setRequestId(1); - commandProducer.setProducerId(1); - commandProducer.setProducerName("test-producer"); - commandProducer.setTopic(topicName.toString()); - serverCnx.handleProducer(commandProducer); - verify(authorizationService, times(1)).allowTopicOperationAsync(topicName, TopicOperation.PRODUCE, - CLIENT_PRINCIPAL, - serverCnx.getAuthData()); - - // consumer - CommandSubscribe commandSubscribe = new CommandSubscribe(); - commandSubscribe.setTopic(topicName.toString()); - commandSubscribe.setRequestId(1); - commandSubscribe.setConsumerId(1); - final String subscriptionName = "test-subscribe"; - commandSubscribe.setSubscription("test-subscribe"); - commandSubscribe.setSubType(CommandSubscribe.SubType.Shared); - serverCnx.handleSubscribe(commandSubscribe); - - verify(authorizationService, times(1)).allowTopicOperationAsync( - eq(topicName), eq(TopicOperation.CONSUME), - eq(CLIENT_PRINCIPAL), argThat(arg -> { - assertTrue(arg instanceof AuthenticationDataSubscription); - try { - assertEquals(arg.getCommandData(), clientAuthenticationToken.getAuthData().getCommandData()); - } catch (PulsarClientException e) { - fail(e.getMessage()); - } - assertEquals(arg.getSubscription(), subscriptionName); - return true; - })); - } -} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 7d30b91e8d2c1..1ebea21d226c0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -23,6 +23,8 @@ import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.CALLS_REAL_METHODS; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; @@ -50,6 +52,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -72,12 +75,11 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription; import org.apache.pulsar.broker.auth.MockAuthenticationProvider; import org.apache.pulsar.broker.auth.MockMultiStageAuthenticationProvider; -import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationProvider; import org.apache.pulsar.broker.authentication.AuthenticationService; -import org.apache.pulsar.broker.authentication.AuthenticationState; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider; import org.apache.pulsar.broker.cache.ConfigurationCacheService; @@ -104,6 +106,7 @@ import org.apache.pulsar.common.api.proto.CommandProducerSuccess; import org.apache.pulsar.common.api.proto.CommandSendError; import org.apache.pulsar.common.api.proto.CommandSendReceipt; +import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.CommandSuccess; @@ -113,6 +116,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.protocol.ByteBufPair; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.Commands.ChecksumType; @@ -369,35 +373,117 @@ public void testKeepAliveBeforeHandshake() throws Exception { @Test(timeOut = 30000) public void testConnectCommandWithAuthenticationPositive() throws Exception { AuthenticationService authenticationService = mock(AuthenticationService.class); - AuthenticationProvider authenticationProvider = mock(AuthenticationProvider.class); - AuthenticationState authenticationState = mock(AuthenticationState.class); - AuthenticationDataSource authenticationDataSource = mock(AuthenticationDataSource.class); - AuthData authData = AuthData.of(null); + AuthenticationProvider authenticationProvider = new MockAuthenticationProvider(); + String authMethodName = authenticationProvider.getAuthMethodName(); - doReturn(authenticationService).when(brokerService).getAuthenticationService(); - doReturn(authenticationProvider).when(authenticationService).getAuthenticationProvider(Mockito.anyString()); - doReturn(authenticationState).when(authenticationProvider) - .newAuthState(Mockito.any(), Mockito.any(), Mockito.any()); - doReturn(authData).when(authenticationState) - .authenticate(authData); - doReturn(true).when(authenticationState) - .isComplete(); + when(brokerService.getAuthenticationService()).thenReturn(authenticationService); + when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); + svcConfig.setAuthenticationEnabled(true); - doReturn("appid1").when(authenticationState) - .getAuthRole(); + resetChannel(); + assertTrue(channel.isActive()); + assertEquals(serverCnx.getState(), State.Start); - doReturn(true).when(brokerService).isAuthenticationEnabled(); + // test server response to CONNECT + ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.client", null); + channel.writeInbound(clientCommand); + + assertTrue(getResponse() instanceof CommandConnected); + assertEquals(serverCnx.getState(), State.Connected); + assertEquals(serverCnx.getPrincipal(), "pass.client"); + assertTrue(serverCnx.isActive()); + channel.finish(); + } + + @Test(timeOut = 30000) + public void testConnectCommandWithoutOriginalAuthInfoWhenAuthenticateOriginalAuthData() throws Exception { + AuthenticationService authenticationService = mock(AuthenticationService.class); + AuthenticationProvider authenticationProvider = new MockAuthenticationProvider(); + String authMethodName = authenticationProvider.getAuthMethodName(); + + when(brokerService.getAuthenticationService()).thenReturn(authenticationService); + when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); + svcConfig.setAuthenticationEnabled(true); + svcConfig.setAuthenticateOriginalAuthData(true); resetChannel(); assertTrue(channel.isActive()); assertEquals(serverCnx.getState(), State.Start); - // test server response to CONNECT - ByteBuf clientCommand = Commands.newConnect("none", "", null); + ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.client", ""); channel.writeInbound(clientCommand); + Object response1 = getResponse(); + assertTrue(response1 instanceof CommandConnected); assertEquals(serverCnx.getState(), State.Connected); - assertTrue(getResponse() instanceof CommandConnected); + assertEquals(serverCnx.getAuthRole(), "pass.client"); + assertEquals(serverCnx.getPrincipal(), "pass.client"); + assertNull(serverCnx.getOriginalPrincipal()); + assertTrue(serverCnx.isActive()); + channel.finish(); + } + + @Test(timeOut = 30000) + public void testConnectCommandWithPassingOriginalAuthData() throws Exception { + AuthenticationService authenticationService = mock(AuthenticationService.class); + AuthenticationProvider authenticationProvider = new MockAuthenticationProvider(); + String authMethodName = authenticationProvider.getAuthMethodName(); + + when(brokerService.getAuthenticationService()).thenReturn(authenticationService); + when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); + svcConfig.setAuthenticationEnabled(true); + svcConfig.setAuthenticateOriginalAuthData(true); + svcConfig.setProxyRoles(Collections.singleton("pass.proxy")); + + resetChannel(); + assertTrue(channel.isActive()); + assertEquals(serverCnx.getState(), State.Start); + + ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1, null, + null, "client", "pass.client", authMethodName); + channel.writeInbound(clientCommand); + + Object response1 = getResponse(); + assertTrue(response1 instanceof CommandConnected); + assertEquals(serverCnx.getState(), State.Connected); + // Note that this value will change to the client's data if the broker sends an AuthChallenge to the + // proxy/client. Details described here https://github.com/apache/pulsar/issues/19332. + assertEquals(serverCnx.getAuthRole(), "pass.proxy"); + // These are all taken without verifying the auth data + assertEquals(serverCnx.getPrincipal(), "pass.client"); + assertEquals(serverCnx.getOriginalPrincipal(), "pass.client"); + assertTrue(serverCnx.isActive()); + channel.finish(); + } + + @Test(timeOut = 30000) + public void testConnectCommandWithPassingOriginalPrincipal() throws Exception { + AuthenticationService authenticationService = mock(AuthenticationService.class); + AuthenticationProvider authenticationProvider = new MockAuthenticationProvider(); + String authMethodName = authenticationProvider.getAuthMethodName(); + + when(brokerService.getAuthenticationService()).thenReturn(authenticationService); + when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); + svcConfig.setAuthenticationEnabled(true); + svcConfig.setAuthenticateOriginalAuthData(false); + svcConfig.setProxyRoles(Collections.singleton("pass.proxy")); + + resetChannel(); + assertTrue(channel.isActive()); + assertEquals(serverCnx.getState(), State.Start); + + ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1, null, + null, "client", "pass.client", authMethodName); + channel.writeInbound(clientCommand); + + Object response1 = getResponse(); + assertTrue(response1 instanceof CommandConnected); + assertEquals(serverCnx.getState(), State.Connected); + assertEquals(serverCnx.getAuthRole(), "pass.proxy"); + // These are all taken without verifying the auth data + assertEquals(serverCnx.getPrincipal(), "client"); + assertEquals(serverCnx.getOriginalPrincipal(), "client"); + assertTrue(serverCnx.isActive()); channel.finish(); } @@ -471,7 +557,7 @@ public void testConnectCommandWithFailingOriginalAuthData() throws Exception { when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); svcConfig.setAuthenticationEnabled(true); svcConfig.setAuthenticateOriginalAuthData(true); - svcConfig.setProxyRoles(Collections.singleton("proxy")); + svcConfig.setProxyRoles(Collections.singleton("pass.proxy")); resetChannel(); assertTrue(channel.isActive()); @@ -481,14 +567,9 @@ public void testConnectCommandWithFailingOriginalAuthData() throws Exception { null, "client", "fail", authMethodName); channel.writeInbound(clientCommand); - // We currently expect two responses because the originalAuthData is verified after sending - // a successful response to the proxy. Because this is a synchronous operation, there is currently - // no risk. It would be better to fix this. See https://github.com/apache/pulsar/issues/19311. Object response1 = getResponse(); - assertTrue(response1 instanceof CommandConnected); - Object response2 = getResponse(); - assertTrue(response2 instanceof CommandError); - assertEquals(((CommandError) response2).getMessage(), "Unable to authenticate"); + assertTrue(response1 instanceof CommandError); + assertEquals(((CommandError) response1).getMessage(), "Unable to authenticate"); assertEquals(serverCnx.getState(), State.Failed); assertFalse(serverCnx.isActive()); channel.finish(); @@ -514,6 +595,7 @@ public void testAuthResponseWithFailingAuthData() throws Exception { Object challenge1 = getResponse(); assertTrue(challenge1 instanceof CommandAuthChallenge); + assertEquals(serverCnx.getState(), State.Connecting); // Trigger another AuthChallenge to verify that code path continues to challenge ByteBuf authResponse1 = Commands.newAuthResponse(authMethodName, AuthData.of("challenge.client".getBytes()), 1, "1"); @@ -521,6 +603,7 @@ public void testAuthResponseWithFailingAuthData() throws Exception { Object challenge2 = getResponse(); assertTrue(challenge2 instanceof CommandAuthChallenge); + assertEquals(serverCnx.getState(), State.Connecting); // Trigger failure ByteBuf authResponse2 = Commands.newAuthResponse(authMethodName, AuthData.of("fail.client".getBytes()), 1, "1"); @@ -528,12 +611,320 @@ public void testAuthResponseWithFailingAuthData() throws Exception { Object response3 = getResponse(); assertTrue(response3 instanceof CommandError); - assertEquals(((CommandError) response3).getMessage(), "Unable to authenticate"); + assertEquals(((CommandError) response3).getMessage(), "Do not pass"); assertEquals(serverCnx.getState(), State.Failed); assertFalse(serverCnx.isActive()); channel.finish(); } + @Test(timeOut = 30000) + public void testOriginalAuthDataTriggersAuthChallengeFailure() throws Exception { + // Test verifies the current behavior in the absence of a solution for + // https://github.com/apache/pulsar/issues/19291. When that issue is completed, we can update this test + // to correctly verify that behavior. + AuthenticationService authenticationService = mock(AuthenticationService.class); + AuthenticationProvider authenticationProvider = new MockMultiStageAuthenticationProvider(); + String authMethodName = authenticationProvider.getAuthMethodName(); + + when(brokerService.getAuthenticationService()).thenReturn(authenticationService); + when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); + svcConfig.setAuthenticationEnabled(true); + svcConfig.setAuthenticateOriginalAuthData(true); + svcConfig.setProxyRoles(Collections.singleton("pass.proxy")); + + resetChannel(); + assertTrue(channel.isActive()); + assertEquals(serverCnx.getState(), State.Start); + + // Trigger connect command to result in AuthChallenge + ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1, "1", + "localhost", "client", "challenge.client", authMethodName); + channel.writeInbound(clientCommand); + + Object response = getResponse(); + assertTrue(response instanceof CommandError); + + assertEquals(((CommandError) response).getMessage(), "Unable to authenticate"); + assertEquals(serverCnx.getState(), State.Failed); + assertFalse(serverCnx.isActive()); + channel.finish(); + } + + // This test used to be in the ServerCnxAuthorizationTest class, but it was migrated here because the mocking + // in that class was too extensive. There is some overlap with this test and other tests in this class. The primary + // role of this test is verifying that the correct role and AuthenticationDataSource are passed to the + // AuthorizationService. + @Test + public void testVerifyOriginalPrincipalWithAuthDataForwardedFromProxy() throws Exception { + AuthenticationService authenticationService = mock(AuthenticationService.class); + AuthenticationProvider authenticationProvider = new MockAuthenticationProvider(); + String authMethodName = authenticationProvider.getAuthMethodName(); + when(brokerService.getAuthenticationService()).thenReturn(authenticationService); + when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); + svcConfig.setAuthenticationEnabled(true); + svcConfig.setAuthenticateOriginalAuthData(true); + svcConfig.setProxyRoles(Collections.singleton("pass.pass")); + + svcConfig.setAuthorizationProvider("org.apache.pulsar.broker.auth.MockAuthorizationProvider"); + AuthorizationService authorizationService = + spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, + pulsar.getPulsarResources()); + when(brokerService.getAuthorizationService()).thenReturn(authorizationService); + svcConfig.setAuthorizationEnabled(true); + + resetChannel(); + assertTrue(channel.isActive()); + assertEquals(serverCnx.getState(), State.Start); + + // Connect + // This client role integrates with the MockAuthenticationProvider and MockAuthorizationProvider + // to pass authentication and fail authorization + String proxyRole = "pass.pass"; + String clientRole = "pass.fail"; + // Submit a failing originalPrincipal to show that it is not used at all. + ByteBuf connect = Commands.newConnect(authMethodName, proxyRole, "test", "localhost", + "fail.fail", clientRole, authMethodName); + channel.writeInbound(connect); + Object connectResponse = getResponse(); + assertTrue(connectResponse instanceof CommandConnected); + assertEquals(serverCnx.getOriginalAuthData().getCommandData(), clientRole); + assertEquals(serverCnx.getOriginalAuthState().getAuthRole(), clientRole); + assertEquals(serverCnx.getOriginalPrincipal(), clientRole); + assertEquals(serverCnx.getAuthData().getCommandData(), proxyRole); + assertEquals(serverCnx.getAuthRole(), proxyRole); + assertEquals(serverCnx.getAuthState().getAuthRole(), proxyRole); + + // Lookup + TopicName topicName = TopicName.get("persistent://public/default/test-topic"); + ByteBuf lookup = Commands.newLookup(topicName.toString(), false, 1); + channel.writeInbound(lookup); + Object lookupResponse = getResponse(); + assertTrue(lookupResponse instanceof CommandLookupTopicResponse); + assertEquals(((CommandLookupTopicResponse) lookupResponse).getError(), ServerError.AuthorizationError); + assertEquals(((CommandLookupTopicResponse) lookupResponse).getRequestId(), 1); + verify(authorizationService, times(1)) + .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, proxyRole, serverCnx.getAuthData()); + verify(authorizationService, times(1)) + .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, clientRole, serverCnx.getOriginalAuthData()); + + // producer + ByteBuf producer = Commands.newProducer(topicName.toString(), 1, 2, "test-producer", new HashMap<>(), false); + channel.writeInbound(producer); + Object producerResponse = getResponse(); + assertTrue(producerResponse instanceof CommandError); + assertEquals(((CommandError) producerResponse).getError(), ServerError.AuthorizationError); + assertEquals(((CommandError) producerResponse).getRequestId(), 2); + verify(authorizationService, times(1)) + .allowTopicOperationAsync(topicName, TopicOperation.PRODUCE, clientRole, serverCnx.getOriginalAuthData()); + verify(authorizationService, times(1)) + .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, proxyRole, serverCnx.getAuthData()); + + // consumer + String subscriptionName = "test-subscribe"; + ByteBuf subscribe = Commands.newSubscribe(topicName.toString(), subscriptionName, 1, 3, + CommandSubscribe.SubType.Shared, 0, "consumer", 0); + channel.writeInbound(subscribe); + Object subscribeResponse = getResponse(); + assertTrue(subscribeResponse instanceof CommandError); + assertEquals(((CommandError) subscribeResponse).getError(), ServerError.AuthorizationError); + assertEquals(((CommandError) subscribeResponse).getRequestId(), 3); + verify(authorizationService, times(1)).allowTopicOperationAsync( + eq(topicName), eq(TopicOperation.CONSUME), + eq(clientRole), argThat(arg -> { + assertTrue(arg instanceof AuthenticationDataSubscription); + assertEquals(arg.getCommandData(), clientRole); + assertEquals(arg.getSubscription(), subscriptionName); + return true; + })); + verify(authorizationService, times(1)).allowTopicOperationAsync( + eq(topicName), eq(TopicOperation.CONSUME), + eq(proxyRole), argThat(arg -> { + assertTrue(arg instanceof AuthenticationDataSubscription); + assertEquals(arg.getCommandData(), proxyRole); + assertEquals(arg.getSubscription(), subscriptionName); + return true; + })); + } + + // This test used to be in the ServerCnxAuthorizationTest class, but it was migrated here because the mocking + // in that class was too extensive. There is some overlap with this test and other tests in this class. The primary + // role of this test is verifying that the correct role and AuthenticationDataSource are passed to the + // AuthorizationService. + public void testVerifyOriginalPrincipalWithoutAuthDataForwardedFromProxy() throws Exception { + AuthenticationService authenticationService = mock(AuthenticationService.class); + AuthenticationProvider authenticationProvider = new MockAuthenticationProvider(); + String authMethodName = authenticationProvider.getAuthMethodName(); + when(brokerService.getAuthenticationService()).thenReturn(authenticationService); + when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); + svcConfig.setAuthenticationEnabled(true); + svcConfig.setAuthenticateOriginalAuthData(false); + svcConfig.setProxyRoles(Collections.singleton("pass.pass")); + + svcConfig.setAuthorizationProvider("org.apache.pulsar.broker.auth.MockAuthorizationProvider"); + AuthorizationService authorizationService = + spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, + pulsar.getPulsarResources()); + when(brokerService.getAuthorizationService()).thenReturn(authorizationService); + svcConfig.setAuthorizationEnabled(true); + + resetChannel(); + assertTrue(channel.isActive()); + assertEquals(serverCnx.getState(), State.Start); + + // Connect + // This client role integrates with the MockAuthenticationProvider and MockAuthorizationProvider + // to pass authentication and fail authorization + String proxyRole = "pass.pass"; + String clientRole = "pass.fail"; + ByteBuf connect = Commands.newConnect(authMethodName, proxyRole, "test", "localhost", + clientRole, null, null); + channel.writeInbound(connect); + Object connectResponse = getResponse(); + assertTrue(connectResponse instanceof CommandConnected); + assertNull(serverCnx.getOriginalAuthData()); + assertNull(serverCnx.getOriginalAuthState()); + assertEquals(serverCnx.getOriginalPrincipal(), clientRole); + assertEquals(serverCnx.getAuthData().getCommandData(), proxyRole); + assertEquals(serverCnx.getAuthRole(), proxyRole); + assertEquals(serverCnx.getAuthState().getAuthRole(), proxyRole); + + // Lookup + TopicName topicName = TopicName.get("persistent://public/default/test-topic"); + ByteBuf lookup = Commands.newLookup(topicName.toString(), false, 1); + channel.writeInbound(lookup); + Object lookupResponse = getResponse(); + assertTrue(lookupResponse instanceof CommandLookupTopicResponse); + assertEquals(((CommandLookupTopicResponse) lookupResponse).getError(), ServerError.AuthorizationError); + assertEquals(((CommandLookupTopicResponse) lookupResponse).getRequestId(), 1); + verify(authorizationService, times(1)) + .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, proxyRole, serverCnx.getAuthData()); + // This test is an example of https://github.com/apache/pulsar/issues/19332. Essentially, we're passing + // the proxy's auth data because it is all we have. This test should be updated when we resolve that issue. + verify(authorizationService, times(1)) + .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, clientRole, serverCnx.getAuthData()); + + // producer + ByteBuf producer = Commands.newProducer(topicName.toString(), 1, 2, "test-producer", new HashMap<>(), false); + channel.writeInbound(producer); + Object producerResponse = getResponse(); + assertTrue(producerResponse instanceof CommandError); + assertEquals(((CommandError) producerResponse).getError(), ServerError.AuthorizationError); + assertEquals(((CommandError) producerResponse).getRequestId(), 2); + // See https://github.com/apache/pulsar/issues/19332 for justification of this assertion. + verify(authorizationService, times(1)) + .allowTopicOperationAsync(topicName, TopicOperation.PRODUCE, clientRole, serverCnx.getAuthData()); + verify(authorizationService, times(1)) + .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, proxyRole, serverCnx.getAuthData()); + + // consumer + String subscriptionName = "test-subscribe"; + ByteBuf subscribe = Commands.newSubscribe(topicName.toString(), subscriptionName, 1, 3, + CommandSubscribe.SubType.Shared, 0, "consumer", 0); + channel.writeInbound(subscribe); + Object subscribeResponse = getResponse(); + assertTrue(subscribeResponse instanceof CommandError); + assertEquals(((CommandError) subscribeResponse).getError(), ServerError.AuthorizationError); + assertEquals(((CommandError) subscribeResponse).getRequestId(), 3); + verify(authorizationService, times(1)).allowTopicOperationAsync( + eq(topicName), eq(TopicOperation.CONSUME), + eq(clientRole), argThat(arg -> { + assertTrue(arg instanceof AuthenticationDataSubscription); + // We assert that the role is clientRole and commandData is proxyRole due to + // https://github.com/apache/pulsar/issues/19332. + assertEquals(arg.getCommandData(), proxyRole); + assertEquals(arg.getSubscription(), subscriptionName); + return true; + })); + verify(authorizationService, times(1)).allowTopicOperationAsync( + eq(topicName), eq(TopicOperation.CONSUME), + eq(proxyRole), argThat(arg -> { + assertTrue(arg instanceof AuthenticationDataSubscription); + assertEquals(arg.getCommandData(), proxyRole); + assertEquals(arg.getSubscription(), subscriptionName); + return true; + })); + } + + // This test used to be in the ServerCnxAuthorizationTest class, but it was migrated here because the mocking + // in that class was too extensive. There is some overlap with this test and other tests in this class. The primary + // role of this test is verifying that the correct role and AuthenticationDataSource are passed to the + // AuthorizationService. + @Test + public void testVerifyAuthRoleAndAuthDataFromDirectConnectionBroker() throws Exception { + AuthenticationService authenticationService = mock(AuthenticationService.class); + AuthenticationProvider authenticationProvider = new MockAuthenticationProvider(); + String authMethodName = authenticationProvider.getAuthMethodName(); + when(brokerService.getAuthenticationService()).thenReturn(authenticationService); + when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider); + svcConfig.setAuthenticationEnabled(true); + + svcConfig.setAuthorizationProvider("org.apache.pulsar.broker.auth.MockAuthorizationProvider"); + AuthorizationService authorizationService = + spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, + pulsar.getPulsarResources()); + when(brokerService.getAuthorizationService()).thenReturn(authorizationService); + svcConfig.setAuthorizationEnabled(true); + + resetChannel(); + assertTrue(channel.isActive()); + assertEquals(serverCnx.getState(), State.Start); + + // connect + // This client role integrates with the MockAuthenticationProvider and MockAuthorizationProvider + // to pass authentication and fail authorization + String clientRole = "pass.fail"; + ByteBuf connect = Commands.newConnect(authMethodName, clientRole, "test"); + channel.writeInbound(connect); + + Object connectResponse = getResponse(); + assertTrue(connectResponse instanceof CommandConnected); + assertNull(serverCnx.getOriginalAuthData()); + assertNull(serverCnx.getOriginalAuthState()); + assertNull(serverCnx.getOriginalPrincipal()); + assertEquals(serverCnx.getAuthData().getCommandData(), clientRole); + assertEquals(serverCnx.getAuthRole(), clientRole); + assertEquals(serverCnx.getAuthState().getAuthRole(), clientRole); + + // lookup + TopicName topicName = TopicName.get("persistent://public/default/test-topic"); + ByteBuf lookup = Commands.newLookup(topicName.toString(), false, 1); + channel.writeInbound(lookup); + Object lookupResponse = getResponse(); + assertTrue(lookupResponse instanceof CommandLookupTopicResponse); + assertEquals(((CommandLookupTopicResponse) lookupResponse).getError(), ServerError.AuthorizationError); + assertEquals(((CommandLookupTopicResponse) lookupResponse).getRequestId(), 1); + verify(authorizationService, times(1)) + .allowTopicOperationAsync(topicName, TopicOperation.LOOKUP, clientRole, serverCnx.getAuthData()); + + // producer + ByteBuf producer = Commands.newProducer(topicName.toString(), 1, 2, "test-producer", new HashMap<>(), false); + channel.writeInbound(producer); + Object producerResponse = getResponse(); + assertTrue(producerResponse instanceof CommandError); + assertEquals(((CommandError) producerResponse).getError(), ServerError.AuthorizationError); + assertEquals(((CommandError) producerResponse).getRequestId(), 2); + verify(authorizationService, times(1)) + .allowTopicOperationAsync(topicName, TopicOperation.PRODUCE, clientRole, serverCnx.getAuthData()); + + // consumer + String subscriptionName = "test-subscribe"; + ByteBuf subscribe = Commands.newSubscribe(topicName.toString(), subscriptionName, 1, 3, + CommandSubscribe.SubType.Shared, 0, "consumer", 0); + channel.writeInbound(subscribe); + Object subscribeResponse = getResponse(); + assertTrue(subscribeResponse instanceof CommandError); + assertEquals(((CommandError) subscribeResponse).getError(), ServerError.AuthorizationError); + assertEquals(((CommandError) subscribeResponse).getRequestId(), 3); + verify(authorizationService, times(1)).allowTopicOperationAsync( + eq(topicName), eq(TopicOperation.CONSUME), + eq(clientRole), argThat(arg -> { + assertTrue(arg instanceof AuthenticationDataSubscription); + assertEquals(arg.getCommandData(), clientRole); + assertEquals(arg.getSubscription(), subscriptionName); + return true; + })); + } + @Test(timeOut = 30000) public void testProducerCommand() throws Exception { resetChannel();