Skip to content

Commit

Permalink
[improve][broker] Replaced checkBackloggedCursors with checkBacklogge…
Browse files Browse the repository at this point in the history
…dCursor(single subscription check) from subscribe
  • Loading branch information
heesung-sn committed Jan 29, 2023
1 parent 644be5f commit 3cb8c55
Showing 1 changed file with 14 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,9 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
readCompacted, keySharedMeta, startMessageId, consumerEpoch);

return addConsumerToSubscription(subscription, consumer).thenCompose(v -> {
checkBackloggedCursors();
if (subscription instanceof PersistentSubscription persistentSubscription) {
checkBackloggedCursor(persistentSubscription);
}
if (!cnx.isActive()) {
try {
consumer.close();
Expand Down Expand Up @@ -2566,17 +2568,21 @@ public void checkInactiveSubscriptions() {

@Override
public void checkBackloggedCursors() {
// activate caught up cursors which include consumers
subscriptions.forEach((subName, subscription) -> {
if (!subscription.getConsumers().isEmpty()
&& subscription.getCursor().getNumberOfEntries() < backloggedCursorThresholdEntries) {
subscription.getCursor().setActive();
} else {
subscription.getCursor().setInactive();
}
checkBackloggedCursor(subscription);
});
}

private void checkBackloggedCursor(PersistentSubscription subscription) {
// activate caught up cursor which include consumers
if (!subscription.getConsumers().isEmpty()
&& subscription.getCursor().getNumberOfEntries() < backloggedCursorThresholdEntries) {
subscription.getCursor().setActive();
} else {
subscription.getCursor().setInactive();
}
}

public void checkInactiveLedgers() {
ledger.checkInactiveLedgerAndRollOver();
}
Expand Down

0 comments on commit 3cb8c55

Please sign in to comment.