-
Notifications
You must be signed in to change notification settings - Fork 34
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
expect always earliest offset on empty initial offset for the records #180
Conversation
Tried this: .delaySubscription(
initialOffset.map(PulsarRecordsStorage::fromOffset)
.switchIfEmpty(Mono.justOrEmpty(autoOffsetReset
.map(it -> {
switch (it) {
case "earliest":
return MessageId.earliest;
case "latest":
return MessageId.latest;
default:
return null;
}
})
))
.defaultIfEmpty(MessageId.earliest)
.flatMap(messageId -> {
return Mono.fromCompletionStage(consumer.seekAsync(messageId));
})
); it fixes the test, but fails with |
with this fix, |
@bsideup disabled the test for pulsar, so that PR could be merged on its own |
...ecords-storage/src/test/java/com/github/bsideup/liiklus/pulsar/PulsarRecordsStorageTest.java
Outdated
Show resolved
Hide resolved
...ecords-storage/src/test/java/com/github/bsideup/liiklus/pulsar/PulsarRecordsStorageTest.java
Show resolved
Hide resolved
...ecords-storage/src/test/java/com/github/bsideup/liiklus/pulsar/PulsarRecordsStorageTest.java
Outdated
Show resolved
Hide resolved
...ecords-storage/src/test/java/com/github/bsideup/liiklus/pulsar/PulsarRecordsStorageTest.java
Outdated
Show resolved
Hide resolved
Pulsar by default caches the result of the seek, so in case you do seek, then create new subscription with the same group name without seek and `earliest` subscription type, it will get back to the last seeked offset what is different from the Kafka and should be aligned in the liiklus
Pulsar by default caches the result of the seek, so in case you do seek,
then create new subscription with the same group name without seek and
earliest
subscription type, it will get back to the last seeked offsetwhat is different from the Kafka and should be aligned in the liiklus