diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index f8bc30f09667c..7ab3e545e981e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -545,4 +545,51 @@ public void testMultiTopicConsumerConcurrentRedeliverAndReceive() throws Excepti consumer.close(); admin.topics().deletePartitionedTopic("persistent://public/default/" + topic); } + + @DataProvider(name = "negativeAckPrecisionBitCnt") + public Object[][] negativeAckPrecisionBitCnt() { + return new Object[][]{ + {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}, {11}, {12} + }; + } + + /** + * When negativeAckPrecisionBitCnt is greater than 0, the lower bits of the redelivery time will be truncated + * to reduce the memory occupation. If set to k, the redelivery time will be bucketed by 2^k ms, resulting in + * the redelivery time could be earlier(no later) than the expected time no more than 2^k ms. + * @throws Exception if an error occurs + */ + @Test(dataProvider = "negativeAckPrecisionBitCnt") + public void testConfigureNegativeAckPrecisionBitCnt(int negativeAckPrecisionBitCnt) throws Exception { + String topic = BrokerTestUtil.newUniqueName("testConfigureNegativeAckPrecisionBitCnt"); + long timeDeviation = 1L << negativeAckPrecisionBitCnt; + long delayInMs = 2000; + + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("sub1") + .acknowledgmentGroupTime(0, TimeUnit.SECONDS) + .subscriptionType(SubscriptionType.Shared) + .negativeAckRedeliveryDelay(delayInMs, TimeUnit.MILLISECONDS) + .negativeAckRedeliveryDelayPrecision(negativeAckPrecisionBitCnt) + .subscribe(); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .create(); + producer.sendAsync("test-0"); + producer.flush(); + + // receive the message and negative ack + consumer.negativeAcknowledge(consumer.receive()); + long expectedTime = System.currentTimeMillis() + delayInMs; + + // receive the redelivered message and calculate the time deviation + // assert that the redelivery time is no earlier than the `expected time - timeDeviation` + Message msg1 = consumer.receive(); + assertTrue(System.currentTimeMillis() >= expectedTime - timeDeviation); + assertNotNull(msg1); + } } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index 3ce12b7741a8f..ed77652c82340 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -243,6 +243,19 @@ public interface ConsumerBuilder extends Cloneable { */ ConsumerBuilder negativeAckRedeliveryDelay(long redeliveryDelay, TimeUnit timeUnit); + /** + * Sets the redelivery time precision bit count. The lower bits of the redelivery time will be + * trimmed to reduce the memory occupation. The default value is 8, which means the redelivery time + * will be bucketed by 256ms, the redelivery time could be earlier(no later) than the expected time, + * but no more than 256ms. If set to k, the redelivery time will be bucketed by 2^k ms. + * If the value is 0, the redelivery time will be accurate to ms. + * + * @param negativeAckPrecisionBitCnt + * The redelivery time precision bit count. + * @return the consumer builder instance + */ + ConsumerBuilder negativeAckRedeliveryDelayPrecision(int negativeAckPrecisionBitCount); + /** * Select the subscription type to be used when subscribing to a topic. * diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index 35f772028f17a..478f93b56a0d2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -281,6 +281,13 @@ public ConsumerBuilder negativeAckRedeliveryDelay(long redeliveryDelay, TimeU return this; } + @Override + public ConsumerBuilder negativeAckRedeliveryDelayPrecision(int negativeAckPrecisionBitCount) { + checkArgument(negativeAckPrecisionBitCount >= 0, "negativeAckPrecisionBitCount needs to be >= 0"); + conf.setNegativeAckPrecisionBitCnt(negativeAckPrecisionBitCount); + return this; + } + @Override public ConsumerBuilder subscriptionType(@NonNull SubscriptionType subscriptionType) { conf.setSubscriptionType(subscriptionType);