Skip to content

Commit

Permalink
Disable group ack for the consumer (#189)
Browse files Browse the repository at this point in the history
We actually don't need ack capabilities here at all, so better
to disable the scheduling of the grouped ack flush to avoid any leaks
and useless objects creation.

Cleanup issues should be fixed here
apache/pulsar#5204
  • Loading branch information
lanwen authored and bsideup committed Sep 18, 2019
1 parent 9a07bbf commit 0ddb3a3
Showing 1 changed file with 2 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Expand Down Expand Up @@ -187,6 +188,7 @@ public Publisher<Record> getPublisher() {
return Flux.usingWhen(
Mono.fromCompletionStage(() -> {
val consumerBuilder = pulsarClient.newConsumer()
.acknowledgmentGroupTime(0, TimeUnit.SECONDS) // we don't ack here at all
.subscriptionName(groupName)
.subscriptionType(SubscriptionType.Failover)
.topic(TopicName.get(topic).getPartition(partition).toString());
Expand Down

0 comments on commit 0ddb3a3

Please sign in to comment.