From 23d484ad238da0428ba4bdeda5bf7f53ac704a90 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Tue, 14 May 2024 14:14:08 +0800 Subject: [PATCH 1/5] Optimize get last dispatchable position. --- .../service/persistent/PersistentTopic.java | 63 +++++++++++++++---- .../buffer/impl/TopicTransactionBuffer.java | 11 ++++ 2 files changed, 63 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 69c7f404fdd57..c66f1fe3f938a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -309,6 +309,9 @@ private static class EstimateTimeBasedBacklogQuotaCheckResult { Long estimatedOldestUnacknowledgedMessageTimestamp; } + // The last position that can be dispatched to consumers + private volatile Position lastDispatchablePosition; + /*** * We use 3 futures to prevent a new closing if there is an in-progress deletion or closing. We make Pulsar return * the in-progress one when it is called the second time. @@ -3774,18 +3777,56 @@ public Position getLastPosition() { @Override public CompletableFuture getLastDispatchablePosition() { + if (lastDispatchablePosition != null) { + return CompletableFuture.completedFuture(lastDispatchablePosition); + } return ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry -> { - MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer()); - // If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer - if (Markers.isServerOnlyMarker(md)) { - return false; - } else if (md.hasTxnidMostBits() && md.hasTxnidLeastBits()) { - // Filter-out transaction aborted messages. - TxnID txnID = new TxnID(md.getTxnidMostBits(), md.getTxnidLeastBits()); - return !isTxnAborted(txnID, (PositionImpl) entry.getPosition()); - } - return true; - }, getMaxReadPosition()); + MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer()); + // If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer + if (Markers.isServerOnlyMarker(md)) { + return false; + } else if (md.hasTxnidMostBits() && md.hasTxnidLeastBits()) { + // Filter-out transaction aborted messages. + TxnID txnID = new TxnID(md.getTxnidMostBits(), md.getTxnidLeastBits()); + return !isTxnAborted(txnID, (PositionImpl) entry.getPosition()); + } + return true; + }, getMaxReadPosition()) + .thenApply(position -> { + // Update lastDispatchablePosition to the given position + updateLastDispatchablePosition(position); + return position; + }); + } + + /** + * Update lastDispatchablePosition if the given position is greater than the lastDispatchablePosition. + * + * @param position + */ + public synchronized void updateLastDispatchablePosition(Position position) { + // Update lastDispatchablePosition to null if the position is null, fallback to + // ManagedLedgerImplUtils#asyncGetLastValidPosition + if (position == null) { + lastDispatchablePosition = null; + return; + } + + PositionImpl position0 = (PositionImpl) position; + // If the position is greater than the maxReadPosition, ignore + if (position0.compareTo(getMaxReadPosition()) > 0) { + return; + } + // If the lastDispatchablePosition is null, set it to the position + if (lastDispatchablePosition == null) { + lastDispatchablePosition = position; + return; + } + // If the position is greater than the lastDispatchablePosition, update it + PositionImpl lastDispatchablePosition0 = (PositionImpl) lastDispatchablePosition; + if (position0.compareTo(lastDispatchablePosition0) > 0) { + lastDispatchablePosition = position; + } } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index dfb73815e08d7..fbd4ddf7da053 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -297,6 +297,11 @@ private void handleTransactionMessage(TxnID txnId, Position position) { } } + // ThreadSafe + private void updateLastDispatchablePosition(Position position) { + topic.updateLastDispatchablePosition(position); + } + @Override public CompletableFuture openTransactionBufferReader(TxnID txnID, long startSequenceId) { return null; @@ -459,6 +464,8 @@ void removeTxnAndUpdateMaxReadPosition(TxnID txnID) { } else { updateMaxReadPosition((PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(), false); } + // Update the last dispatchable position to null if there is a TXN finished. + updateLastDispatchablePosition(null); } /** @@ -523,6 +530,10 @@ public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean i } } } + // If the message is a normal message, update the last dispatchable position. + if (!isMarkerMessage) { + updateLastDispatchablePosition(position); + } } @Override From 3a745a6e3c89caf790921c17fde71598629412d4 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Tue, 14 May 2024 15:27:13 +0800 Subject: [PATCH 2/5] Apply changes to related classes. --- .../buffer/impl/InMemTransactionBuffer.java | 16 ++++++++++++++-- .../buffer/impl/TransactionBufferDisable.java | 14 ++++++++++++-- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java index bab7b64c608c4..4f3ae78fb2ed1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java @@ -307,6 +307,7 @@ public CompletableFuture commitTxn(TxnID txnID, long lowWaterMark) { txnBuffer.commitAt(committedAtLedgerId, committedAtEntryId); addTxnToTxnIdex(txnID, committedAtLedgerId); } + updateLastDispatchablePosition(null); commitFuture.complete(null); } catch (TransactionBufferException.TransactionNotFoundException | TransactionBufferException.TransactionStatusException e) { @@ -331,6 +332,7 @@ public CompletableFuture abortTxn(TxnID txnID, long lowWaterMark) { TxnBuffer txnBuffer = getTxnBufferOrThrowNotFoundException(txnID); txnBuffer.abort(); buffers.remove(txnID, txnBuffer); + updateLastDispatchablePosition(null); abortFuture.complete(null); } catch (TransactionBufferException.TransactionNotFoundException | TransactionBufferException.TransactionStatusException e) { @@ -377,8 +379,11 @@ public boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) { @Override public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) { - if (!isMarkerMessage && maxReadPositionCallBack != null) { - maxReadPositionCallBack.maxReadPositionMovedForward(null, position); + if (!isMarkerMessage) { + updateLastDispatchablePosition(position); + if (maxReadPositionCallBack != null) { + maxReadPositionCallBack.maxReadPositionMovedForward(null, position); + } } } @@ -436,4 +441,11 @@ public long getCommittedTxnCount() { .filter(txnBuffer -> txnBuffer.status.equals(TxnStatus.COMMITTED)) .count(); } + + // ThreadSafe + private void updateLastDispatchablePosition(Position position) { + if (topic instanceof PersistentTopic t) { + t.updateLastDispatchablePosition(position); + } + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java index ebd61dbaa82ec..6f5dc0cd4d0dd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java @@ -99,8 +99,11 @@ public boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) { @Override public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) { - if (!isMarkerMessage && maxReadPositionCallBack != null) { - maxReadPositionCallBack.maxReadPositionMovedForward(null, position); + if (!isMarkerMessage) { + updateLastDispatchablePosition(position); + if (maxReadPositionCallBack != null) { + maxReadPositionCallBack.maxReadPositionMovedForward(null, position); + } } } @@ -148,4 +151,11 @@ public long getAbortedTxnCount() { public long getCommittedTxnCount() { return 0; } + + // ThreadSafe + private void updateLastDispatchablePosition(Position position) { + if (topic instanceof PersistentTopic t) { + t.updateLastDispatchablePosition(position); + } + } } From 93f9b43778dca357896f68f63cd7f5b55d53ecb0 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Tue, 14 May 2024 16:43:19 +0800 Subject: [PATCH 3/5] format --- .../pulsar/broker/service/persistent/PersistentTopic.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index c66f1fe3f938a..47dc988c8ca68 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -3780,7 +3780,8 @@ public CompletableFuture getLastDispatchablePosition() { if (lastDispatchablePosition != null) { return CompletableFuture.completedFuture(lastDispatchablePosition); } - return ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry -> { + return ManagedLedgerImplUtils + .asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry -> { MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer()); // If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer if (Markers.isServerOnlyMarker(md)) { From 3c5aabb15bc3b3309bc8e3bddd773f40e3467116 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Mon, 10 Jun 2024 12:50:13 +0800 Subject: [PATCH 4/5] Address review comments --- .../buffer/impl/InMemTransactionBuffer.java | 16 ++-------------- .../buffer/impl/TransactionBufferDisable.java | 14 ++------------ 2 files changed, 4 insertions(+), 26 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java index 4f3ae78fb2ed1..bab7b64c608c4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java @@ -307,7 +307,6 @@ public CompletableFuture commitTxn(TxnID txnID, long lowWaterMark) { txnBuffer.commitAt(committedAtLedgerId, committedAtEntryId); addTxnToTxnIdex(txnID, committedAtLedgerId); } - updateLastDispatchablePosition(null); commitFuture.complete(null); } catch (TransactionBufferException.TransactionNotFoundException | TransactionBufferException.TransactionStatusException e) { @@ -332,7 +331,6 @@ public CompletableFuture abortTxn(TxnID txnID, long lowWaterMark) { TxnBuffer txnBuffer = getTxnBufferOrThrowNotFoundException(txnID); txnBuffer.abort(); buffers.remove(txnID, txnBuffer); - updateLastDispatchablePosition(null); abortFuture.complete(null); } catch (TransactionBufferException.TransactionNotFoundException | TransactionBufferException.TransactionStatusException e) { @@ -379,11 +377,8 @@ public boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) { @Override public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) { - if (!isMarkerMessage) { - updateLastDispatchablePosition(position); - if (maxReadPositionCallBack != null) { - maxReadPositionCallBack.maxReadPositionMovedForward(null, position); - } + if (!isMarkerMessage && maxReadPositionCallBack != null) { + maxReadPositionCallBack.maxReadPositionMovedForward(null, position); } } @@ -441,11 +436,4 @@ public long getCommittedTxnCount() { .filter(txnBuffer -> txnBuffer.status.equals(TxnStatus.COMMITTED)) .count(); } - - // ThreadSafe - private void updateLastDispatchablePosition(Position position) { - if (topic instanceof PersistentTopic t) { - t.updateLastDispatchablePosition(position); - } - } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java index 6f5dc0cd4d0dd..ebd61dbaa82ec 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java @@ -99,11 +99,8 @@ public boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) { @Override public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) { - if (!isMarkerMessage) { - updateLastDispatchablePosition(position); - if (maxReadPositionCallBack != null) { - maxReadPositionCallBack.maxReadPositionMovedForward(null, position); - } + if (!isMarkerMessage && maxReadPositionCallBack != null) { + maxReadPositionCallBack.maxReadPositionMovedForward(null, position); } } @@ -151,11 +148,4 @@ public long getAbortedTxnCount() { public long getCommittedTxnCount() { return 0; } - - // ThreadSafe - private void updateLastDispatchablePosition(Position position) { - if (topic instanceof PersistentTopic t) { - t.updateLastDispatchablePosition(position); - } - } } From 780000b99f0c26bee8604ba2517554003688db81 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Mon, 10 Jun 2024 12:54:43 +0800 Subject: [PATCH 5/5] Address review comments --- .../buffer/impl/InMemTransactionBuffer.java | 14 ++++++++++++-- .../buffer/impl/TransactionBufferDisable.java | 14 ++++++++++++-- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java index bab7b64c608c4..533d0716d413c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java @@ -377,8 +377,11 @@ public boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) { @Override public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) { - if (!isMarkerMessage && maxReadPositionCallBack != null) { - maxReadPositionCallBack.maxReadPositionMovedForward(null, position); + if (!isMarkerMessage) { + updateLastDispatchablePosition(position); + if (maxReadPositionCallBack != null) { + maxReadPositionCallBack.maxReadPositionMovedForward(null, position); + } } } @@ -436,4 +439,11 @@ public long getCommittedTxnCount() { .filter(txnBuffer -> txnBuffer.status.equals(TxnStatus.COMMITTED)) .count(); } + + // ThreadSafe + private void updateLastDispatchablePosition(Position position) { + if (topic instanceof PersistentTopic t) { + t.updateLastDispatchablePosition(position); + } + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java index ebd61dbaa82ec..6f5dc0cd4d0dd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java @@ -99,8 +99,11 @@ public boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) { @Override public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) { - if (!isMarkerMessage && maxReadPositionCallBack != null) { - maxReadPositionCallBack.maxReadPositionMovedForward(null, position); + if (!isMarkerMessage) { + updateLastDispatchablePosition(position); + if (maxReadPositionCallBack != null) { + maxReadPositionCallBack.maxReadPositionMovedForward(null, position); + } } } @@ -148,4 +151,11 @@ public long getAbortedTxnCount() { public long getCommittedTxnCount() { return 0; } + + // ThreadSafe + private void updateLastDispatchablePosition(Position position) { + if (topic instanceof PersistentTopic t) { + t.updateLastDispatchablePosition(position); + } + } }