Skip to content

Commit

Permalink
Avoid side-effect in VersionMap when assertion enabled
Browse files Browse the repository at this point in the history
Today when a version map does not require safe access, we will skip that
document. However, if the assertion is enabled, we remove the delete
tombstone of that document if existed. This may accidentally hide bugs
in which stale delete tombstone can be accessed.

This change folds a tombstone map into the version map so that each
version map will have a separate tombstone map.
  • Loading branch information
dnhatn committed Apr 18, 2018
1 parent 2b47d67 commit 81327ee
Showing 1 changed file with 54 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,40 @@ public void updateMinDeletedTimestamp(DeleteVersionValue delete) {

}

private static final class Tombstones {
final Map<BytesRef, DeleteVersionValue> tombstones = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
final AtomicLong ramBytesUsedTombstones = new AtomicLong();

void putTombstone(BytesRef uid, DeleteVersionValue version) {
long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length;
// Also enroll the delete into tombstones, and account for its RAM too:
final VersionValue prevTombstone = tombstones.put(uid, version);
long accountRam = (BASE_BYTES_PER_CHM_ENTRY + version.ramBytesUsed() + uidRAMBytesUsed);
// Deduct tombstones bytes used for the version we just removed or replaced:
if (prevTombstone != null) {
accountRam -= (BASE_BYTES_PER_CHM_ENTRY + prevTombstone.ramBytesUsed() + uidRAMBytesUsed);
}
if (accountRam != 0) {
long v = ramBytesUsedTombstones.addAndGet(accountRam);
assert v >= 0 : "bytes=" + v;
}
}

void removeTombstone(BytesRef uid) {
long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length;
final VersionValue prev = tombstones.remove(uid);
if (prev != null) {
assert prev.isDelete();
long v = ramBytesUsedTombstones.addAndGet(-(BASE_BYTES_PER_CHM_ENTRY + prev.ramBytesUsed() + uidRAMBytesUsed));
assert v >= 0 : "bytes=" + v;
}
}

DeleteVersionValue getTombstone(BytesRef uid) {
return tombstones.get(uid);
}
}

private static final class Maps {

// All writes (adds and deletes) go into here:
Expand All @@ -115,15 +149,19 @@ private static final class Maps {
boolean needsSafeAccess;
final boolean previousMapsNeededSafeAccess;

// All deletes also go here, and delete "tombstones" are retained after refresh:
private final Tombstones tombstones;

Maps(VersionLookup current, VersionLookup old, boolean previousMapsNeededSafeAccess) {
Maps(VersionLookup current, VersionLookup old, boolean previousMapsNeededSafeAccess, Tombstones tombstones) {
this.current = current;
this.old = old;
this.previousMapsNeededSafeAccess = previousMapsNeededSafeAccess;
this.tombstones = tombstones; // transfer the tombstone
}

Maps() {
this(new VersionLookup(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency()), VersionLookup.EMPTY, false);
this(new VersionLookup(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency()), VersionLookup.EMPTY, false,
new Tombstones());
}

boolean isSafeAccessMode() {
Expand All @@ -142,14 +180,14 @@ boolean shouldInheritSafeAccess() {
*/
Maps buildTransitionMap() {
return new Maps(new VersionLookup(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(current.size())), current,
shouldInheritSafeAccess());
shouldInheritSafeAccess(), tombstones);
}

/**
* builds a new map that invalidates the old map but maintains the current. This should be called in afterRefresh()
*/
Maps invalidateOldMap() {
return new Maps(current, VersionLookup.EMPTY, previousMapsNeededSafeAccess);
return new Maps(current, VersionLookup.EMPTY, previousMapsNeededSafeAccess, tombstones);
}

void put(BytesRef uid, VersionValue version) {
Expand Down Expand Up @@ -186,8 +224,6 @@ long getMinDeleteTimestamp() {
}
}

// All deletes also go here, and delete "tombstones" are retained after refresh:
private final Map<BytesRef, DeleteVersionValue> tombstones = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();

private volatile Maps maps = new Maps();
// we maintain a second map that only receives the updates that we skip on the actual map (unsafe ops)
Expand Down Expand Up @@ -229,11 +265,6 @@ long getMinDeleteTimestamp() {
BASE_BYTES_PER_CHM_ENTRY = chmEntryShallowSize + 2 * RamUsageEstimator.NUM_BYTES_OBJECT_REF;
}

/**
* Tracks bytes used by tombstones (deletes)
*/
final AtomicLong ramBytesUsedTombstones = new AtomicLong();

@Override
public void beforeRefresh() throws IOException {
// Start sending all updates after this point to the new
Expand Down Expand Up @@ -280,7 +311,7 @@ private VersionValue getUnderLock(final BytesRef uid, Maps currentMaps) {
return value;
}

return tombstones.get(uid);
return maps.tombstones.getTombstone(uid);
}

VersionValue getVersionForAssert(final BytesRef uid) {
Expand Down Expand Up @@ -338,41 +369,18 @@ private void putUnderLock(BytesRef uid, VersionValue version, Maps maps) {
assert uid.bytes.length == uid.length : "Oversized _uid! UID length: " + uid.length + ", bytes length: " + uid.bytes.length;
if (version.isDelete() == false) {
maps.put(uid, version);
removeTombstoneUnderLock(uid);
maps.tombstones.removeTombstone(uid);
} else {
DeleteVersionValue versionValue = (DeleteVersionValue) version;
putTombstone(uid, versionValue);
maps.tombstones.putTombstone(uid, versionValue);
maps.remove(uid, versionValue);
}
}

private void putTombstone(BytesRef uid, DeleteVersionValue version) {
long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length;
// Also enroll the delete into tombstones, and account for its RAM too:
final VersionValue prevTombstone = tombstones.put(uid, version);
long accountRam = (BASE_BYTES_PER_CHM_ENTRY + version.ramBytesUsed() + uidRAMBytesUsed);
// Deduct tombstones bytes used for the version we just removed or replaced:
if (prevTombstone != null) {
accountRam -= (BASE_BYTES_PER_CHM_ENTRY + prevTombstone.ramBytesUsed() + uidRAMBytesUsed);
}
if (accountRam != 0) {
long v = ramBytesUsedTombstones.addAndGet(accountRam);
assert v >= 0: "bytes=" + v;
}
}

/**
* Removes this uid from the pending deletes map.
*/
// For testing
void removeTombstoneUnderLock(BytesRef uid) {
assert keyedLock.isHeldByCurrentThread(uid);
long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length;
final VersionValue prev = tombstones.remove(uid);
if (prev != null) {
assert prev.isDelete();
long v = ramBytesUsedTombstones.addAndGet(-(BASE_BYTES_PER_CHM_ENTRY + prev.ramBytesUsed() + uidRAMBytesUsed));
assert v >= 0 : "bytes=" + v;
}
assert keyedLock.isHeldByCurrentThread(uid) : Thread.currentThread().getName();
maps.tombstones.removeTombstone(uid);
}

private boolean canRemoveTombstone(long maxTimestampToPrune, long maxSeqNoToPrune, DeleteVersionValue versionValue) {
Expand All @@ -389,7 +397,8 @@ private boolean canRemoveTombstone(long maxTimestampToPrune, long maxSeqNoToPrun
* Try to prune tombstones whose timestamp is less than maxTimestampToPrune and seqno at most the maxSeqNoToPrune.
*/
void pruneTombstones(long maxTimestampToPrune, long maxSeqNoToPrune) {
for (Map.Entry<BytesRef, DeleteVersionValue> entry : tombstones.entrySet()) {
final Tombstones tombstones = maps.tombstones;
for (Map.Entry<BytesRef, DeleteVersionValue> entry : tombstones.tombstones.entrySet()) {
// we do check before we actually lock the key - this way we don't need to acquire the lock for tombstones that are not
// prune-able. If the tombstone changes concurrently we will re-read and step out below since if we can't collect it now w
// we won't collect the tombstone below since it must be newer than this one.
Expand All @@ -401,10 +410,10 @@ void pruneTombstones(long maxTimestampToPrune, long maxSeqNoToPrune) {
// if we do use a blocking acquire. see #28714
if (lock != null) { // did we get the lock?
// Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator:
final DeleteVersionValue versionValue = tombstones.get(uid);
final DeleteVersionValue versionValue = tombstones.getTombstone(uid);
if (versionValue != null) {
if (canRemoveTombstone(maxTimestampToPrune, maxSeqNoToPrune, versionValue)) {
removeTombstoneUnderLock(uid);
tombstones.removeTombstone(uid);
}
}
}
Expand All @@ -418,7 +427,6 @@ void pruneTombstones(long maxTimestampToPrune, long maxSeqNoToPrune) {
*/
synchronized void clear() {
maps = new Maps();
tombstones.clear();
// NOTE: we can't zero this here, because a refresh thread could be calling InternalEngine.pruneDeletedTombstones at the same time,
// and this will lead to an assert trip. Presumably it's fine if our ramBytesUsedTombstones is non-zero after clear since the index
// is being closed:
Expand All @@ -427,7 +435,7 @@ synchronized void clear() {

@Override
public long ramBytesUsed() {
return maps.current.ramBytesUsed.get() + ramBytesUsedTombstones.get();
return maps.current.ramBytesUsed.get() + maps.tombstones.ramBytesUsedTombstones.get();
}

/**
Expand All @@ -453,7 +461,7 @@ Map<BytesRef, VersionValue> getAllCurrent() {

/** Iterates over all deleted versions, including new ones (not yet exposed via reader) and old ones (exposed via reader but not yet GC'd). */
Map<BytesRef, DeleteVersionValue> getAllTombstones() {
return tombstones;
return maps.tombstones.tombstones;
}

/**
Expand Down

0 comments on commit 81327ee

Please sign in to comment.