From 36ad0c46149c93424e0ef3a0d8f36456240d6295 Mon Sep 17 00:00:00 2001 From: Kirill Merkushev Date: Tue, 3 Sep 2019 17:07:04 +0200 Subject: [PATCH 1/4] expect always earliest offset on empty initial offset for the records Pulsar by default caches the result of the seek, so in case you do seek, then create new subscription with the same group name without seek and `earliest` subscription type, it will get back to the last seeked offset what is different from the Kafka and should be aligned in the liiklus --- .../records/tests/ConsumerGroupTest.java | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/tck/src/main/java/com/github/bsideup/liiklus/records/tests/ConsumerGroupTest.java b/tck/src/main/java/com/github/bsideup/liiklus/records/tests/ConsumerGroupTest.java index 2e918088..71835b80 100644 --- a/tck/src/main/java/com/github/bsideup/liiklus/records/tests/ConsumerGroupTest.java +++ b/tck/src/main/java/com/github/bsideup/liiklus/records/tests/ConsumerGroupTest.java @@ -1,6 +1,7 @@ package com.github.bsideup.liiklus.records.tests; import com.github.bsideup.liiklus.records.RecordStorageTestSupport; +import com.github.bsideup.liiklus.records.RecordsStorage; import com.github.bsideup.liiklus.records.RecordsStorage.OffsetInfo; import com.github.bsideup.liiklus.records.RecordsStorage.PartitionSource; import com.github.bsideup.liiklus.records.RecordsStorage.Subscription; @@ -12,16 +13,20 @@ import reactor.util.function.Tuple2; import reactor.util.function.Tuples; +import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -170,4 +175,43 @@ default void testExclusiveRecordDistribution() throws Exception { disposeAll.onNext(true); } } + + + @Test + default void shouldAlwaysUseEarliestOffsetOnEmptyOffsetsInTheInitialProvider() { + String groupName = "name"; + var earliest = "earliest"; + int count = 10; + int offsetShift = 5; + + var published = publishMany(UUID.randomUUID().toString().getBytes(), count); + var latest = published.get(published.size() - 1); + + RecordsStorage.Record latestRecord = subscribeToPartitionWithGroup(groupName, latest.getPartition(), earliest, () -> CompletableFuture.completedStage(Map.of(latest.getPartition(), latest.getOffset() - offsetShift))) + .flatMap(PartitionSource::getPublisher) + .takeUntil(next -> next.getOffset() == latest.getOffset()) + .blockLast(Duration.ofSeconds(10)); + + assertThat(latestRecord.getOffset()).describedAs("latest offset").isEqualTo(latest.getOffset()); + + List records = subscribeToPartitionWithGroup(groupName, latest.getPartition(), earliest, () -> CompletableFuture.completedStage(Map.of())) + .flatMap(PartitionSource::getPublisher) + .takeUntil(next -> next.getOffset() == latest.getOffset()) + .collectList() + .block(Duration.ofSeconds(10)); + + assertThat(records).hasSize(count); + } + + + default Flux subscribeToPartitionWithGroup( + String groupName, + int partition, + String offsetReset, + Supplier>> offsetsProvider + ) { + return Flux.from(getTarget().subscribe(getTopic(), groupName, Optional.of(offsetReset)).getPublisher(offsetsProvider)) + .flatMapIterable(it -> it::iterator) + .filter(it -> partition == it.getPartition()); + } } From de869ea129a2331b81004a66c1b4f494967504f3 Mon Sep 17 00:00:00 2001 From: Kirill Merkushev Date: Tue, 8 Oct 2019 12:19:50 +0200 Subject: [PATCH 2/4] Disable earliest offset test from tck for the pulsar records storage --- .../bsideup/liiklus/pulsar/PulsarRecordsStorageTest.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/plugins/pulsar-records-storage/src/test/java/com/github/bsideup/liiklus/pulsar/PulsarRecordsStorageTest.java b/plugins/pulsar-records-storage/src/test/java/com/github/bsideup/liiklus/pulsar/PulsarRecordsStorageTest.java index 1f79f7d6..1e534c88 100644 --- a/plugins/pulsar-records-storage/src/test/java/com/github/bsideup/liiklus/pulsar/PulsarRecordsStorageTest.java +++ b/plugins/pulsar-records-storage/src/test/java/com/github/bsideup/liiklus/pulsar/PulsarRecordsStorageTest.java @@ -11,6 +11,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.Murmur3_32Hash; import org.apache.pulsar.client.util.MathUtils; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.springframework.context.ApplicationContext; import org.testcontainers.containers.PulsarContainer; @@ -79,6 +80,12 @@ public int getNumberOfPartitions() { return NUM_OF_PARTITIONS; } + @Override + @Test + @Disabled("#180 - Pulsar should fix the way seek works, not disconnecting consumers (apache/pulsar/pull/5022)") + public void shouldAlwaysUseEarliestOffsetOnEmptyOffsetsInTheInitialProvider() { + } + @Test void shouldPreferEventTimeOverPublishTime() throws Exception { var topic = getTopic(); From 1e29ee466dced76ba30524f637468321c24294bd Mon Sep 17 00:00:00 2001 From: Kirill Merkushev Date: Mon, 2 Dec 2019 23:26:31 +0100 Subject: [PATCH 3/4] disable the test only until the Jan 1 2020 --- .../pulsar/PulsarRecordsStorageTest.java | 48 +++++++++++++++++-- 1 file changed, 45 insertions(+), 3 deletions(-) diff --git a/plugins/pulsar-records-storage/src/test/java/com/github/bsideup/liiklus/pulsar/PulsarRecordsStorageTest.java b/plugins/pulsar-records-storage/src/test/java/com/github/bsideup/liiklus/pulsar/PulsarRecordsStorageTest.java index 1e534c88..6e733a42 100644 --- a/plugins/pulsar-records-storage/src/test/java/com/github/bsideup/liiklus/pulsar/PulsarRecordsStorageTest.java +++ b/plugins/pulsar-records-storage/src/test/java/com/github/bsideup/liiklus/pulsar/PulsarRecordsStorageTest.java @@ -11,20 +11,30 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.Murmur3_32Hash; import org.apache.pulsar.client.util.MathUtils; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ConditionEvaluationResult; +import org.junit.jupiter.api.extension.ExecutionCondition; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.ExtensionContext; import org.springframework.context.ApplicationContext; import org.testcontainers.containers.PulsarContainer; import reactor.core.publisher.Mono; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; import java.time.Duration; import java.time.Instant; +import java.time.LocalDate; import java.time.temporal.ChronoUnit; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.UUID; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.platform.commons.support.AnnotationSupport.findAnnotation; public class PulsarRecordsStorageTest implements RecordStorageTests { @@ -82,7 +92,7 @@ public int getNumberOfPartitions() { @Override @Test - @Disabled("#180 - Pulsar should fix the way seek works, not disconnecting consumers (apache/pulsar/pull/5022)") + @DisabledUntil(value = "2020-01-01", comment = "#180 - Pulsar should fix the way seek works, not disconnecting consumers (apache/pulsar/pull/5022)") public void shouldAlwaysUseEarliestOffsetOnEmptyOffsetsInTheInitialProvider() { } @@ -93,7 +103,7 @@ void shouldPreferEventTimeOverPublishTime() throws Exception { var key = keyByPartition(partition); var eventTimestamp = Instant.now().minusSeconds(1000).truncatedTo(ChronoUnit.MILLIS); - try( + try ( var pulsarClient = PulsarClient.builder() .serviceUrl(pulsar.getPulsarBrokerUrl()) .build() @@ -117,4 +127,36 @@ var record = subscribeToPartition(partition) assertThat(it.getTimestamp()).isEqualTo(eventTimestamp); }); } + + + @Target({ElementType.TYPE, ElementType.METHOD}) + @Retention(RetentionPolicy.RUNTIME) + @ExtendWith(DisabledUntil.DisabledCondition.class) + public @interface DisabledUntil { + String value(); + + String comment(); + + class DisabledCondition implements ExecutionCondition { + + @Override + public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) { + Optional until = findAnnotation(context.getElement(), DisabledUntil.class); + if (until.isPresent() + && until.map(DisabledUntil::value) + .map(date -> LocalDate.parse(date).isAfter(LocalDate.now())) + .orElse(false) + ) { + String reason = until.map(DisabledUntil::comment).orElse("Disabled for now"); + return ConditionEvaluationResult.disabled(reason); + } + + return ConditionEvaluationResult.enabled("Enabled"); + } + + } + + } + + } \ No newline at end of file From 2fbf59b57a2b5d66388301298a964a89fa468f5e Mon Sep 17 00:00:00 2001 From: Kirill Merkushev Date: Tue, 3 Dec 2019 11:03:29 +0100 Subject: [PATCH 4/4] Move annotation to tck --- .../pulsar/PulsarRecordsStorageTest.java | 45 +------------------ .../liiklus/support/DisabledUntil.java | 42 +++++++++++++++++ 2 files changed, 44 insertions(+), 43 deletions(-) create mode 100644 tck/src/main/java/com/github/bsideup/liiklus/support/DisabledUntil.java diff --git a/plugins/pulsar-records-storage/src/test/java/com/github/bsideup/liiklus/pulsar/PulsarRecordsStorageTest.java b/plugins/pulsar-records-storage/src/test/java/com/github/bsideup/liiklus/pulsar/PulsarRecordsStorageTest.java index 6e733a42..c276c05b 100644 --- a/plugins/pulsar-records-storage/src/test/java/com/github/bsideup/liiklus/pulsar/PulsarRecordsStorageTest.java +++ b/plugins/pulsar-records-storage/src/test/java/com/github/bsideup/liiklus/pulsar/PulsarRecordsStorageTest.java @@ -4,6 +4,7 @@ import com.github.bsideup.liiklus.records.RecordStorageTests; import com.github.bsideup.liiklus.records.RecordsStorage; import com.github.bsideup.liiklus.records.RecordsStorage.PartitionSource; +import com.github.bsideup.liiklus.support.DisabledUntil; import lombok.Getter; import lombok.SneakyThrows; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -12,29 +13,18 @@ import org.apache.pulsar.client.impl.Murmur3_32Hash; import org.apache.pulsar.client.util.MathUtils; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ConditionEvaluationResult; -import org.junit.jupiter.api.extension.ExecutionCondition; -import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.api.extension.ExtensionContext; import org.springframework.context.ApplicationContext; import org.testcontainers.containers.PulsarContainer; import reactor.core.publisher.Mono; -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; import java.time.Duration; import java.time.Instant; -import java.time.LocalDate; import java.time.temporal.ChronoUnit; import java.util.HashMap; import java.util.Map; -import java.util.Optional; import java.util.UUID; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.platform.commons.support.AnnotationSupport.findAnnotation; public class PulsarRecordsStorageTest implements RecordStorageTests { @@ -94,6 +84,7 @@ public int getNumberOfPartitions() { @Test @DisabledUntil(value = "2020-01-01", comment = "#180 - Pulsar should fix the way seek works, not disconnecting consumers (apache/pulsar/pull/5022)") public void shouldAlwaysUseEarliestOffsetOnEmptyOffsetsInTheInitialProvider() { + RecordStorageTests.super.shouldAlwaysUseEarliestOffsetOnEmptyOffsetsInTheInitialProvider(); } @Test @@ -127,36 +118,4 @@ var record = subscribeToPartition(partition) assertThat(it.getTimestamp()).isEqualTo(eventTimestamp); }); } - - - @Target({ElementType.TYPE, ElementType.METHOD}) - @Retention(RetentionPolicy.RUNTIME) - @ExtendWith(DisabledUntil.DisabledCondition.class) - public @interface DisabledUntil { - String value(); - - String comment(); - - class DisabledCondition implements ExecutionCondition { - - @Override - public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) { - Optional until = findAnnotation(context.getElement(), DisabledUntil.class); - if (until.isPresent() - && until.map(DisabledUntil::value) - .map(date -> LocalDate.parse(date).isAfter(LocalDate.now())) - .orElse(false) - ) { - String reason = until.map(DisabledUntil::comment).orElse("Disabled for now"); - return ConditionEvaluationResult.disabled(reason); - } - - return ConditionEvaluationResult.enabled("Enabled"); - } - - } - - } - - } \ No newline at end of file diff --git a/tck/src/main/java/com/github/bsideup/liiklus/support/DisabledUntil.java b/tck/src/main/java/com/github/bsideup/liiklus/support/DisabledUntil.java new file mode 100644 index 00000000..7888d6d1 --- /dev/null +++ b/tck/src/main/java/com/github/bsideup/liiklus/support/DisabledUntil.java @@ -0,0 +1,42 @@ +package com.github.bsideup.liiklus.support; + +import org.junit.jupiter.api.extension.ConditionEvaluationResult; +import org.junit.jupiter.api.extension.ExecutionCondition; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.ExtensionContext; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.time.LocalDate; +import java.util.Optional; + +import static org.junit.platform.commons.support.AnnotationSupport.findAnnotation; + + +@Target({ElementType.TYPE, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@ExtendWith(DisabledUntil.DisabledCondition.class) +public @interface DisabledUntil { + /** + * Local date, ISO formatted string such as 2000-01-30 + */ + String value(); + + String comment(); + + class DisabledCondition implements ExecutionCondition { + + @Override + public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) { + Optional until = findAnnotation(context.getElement(), DisabledUntil.class); + if (until.map(it -> LocalDate.parse(it.value()).isAfter(LocalDate.now())).orElse(false)) { + String reason = until.map(DisabledUntil::comment).orElse("Disabled for now"); + return ConditionEvaluationResult.disabled(reason); + } + + return ConditionEvaluationResult.enabled("Enabled"); + } + } +} \ No newline at end of file