From 532b0d9063474bd1c7ae8ac7cf5bd2d56b002164 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=81=93=E5=90=9B?= Date: Tue, 12 Mar 2024 23:36:59 +0800 Subject: [PATCH] [cleanup][ml] ManagedCursor clean up. (#22246) --- .../org/apache/bookkeeper/mledger/impl/EntryImpl.java | 7 ++++++- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 11 +++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java index 6512399173f0a..803979313575a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java @@ -42,6 +42,7 @@ protected EntryImpl newObject(Handle handle) { private long timestamp; private long ledgerId; private long entryId; + private PositionImpl position; ByteBuf data; private Runnable onDeallocate; @@ -151,7 +152,10 @@ public int getLength() { @Override public PositionImpl getPosition() { - return new PositionImpl(ledgerId, entryId); + if (position == null) { + position = PositionImpl.get(ledgerId, entryId); + } + return position; } @Override @@ -197,6 +201,7 @@ protected void deallocate() { timestamp = -1; ledgerId = -1; entryId = -1; + position = null; recyclerHandle.recycle(this); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 0b9a9c3e9fc94..9c3598f46ef24 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1502,10 +1502,7 @@ public Set asyncReplayEntries(Set positi Set alreadyAcknowledgedPositions = new HashSet<>(); lock.readLock().lock(); try { - positions.stream() - .filter(position -> ((PositionImpl) position).compareTo(markDeletePosition) <= 0 - || individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId())) - .forEach(alreadyAcknowledgedPositions::add); + positions.stream().filter(this::isMessageDeleted).forEach(alreadyAcknowledgedPositions::add); } finally { lock.readLock().unlock(); } @@ -2278,8 +2275,7 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb return; } - if (position.compareTo(markDeletePosition) <= 0 - || individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId())) { + if (isMessageDeleted(position)) { if (config.isDeletionAtBatchIndexLevelEnabled()) { BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position); if (bitSetRecyclable != null) { @@ -3504,8 +3500,7 @@ public Range getLastIndividualDeletedRange() { @Override public void trimDeletedEntries(List entries) { entries.removeIf(entry -> { - boolean isDeleted = markDeletePosition.compareTo(entry.getLedgerId(), entry.getEntryId()) >= 0 - || individualDeletedMessages.contains(entry.getLedgerId(), entry.getEntryId()); + boolean isDeleted = isMessageDeleted(entry.getPosition()); if (isDeleted) { entry.release(); }