Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix pulsar intrinsic bugs #176

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,23 @@
package com.github.bsideup.liiklus.pulsar;

import com.github.bsideup.liiklus.records.FiniteRecordsStorage;
import lombok.*;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import lombok.Value;
import lombok.experimental.FieldDefaults;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImplAccessor;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
Expand All @@ -14,11 +27,13 @@

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -27,6 +42,7 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;

@Slf4j
@RequiredArgsConstructor
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
public class PulsarRecordsStorage implements FiniteRecordsStorage {
Expand All @@ -43,6 +59,14 @@ public static long toOffset(MessageId messageId) {
return (msgId.getLedgerId() << 28) | msgId.getEntryId();
}

// For closed ledger id, pulsar have a different treatment for entry id. It have to be lowered by 1.
// https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L2729
// https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L2744
private static MessageId adaptForSeek(MessageId messageId) {
MessageIdImpl id = (MessageIdImpl) messageId;
return new MessageIdImpl(id.getLedgerId(), Math.max(0, id.getEntryId() - 1), id.getPartitionIndex());
}

private static Instant extractTime(Message<byte[]> message) {
// event time does not always exist
if (message.getEventTime() == 0) {
Expand Down Expand Up @@ -181,16 +205,51 @@ private class PulsarPartitionSource implements PartitionSource {

Optional<String> autoOffsetReset;

Mono<Long> initialOffset;
Mono<Long> lastOffset;

@Override
public Publisher<Record> getPublisher() {
return Flux.usingWhen(
Mono.fromCompletionStage(() -> {
return Flux.usingWhen(createConsumer(), this::consumeMessage, this::cleanConsumer)
.doOnSubscribe(__ -> log.debug(
"subscription {}, topic {}, partition {} subscribed",
groupName, topic, partition
))
.doOnComplete(() -> log.debug(
"subscription {}, topic {}, partition {} completed",
groupName, topic, partition
))
.onErrorMap(CompletionException.class, Throwable::getCause)
.doOnError(
e -> !(e instanceof PulsarClientException.ConsumerBusyException),
e -> log.error(
"subscription {}, topic {}, partition {} failed",
groupName, topic, partition, e
)
)
.doOnError(
e -> e instanceof PulsarClientException.ConsumerBusyException,
e -> log.trace(
"subscription {}, topic {}, partition {} already connected",
groupName, topic, partition
)
)
// retry for connecting if the other exclusive pulsar consumer died
// also need to retry as resetting subscription offset disconnect all pulsar consumers of a group
// this should also be taken care of liiklus transparently for the liiklus consumer
.retryBackoff(Long.MAX_VALUE, Duration.ofSeconds(1), Duration.ofSeconds(30));
}

private Mono<Consumer<byte[]>> createConsumer() {
return Mono
.fromCompletionStage(() -> {
val consumerBuilder = pulsarClient.newConsumer()
.acknowledgmentGroupTime(0, TimeUnit.SECONDS) // we don't ack here at all
.subscriptionName(groupName)
.subscriptionType(SubscriptionType.Failover)
// failover subscription type does not failover properly
// in case it works, unacked messaged will be re-delivered to other consumer
// the only model which is compatible with liiklus is exclusive
// which is similar to the reader interface of pulsar
.subscriptionType(SubscriptionType.Exclusive)
.topic(TopicName.get(topic).getPartition(partition).toString());

autoOffsetReset
Expand All @@ -207,33 +266,106 @@ public Publisher<Record> getPublisher() {
.ifPresent(consumerBuilder::subscriptionInitialPosition);

return consumerBuilder.subscribeAsync();
}),
consumer -> {
return Mono
.fromCompletionStage(consumer::receiveAsync)
.repeat()
.onErrorResume(AlreadyClosedException.class, __ -> Mono.empty())
.map(message -> {
var key = message.getKey();
return new Record(
new Envelope(
topic,
key != null ? ByteBuffer.wrap(key.getBytes()) : null,
ByteBuffer.wrap(message.getValue())
),
extractTime(message),
partition,
toOffset(message.getMessageId())
);
})
.delaySubscription(
initialOffset.flatMap(offset -> {
return Mono.fromCompletionStage(consumer.seekAsync(fromOffset(offset)));
})
);
},
consumer -> Mono.fromCompletionStage(consumer.closeAsync())
})
.doOnNext(__ -> log.debug(
"subscription {}, topic {}, partition {} consumer created",
groupName, topic, partition
))
.onErrorMap(CompletionException.class, Throwable::getCause)
.doOnError(
e -> !(e instanceof PulsarClientException.ConsumerBusyException),
e -> log.error(
"subscription {}, topic {}, partition {} failed to create consumer",
groupName, topic, partition, e
)
)
.doOnError(
e -> e instanceof PulsarClientException.ConsumerBusyException,
e -> log.trace(
"subscription {}, topic {}, partition {} already connected",
groupName, topic, partition
)
);
}

private Flux<Record> consumeMessage(Consumer<byte[]> consumer) {
return Mono
.fromCompletionStage(consumer::receiveAsync)
.repeat()
.map(message -> {
var key = message.getKey();
return new Record(
new Envelope(
topic,
key != null ? ByteBuffer.wrap(key.getBytes()) : null,
ByteBuffer.wrap(message.getValue())
),
extractTime(message),
partition,
toOffset(message.getMessageId())
);
})
.delaySubscription(resetSubscriptionOffset(consumer));
}

private Mono<Void> resetSubscriptionOffset(Consumer<byte[]> consumer) {
return lastOffset
.map(PulsarRecordsStorage::fromOffset)
.cast(MessageIdImpl.class)
.flatMap(messageId -> convertToOffset(consumer, messageId))
.switchIfEmpty(initialOffset())
.flatMap(offset -> Mono.fromCompletionStage(consumer.seekAsync(offset))
.doOnSuccess(__ -> log.debug(
"succeeded to seek offset {} at consumer {} for {} {}",
offset, groupName, topic, partition
))
.doOnError(e -> log.error(
"failed to seek offset {} at consumer {} for {} {}",
offset, groupName, topic, partition, e
))
);
}

private Mono<MessageId> convertToOffset(Consumer<byte[]> consumer, MessageIdImpl messageId) {
return Mono.fromCompletionStage(() -> ConsumerImplAccessor.getLastMessageIdAsync(consumer))
.map(last -> {
var msg = (MessageIdImpl) last;

if (msg.getLedgerId() == messageId.getLedgerId()) {
return messageId;
}

return adaptForSeek(messageId);
});
}

private Mono<MessageId> initialOffset() {
return Mono.fromSupplier(() -> autoOffsetReset
.map(it -> {
switch (it) {
case "earliest":
return MessageId.earliest;
case "latest":
return MessageId.latest;
default:
return null;
}
})
.orElse(null)
);
}

private Mono<Void> cleanConsumer(Consumer<byte[]> consumer) {
return Mono.fromCompletionStage(consumer::closeAsync)
.doOnSuccess(__ -> log.debug(
"subscription {}, topic {}, partition {} cleanup succeed",
groupName, topic, partition
))
.doOnError(e -> log.debug(
"subscription {}, topic {}, partition {} cleanup failed",
groupName, topic, partition, e
));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ void shouldPreferEventTimeOverPublishTime() throws Exception {
var key = keyByPartition(partition);
var eventTimestamp = Instant.now().minusSeconds(1000).truncatedTo(ChronoUnit.MILLIS);

try(
try (
var pulsarClient = PulsarClient.builder()
.serviceUrl(pulsar.getPulsarBrokerUrl())
.build()
Expand Down