From 0ddb3a307378dfb1f69c8d48ad586d8a30d4e967 Mon Sep 17 00:00:00 2001 From: Kirill Merkushev Date: Wed, 18 Sep 2019 07:09:41 +0200 Subject: [PATCH] Disable group ack for the consumer (#189) We actually don't need ack capabilities here at all, so better to disable the scheduling of the grouped ack flush to avoid any leaks and useless objects creation. Cleanup issues should be fixed here https://github.com/apache/pulsar/pull/5204 --- .../com/github/bsideup/liiklus/pulsar/PulsarRecordsStorage.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/plugins/pulsar-records-storage/src/main/java/com/github/bsideup/liiklus/pulsar/PulsarRecordsStorage.java b/plugins/pulsar-records-storage/src/main/java/com/github/bsideup/liiklus/pulsar/PulsarRecordsStorage.java index 5e184a92..86313267 100644 --- a/plugins/pulsar-records-storage/src/main/java/com/github/bsideup/liiklus/pulsar/PulsarRecordsStorage.java +++ b/plugins/pulsar-records-storage/src/main/java/com/github/bsideup/liiklus/pulsar/PulsarRecordsStorage.java @@ -22,6 +22,7 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -187,6 +188,7 @@ public Publisher getPublisher() { return Flux.usingWhen( Mono.fromCompletionStage(() -> { val consumerBuilder = pulsarClient.newConsumer() + .acknowledgmentGroupTime(0, TimeUnit.SECONDS) // we don't ack here at all .subscriptionName(groupName) .subscriptionType(SubscriptionType.Failover) .topic(TopicName.get(topic).getPartition(partition).toString());