diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java index bfad643d1e2..98fe6c613ff 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java @@ -92,6 +92,7 @@ class BookieNettyServer { private static final Logger LOG = LoggerFactory.getLogger(BookieNettyServer.class); + public static final String CONSOLIDATION_HANDLER_NAME = "consolidation"; final int maxFrameSize; final ServerConfiguration conf; @@ -344,7 +345,7 @@ protected void initChannel(SocketChannel ch) throws Exception { new BookieSideConnectionPeerContextHandler(); ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast("consolidation", new FlushConsolidationHandler(1024, true)); + pipeline.addLast(CONSOLIDATION_HANDLER_NAME, new FlushConsolidationHandler(1024, true)); pipeline.addLast("bytebufList", ByteBufList.ENCODER); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java index 1a083519625..7b55545b5d2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java @@ -66,6 +66,7 @@ public class BookieRequestProcessor implements RequestProcessor { private static final Logger LOG = LoggerFactory.getLogger(BookieRequestProcessor.class); + public static final String TLS_HANDLER_NAME = "tls"; /** * The server configuration. We use this for getting the number of add and read @@ -580,9 +581,15 @@ private void processStartTLSRequestV3(final BookkeeperProtocol.Request r, response.setStatus(BookkeeperProtocol.StatusCode.EBADREQ); writeAndFlush(c, response.build()); } else { + LOG.info("Starting TLS handshake with client on channel {}", c); // there is no need to execute in a different thread as this operation is light SslHandler sslHandler = shFactory.newTLSHandler(); - c.pipeline().addFirst("tls", sslHandler); + if (c.pipeline().names().contains(BookieNettyServer.CONSOLIDATION_HANDLER_NAME)) { + c.pipeline().addAfter(BookieNettyServer.CONSOLIDATION_HANDLER_NAME, TLS_HANDLER_NAME, sslHandler); + } else { + // local transport doesn't contain FlushConsolidationHandler + c.pipeline().addFirst(TLS_HANDLER_NAME, sslHandler); + } response.setStatus(BookkeeperProtocol.StatusCode.EOK); BookkeeperProtocol.StartTLSResponse.Builder builder = BookkeeperProtocol.StartTLSResponse.newBuilder(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 5fe1a6e2ef5..1eefda45e0e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -175,6 +175,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { BKException.Code.WriteOnReadOnlyBookieException)); private static final int DEFAULT_HIGH_PRIORITY_VALUE = 100; // We may add finer grained priority later. private static final AtomicLong txnIdGenerator = new AtomicLong(0); + static final String CONSOLIDATION_HANDLER_NAME = "consolidation"; final BookieId bookieId; final BookieAddressResolver bookieAddressResolver; @@ -595,7 +596,7 @@ protected ChannelFuture connect() { @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast("consolidation", new FlushConsolidationHandler(1024, true)); + pipeline.addLast(CONSOLIDATION_HANDLER_NAME, new FlushConsolidationHandler(1024, true)); pipeline.addLast("bytebufList", ByteBufList.ENCODER); pipeline.addLast("lengthbasedframedecoder", new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4)); @@ -1573,9 +1574,16 @@ void initTLSHandshake() { } else { throw new RuntimeException("Unexpected socket address type"); } - SslHandler handler = parentObj.shFactory.newTLSHandler(address.getHostName(), address.getPort()); - channel.pipeline().addFirst(parentObj.shFactory.getHandlerName(), handler); - handler.handshakeFuture().addListener(new GenericFutureListener>() { + LOG.info("Starting TLS handshake with {}:{}", address.getHostString(), address.getPort()); + SslHandler sslHandler = parentObj.shFactory.newTLSHandler(address.getHostName(), address.getPort()); + String sslHandlerName = parentObj.shFactory.getHandlerName(); + if (channel.pipeline().names().contains(CONSOLIDATION_HANDLER_NAME)) { + channel.pipeline().addAfter(CONSOLIDATION_HANDLER_NAME, sslHandlerName, sslHandler); + } else { + // local transport doesn't contain FlushConsolidationHandler + channel.pipeline().addFirst(sslHandlerName, sslHandler); + } + sslHandler.handshakeFuture().addListener(new GenericFutureListener>() { @Override public void operationComplete(Future future) throws Exception { int rc; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java index 83893922d69..b73a3ee7b44 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java @@ -318,7 +318,7 @@ protected ServerConfiguration newServerConfiguration() throws Exception { } protected ClientConfiguration newClientConfiguration() { - return new ClientConfiguration(baseConf); + return new ClientConfiguration(baseClientConf); } protected ServerConfiguration newServerConfiguration(int port, File journalDir, File[] ledgerDirs) { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java index 1d789f6559e..1ab90d32521 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java @@ -350,11 +350,6 @@ public void testConnectToTLSClusterTLSClient() throws Exception { */ @Test public void testConnectToLocalTLSClusterTLSClient() throws Exception { - // skip test - if (useV2Protocol) { - return; - } - restartBookies(c -> { c.setDisableServerSocketBind(true); c.setEnableLocalTransport(true); @@ -622,10 +617,6 @@ public void testBookieAuthPluginRequireClientTLSAuthentication() throws Exceptio */ @Test public void testBookieAuthPluginRequireClientTLSAuthenticationLocal() throws Exception { - if (useV2Protocol) { - return; - } - restartBookies(c -> { c.setBookieAuthProviderFactoryClass( AllowOnlyClientsWithX509Certificates.class.getName()); @@ -756,6 +747,10 @@ public void testBookieAuthPluginDenyAccessToClientWithoutTLS() throws Exception testClient(clientConf, numBookies); fail("Shouldn't be able to connect"); } catch (BKException.BKUnauthorizedAccessException authFailed) { + } catch (BKException.BKNotEnoughBookiesException notEnoughBookiesException) { + if (!useV2Protocol) { + fail("Unexpected exception occurred."); + } } assertFalse(secureBookieSideChannel);