Skip to content

Commit

Permalink
Revert "[fix] [broker] Prevent long deduplication cursor backlog so t…
Browse files Browse the repository at this point in the history
…hat topic loading wouldn't timeout (#22479)"

This reverts commit 7cec82f.
  • Loading branch information
poorbarcode committed Apr 15, 2024
1 parent 7cec82f commit f91b519
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -576,10 +576,8 @@ protected void startStatsUpdater(int statsUpdateInitialDelayInSecs, int statsUpd
}

protected void startDeduplicationSnapshotMonitor() {
// We do not know whether users will enable deduplication on namespace level/topic level or not, so keep this
// scheduled task runs.
int interval = pulsar().getConfiguration().getBrokerDeduplicationSnapshotFrequencyInSeconds();
if (interval > 0) {
if (interval > 0 && pulsar().getConfiguration().isBrokerDeduplicationEnabled()) {
this.deduplicationSnapshotMonitor = OrderedScheduler.newSchedulerBuilder()
.name("deduplication-snapshot-monitor")
.numThreads(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,9 @@ private CompletableFuture<Void> recoverSequenceIdsMap() {

// Replay all the entries and apply all the sequence ids updates
log.info("[{}] Replaying {} entries for deduplication", topic.getName(), managedCursor.getNumberOfEntries());
CompletableFuture<Position> future = new CompletableFuture<>();
CompletableFuture<Void> future = new CompletableFuture<>();
replayCursor(future);
return future.thenAccept(lastPosition -> {
if (lastPosition != null && snapshotCounter >= snapshotInterval) {
snapshotCounter = 0;
takeSnapshot(lastPosition);
}
});
return future;
}

/**
Expand All @@ -173,11 +168,11 @@ private CompletableFuture<Void> recoverSequenceIdsMap() {
*
* @param future future to trigger when the replay is complete
*/
private void replayCursor(CompletableFuture<Position> future) {
private void replayCursor(CompletableFuture<Void> future) {
managedCursor.asyncReadEntries(100, new ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
Position lastPosition = null;

for (Entry entry : entries) {
ByteBuf messageMetadataAndPayload = entry.getDataBuffer();
MessageMetadata md = Commands.parseMessageMetadata(messageMetadataAndPayload);
Expand All @@ -187,8 +182,7 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {
highestSequencedPushed.put(producerName, sequenceId);
highestSequencedPersisted.put(producerName, sequenceId);
producerRemoved(producerName);
snapshotCounter++;
lastPosition = entry.getPosition();

entry.release();
}

Expand All @@ -197,7 +191,7 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {
pulsar.getExecutor().execute(() -> replayCursor(future));
} else {
// Done replaying
future.complete(lastPosition);
future.complete(null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
private volatile List<String> shadowTopics;
private final TopicName shadowSourceTopic;

public static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup";
static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup";

public static boolean isDedupCursorName(String name) {
return DEDUPLICATION_CURSOR_NAME.equals(name);
Expand Down

This file was deleted.

0 comments on commit f91b519

Please sign in to comment.