Skip to content

Commit

Permalink
[improve][broker] Optimize subscription seek (cursor reset) by timest…
Browse files Browse the repository at this point in the history
…amp (#22792)

Co-authored-by: Lari Hotari <lhotari@apache.org>
  • Loading branch information
dao-jun and lhotari authored Jan 9, 2025
1 parent 9149720 commit 2eb4eab
Show file tree
Hide file tree
Showing 7 changed files with 690 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,31 @@ void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry>
void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
FindEntryCallback callback, Object ctx, boolean isFindFromLedger);


/**
* Find the newest entry that matches the given predicate.
*
* @param constraint
* search only active entries or all entries
* @param condition
* predicate that reads an entry an applies a condition
* @param callback
* callback object returning the resultant position
* @param startPosition
* start position to search from.
* @param endPosition
* end position to search to.
* @param ctx
* opaque context
* @param isFindFromLedger
* find the newest entry from ledger
*/
default void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
Position startPosition, Position endPosition, FindEntryCallback callback,
Object ctx, boolean isFindFromLedger) {
asyncFindNewestMatching(constraint, condition, callback, ctx, isFindFromLedger);
}

/**
* reset the cursor to specified position to enable replay of messages.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1272,27 +1272,55 @@ public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate
@Override
public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
FindEntryCallback callback, Object ctx, boolean isFindFromLedger) {
OpFindNewest op;
Position startPosition = null;
long max = 0;
asyncFindNewestMatching(constraint, condition, null, null, callback, ctx,
isFindFromLedger);
}


@Override
public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
Position start, Position end, FindEntryCallback callback,
Object ctx, boolean isFindFromLedger) {
Position startPosition;
switch (constraint) {
case SearchAllAvailableEntries:
startPosition = getFirstPosition();
max = ledger.getNumberOfEntries() - 1;
break;
case SearchActiveEntries:
startPosition = ledger.getNextValidPosition(markDeletePosition);
max = getNumberOfEntriesInStorage();
break;
default:
callback.findEntryFailed(new ManagedLedgerException("Unknown position constraint"), Optional.empty(), ctx);
return;
case SearchAllAvailableEntries ->
startPosition = start == null ? getFirstPosition() : start;
case SearchActiveEntries -> {
if (start == null) {
startPosition = ledger.getNextValidPosition(markDeletePosition);
} else {
startPosition = start;
startPosition = startPosition.compareTo(markDeletePosition) <= 0
? ledger.getNextValidPosition(startPosition) : startPosition;
}
}
default -> {
callback.findEntryFailed(
new ManagedLedgerException("Unknown position constraint"), Optional.empty(), ctx);
return;
}
}
// startPosition can't be null, should never go here.
if (startPosition == null) {
callback.findEntryFailed(new ManagedLedgerException("Couldn't find start position"),
Optional.empty(), ctx);
return;
}
// Calculate the end position
Position endPosition = end == null ? ledger.lastConfirmedEntry : end;
endPosition = endPosition.compareTo(ledger.lastConfirmedEntry) > 0 ? ledger.lastConfirmedEntry : endPosition;
// Calculate the number of entries between the startPosition and endPosition
long max = 0;
if (startPosition.compareTo(endPosition) <= 0) {
max = ledger.getNumberOfEntries(Range.closed(startPosition, endPosition));
}

if (max <= 0) {
callback.findEntryComplete(null, ctx);
return;
}

OpFindNewest op;
if (isFindFromLedger) {
op = new OpFindNewest(this.ledger, startPosition, condition, max, callback, ctx);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4873,6 +4873,297 @@ public void operationFailed(ManagedLedgerException exception) {
assertEquals(cursor.getReadPosition(), markDeletedPosition.getNext());
}

@Test
public void testFindNewestMatching_SearchAllAvailableEntries_ByStartAndEnd() throws Exception {
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
managedLedgerConfig.setMaxEntriesPerLedger(2);
managedLedgerConfig.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
@Cleanup
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testFindNewestMatching_SearchAllAvailableEntries_ByStartAndEnd", managedLedgerConfig);
@Cleanup
ManagedCursor managedCursor = ledger.openCursor("test");

Position position = ledger.addEntry("test".getBytes(Encoding));
Position position1 = ledger.addEntry("test1".getBytes(Encoding));
Position position2 = ledger.addEntry("test2".getBytes(Encoding));
Position position3 = ledger.addEntry("test3".getBytes(Encoding));

Predicate<Entry> condition = entry -> {
try {
Position p = entry.getPosition();
return p.compareTo(position1) <= 0;
} finally {
entry.release();
}
};

// find the newest entry with start and end position
AtomicBoolean failed = new AtomicBoolean(false);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Position> positionRef = new AtomicReference<>();
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, condition, position, position2, new AsyncCallbacks.FindEntryCallback() {
@Override
public void findEntryComplete(Position position, Object ctx) {
positionRef.set(position);
latch.countDown();
}

@Override
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
failed.set(true);
latch.countDown();
}
}, null, true);

latch.await();
assertFalse(failed.get());
assertNotNull(positionRef.get());
assertEquals(positionRef.get(), position1);

// find the newest entry with start
AtomicBoolean failed1 = new AtomicBoolean(false);
CountDownLatch latch1 = new CountDownLatch(1);
AtomicReference<Position> positionRef1 = new AtomicReference<>();
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, condition, position, null, new AsyncCallbacks.FindEntryCallback() {
@Override
public void findEntryComplete(Position position, Object ctx) {
positionRef1.set(position);
latch1.countDown();
}

@Override
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
failed1.set(true);
latch1.countDown();
}
}, null, true);
latch1.await();
assertFalse(failed1.get());
assertNotNull(positionRef1.get());
assertEquals(positionRef1.get(), position1);

// find the newest entry with end
AtomicBoolean failed2 = new AtomicBoolean(false);
CountDownLatch latch2 = new CountDownLatch(1);
AtomicReference<Position> positionRef2 = new AtomicReference<>();
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, condition, null, position2, new AsyncCallbacks.FindEntryCallback() {
@Override
public void findEntryComplete(Position position, Object ctx) {
positionRef2.set(position);
latch2.countDown();
}

@Override
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
failed2.set(true);
latch2.countDown();
}
}, null, true);
latch2.await();
assertFalse(failed2.get());
assertNotNull(positionRef2.get());
assertEquals(positionRef2.get(), position1);

// find the newest entry without start and end position
AtomicBoolean failed3 = new AtomicBoolean(false);
CountDownLatch latch3 = new CountDownLatch(1);
AtomicReference<Position> positionRef3 = new AtomicReference<>();
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, condition, null, null, new AsyncCallbacks.FindEntryCallback() {
@Override
public void findEntryComplete(Position position, Object ctx) {
positionRef3.set(position);
latch3.countDown();
}

@Override
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
failed3.set(true);
latch3.countDown();
}
}, null, true);
latch3.await();
assertFalse(failed3.get());
assertNotNull(positionRef3.get());
assertEquals(positionRef3.get(), position1);

// find position3
AtomicBoolean failed4 = new AtomicBoolean(false);
CountDownLatch latch4 = new CountDownLatch(1);
AtomicReference<Position> positionRef4 = new AtomicReference<>();
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, entry -> {
try {
Position p = entry.getPosition();
return p.compareTo(position3) <= 0;
} finally {
entry.release();
}
}, position3, position3, new AsyncCallbacks.FindEntryCallback() {
@Override
public void findEntryComplete(Position position, Object ctx) {
positionRef4.set(position);
latch4.countDown();
}

@Override
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
failed4.set(true);
latch4.countDown();
}
}, null, true);
latch4.await();
assertFalse(failed4.get());
assertNotNull(positionRef4.get());
assertEquals(positionRef4.get(), position3);
}


@Test
public void testFindNewestMatching_SearchActiveEntries_ByStartAndEnd() throws Exception {
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
managedLedgerConfig.setMaxEntriesPerLedger(2);
managedLedgerConfig.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
@Cleanup
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testFindNewestMatching_SearchActiveEntries_ByStartAndEnd", managedLedgerConfig);
@Cleanup
ManagedCursorImpl managedCursor = (ManagedCursorImpl) ledger.openCursor("test");

Position position = ledger.addEntry("test".getBytes(Encoding));
Position position1 = ledger.addEntry("test1".getBytes(Encoding));
Position position2 = ledger.addEntry("test2".getBytes(Encoding));
Position position3 = ledger.addEntry("test3".getBytes(Encoding));
Position position4 = ledger.addEntry("test4".getBytes(Encoding));
managedCursor.markDelete(position1);
assertEquals(managedCursor.getNumberOfEntries(), 3);

Predicate<Entry> condition = entry -> {
try {
Position p = entry.getPosition();
return p.compareTo(position3) <= 0;
} finally {
entry.release();
}
};

// find the newest entry with start and end position
AtomicBoolean failed = new AtomicBoolean(false);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Position> positionRef = new AtomicReference<>();
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, condition, position2, position4, new AsyncCallbacks.FindEntryCallback() {
@Override
public void findEntryComplete(Position position, Object ctx) {
positionRef.set(position);
latch.countDown();
}

@Override
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
failed.set(true);
latch.countDown();
}
}, null, true);
latch.await();
assertFalse(failed.get());
assertNotNull(positionRef.get());
assertEquals(positionRef.get(), position3);

// find the newest entry with start
AtomicBoolean failed1 = new AtomicBoolean(false);
CountDownLatch latch1 = new CountDownLatch(1);
AtomicReference<Position> positionRef1 = new AtomicReference<>();
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, condition, position2, null, new AsyncCallbacks.FindEntryCallback() {
@Override
public void findEntryComplete(Position position, Object ctx) {
positionRef1.set(position);
latch1.countDown();
}

@Override
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
failed1.set(true);
latch1.countDown();
}
}, null, true);

latch1.await();
assertFalse(failed1.get());
assertNotNull(positionRef1.get());
assertEquals(positionRef1.get(), position3);

// find the newest entry with end
AtomicBoolean failed2 = new AtomicBoolean(false);
CountDownLatch latch2 = new CountDownLatch(1);
AtomicReference<Position> positionRef2 = new AtomicReference<>();
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, condition, null, position4, new AsyncCallbacks.FindEntryCallback() {
@Override
public void findEntryComplete(Position position, Object ctx) {
positionRef2.set(position);
latch2.countDown();
}

@Override
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
failed2.set(true);
latch2.countDown();
}
}, null, true);

latch2.await();
assertFalse(failed2.get());
assertNotNull(positionRef2.get());
assertEquals(positionRef2.get(), position3);

// find the newest entry without start and end position
AtomicBoolean failed3 = new AtomicBoolean(false);
CountDownLatch latch3 = new CountDownLatch(1);
AtomicReference<Position> positionRef3 = new AtomicReference<>();
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, condition, null, null, new AsyncCallbacks.FindEntryCallback() {
@Override
public void findEntryComplete(Position position, Object ctx) {
positionRef3.set(position);
latch3.countDown();
}

@Override
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
failed3.set(true);
latch3.countDown();
}
}, null, true);
latch3.await();
assertFalse(failed3.get());
assertNotNull(positionRef3.get());
assertEquals(positionRef3.get(), position3);

// find position4
AtomicBoolean failed4 = new AtomicBoolean(false);
CountDownLatch latch4 = new CountDownLatch(1);
AtomicReference<Position> positionRef4 = new AtomicReference<>();
managedCursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
try {
Position p = entry.getPosition();
return p.compareTo(position4) <= 0;
} finally {
entry.release();
}
}, position4, position4, new AsyncCallbacks.FindEntryCallback() {
@Override
public void findEntryComplete(Position position, Object ctx) {
positionRef4.set(position);
latch4.countDown();
}

@Override
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
failed4.set(true);
latch4.countDown();
}
}, null, true);
latch4.await();
assertFalse(failed4.get());
assertNotNull(positionRef4.get());
assertEquals(positionRef4.get(), position4);
}

@Test
void testForceCursorRecovery() throws Exception {
TestPulsarMockBookKeeper bk = new TestPulsarMockBookKeeper(executor);
Expand Down
Loading

0 comments on commit 2eb4eab

Please sign in to comment.