diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index dbeee2fd37329f..abb4450a2d6b8a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -3283,4 +3283,12 @@ public void setState(State state) { public ManagedLedgerConfig getConfig() { return config; } + + /** + * check cursor reset status + * @return true if the cursor reset in progress + */ + public boolean resetCursorInProgress() { + return RESET_CURSOR_IN_PROGRESS_UPDATER.get(this) == TRUE; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index a0af118f6e677d..22bfa8fccbda21 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -34,6 +34,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException; import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.util.SafeRun; import org.apache.commons.lang3.tuple.Pair; @@ -278,6 +279,11 @@ private synchronized void internalConsumerFlow(Consumer consumer) { log.debug("[{}-{}] Ignoring flow control message since consumer is waiting for cursor to be rewinded", name, consumer); } + } else if (((ManagedCursorImpl) cursor).resetCursorInProgress()) { + if (log.isDebugEnabled()) { + log.debug("[{}-{}] Ignoring flow control message since cursor reset in progress - cursor {}", + name, consumer, cursor.getName()); + } } else { if (log.isDebugEnabled()) { log.debug("[{}-{}] Trigger new read after receiving flow control message", name, consumer);