Skip to content

Commit

Permalink
Handle non-partitioned Pulsar topics (#260)
Browse files Browse the repository at this point in the history
Apparently, Pulsar uses a magical -1 value for the partition number when the topic is not partitioned.
  • Loading branch information
bsideup authored Feb 6, 2020
1 parent f99da14 commit ba08641
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
Expand All @@ -24,7 +23,6 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import java.util.stream.Stream;

@RequiredArgsConstructor
Expand Down Expand Up @@ -140,10 +138,9 @@ public Publisher<Stream<? extends PartitionSource>> getPublisher(
) {
return Mono
.defer(() -> Mono.fromCompletionStage(pulsarClient.getPartitionsForTopic(topic)))
.map(List::size)
.mergeWith(Flux.never()) // Never complete
.map(numberOfPartitions -> {
return IntStream.range(0, numberOfPartitions).mapToObj(partition -> new PulsarPartitionSource(
.map(partitions -> {
return partitions.stream().map(TopicName::getPartitionIndex).map(partition -> new PulsarPartitionSource(
topic,
partition,
groupName,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package com.github.bsideup.liiklus.pulsar;

import com.github.bsideup.liiklus.ApplicationRunner;
import com.github.bsideup.liiklus.records.RecordStorageTests;
import com.github.bsideup.liiklus.records.RecordsStorage;
import com.github.bsideup.liiklus.support.DisabledUntil;
import lombok.Getter;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.junit.jupiter.api.Test;
import org.springframework.context.ApplicationContext;
import org.testcontainers.containers.PulsarContainer;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;

abstract class AbstractPulsarRecordsStorageTest implements RecordStorageTests {

static final PulsarContainer pulsar = new PulsarContainer("2.5.0")
.withReuse(true);

private static final ApplicationContext applicationContext;

static {
pulsar.start();

applicationContext = new ApplicationRunner("PULSAR", "MEMORY")
.withProperty("pulsar.serviceUrl", pulsar.getPulsarBrokerUrl())
.run();
}

@Getter
RecordsStorage target = applicationContext.getBean(RecordsStorage.class);

@Getter
String topic = UUID.randomUUID().toString();

@Override
@Test
@DisabledUntil(value = "2020-03-01", comment = "#180 - Pulsar should fix the way seek works, not disconnecting consumers (apache/pulsar/pull/5022)")
public void shouldAlwaysUseEarliestOffsetOnEmptyOffsetsInTheInitialProvider() {
RecordStorageTests.super.shouldAlwaysUseEarliestOffsetOnEmptyOffsetsInTheInitialProvider();
}

@Test
void shouldPreferEventTimeOverPublishTime() throws Exception {
var topic = getTopic();
var eventTimestamp = Instant.now().minusSeconds(1000).truncatedTo(ChronoUnit.MILLIS);

int partition;
try (
var pulsarClient = PulsarClient.builder()
.serviceUrl(pulsar.getPulsarBrokerUrl())
.build()
) {
var messageId = pulsarClient.newProducer()
.topic(topic)
.create()
.newMessage()
.value("hello".getBytes())
.eventTime(eventTimestamp.toEpochMilli())
.send();

partition = ((MessageIdImpl) messageId).getPartitionIndex();
}

var record = subscribeToPartition(partition)
.flatMap(RecordsStorage.PartitionSource::getPublisher)
.blockFirst(Duration.ofSeconds(10));

assertThat(record).satisfies(it -> {
assertThat(it.getTimestamp()).isEqualTo(eventTimestamp);
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.github.bsideup.liiklus.pulsar;

import lombok.SneakyThrows;
import org.apache.pulsar.client.admin.PulsarAdmin;

public class NonPartitionedPulsarRecordsStorageTest extends AbstractPulsarRecordsStorageTest {

@SneakyThrows
public NonPartitionedPulsarRecordsStorageTest() {
PulsarAdmin pulsarAdmin = PulsarAdmin.builder()
.serviceHttpUrl(pulsar.getHttpServiceUrl())
.build();

pulsarAdmin.topics().createNonPartitionedTopic(topic);
}

@Override
public String keyByPartition(int partition) {
return "foo";
}

@Override
public int getNumberOfPartitions() {
return 1;
}
}
Original file line number Diff line number Diff line change
@@ -1,32 +1,17 @@
package com.github.bsideup.liiklus.pulsar;

import com.github.bsideup.liiklus.ApplicationRunner;
import com.github.bsideup.liiklus.records.RecordStorageTests;
import com.github.bsideup.liiklus.records.RecordsStorage;
import com.github.bsideup.liiklus.records.RecordsStorage.PartitionSource;
import com.github.bsideup.liiklus.support.DisabledUntil;
import lombok.Getter;
import lombok.SneakyThrows;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.Murmur3_32Hash;
import org.apache.pulsar.client.util.MathUtils;
import org.junit.jupiter.api.Test;
import org.springframework.context.ApplicationContext;
import org.testcontainers.containers.PulsarContainer;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;

public class PulsarRecordsStorageTest implements RecordStorageTests {
public class PulsarRecordsStorageTest extends AbstractPulsarRecordsStorageTest {

private static final int NUM_OF_PARTITIONS = 4;

Expand All @@ -43,24 +28,6 @@ public class PulsarRecordsStorageTest implements RecordStorageTests {
.filter(it -> it.size() == NUM_OF_PARTITIONS)
.blockFirst(Duration.ofSeconds(10));

private static final PulsarContainer pulsar = new PulsarContainer();

static final ApplicationContext applicationContext;

static {
pulsar.start();

applicationContext = new ApplicationRunner("PULSAR", "MEMORY")
.withProperty("pulsar.serviceUrl", pulsar.getPulsarBrokerUrl())
.run();
}

@Getter
RecordsStorage target = applicationContext.getBean(RecordsStorage.class);

@Getter
String topic = UUID.randomUUID().toString();

@SneakyThrows
public PulsarRecordsStorageTest() {
PulsarAdmin pulsarAdmin = PulsarAdmin.builder()
Expand All @@ -79,43 +46,4 @@ public String keyByPartition(int partition) {
public int getNumberOfPartitions() {
return NUM_OF_PARTITIONS;
}

@Override
@Test
@DisabledUntil(value = "2020-03-01", comment = "#180 - Pulsar should fix the way seek works, not disconnecting consumers (apache/pulsar/pull/5022)")
public void shouldAlwaysUseEarliestOffsetOnEmptyOffsetsInTheInitialProvider() {
RecordStorageTests.super.shouldAlwaysUseEarliestOffsetOnEmptyOffsetsInTheInitialProvider();
}

@Test
void shouldPreferEventTimeOverPublishTime() throws Exception {
var topic = getTopic();
var partition = 0;
var key = keyByPartition(partition);
var eventTimestamp = Instant.now().minusSeconds(1000).truncatedTo(ChronoUnit.MILLIS);

try (
var pulsarClient = PulsarClient.builder()
.serviceUrl(pulsar.getPulsarBrokerUrl())
.build()
) {
pulsarClient.newProducer()
.topic(topic)
.hashingScheme(HashingScheme.Murmur3_32Hash)
.create()
.newMessage()
.key(key)
.value("hello".getBytes())
.eventTime(eventTimestamp.toEpochMilli())
.send();
}

var record = subscribeToPartition(partition)
.flatMap(PartitionSource::getPublisher)
.blockFirst(Duration.ofSeconds(10));

assertThat(record).satisfies(it -> {
assertThat(it.getTimestamp()).isEqualTo(eventTimestamp);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ default void testEndOffsets() throws Exception {
var lastReceivedOffsets = new HashMap<Integer, Long>();
for (int partition = 0; partition < getNumberOfPartitions(); partition++) {
for (int i = 0; i < partition + 1; i++) {
var offset = publish(keyByPartition(partition).getBytes(), new byte[1]).getOffset();
lastReceivedOffsets.put(partition, offset);
var offsetInfo = publish(keyByPartition(partition).getBytes(), new byte[1]);
lastReceivedOffsets.put(offsetInfo.getPartition(), offsetInfo.getOffset());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ var record = createEnvelope("key".getBytes());
assertThat(offsetInfo)
.satisfies(info -> {
assertThat(info.getTopic()).as("topic").isEqualTo(getTopic());
assertThat(info.getPartition()).as("partition").isNotNegative();
assertThat(info.getOffset()).as("offset").isNotNegative();
});

Expand Down

0 comments on commit ba08641

Please sign in to comment.