Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Optimize PersistentTopic.getLastDispatchablePosition #22707

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,9 @@ private static class EstimateTimeBasedBacklogQuotaCheckResult {
Long estimatedOldestUnacknowledgedMessageTimestamp;
}

// The last position that can be dispatched to consumers
private volatile Position lastDispatchablePosition;

/***
* We use 3 futures to prevent a new closing if there is an in-progress deletion or closing. We make Pulsar return
* the in-progress one when it is called the second time.
Expand Down Expand Up @@ -3774,18 +3777,57 @@ public Position getLastPosition() {

@Override
public CompletableFuture<Position> getLastDispatchablePosition() {
return ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry -> {
MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer());
// If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer
if (Markers.isServerOnlyMarker(md)) {
return false;
} else if (md.hasTxnidMostBits() && md.hasTxnidLeastBits()) {
// Filter-out transaction aborted messages.
TxnID txnID = new TxnID(md.getTxnidMostBits(), md.getTxnidLeastBits());
return !isTxnAborted(txnID, (PositionImpl) entry.getPosition());
}
return true;
}, getMaxReadPosition());
if (lastDispatchablePosition != null) {
return CompletableFuture.completedFuture(lastDispatchablePosition);
}
return ManagedLedgerImplUtils
.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry -> {
MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer());
// If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer
if (Markers.isServerOnlyMarker(md)) {
return false;
} else if (md.hasTxnidMostBits() && md.hasTxnidLeastBits()) {
// Filter-out transaction aborted messages.
TxnID txnID = new TxnID(md.getTxnidMostBits(), md.getTxnidLeastBits());
return !isTxnAborted(txnID, (PositionImpl) entry.getPosition());
}
return true;
}, getMaxReadPosition())
.thenApply(position -> {
// Update lastDispatchablePosition to the given position
updateLastDispatchablePosition(position);
return position;
});
}

/**
* Update lastDispatchablePosition if the given position is greater than the lastDispatchablePosition.
*
* @param position
*/
public synchronized void updateLastDispatchablePosition(Position position) {
// Update lastDispatchablePosition to null if the position is null, fallback to
// ManagedLedgerImplUtils#asyncGetLastValidPosition
if (position == null) {
lastDispatchablePosition = null;
return;
}

PositionImpl position0 = (PositionImpl) position;
// If the position is greater than the maxReadPosition, ignore
if (position0.compareTo(getMaxReadPosition()) > 0) {
return;
}
// If the lastDispatchablePosition is null, set it to the position
if (lastDispatchablePosition == null) {
lastDispatchablePosition = position;
return;
}
// If the position is greater than the lastDispatchablePosition, update it
PositionImpl lastDispatchablePosition0 = (PositionImpl) lastDispatchablePosition;
if (position0.compareTo(lastDispatchablePosition0) > 0) {
lastDispatchablePosition = position;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,11 @@ public boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) {

@Override
public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) {
if (!isMarkerMessage && maxReadPositionCallBack != null) {
maxReadPositionCallBack.maxReadPositionMovedForward(null, position);
if (!isMarkerMessage) {
updateLastDispatchablePosition(position);
dao-jun marked this conversation as resolved.
Show resolved Hide resolved
if (maxReadPositionCallBack != null) {
maxReadPositionCallBack.maxReadPositionMovedForward(null, position);
}
}
}

Expand Down Expand Up @@ -436,4 +439,11 @@ public long getCommittedTxnCount() {
.filter(txnBuffer -> txnBuffer.status.equals(TxnStatus.COMMITTED))
.count();
}

// ThreadSafe
private void updateLastDispatchablePosition(Position position) {
if (topic instanceof PersistentTopic t) {
t.updateLastDispatchablePosition(position);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,11 @@ private void handleTransactionMessage(TxnID txnId, Position position) {
}
}

// ThreadSafe
private void updateLastDispatchablePosition(Position position) {
topic.updateLastDispatchablePosition(position);
}

@Override
public CompletableFuture<TransactionBufferReader> openTransactionBufferReader(TxnID txnID, long startSequenceId) {
return null;
Expand Down Expand Up @@ -459,6 +464,8 @@ void removeTxnAndUpdateMaxReadPosition(TxnID txnID) {
} else {
updateMaxReadPosition((PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(), false);
}
// Update the last dispatchable position to null if there is a TXN finished.
updateLastDispatchablePosition(null);
Copy link
Contributor

@liangyepianzhou liangyepianzhou Jun 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, Sorry, I overlooked this. If you do not set it to null here, but set a flag that requires reading BK, you can reduce the response time of the next request. I mentioned here before, but maybe not in detail.

  1. end txn.
  2. set a flag 'readFromBK' as true
  3. update normal message
  4. set a flag 'readFromBK' as false
  5. when call getLastDispatchablePosition, if the flag == true, the lastDispatchablePosition is valid to return. And read from BK after return.
  6. Call getLastDispatchablePosition again, it can get the last getLastDispatchablePosition.

How do you think about this?

}

/**
Expand Down Expand Up @@ -523,6 +530,10 @@ public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean i
}
}
}
// If the message is a normal message, update the last dispatchable position.
if (!isMarkerMessage) {
updateLastDispatchablePosition(position);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,11 @@ public boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) {

@Override
public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) {
if (!isMarkerMessage && maxReadPositionCallBack != null) {
maxReadPositionCallBack.maxReadPositionMovedForward(null, position);
if (!isMarkerMessage) {
updateLastDispatchablePosition(position);
if (maxReadPositionCallBack != null) {
maxReadPositionCallBack.maxReadPositionMovedForward(null, position);
}
}
}

Expand Down Expand Up @@ -148,4 +151,11 @@ public long getAbortedTxnCount() {
public long getCommittedTxnCount() {
return 0;
}

// ThreadSafe
private void updateLastDispatchablePosition(Position position) {
if (topic instanceof PersistentTopic t) {
t.updateLastDispatchablePosition(position);
}
}
}
Loading