From 216cb96a9d49494769c56fa34c1a554179c4239e Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 29 May 2024 09:49:18 +0300 Subject: [PATCH] Fix TLS stability issues with V2 protocol that caused data corruption (#4404) * Fix TLS stability issues with V2 protocol that caused data corruption - add the TLS handler after the FlushConsolidationHandler - This makes TLS connections from Pulsar Broker to Bookkeeper stable when bookkeeperUseV2WireProtocol=true is used - Fix test TestTLS for V2 - Fix inconsistency in client configuration in BookKeeperClusterTestCase (cherry picked from commit 5f73147a2803a5147d9d9ba2d28eaa6c79c998a3) --- .../bookkeeper/proto/BookieNettyServer.java | 3 ++- .../bookkeeper/proto/BookieRequestProcessor.java | 9 ++++++++- .../bookkeeper/proto/PerChannelBookieClient.java | 16 ++++++++++++---- .../test/BookKeeperClusterTestCase.java | 2 +- .../java/org/apache/bookkeeper/tls/TestTLS.java | 13 ++++--------- 5 files changed, 27 insertions(+), 16 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java index a834208ce06..a7e919c826c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java @@ -92,6 +92,7 @@ class BookieNettyServer { private static final Logger LOG = LoggerFactory.getLogger(BookieNettyServer.class); + public static final String CONSOLIDATION_HANDLER_NAME = "consolidation"; final int maxFrameSize; final ServerConfiguration conf; @@ -344,7 +345,7 @@ protected void initChannel(SocketChannel ch) throws Exception { new BookieSideConnectionPeerContextHandler(); ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast("consolidation", new FlushConsolidationHandler(1024, true)); + pipeline.addLast(CONSOLIDATION_HANDLER_NAME, new FlushConsolidationHandler(1024, true)); pipeline.addLast("bytebufList", ByteBufList.ENCODER); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java index a77b3d7bb5b..0d9d61f634f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java @@ -66,6 +66,7 @@ public class BookieRequestProcessor implements RequestProcessor { private static final Logger LOG = LoggerFactory.getLogger(BookieRequestProcessor.class); + public static final String TLS_HANDLER_NAME = "tls"; /** * The server configuration. We use this for getting the number of add and read @@ -576,9 +577,15 @@ private void processStartTLSRequestV3(final BookkeeperProtocol.Request r, response.setStatus(BookkeeperProtocol.StatusCode.EBADREQ); writeAndFlush(c, response.build()); } else { + LOG.info("Starting TLS handshake with client on channel {}", c); // there is no need to execute in a different thread as this operation is light SslHandler sslHandler = shFactory.newTLSHandler(); - c.pipeline().addFirst("tls", sslHandler); + if (c.pipeline().names().contains(BookieNettyServer.CONSOLIDATION_HANDLER_NAME)) { + c.pipeline().addAfter(BookieNettyServer.CONSOLIDATION_HANDLER_NAME, TLS_HANDLER_NAME, sslHandler); + } else { + // local transport doesn't contain FlushConsolidationHandler + c.pipeline().addFirst(TLS_HANDLER_NAME, sslHandler); + } response.setStatus(BookkeeperProtocol.StatusCode.EOK); BookkeeperProtocol.StartTLSResponse.Builder builder = BookkeeperProtocol.StartTLSResponse.newBuilder(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index ca1448c7682..149f97fc28b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -174,6 +174,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { BKException.Code.WriteOnReadOnlyBookieException)); private static final int DEFAULT_HIGH_PRIORITY_VALUE = 100; // We may add finer grained priority later. private static final AtomicLong txnIdGenerator = new AtomicLong(0); + static final String CONSOLIDATION_HANDLER_NAME = "consolidation"; final BookieId bookieId; final BookieAddressResolver bookieAddressResolver; @@ -594,7 +595,7 @@ protected ChannelFuture connect() { @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast("consolidation", new FlushConsolidationHandler(1024, true)); + pipeline.addLast(CONSOLIDATION_HANDLER_NAME, new FlushConsolidationHandler(1024, true)); pipeline.addLast("bytebufList", ByteBufList.ENCODER); pipeline.addLast("lengthbasedframedecoder", new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4)); @@ -1522,9 +1523,16 @@ void initTLSHandshake() { } else { throw new RuntimeException("Unexpected socket address type"); } - SslHandler handler = parentObj.shFactory.newTLSHandler(address.getHostName(), address.getPort()); - channel.pipeline().addFirst(parentObj.shFactory.getHandlerName(), handler); - handler.handshakeFuture().addListener(new GenericFutureListener>() { + LOG.info("Starting TLS handshake with {}:{}", address.getHostString(), address.getPort()); + SslHandler sslHandler = parentObj.shFactory.newTLSHandler(address.getHostName(), address.getPort()); + String sslHandlerName = parentObj.shFactory.getHandlerName(); + if (channel.pipeline().names().contains(CONSOLIDATION_HANDLER_NAME)) { + channel.pipeline().addAfter(CONSOLIDATION_HANDLER_NAME, sslHandlerName, sslHandler); + } else { + // local transport doesn't contain FlushConsolidationHandler + channel.pipeline().addFirst(sslHandlerName, sslHandler); + } + sslHandler.handshakeFuture().addListener(new GenericFutureListener>() { @Override public void operationComplete(Future future) throws Exception { int rc; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java index 5e705f724ed..f6acefb5874 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java @@ -301,7 +301,7 @@ protected ServerConfiguration newServerConfiguration() throws Exception { } protected ClientConfiguration newClientConfiguration() { - return new ClientConfiguration(baseConf); + return new ClientConfiguration(baseClientConf); } protected ServerConfiguration newServerConfiguration(int port, File journalDir, File[] ledgerDirs) { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java index b5719deea0d..f4d5d941978 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java @@ -349,11 +349,6 @@ public void testConnectToTLSClusterTLSClient() throws Exception { */ @Test public void testConnectToLocalTLSClusterTLSClient() throws Exception { - // skip test - if (useV2Protocol) { - return; - } - restartBookies(c -> { c.setDisableServerSocketBind(true); c.setEnableLocalTransport(true); @@ -621,10 +616,6 @@ public void testBookieAuthPluginRequireClientTLSAuthentication() throws Exceptio */ @Test public void testBookieAuthPluginRequireClientTLSAuthenticationLocal() throws Exception { - if (useV2Protocol) { - return; - } - restartBookies(c -> { c.setBookieAuthProviderFactoryClass( AllowOnlyClientsWithX509Certificates.class.getName()); @@ -755,6 +746,10 @@ public void testBookieAuthPluginDenyAccessToClientWithoutTLS() throws Exception testClient(clientConf, numBookies); fail("Shouldn't be able to connect"); } catch (BKException.BKUnauthorizedAccessException authFailed) { + } catch (BKException.BKNotEnoughBookiesException notEnoughBookiesException) { + if (!useV2Protocol) { + fail("Unexpected exception occurred."); + } } assertFalse(secureBookieSideChannel);