Skip to content

Commit

Permalink
Make PulsarRecordsStorage implement FiniteRecordsStorage (#164)
Browse files Browse the repository at this point in the history
* WIP: implement `FiniteRecordsStorage` interface in Pulsar

* Make `PulsarRecordsStorage` implement `FiniteRecordsStorage`
  • Loading branch information
bsideup authored Aug 6, 2019
1 parent 3391973 commit a5534a6
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package com.github.bsideup.liiklus.pulsar;

import com.github.bsideup.liiklus.records.RecordsStorage;
import com.github.bsideup.liiklus.records.FiniteRecordsStorage;
import lombok.*;
import lombok.experimental.FieldDefaults;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
import org.apache.pulsar.client.impl.ConsumerImplAccessor;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.reactivestreams.Publisher;
Expand All @@ -17,6 +18,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -26,7 +28,7 @@

@RequiredArgsConstructor
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
public class PulsarRecordsStorage implements RecordsStorage {
public class PulsarRecordsStorage implements FiniteRecordsStorage {

public static MessageId fromOffset(long offset) {
return new MessageIdImpl(offset >>> 28, offset & 0x0F_FF_FF_FFL, -1);
Expand Down Expand Up @@ -83,6 +85,35 @@ public Subscription subscribe(String topic, String groupName, Optional<String> a
return new PulsarSubscription(topic, groupName, autoOffsetReset);
}

@Override
public CompletionStage<Map<Integer, Long>> getEndOffsets(String topic) {
return Mono
.fromCompletionStage(() -> pulsarClient.getPartitionsForTopic(topic))
.flatMapIterable(it -> it)
.flatMap(partitionTopic -> {
var partitionIndex = TopicName.getPartitionIndex(partitionTopic);

var consumerFuture = pulsarClient.newConsumer()
.subscriptionName(UUID.randomUUID().toString())
.subscriptionType(SubscriptionType.Failover)
.topic(partitionTopic)
.subscribeAsync();

return Mono.usingWhen(
Mono.fromCompletionStage(() -> consumerFuture),
consumer -> {
return Mono
.fromCompletionStage(() -> ConsumerImplAccessor.getLastMessageIdAsync(consumer))
.map(messageId -> Map.entry(partitionIndex, toOffset(messageId)));
},
consumer -> Mono.fromCompletionStage(consumer.closeAsync()),
consumer -> Mono.fromCompletionStage(consumer.closeAsync())
);
})
.collectMap(Map.Entry::getKey, Map.Entry::getValue)
.toFuture();
}

@Value
private class PulsarSubscription implements Subscription {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.apache.pulsar.client.impl;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;

import java.util.concurrent.CompletableFuture;

public class ConsumerImplAccessor {

public static CompletableFuture<MessageId> getLastMessageIdAsync(Consumer<?> consumer) {
return ((ConsumerImpl<?>) consumer).getLastMessageIdAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.junit.jupiter.api.TestInfo;

import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

Expand All @@ -34,21 +35,17 @@ default void blah(TestInfo testInfo) {
default void testEndOffsets() throws Exception {
var topic = getTopic();

var lastReceivedOffsets = new HashMap<Integer, Long>();
for (int partition = 0; partition < getNumberOfPartitions(); partition++) {
for (int i = 0; i < partition + 1; i++) {
publish(keyByPartition(partition).getBytes(), new byte[1]);
var offset = publish(keyByPartition(partition).getBytes(), new byte[1]).getOffset();
lastReceivedOffsets.put(partition, offset);
}
}

var offsets = getFiniteTarget().getEndOffsets(topic).toCompletableFuture().get(10, TimeUnit.SECONDS);

assertThat(offsets)
.hasSize(getNumberOfPartitions())
.allSatisfy((partition, offset) -> {
assertThat(offset)
.as("offset of p" + partition)
.isEqualTo(partition.longValue());
});
assertThat(offsets).isEqualTo(lastReceivedOffsets);
}

@Test
Expand Down

0 comments on commit a5534a6

Please sign in to comment.