diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 86604f87def524..355e252db19ea3 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2489,6 +2489,12 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private int transactionBufferClientMaxConcurrentRequests = 1000; + @FieldContext( + category = CATEGORY_TRANSACTION, + doc = "The transaction buffer client's operation timeout in milliseconds." + ) + private long transactionBufferClientOperationTimeoutInMills = 3000L; + /**** --- KeyStore TLS config variables. --- ****/ @FieldContext( category = CATEGORY_KEYSTORE_TLS, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 93fc75e0ff5183..b060cbbcfcf70e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -752,7 +752,8 @@ public void start() throws PulsarServerException { this.transactionTimer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer")); transactionBufferClient = TransactionBufferClientImpl.create(getClient(), transactionTimer, - config.getTransactionBufferClientMaxConcurrentRequests()); + config.getTransactionBufferClientMaxConcurrentRequests(), + config.getTransactionBufferClientOperationTimeoutInMills()); transactionMetadataStoreService = new TransactionMetadataStoreService(TransactionMetadataStoreProvider .newProvider(config.getTransactionMetadataStoreProviderClassName()), this, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java index 454e9a6d53beb4..060476e573c2ae 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java @@ -40,9 +40,9 @@ private TransactionBufferClientImpl(TransactionBufferHandler tbHandler) { } public static TransactionBufferClient create(PulsarClient pulsarClient, HashedWheelTimer timer, - int maxConcurrentRequests) { + int maxConcurrentRequests, long operationTimeoutInMills) { TransactionBufferHandler handler = new TransactionBufferHandlerImpl(pulsarClient, timer, - maxConcurrentRequests); + maxConcurrentRequests, operationTimeoutInMills); return new TransactionBufferClientImpl(handler); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java index 6ea53a3edd263d..b80a273bc6fcd9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java @@ -75,12 +75,12 @@ public CompletableFuture load(String topic) { } }); - public TransactionBufferHandlerImpl(PulsarClient pulsarClient, - HashedWheelTimer timer, int maxConcurrentRequests) { + public TransactionBufferHandlerImpl(PulsarClient pulsarClient, HashedWheelTimer timer, + int maxConcurrentRequests, long operationTimeoutInMills) { this.pulsarClient = pulsarClient; this.outstandingRequests = new ConcurrentSkipListMap<>(); this.pendingRequests = new GrowableArrayBlockingQueue<>(); - this.operationTimeoutInMills = 3000L; + this.operationTimeoutInMills = operationTimeoutInMills; this.timer = timer; this.requestCredits = Math.max(100, maxConcurrentRequests); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java index fa1b9e7f287e61..d85f6b42f23f61 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java @@ -81,7 +81,7 @@ protected void setup() throws Exception { admin.namespaces().createNamespace(namespace, 10); admin.topics().createPartitionedTopic(partitionedTopicName.getPartitionedTopicName(), partitions); tbClient = TransactionBufferClientImpl.create(pulsarClient, - new HashedWheelTimer(new DefaultThreadFactory("transaction-buffer")), 1000); + new HashedWheelTimer(new DefaultThreadFactory("transaction-buffer")), 1000, 3000); } @Override @@ -160,7 +160,7 @@ public void testTransactionBufferClientTimeout() throws Exception { @Cleanup("stop") HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(); TransactionBufferHandlerImpl transactionBufferHandler = - new TransactionBufferHandlerImpl(mockClient, hashedWheelTimer, 1000); + new TransactionBufferHandlerImpl(mockClient, hashedWheelTimer, 1000, 3000); CompletableFuture endFuture = transactionBufferHandler.endTxnOnTopic("test", 1, 1, TxnAction.ABORT, 1); @@ -203,7 +203,7 @@ public void testTransactionBufferChannelUnActive() { @Cleanup("stop") HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(); TransactionBufferHandlerImpl transactionBufferHandler = - new TransactionBufferHandlerImpl(mockClient, hashedWheelTimer, 1000); + new TransactionBufferHandlerImpl(mockClient, hashedWheelTimer, 1000, 3000); try { transactionBufferHandler.endTxnOnTopic("test", 1, 1, TxnAction.ABORT, 1).get(); fail(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java index ef0cf037772b6e..5241342635b881 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferHandlerImplTest.java @@ -42,7 +42,8 @@ public class TransactionBufferHandlerImplTest { public void testRequestCredits() { PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class); when(pulsarClient.getConnection(anyString())).thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class))); - TransactionBufferHandlerImpl handler = spy(new TransactionBufferHandlerImpl(pulsarClient, null, 1000)); + TransactionBufferHandlerImpl handler = spy( + new TransactionBufferHandlerImpl(pulsarClient, null, 1000, 3000)); doNothing().when(handler).endTxn(any()); for (int i = 0; i < 500; i++) { handler.endTxnOnTopic("t", 1L, 1L, TxnAction.COMMIT, 1L); @@ -61,7 +62,8 @@ public void testRequestCredits() { @Test public void testMinRequestCredits() { - TransactionBufferHandlerImpl handler = spy(new TransactionBufferHandlerImpl(null, null, 50)); + TransactionBufferHandlerImpl handler = spy( + new TransactionBufferHandlerImpl(null, null, 50, 3000)); assertEquals(handler.getAvailableRequestCredits(), 100); } }