From f05d8ae0d57b7f23fbca79b727c506537896d78c Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 28 May 2024 23:47:32 +0300 Subject: [PATCH 1/3] Fix TLS stability issues with V2 protocol that caused data corruption - add the TLS handler after the FlushConsolidationHandler - This makes TLS connections from Pulsar Broker to Bookkeeper stable when bookkeeperUseV2WireProtocol=true is used - Fix test TestTLS for V2 - Fix inconsistency in client configuration in BookKeeperClusterTestCase --- .../apache/bookkeeper/proto/BookieNettyServer.java | 3 ++- .../bookkeeper/proto/BookieRequestProcessor.java | 3 ++- .../bookkeeper/proto/PerChannelBookieClient.java | 7 +++++-- .../bookkeeper/test/BookKeeperClusterTestCase.java | 2 +- .../java/org/apache/bookkeeper/tls/TestTLS.java | 13 ++++--------- 5 files changed, 14 insertions(+), 14 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java index bfad643d1e2..98fe6c613ff 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java @@ -92,6 +92,7 @@ class BookieNettyServer { private static final Logger LOG = LoggerFactory.getLogger(BookieNettyServer.class); + public static final String CONSOLIDATION_HANDLER_NAME = "consolidation"; final int maxFrameSize; final ServerConfiguration conf; @@ -344,7 +345,7 @@ protected void initChannel(SocketChannel ch) throws Exception { new BookieSideConnectionPeerContextHandler(); ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast("consolidation", new FlushConsolidationHandler(1024, true)); + pipeline.addLast(CONSOLIDATION_HANDLER_NAME, new FlushConsolidationHandler(1024, true)); pipeline.addLast("bytebufList", ByteBufList.ENCODER); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java index 1a083519625..93c252c40d5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java @@ -580,9 +580,10 @@ private void processStartTLSRequestV3(final BookkeeperProtocol.Request r, response.setStatus(BookkeeperProtocol.StatusCode.EBADREQ); writeAndFlush(c, response.build()); } else { + LOG.info("Starting TLS handshake with client on channel {}", c); // there is no need to execute in a different thread as this operation is light SslHandler sslHandler = shFactory.newTLSHandler(); - c.pipeline().addFirst("tls", sslHandler); + c.pipeline().addAfter(BookieNettyServer.CONSOLIDATION_HANDLER_NAME, "tls", sslHandler); response.setStatus(BookkeeperProtocol.StatusCode.EOK); BookkeeperProtocol.StartTLSResponse.Builder builder = BookkeeperProtocol.StartTLSResponse.newBuilder(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 5fe1a6e2ef5..24adeb1c19e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -175,6 +175,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { BKException.Code.WriteOnReadOnlyBookieException)); private static final int DEFAULT_HIGH_PRIORITY_VALUE = 100; // We may add finer grained priority later. private static final AtomicLong txnIdGenerator = new AtomicLong(0); + static final String CONSOLIDATION_HANDLER_NAME = "consolidation"; final BookieId bookieId; final BookieAddressResolver bookieAddressResolver; @@ -595,7 +596,7 @@ protected ChannelFuture connect() { @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast("consolidation", new FlushConsolidationHandler(1024, true)); + pipeline.addLast(CONSOLIDATION_HANDLER_NAME, new FlushConsolidationHandler(1024, true)); pipeline.addLast("bytebufList", ByteBufList.ENCODER); pipeline.addLast("lengthbasedframedecoder", new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4)); @@ -1573,8 +1574,10 @@ void initTLSHandshake() { } else { throw new RuntimeException("Unexpected socket address type"); } + LOG.info("Starting TLS handshake with {}:{}", address.getHostString(), address.getPort()); SslHandler handler = parentObj.shFactory.newTLSHandler(address.getHostName(), address.getPort()); - channel.pipeline().addFirst(parentObj.shFactory.getHandlerName(), handler); + channel.pipeline() + .addAfter(CONSOLIDATION_HANDLER_NAME, parentObj.shFactory.getHandlerName(), handler); handler.handshakeFuture().addListener(new GenericFutureListener>() { @Override public void operationComplete(Future future) throws Exception { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java index 83893922d69..b73a3ee7b44 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java @@ -318,7 +318,7 @@ protected ServerConfiguration newServerConfiguration() throws Exception { } protected ClientConfiguration newClientConfiguration() { - return new ClientConfiguration(baseConf); + return new ClientConfiguration(baseClientConf); } protected ServerConfiguration newServerConfiguration(int port, File journalDir, File[] ledgerDirs) { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java index 1d789f6559e..1ab90d32521 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java @@ -350,11 +350,6 @@ public void testConnectToTLSClusterTLSClient() throws Exception { */ @Test public void testConnectToLocalTLSClusterTLSClient() throws Exception { - // skip test - if (useV2Protocol) { - return; - } - restartBookies(c -> { c.setDisableServerSocketBind(true); c.setEnableLocalTransport(true); @@ -622,10 +617,6 @@ public void testBookieAuthPluginRequireClientTLSAuthentication() throws Exceptio */ @Test public void testBookieAuthPluginRequireClientTLSAuthenticationLocal() throws Exception { - if (useV2Protocol) { - return; - } - restartBookies(c -> { c.setBookieAuthProviderFactoryClass( AllowOnlyClientsWithX509Certificates.class.getName()); @@ -756,6 +747,10 @@ public void testBookieAuthPluginDenyAccessToClientWithoutTLS() throws Exception testClient(clientConf, numBookies); fail("Shouldn't be able to connect"); } catch (BKException.BKUnauthorizedAccessException authFailed) { + } catch (BKException.BKNotEnoughBookiesException notEnoughBookiesException) { + if (!useV2Protocol) { + fail("Unexpected exception occurred."); + } } assertFalse(secureBookieSideChannel); From f08cad5309f02c2f50377df1f6e91bf51575bd90 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 29 May 2024 03:58:51 +0300 Subject: [PATCH 2/3] Fix test failures - revert changes to enable certain tests for V2 --- .../bookkeeper/proto/PerChannelBookieClient.java | 13 +++++++++---- .../java/org/apache/bookkeeper/tls/TestTLS.java | 9 +++++++++ 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 24adeb1c19e..1eefda45e0e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -1575,10 +1575,15 @@ void initTLSHandshake() { throw new RuntimeException("Unexpected socket address type"); } LOG.info("Starting TLS handshake with {}:{}", address.getHostString(), address.getPort()); - SslHandler handler = parentObj.shFactory.newTLSHandler(address.getHostName(), address.getPort()); - channel.pipeline() - .addAfter(CONSOLIDATION_HANDLER_NAME, parentObj.shFactory.getHandlerName(), handler); - handler.handshakeFuture().addListener(new GenericFutureListener>() { + SslHandler sslHandler = parentObj.shFactory.newTLSHandler(address.getHostName(), address.getPort()); + String sslHandlerName = parentObj.shFactory.getHandlerName(); + if (channel.pipeline().names().contains(CONSOLIDATION_HANDLER_NAME)) { + channel.pipeline().addAfter(CONSOLIDATION_HANDLER_NAME, sslHandlerName, sslHandler); + } else { + // local transport doesn't contain FlushConsolidationHandler + channel.pipeline().addFirst(sslHandlerName, sslHandler); + } + sslHandler.handshakeFuture().addListener(new GenericFutureListener>() { @Override public void operationComplete(Future future) throws Exception { int rc; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java index 1ab90d32521..2045f60a8ec 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java @@ -350,6 +350,11 @@ public void testConnectToTLSClusterTLSClient() throws Exception { */ @Test public void testConnectToLocalTLSClusterTLSClient() throws Exception { + // skip test + if (useV2Protocol) { + return; + } + restartBookies(c -> { c.setDisableServerSocketBind(true); c.setEnableLocalTransport(true); @@ -617,6 +622,10 @@ public void testBookieAuthPluginRequireClientTLSAuthentication() throws Exceptio */ @Test public void testBookieAuthPluginRequireClientTLSAuthenticationLocal() throws Exception { + if (useV2Protocol) { + return; + } + restartBookies(c -> { c.setBookieAuthProviderFactoryClass( AllowOnlyClientsWithX509Certificates.class.getName()); From b68d20d5e2104cc231abc0b636c3e02983f61953 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 29 May 2024 04:04:17 +0300 Subject: [PATCH 3/3] Fix local transport tests --- .../apache/bookkeeper/proto/BookieRequestProcessor.java | 8 +++++++- .../src/test/java/org/apache/bookkeeper/tls/TestTLS.java | 9 --------- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java index 93c252c40d5..7b55545b5d2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java @@ -66,6 +66,7 @@ public class BookieRequestProcessor implements RequestProcessor { private static final Logger LOG = LoggerFactory.getLogger(BookieRequestProcessor.class); + public static final String TLS_HANDLER_NAME = "tls"; /** * The server configuration. We use this for getting the number of add and read @@ -583,7 +584,12 @@ private void processStartTLSRequestV3(final BookkeeperProtocol.Request r, LOG.info("Starting TLS handshake with client on channel {}", c); // there is no need to execute in a different thread as this operation is light SslHandler sslHandler = shFactory.newTLSHandler(); - c.pipeline().addAfter(BookieNettyServer.CONSOLIDATION_HANDLER_NAME, "tls", sslHandler); + if (c.pipeline().names().contains(BookieNettyServer.CONSOLIDATION_HANDLER_NAME)) { + c.pipeline().addAfter(BookieNettyServer.CONSOLIDATION_HANDLER_NAME, TLS_HANDLER_NAME, sslHandler); + } else { + // local transport doesn't contain FlushConsolidationHandler + c.pipeline().addFirst(TLS_HANDLER_NAME, sslHandler); + } response.setStatus(BookkeeperProtocol.StatusCode.EOK); BookkeeperProtocol.StartTLSResponse.Builder builder = BookkeeperProtocol.StartTLSResponse.newBuilder(); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java index 2045f60a8ec..1ab90d32521 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java @@ -350,11 +350,6 @@ public void testConnectToTLSClusterTLSClient() throws Exception { */ @Test public void testConnectToLocalTLSClusterTLSClient() throws Exception { - // skip test - if (useV2Protocol) { - return; - } - restartBookies(c -> { c.setDisableServerSocketBind(true); c.setEnableLocalTransport(true); @@ -622,10 +617,6 @@ public void testBookieAuthPluginRequireClientTLSAuthentication() throws Exceptio */ @Test public void testBookieAuthPluginRequireClientTLSAuthenticationLocal() throws Exception { - if (useV2Protocol) { - return; - } - restartBookies(c -> { c.setBookieAuthProviderFactoryClass( AllowOnlyClientsWithX509Certificates.class.getName());