-
Notifications
You must be signed in to change notification settings - Fork 24.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Avoid side-effect in VersionMap when assertion enabled #29585
Changes from 2 commits
81327ee
4107caa
32711aa
a7beda7
0346ba6
bec5ac3
e2b9f96
516156a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -102,6 +102,40 @@ public void updateMinDeletedTimestamp(DeleteVersionValue delete) { | |
|
||
} | ||
|
||
private static final class Tombstones { | ||
final Map<BytesRef, DeleteVersionValue> tombstones = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); | ||
final AtomicLong ramBytesUsedTombstones = new AtomicLong(); | ||
|
||
void putTombstone(BytesRef uid, DeleteVersionValue version) { | ||
long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length; | ||
// Also enroll the delete into tombstones, and account for its RAM too: | ||
final VersionValue prevTombstone = tombstones.put(uid, version); | ||
long accountRam = (BASE_BYTES_PER_CHM_ENTRY + version.ramBytesUsed() + uidRAMBytesUsed); | ||
// Deduct tombstones bytes used for the version we just removed or replaced: | ||
if (prevTombstone != null) { | ||
accountRam -= (BASE_BYTES_PER_CHM_ENTRY + prevTombstone.ramBytesUsed() + uidRAMBytesUsed); | ||
} | ||
if (accountRam != 0) { | ||
long v = ramBytesUsedTombstones.addAndGet(accountRam); | ||
assert v >= 0 : "bytes=" + v; | ||
} | ||
} | ||
|
||
void removeTombstone(BytesRef uid) { | ||
long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length; | ||
final VersionValue prev = tombstones.remove(uid); | ||
if (prev != null) { | ||
assert prev.isDelete(); | ||
long v = ramBytesUsedTombstones.addAndGet(-(BASE_BYTES_PER_CHM_ENTRY + prev.ramBytesUsed() + uidRAMBytesUsed)); | ||
assert v >= 0 : "bytes=" + v; | ||
} | ||
} | ||
|
||
DeleteVersionValue getTombstone(BytesRef uid) { | ||
return tombstones.get(uid); | ||
} | ||
} | ||
|
||
private static final class Maps { | ||
|
||
// All writes (adds and deletes) go into here: | ||
|
@@ -115,15 +149,19 @@ private static final class Maps { | |
boolean needsSafeAccess; | ||
final boolean previousMapsNeededSafeAccess; | ||
|
||
// All deletes also go here, and delete "tombstones" are retained after refresh: | ||
private final Tombstones tombstones; | ||
|
||
Maps(VersionLookup current, VersionLookup old, boolean previousMapsNeededSafeAccess) { | ||
Maps(VersionLookup current, VersionLookup old, boolean previousMapsNeededSafeAccess, Tombstones tombstones) { | ||
this.current = current; | ||
this.old = old; | ||
this.previousMapsNeededSafeAccess = previousMapsNeededSafeAccess; | ||
this.tombstones = tombstones; // transfer the tombstone | ||
} | ||
|
||
Maps() { | ||
this(new VersionLookup(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency()), VersionLookup.EMPTY, false); | ||
this(new VersionLookup(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency()), VersionLookup.EMPTY, false, | ||
new Tombstones()); | ||
} | ||
|
||
boolean isSafeAccessMode() { | ||
|
@@ -142,14 +180,14 @@ boolean shouldInheritSafeAccess() { | |
*/ | ||
Maps buildTransitionMap() { | ||
return new Maps(new VersionLookup(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(current.size())), current, | ||
shouldInheritSafeAccess()); | ||
shouldInheritSafeAccess(), tombstones); | ||
} | ||
|
||
/** | ||
* builds a new map that invalidates the old map but maintains the current. This should be called in afterRefresh() | ||
*/ | ||
Maps invalidateOldMap() { | ||
return new Maps(current, VersionLookup.EMPTY, previousMapsNeededSafeAccess); | ||
return new Maps(current, VersionLookup.EMPTY, previousMapsNeededSafeAccess, tombstones); | ||
} | ||
|
||
void put(BytesRef uid, VersionValue version) { | ||
|
@@ -186,8 +224,6 @@ long getMinDeleteTimestamp() { | |
} | ||
} | ||
|
||
// All deletes also go here, and delete "tombstones" are retained after refresh: | ||
private final Map<BytesRef, DeleteVersionValue> tombstones = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); | ||
|
||
private volatile Maps maps = new Maps(); | ||
// we maintain a second map that only receives the updates that we skip on the actual map (unsafe ops) | ||
|
@@ -229,11 +265,6 @@ long getMinDeleteTimestamp() { | |
BASE_BYTES_PER_CHM_ENTRY = chmEntryShallowSize + 2 * RamUsageEstimator.NUM_BYTES_OBJECT_REF; | ||
} | ||
|
||
/** | ||
* Tracks bytes used by tombstones (deletes) | ||
*/ | ||
final AtomicLong ramBytesUsedTombstones = new AtomicLong(); | ||
|
||
@Override | ||
public void beforeRefresh() throws IOException { | ||
// Start sending all updates after this point to the new | ||
|
@@ -280,7 +311,7 @@ private VersionValue getUnderLock(final BytesRef uid, Maps currentMaps) { | |
return value; | ||
} | ||
|
||
return tombstones.get(uid); | ||
return maps.tombstones.getTombstone(uid); | ||
} | ||
|
||
VersionValue getVersionForAssert(final BytesRef uid) { | ||
|
@@ -313,8 +344,8 @@ void maybePutUnderLock(BytesRef uid, VersionValue version) { | |
putUnderLock(uid, version, maps); | ||
} else { | ||
maps.current.markAsUnsafe(); | ||
assert putAssertionMap(uid, version); | ||
} | ||
assert putAssertionMap(uid, version); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like that this has moved, but I think it changes things a little. The thing that's currently called |
||
} | ||
|
||
private boolean putAssertionMap(BytesRef uid, VersionValue version) { | ||
|
@@ -338,41 +369,18 @@ private void putUnderLock(BytesRef uid, VersionValue version, Maps maps) { | |
assert uid.bytes.length == uid.length : "Oversized _uid! UID length: " + uid.length + ", bytes length: " + uid.bytes.length; | ||
if (version.isDelete() == false) { | ||
maps.put(uid, version); | ||
removeTombstoneUnderLock(uid); | ||
maps.tombstones.removeTombstone(uid); | ||
} else { | ||
DeleteVersionValue versionValue = (DeleteVersionValue) version; | ||
putTombstone(uid, versionValue); | ||
maps.tombstones.putTombstone(uid, versionValue); | ||
maps.remove(uid, versionValue); | ||
} | ||
} | ||
|
||
private void putTombstone(BytesRef uid, DeleteVersionValue version) { | ||
long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length; | ||
// Also enroll the delete into tombstones, and account for its RAM too: | ||
final VersionValue prevTombstone = tombstones.put(uid, version); | ||
long accountRam = (BASE_BYTES_PER_CHM_ENTRY + version.ramBytesUsed() + uidRAMBytesUsed); | ||
// Deduct tombstones bytes used for the version we just removed or replaced: | ||
if (prevTombstone != null) { | ||
accountRam -= (BASE_BYTES_PER_CHM_ENTRY + prevTombstone.ramBytesUsed() + uidRAMBytesUsed); | ||
} | ||
if (accountRam != 0) { | ||
long v = ramBytesUsedTombstones.addAndGet(accountRam); | ||
assert v >= 0: "bytes=" + v; | ||
} | ||
} | ||
|
||
/** | ||
* Removes this uid from the pending deletes map. | ||
*/ | ||
// For testing | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This -- and the number of times we do |
||
void removeTombstoneUnderLock(BytesRef uid) { | ||
assert keyedLock.isHeldByCurrentThread(uid); | ||
long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length; | ||
final VersionValue prev = tombstones.remove(uid); | ||
if (prev != null) { | ||
assert prev.isDelete(); | ||
long v = ramBytesUsedTombstones.addAndGet(-(BASE_BYTES_PER_CHM_ENTRY + prev.ramBytesUsed() + uidRAMBytesUsed)); | ||
assert v >= 0 : "bytes=" + v; | ||
} | ||
assert keyedLock.isHeldByCurrentThread(uid) : Thread.currentThread().getName(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. while you're at it - can you add the uid? |
||
maps.tombstones.removeTombstone(uid); | ||
} | ||
|
||
private boolean canRemoveTombstone(long maxTimestampToPrune, long maxSeqNoToPrune, DeleteVersionValue versionValue) { | ||
|
@@ -389,7 +397,8 @@ private boolean canRemoveTombstone(long maxTimestampToPrune, long maxSeqNoToPrun | |
* Try to prune tombstones whose timestamp is less than maxTimestampToPrune and seqno at most the maxSeqNoToPrune. | ||
*/ | ||
void pruneTombstones(long maxTimestampToPrune, long maxSeqNoToPrune) { | ||
for (Map.Entry<BytesRef, DeleteVersionValue> entry : tombstones.entrySet()) { | ||
final Tombstones tombstones = maps.tombstones; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similarly - this feels like it could move onto |
||
for (Map.Entry<BytesRef, DeleteVersionValue> entry : tombstones.tombstones.entrySet()) { | ||
// we do check before we actually lock the key - this way we don't need to acquire the lock for tombstones that are not | ||
// prune-able. If the tombstone changes concurrently we will re-read and step out below since if we can't collect it now w | ||
// we won't collect the tombstone below since it must be newer than this one. | ||
|
@@ -401,10 +410,10 @@ void pruneTombstones(long maxTimestampToPrune, long maxSeqNoToPrune) { | |
// if we do use a blocking acquire. see #28714 | ||
if (lock != null) { // did we get the lock? | ||
// Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator: | ||
final DeleteVersionValue versionValue = tombstones.get(uid); | ||
final DeleteVersionValue versionValue = tombstones.getTombstone(uid); | ||
if (versionValue != null) { | ||
if (canRemoveTombstone(maxTimestampToPrune, maxSeqNoToPrune, versionValue)) { | ||
removeTombstoneUnderLock(uid); | ||
tombstones.removeTombstone(uid); | ||
} | ||
} | ||
} | ||
|
@@ -418,7 +427,6 @@ void pruneTombstones(long maxTimestampToPrune, long maxSeqNoToPrune) { | |
*/ | ||
synchronized void clear() { | ||
maps = new Maps(); | ||
tombstones.clear(); | ||
// NOTE: we can't zero this here, because a refresh thread could be calling InternalEngine.pruneDeletedTombstones at the same time, | ||
// and this will lead to an assert trip. Presumably it's fine if our ramBytesUsedTombstones is non-zero after clear since the index | ||
// is being closed: | ||
|
@@ -427,7 +435,7 @@ synchronized void clear() { | |
|
||
@Override | ||
public long ramBytesUsed() { | ||
return maps.current.ramBytesUsed.get() + ramBytesUsedTombstones.get(); | ||
return maps.current.ramBytesUsed.get() + maps.tombstones.ramBytesUsedTombstones.get(); | ||
} | ||
|
||
/** | ||
|
@@ -453,7 +461,7 @@ Map<BytesRef, VersionValue> getAllCurrent() { | |
|
||
/** Iterates over all deleted versions, including new ones (not yet exposed via reader) and old ones (exposed via reader but not yet GC'd). */ | ||
Map<BytesRef, DeleteVersionValue> getAllTombstones() { | ||
return tombstones; | ||
return maps.tombstones.tombstones; | ||
} | ||
|
||
/** | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this comment isn't needed, WDYT?