From ba08641890e83454b64d97861354a0ee4b588335 Mon Sep 17 00:00:00 2001 From: Sergei Egorov Date: Thu, 6 Feb 2020 22:06:05 +0100 Subject: [PATCH] Handle non-partitioned Pulsar topics (#260) Apparently, Pulsar uses a magical -1 value for the partition number when the topic is not partitioned. --- .../liiklus/pulsar/PulsarRecordsStorage.java | 7 +- .../AbstractPulsarRecordsStorageTest.java | 79 +++++++++++++++++++ ...onPartitionedPulsarRecordsStorageTest.java | 26 ++++++ .../pulsar/PulsarRecordsStorageTest.java | 74 +---------------- .../liiklus/records/tests/EndOffsetsTest.java | 4 +- .../liiklus/records/tests/PublishTest.java | 1 - 6 files changed, 110 insertions(+), 81 deletions(-) create mode 100644 plugins/pulsar-records-storage/src/test/java/com/github/bsideup/liiklus/pulsar/AbstractPulsarRecordsStorageTest.java create mode 100644 plugins/pulsar-records-storage/src/test/java/com/github/bsideup/liiklus/pulsar/NonPartitionedPulsarRecordsStorageTest.java diff --git a/plugins/pulsar-records-storage/src/main/java/com/github/bsideup/liiklus/pulsar/PulsarRecordsStorage.java b/plugins/pulsar-records-storage/src/main/java/com/github/bsideup/liiklus/pulsar/PulsarRecordsStorage.java index bda63146..7eb97ce7 100644 --- a/plugins/pulsar-records-storage/src/main/java/com/github/bsideup/liiklus/pulsar/PulsarRecordsStorage.java +++ b/plugins/pulsar-records-storage/src/main/java/com/github/bsideup/liiklus/pulsar/PulsarRecordsStorage.java @@ -15,7 +15,6 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.time.Instant; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; @@ -24,7 +23,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; -import java.util.stream.IntStream; import java.util.stream.Stream; @RequiredArgsConstructor @@ -140,10 +138,9 @@ public Publisher> getPublisher( ) { return Mono .defer(() -> Mono.fromCompletionStage(pulsarClient.getPartitionsForTopic(topic))) - .map(List::size) .mergeWith(Flux.never()) // Never complete - .map(numberOfPartitions -> { - return IntStream.range(0, numberOfPartitions).mapToObj(partition -> new PulsarPartitionSource( + .map(partitions -> { + return partitions.stream().map(TopicName::getPartitionIndex).map(partition -> new PulsarPartitionSource( topic, partition, groupName, diff --git a/plugins/pulsar-records-storage/src/test/java/com/github/bsideup/liiklus/pulsar/AbstractPulsarRecordsStorageTest.java b/plugins/pulsar-records-storage/src/test/java/com/github/bsideup/liiklus/pulsar/AbstractPulsarRecordsStorageTest.java new file mode 100644 index 00000000..e8fb2728 --- /dev/null +++ b/plugins/pulsar-records-storage/src/test/java/com/github/bsideup/liiklus/pulsar/AbstractPulsarRecordsStorageTest.java @@ -0,0 +1,79 @@ +package com.github.bsideup.liiklus.pulsar; + +import com.github.bsideup.liiklus.ApplicationRunner; +import com.github.bsideup.liiklus.records.RecordStorageTests; +import com.github.bsideup.liiklus.records.RecordsStorage; +import com.github.bsideup.liiklus.support.DisabledUntil; +import lombok.Getter; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.junit.jupiter.api.Test; +import org.springframework.context.ApplicationContext; +import org.testcontainers.containers.PulsarContainer; + +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + +abstract class AbstractPulsarRecordsStorageTest implements RecordStorageTests { + + static final PulsarContainer pulsar = new PulsarContainer("2.5.0") + .withReuse(true); + + private static final ApplicationContext applicationContext; + + static { + pulsar.start(); + + applicationContext = new ApplicationRunner("PULSAR", "MEMORY") + .withProperty("pulsar.serviceUrl", pulsar.getPulsarBrokerUrl()) + .run(); + } + + @Getter + RecordsStorage target = applicationContext.getBean(RecordsStorage.class); + + @Getter + String topic = UUID.randomUUID().toString(); + + @Override + @Test + @DisabledUntil(value = "2020-03-01", comment = "#180 - Pulsar should fix the way seek works, not disconnecting consumers (apache/pulsar/pull/5022)") + public void shouldAlwaysUseEarliestOffsetOnEmptyOffsetsInTheInitialProvider() { + RecordStorageTests.super.shouldAlwaysUseEarliestOffsetOnEmptyOffsetsInTheInitialProvider(); + } + + @Test + void shouldPreferEventTimeOverPublishTime() throws Exception { + var topic = getTopic(); + var eventTimestamp = Instant.now().minusSeconds(1000).truncatedTo(ChronoUnit.MILLIS); + + int partition; + try ( + var pulsarClient = PulsarClient.builder() + .serviceUrl(pulsar.getPulsarBrokerUrl()) + .build() + ) { + var messageId = pulsarClient.newProducer() + .topic(topic) + .create() + .newMessage() + .value("hello".getBytes()) + .eventTime(eventTimestamp.toEpochMilli()) + .send(); + + partition = ((MessageIdImpl) messageId).getPartitionIndex(); + } + + var record = subscribeToPartition(partition) + .flatMap(RecordsStorage.PartitionSource::getPublisher) + .blockFirst(Duration.ofSeconds(10)); + + assertThat(record).satisfies(it -> { + assertThat(it.getTimestamp()).isEqualTo(eventTimestamp); + }); + } +} diff --git a/plugins/pulsar-records-storage/src/test/java/com/github/bsideup/liiklus/pulsar/NonPartitionedPulsarRecordsStorageTest.java b/plugins/pulsar-records-storage/src/test/java/com/github/bsideup/liiklus/pulsar/NonPartitionedPulsarRecordsStorageTest.java new file mode 100644 index 00000000..1594fb62 --- /dev/null +++ b/plugins/pulsar-records-storage/src/test/java/com/github/bsideup/liiklus/pulsar/NonPartitionedPulsarRecordsStorageTest.java @@ -0,0 +1,26 @@ +package com.github.bsideup.liiklus.pulsar; + +import lombok.SneakyThrows; +import org.apache.pulsar.client.admin.PulsarAdmin; + +public class NonPartitionedPulsarRecordsStorageTest extends AbstractPulsarRecordsStorageTest { + + @SneakyThrows + public NonPartitionedPulsarRecordsStorageTest() { + PulsarAdmin pulsarAdmin = PulsarAdmin.builder() + .serviceHttpUrl(pulsar.getHttpServiceUrl()) + .build(); + + pulsarAdmin.topics().createNonPartitionedTopic(topic); + } + + @Override + public String keyByPartition(int partition) { + return "foo"; + } + + @Override + public int getNumberOfPartitions() { + return 1; + } +} \ No newline at end of file diff --git a/plugins/pulsar-records-storage/src/test/java/com/github/bsideup/liiklus/pulsar/PulsarRecordsStorageTest.java b/plugins/pulsar-records-storage/src/test/java/com/github/bsideup/liiklus/pulsar/PulsarRecordsStorageTest.java index 883444b5..261e6abd 100644 --- a/plugins/pulsar-records-storage/src/test/java/com/github/bsideup/liiklus/pulsar/PulsarRecordsStorageTest.java +++ b/plugins/pulsar-records-storage/src/test/java/com/github/bsideup/liiklus/pulsar/PulsarRecordsStorageTest.java @@ -1,32 +1,17 @@ package com.github.bsideup.liiklus.pulsar; -import com.github.bsideup.liiklus.ApplicationRunner; -import com.github.bsideup.liiklus.records.RecordStorageTests; -import com.github.bsideup.liiklus.records.RecordsStorage; -import com.github.bsideup.liiklus.records.RecordsStorage.PartitionSource; -import com.github.bsideup.liiklus.support.DisabledUntil; -import lombok.Getter; import lombok.SneakyThrows; import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.api.HashingScheme; -import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.Murmur3_32Hash; import org.apache.pulsar.client.util.MathUtils; -import org.junit.jupiter.api.Test; -import org.springframework.context.ApplicationContext; -import org.testcontainers.containers.PulsarContainer; import reactor.core.publisher.Mono; import java.time.Duration; -import java.time.Instant; -import java.time.temporal.ChronoUnit; import java.util.HashMap; import java.util.Map; import java.util.UUID; -import static org.assertj.core.api.Assertions.assertThat; - -public class PulsarRecordsStorageTest implements RecordStorageTests { +public class PulsarRecordsStorageTest extends AbstractPulsarRecordsStorageTest { private static final int NUM_OF_PARTITIONS = 4; @@ -43,24 +28,6 @@ public class PulsarRecordsStorageTest implements RecordStorageTests { .filter(it -> it.size() == NUM_OF_PARTITIONS) .blockFirst(Duration.ofSeconds(10)); - private static final PulsarContainer pulsar = new PulsarContainer(); - - static final ApplicationContext applicationContext; - - static { - pulsar.start(); - - applicationContext = new ApplicationRunner("PULSAR", "MEMORY") - .withProperty("pulsar.serviceUrl", pulsar.getPulsarBrokerUrl()) - .run(); - } - - @Getter - RecordsStorage target = applicationContext.getBean(RecordsStorage.class); - - @Getter - String topic = UUID.randomUUID().toString(); - @SneakyThrows public PulsarRecordsStorageTest() { PulsarAdmin pulsarAdmin = PulsarAdmin.builder() @@ -79,43 +46,4 @@ public String keyByPartition(int partition) { public int getNumberOfPartitions() { return NUM_OF_PARTITIONS; } - - @Override - @Test - @DisabledUntil(value = "2020-03-01", comment = "#180 - Pulsar should fix the way seek works, not disconnecting consumers (apache/pulsar/pull/5022)") - public void shouldAlwaysUseEarliestOffsetOnEmptyOffsetsInTheInitialProvider() { - RecordStorageTests.super.shouldAlwaysUseEarliestOffsetOnEmptyOffsetsInTheInitialProvider(); - } - - @Test - void shouldPreferEventTimeOverPublishTime() throws Exception { - var topic = getTopic(); - var partition = 0; - var key = keyByPartition(partition); - var eventTimestamp = Instant.now().minusSeconds(1000).truncatedTo(ChronoUnit.MILLIS); - - try ( - var pulsarClient = PulsarClient.builder() - .serviceUrl(pulsar.getPulsarBrokerUrl()) - .build() - ) { - pulsarClient.newProducer() - .topic(topic) - .hashingScheme(HashingScheme.Murmur3_32Hash) - .create() - .newMessage() - .key(key) - .value("hello".getBytes()) - .eventTime(eventTimestamp.toEpochMilli()) - .send(); - } - - var record = subscribeToPartition(partition) - .flatMap(PartitionSource::getPublisher) - .blockFirst(Duration.ofSeconds(10)); - - assertThat(record).satisfies(it -> { - assertThat(it.getTimestamp()).isEqualTo(eventTimestamp); - }); - } } \ No newline at end of file diff --git a/tck/src/main/java/com/github/bsideup/liiklus/records/tests/EndOffsetsTest.java b/tck/src/main/java/com/github/bsideup/liiklus/records/tests/EndOffsetsTest.java index aaa58fbd..6ecb83e0 100644 --- a/tck/src/main/java/com/github/bsideup/liiklus/records/tests/EndOffsetsTest.java +++ b/tck/src/main/java/com/github/bsideup/liiklus/records/tests/EndOffsetsTest.java @@ -38,8 +38,8 @@ default void testEndOffsets() throws Exception { var lastReceivedOffsets = new HashMap(); for (int partition = 0; partition < getNumberOfPartitions(); partition++) { for (int i = 0; i < partition + 1; i++) { - var offset = publish(keyByPartition(partition).getBytes(), new byte[1]).getOffset(); - lastReceivedOffsets.put(partition, offset); + var offsetInfo = publish(keyByPartition(partition).getBytes(), new byte[1]); + lastReceivedOffsets.put(offsetInfo.getPartition(), offsetInfo.getOffset()); } } diff --git a/tck/src/main/java/com/github/bsideup/liiklus/records/tests/PublishTest.java b/tck/src/main/java/com/github/bsideup/liiklus/records/tests/PublishTest.java index d9c0049b..4f54f8f9 100644 --- a/tck/src/main/java/com/github/bsideup/liiklus/records/tests/PublishTest.java +++ b/tck/src/main/java/com/github/bsideup/liiklus/records/tests/PublishTest.java @@ -19,7 +19,6 @@ var record = createEnvelope("key".getBytes()); assertThat(offsetInfo) .satisfies(info -> { assertThat(info.getTopic()).as("topic").isEqualTo(getTopic()); - assertThat(info.getPartition()).as("partition").isNotNegative(); assertThat(info.getOffset()).as("offset").isNotNegative(); });