diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java index e53f408ca7563..48a79a4ac529c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java @@ -43,6 +43,7 @@ protected EntryImpl newObject(Handle handle) { private long timestamp; private long ledgerId; private long entryId; + private PositionImpl position; ByteBuf data; private Runnable onDeallocate; @@ -152,7 +153,10 @@ public int getLength() { @Override public PositionImpl getPosition() { - return new PositionImpl(ledgerId, entryId); + if (position == null) { + position = PositionImpl.get(ledgerId, entryId); + } + return position; } @Override @@ -198,6 +202,7 @@ protected void deallocate() { timestamp = -1; ledgerId = -1; entryId = -1; + position = null; recyclerHandle.recycle(this); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 3451167affbb6..2df37a0b6ce9f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1506,10 +1506,7 @@ public Set asyncReplayEntries(Set positi Set alreadyAcknowledgedPositions = new HashSet<>(); lock.readLock().lock(); try { - positions.stream() - .filter(position -> ((PositionImpl) position).compareTo(markDeletePosition) <= 0 - || individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId())) - .forEach(alreadyAcknowledgedPositions::add); + positions.stream().filter(this::isMessageDeleted).forEach(alreadyAcknowledgedPositions::add); } finally { lock.readLock().unlock(); } @@ -2281,8 +2278,7 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb return; } - if (position.compareTo(markDeletePosition) <= 0 - || individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId())) { + if (isMessageDeleted(position)) { if (config.isDeletionAtBatchIndexLevelEnabled()) { BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position); if (bitSetRecyclable != null) { @@ -3541,8 +3537,7 @@ public Range getLastIndividualDeletedRange() { @Override public void trimDeletedEntries(List entries) { entries.removeIf(entry -> { - boolean isDeleted = markDeletePosition.compareTo(entry.getLedgerId(), entry.getEntryId()) >= 0 - || individualDeletedMessages.contains(entry.getLedgerId(), entry.getEntryId()); + boolean isDeleted = isMessageDeleted(entry.getPosition()); if (isDeleted) { entry.release(); }