Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] ServerCnx: go to Failed state when auth fails #18

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -969,10 +969,10 @@ protected void handleConnect(CommandConnect connect) {
}
} catch (Exception e) {
service.getPulsarStats().recordConnectionCreateFail();
state = State.Failed;
logAuthException(remoteAddress, "connect", getPrincipal(), Optional.empty(), e);
String msg = "Unable to authenticate";
writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, msg));
close();
ByteBuf msg = Commands.newError(-1, ServerError.AuthenticationError, "Unable to authenticate");
NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg);
}
}

Expand All @@ -994,15 +994,17 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {
authResponse.hasClientVersion() ? authResponse.getClientVersion() : EMPTY);
} catch (AuthenticationException e) {
service.getPulsarStats().recordConnectionCreateFail();
state = State.Failed;
log.warn("[{}] Authentication failed: {} ", remoteAddress, e.getMessage());
writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, e.getMessage()));
close();
ByteBuf msg = Commands.newError(-1, ServerError.AuthenticationError, "Unable to authenticate");
NettyChannelUtil.writeAndFlushWithClosePromise(ctx, msg);
} catch (Exception e) {
service.getPulsarStats().recordConnectionCreateFail();
state = State.Failed;
String msg = "Unable to handleAuthResponse";
log.warn("[{}] {} ", remoteAddress, msg, e);
writeAndFlush(Commands.newError(-1, ServerError.UnknownError, msg));
close();
ByteBuf command = Commands.newError(-1, ServerError.UnknownError, msg);
NettyChannelUtil.writeAndFlushWithClosePromise(ctx, command);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.auth;

import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import java.net.SocketAddress;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.common.api.AuthData;

/**
* Class that provides the same authentication semantics as the {@link MockAuthenticationProvider} except
* that this one initializes the {@link MockMultiStageAuthenticationState} class to support testing
* multistage authentication.
*/
public class MockMultiStageAuthenticationProvider extends MockAuthenticationProvider {

@Override
public String getAuthMethodName() {
return "multi-stage";
}

@Override
public AuthenticationState newAuthState(AuthData authData,
SocketAddress remoteAddress,
SSLSession sslSession) throws AuthenticationException {
return new MockMultiStageAuthenticationState(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.auth;

import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.common.api.AuthData;

import javax.naming.AuthenticationException;

import static java.nio.charset.StandardCharsets.UTF_8;

/**
* Performs multistage authentication by extending the paradigm created in {@link MockAuthenticationProvider}.
*/
public class MockMultiStageAuthenticationState implements AuthenticationState {

private final MockMultiStageAuthenticationProvider provider;
private String authRole = null;

MockMultiStageAuthenticationState(MockMultiStageAuthenticationProvider provider) {
this.provider = provider;
}

@Override
public String getAuthRole() throws AuthenticationException {
if (authRole == null) {
throw new AuthenticationException("Must authenticate first");
}
return null;
}

@Override
public AuthData authenticate(AuthData authData) throws AuthenticationException {
String data = new String(authData.getBytes(), UTF_8);
String[] parts = data.split("\\.");
if (parts.length == 2) {
if ("challenge".equals(parts[0])) {
return AuthData.of("challenged".getBytes());
} else {
AuthenticationDataCommand command = new AuthenticationDataCommand(data);
authRole = provider.authenticate(command);
// Auth successful, no more auth required
return null;
}
}
throw new AuthenticationException("Failed to authenticate");
}

@Override
public AuthenticationDataSource getAuthDataSource() {
return null;
}

@Override
public boolean isComplete() {
return authRole != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.auth.MockAuthenticationProvider;
import org.apache.pulsar.broker.auth.MockMultiStageAuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationService;
Expand All @@ -103,6 +105,7 @@
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse;
import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
import org.apache.pulsar.common.api.proto.CommandAuthResponse;
import org.apache.pulsar.common.api.proto.CommandConnected;
import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
Expand Down Expand Up @@ -450,11 +453,84 @@ public void testConnectCommandWithAuthenticationNegative() throws Exception {
ByteBuf clientCommand = Commands.newConnect("none", "", null);
channel.writeInbound(clientCommand);

assertEquals(serverCnx.getState(), State.Start);
assertEquals(serverCnx.getState(), State.Failed);
assertTrue(getResponse() instanceof CommandError);
channel.finish();
}

@Test(timeOut = 30000)
public void testConnectCommandWithFailingOriginalAuthData() throws Exception {
AuthenticationService authenticationService = mock(AuthenticationService.class);
AuthenticationProvider authenticationProvider = new MockAuthenticationProvider();
String authMethodName = authenticationProvider.getAuthMethodName();

when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
svcConfig.setAuthenticationEnabled(true);
svcConfig.setAuthenticateOriginalAuthData(true);
svcConfig.setProxyRoles(Collections.singleton("proxy"));

resetChannel();
assertTrue(channel.isActive());
assertEquals(serverCnx.getState(), State.Start);

ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1,null,
null, "client", "fail", authMethodName);
channel.writeInbound(clientCommand);

// We currently expect two responses because the originalAuthData is verified after sending
// a successful response to the proxy. Because this is a synchronous operation, there is currently
// no risk. It would be better to fix this. See https://github.com/apache/pulsar/issues/19311.
Object response1 = getResponse();
assertTrue(response1 instanceof CommandConnected);
Object response2 = getResponse();
assertTrue(response2 instanceof CommandError);
assertEquals(((CommandError) response2).getMessage(), "Unable to authenticate");
assertEquals(serverCnx.getState(), State.Failed);
assertFalse(serverCnx.isActive());
channel.finish();
}

@Test(timeOut = 30000)
public void testAuthResponseWithFailingAuthData() throws Exception {
AuthenticationService authenticationService = mock(AuthenticationService.class);
AuthenticationProvider authenticationProvider = new MockMultiStageAuthenticationProvider();
String authMethodName = authenticationProvider.getAuthMethodName();

when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
svcConfig.setAuthenticationEnabled(true);

resetChannel();
assertTrue(channel.isActive());
assertEquals(serverCnx.getState(), State.Start);

// Trigger connect command to result in AuthChallenge
ByteBuf clientCommand = Commands.newConnect(authMethodName, "challenge.client", "1");
channel.writeInbound(clientCommand);

Object challenge1 = getResponse();
assertTrue(challenge1 instanceof CommandAuthChallenge);

// Trigger another AuthChallenge to verify that code path continues to challenge
ByteBuf authResponse1 = Commands.newAuthResponse(authMethodName, AuthData.of("challenge.client".getBytes()), 1, "1");
channel.writeInbound(authResponse1);

Object challenge2 = getResponse();
assertTrue(challenge2 instanceof CommandAuthChallenge);

// Trigger failure
ByteBuf authResponse2 = Commands.newAuthResponse(authMethodName, AuthData.of("fail.client".getBytes()), 1, "1");
channel.writeInbound(authResponse2);

Object response3 = getResponse();
assertTrue(response3 instanceof CommandError);
assertEquals(((CommandError) response3).getMessage(), "Unable to authenticate");
assertEquals(serverCnx.getState(), State.Failed);
assertFalse(serverCnx.isActive());
channel.finish();
}

@Test(timeOut = 30000)
public void testProducerCommand() throws Exception {
resetChannel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Queue;
import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse;
import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
import org.apache.pulsar.common.api.proto.CommandEndTxnResponse;
Expand Down Expand Up @@ -83,6 +84,11 @@ protected void handleConnected(CommandConnected connected) {
queue.offer(new CommandConnected().copyFrom(connected));
}

@Override
protected void handleAuthChallenge(CommandAuthChallenge challenge) {
queue.offer(new CommandAuthChallenge().copyFrom(challenge));
}

@Override
protected void handleSubscribe(CommandSubscribe subscribe) {
queue.offer(new CommandSubscribe().copyFrom(subscribe));
Expand Down