From 81327ee4a68bd7f5ae0947fa764464f222c6f264 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 18 Apr 2018 09:31:28 -0400 Subject: [PATCH 1/8] Avoid side-effect in VersionMap when assertion enabled Today when a version map does not require safe access, we will skip that document. However, if the assertion is enabled, we remove the delete tombstone of that document if existed. This may accidentally hide bugs in which stale delete tombstone can be accessed. This change folds a tombstone map into the version map so that each version map will have a separate tombstone map. --- .../index/engine/LiveVersionMap.java | 100 ++++++++++-------- 1 file changed, 54 insertions(+), 46 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java b/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java index 7c5dcfa5c9050..9f3e3187b5262 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java @@ -102,6 +102,40 @@ public void updateMinDeletedTimestamp(DeleteVersionValue delete) { } + private static final class Tombstones { + final Map tombstones = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); + final AtomicLong ramBytesUsedTombstones = new AtomicLong(); + + void putTombstone(BytesRef uid, DeleteVersionValue version) { + long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length; + // Also enroll the delete into tombstones, and account for its RAM too: + final VersionValue prevTombstone = tombstones.put(uid, version); + long accountRam = (BASE_BYTES_PER_CHM_ENTRY + version.ramBytesUsed() + uidRAMBytesUsed); + // Deduct tombstones bytes used for the version we just removed or replaced: + if (prevTombstone != null) { + accountRam -= (BASE_BYTES_PER_CHM_ENTRY + prevTombstone.ramBytesUsed() + uidRAMBytesUsed); + } + if (accountRam != 0) { + long v = ramBytesUsedTombstones.addAndGet(accountRam); + assert v >= 0 : "bytes=" + v; + } + } + + void removeTombstone(BytesRef uid) { + long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length; + final VersionValue prev = tombstones.remove(uid); + if (prev != null) { + assert prev.isDelete(); + long v = ramBytesUsedTombstones.addAndGet(-(BASE_BYTES_PER_CHM_ENTRY + prev.ramBytesUsed() + uidRAMBytesUsed)); + assert v >= 0 : "bytes=" + v; + } + } + + DeleteVersionValue getTombstone(BytesRef uid) { + return tombstones.get(uid); + } + } + private static final class Maps { // All writes (adds and deletes) go into here: @@ -115,15 +149,19 @@ private static final class Maps { boolean needsSafeAccess; final boolean previousMapsNeededSafeAccess; + // All deletes also go here, and delete "tombstones" are retained after refresh: + private final Tombstones tombstones; - Maps(VersionLookup current, VersionLookup old, boolean previousMapsNeededSafeAccess) { + Maps(VersionLookup current, VersionLookup old, boolean previousMapsNeededSafeAccess, Tombstones tombstones) { this.current = current; this.old = old; this.previousMapsNeededSafeAccess = previousMapsNeededSafeAccess; + this.tombstones = tombstones; // transfer the tombstone } Maps() { - this(new VersionLookup(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency()), VersionLookup.EMPTY, false); + this(new VersionLookup(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency()), VersionLookup.EMPTY, false, + new Tombstones()); } boolean isSafeAccessMode() { @@ -142,14 +180,14 @@ boolean shouldInheritSafeAccess() { */ Maps buildTransitionMap() { return new Maps(new VersionLookup(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(current.size())), current, - shouldInheritSafeAccess()); + shouldInheritSafeAccess(), tombstones); } /** * builds a new map that invalidates the old map but maintains the current. This should be called in afterRefresh() */ Maps invalidateOldMap() { - return new Maps(current, VersionLookup.EMPTY, previousMapsNeededSafeAccess); + return new Maps(current, VersionLookup.EMPTY, previousMapsNeededSafeAccess, tombstones); } void put(BytesRef uid, VersionValue version) { @@ -186,8 +224,6 @@ long getMinDeleteTimestamp() { } } - // All deletes also go here, and delete "tombstones" are retained after refresh: - private final Map tombstones = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); private volatile Maps maps = new Maps(); // we maintain a second map that only receives the updates that we skip on the actual map (unsafe ops) @@ -229,11 +265,6 @@ long getMinDeleteTimestamp() { BASE_BYTES_PER_CHM_ENTRY = chmEntryShallowSize + 2 * RamUsageEstimator.NUM_BYTES_OBJECT_REF; } - /** - * Tracks bytes used by tombstones (deletes) - */ - final AtomicLong ramBytesUsedTombstones = new AtomicLong(); - @Override public void beforeRefresh() throws IOException { // Start sending all updates after this point to the new @@ -280,7 +311,7 @@ private VersionValue getUnderLock(final BytesRef uid, Maps currentMaps) { return value; } - return tombstones.get(uid); + return maps.tombstones.getTombstone(uid); } VersionValue getVersionForAssert(final BytesRef uid) { @@ -338,41 +369,18 @@ private void putUnderLock(BytesRef uid, VersionValue version, Maps maps) { assert uid.bytes.length == uid.length : "Oversized _uid! UID length: " + uid.length + ", bytes length: " + uid.bytes.length; if (version.isDelete() == false) { maps.put(uid, version); - removeTombstoneUnderLock(uid); + maps.tombstones.removeTombstone(uid); } else { DeleteVersionValue versionValue = (DeleteVersionValue) version; - putTombstone(uid, versionValue); + maps.tombstones.putTombstone(uid, versionValue); maps.remove(uid, versionValue); } } - private void putTombstone(BytesRef uid, DeleteVersionValue version) { - long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length; - // Also enroll the delete into tombstones, and account for its RAM too: - final VersionValue prevTombstone = tombstones.put(uid, version); - long accountRam = (BASE_BYTES_PER_CHM_ENTRY + version.ramBytesUsed() + uidRAMBytesUsed); - // Deduct tombstones bytes used for the version we just removed or replaced: - if (prevTombstone != null) { - accountRam -= (BASE_BYTES_PER_CHM_ENTRY + prevTombstone.ramBytesUsed() + uidRAMBytesUsed); - } - if (accountRam != 0) { - long v = ramBytesUsedTombstones.addAndGet(accountRam); - assert v >= 0: "bytes=" + v; - } - } - - /** - * Removes this uid from the pending deletes map. - */ + // For testing void removeTombstoneUnderLock(BytesRef uid) { - assert keyedLock.isHeldByCurrentThread(uid); - long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length; - final VersionValue prev = tombstones.remove(uid); - if (prev != null) { - assert prev.isDelete(); - long v = ramBytesUsedTombstones.addAndGet(-(BASE_BYTES_PER_CHM_ENTRY + prev.ramBytesUsed() + uidRAMBytesUsed)); - assert v >= 0 : "bytes=" + v; - } + assert keyedLock.isHeldByCurrentThread(uid) : Thread.currentThread().getName(); + maps.tombstones.removeTombstone(uid); } private boolean canRemoveTombstone(long maxTimestampToPrune, long maxSeqNoToPrune, DeleteVersionValue versionValue) { @@ -389,7 +397,8 @@ private boolean canRemoveTombstone(long maxTimestampToPrune, long maxSeqNoToPrun * Try to prune tombstones whose timestamp is less than maxTimestampToPrune and seqno at most the maxSeqNoToPrune. */ void pruneTombstones(long maxTimestampToPrune, long maxSeqNoToPrune) { - for (Map.Entry entry : tombstones.entrySet()) { + final Tombstones tombstones = maps.tombstones; + for (Map.Entry entry : tombstones.tombstones.entrySet()) { // we do check before we actually lock the key - this way we don't need to acquire the lock for tombstones that are not // prune-able. If the tombstone changes concurrently we will re-read and step out below since if we can't collect it now w // we won't collect the tombstone below since it must be newer than this one. @@ -401,10 +410,10 @@ void pruneTombstones(long maxTimestampToPrune, long maxSeqNoToPrune) { // if we do use a blocking acquire. see #28714 if (lock != null) { // did we get the lock? // Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator: - final DeleteVersionValue versionValue = tombstones.get(uid); + final DeleteVersionValue versionValue = tombstones.getTombstone(uid); if (versionValue != null) { if (canRemoveTombstone(maxTimestampToPrune, maxSeqNoToPrune, versionValue)) { - removeTombstoneUnderLock(uid); + tombstones.removeTombstone(uid); } } } @@ -418,7 +427,6 @@ void pruneTombstones(long maxTimestampToPrune, long maxSeqNoToPrune) { */ synchronized void clear() { maps = new Maps(); - tombstones.clear(); // NOTE: we can't zero this here, because a refresh thread could be calling InternalEngine.pruneDeletedTombstones at the same time, // and this will lead to an assert trip. Presumably it's fine if our ramBytesUsedTombstones is non-zero after clear since the index // is being closed: @@ -427,7 +435,7 @@ synchronized void clear() { @Override public long ramBytesUsed() { - return maps.current.ramBytesUsed.get() + ramBytesUsedTombstones.get(); + return maps.current.ramBytesUsed.get() + maps.tombstones.ramBytesUsedTombstones.get(); } /** @@ -453,7 +461,7 @@ Map getAllCurrent() { /** Iterates over all deleted versions, including new ones (not yet exposed via reader) and old ones (exposed via reader but not yet GC'd). */ Map getAllTombstones() { - return tombstones; + return maps.tombstones.tombstones; } /** From 4107caa5269b611247db911f71f985931739a31e Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 19 Apr 2018 00:05:05 -0400 Subject: [PATCH 2/8] Always put into the assert map --- .../java/org/elasticsearch/index/engine/LiveVersionMap.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java b/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java index 9f3e3187b5262..9098ec7379e94 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java @@ -344,8 +344,8 @@ void maybePutUnderLock(BytesRef uid, VersionValue version) { putUnderLock(uid, version, maps); } else { maps.current.markAsUnsafe(); - assert putAssertionMap(uid, version); } + assert putAssertionMap(uid, version); } private boolean putAssertionMap(BytesRef uid, VersionValue version) { From 32711aabba9382a1866f7ffcaa045402860cd8da Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 19 Apr 2018 07:04:39 -0400 Subject: [PATCH 3/8] Revert "Always put into the assert map" This reverts commit 4107caa5269b611247db911f71f985931739a31e. --- .../java/org/elasticsearch/index/engine/LiveVersionMap.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java b/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java index 9098ec7379e94..9f3e3187b5262 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java @@ -344,8 +344,8 @@ void maybePutUnderLock(BytesRef uid, VersionValue version) { putUnderLock(uid, version, maps); } else { maps.current.markAsUnsafe(); + assert putAssertionMap(uid, version); } - assert putAssertionMap(uid, version); } private boolean putAssertionMap(BytesRef uid, VersionValue version) { From a7beda73f45ec5d3b53a8fc4a82e542c76bd652d Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 19 Apr 2018 07:43:48 -0400 Subject: [PATCH 4/8] Revert "Avoid side-effect in VersionMap when assertion enabled" This reverts commit 81327ee4a68bd7f5ae0947fa764464f222c6f264. --- .../index/engine/LiveVersionMap.java | 100 ++++++++---------- 1 file changed, 46 insertions(+), 54 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java b/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java index 9f3e3187b5262..7c5dcfa5c9050 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java @@ -102,40 +102,6 @@ public void updateMinDeletedTimestamp(DeleteVersionValue delete) { } - private static final class Tombstones { - final Map tombstones = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); - final AtomicLong ramBytesUsedTombstones = new AtomicLong(); - - void putTombstone(BytesRef uid, DeleteVersionValue version) { - long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length; - // Also enroll the delete into tombstones, and account for its RAM too: - final VersionValue prevTombstone = tombstones.put(uid, version); - long accountRam = (BASE_BYTES_PER_CHM_ENTRY + version.ramBytesUsed() + uidRAMBytesUsed); - // Deduct tombstones bytes used for the version we just removed or replaced: - if (prevTombstone != null) { - accountRam -= (BASE_BYTES_PER_CHM_ENTRY + prevTombstone.ramBytesUsed() + uidRAMBytesUsed); - } - if (accountRam != 0) { - long v = ramBytesUsedTombstones.addAndGet(accountRam); - assert v >= 0 : "bytes=" + v; - } - } - - void removeTombstone(BytesRef uid) { - long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length; - final VersionValue prev = tombstones.remove(uid); - if (prev != null) { - assert prev.isDelete(); - long v = ramBytesUsedTombstones.addAndGet(-(BASE_BYTES_PER_CHM_ENTRY + prev.ramBytesUsed() + uidRAMBytesUsed)); - assert v >= 0 : "bytes=" + v; - } - } - - DeleteVersionValue getTombstone(BytesRef uid) { - return tombstones.get(uid); - } - } - private static final class Maps { // All writes (adds and deletes) go into here: @@ -149,19 +115,15 @@ private static final class Maps { boolean needsSafeAccess; final boolean previousMapsNeededSafeAccess; - // All deletes also go here, and delete "tombstones" are retained after refresh: - private final Tombstones tombstones; - Maps(VersionLookup current, VersionLookup old, boolean previousMapsNeededSafeAccess, Tombstones tombstones) { + Maps(VersionLookup current, VersionLookup old, boolean previousMapsNeededSafeAccess) { this.current = current; this.old = old; this.previousMapsNeededSafeAccess = previousMapsNeededSafeAccess; - this.tombstones = tombstones; // transfer the tombstone } Maps() { - this(new VersionLookup(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency()), VersionLookup.EMPTY, false, - new Tombstones()); + this(new VersionLookup(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency()), VersionLookup.EMPTY, false); } boolean isSafeAccessMode() { @@ -180,14 +142,14 @@ boolean shouldInheritSafeAccess() { */ Maps buildTransitionMap() { return new Maps(new VersionLookup(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(current.size())), current, - shouldInheritSafeAccess(), tombstones); + shouldInheritSafeAccess()); } /** * builds a new map that invalidates the old map but maintains the current. This should be called in afterRefresh() */ Maps invalidateOldMap() { - return new Maps(current, VersionLookup.EMPTY, previousMapsNeededSafeAccess, tombstones); + return new Maps(current, VersionLookup.EMPTY, previousMapsNeededSafeAccess); } void put(BytesRef uid, VersionValue version) { @@ -224,6 +186,8 @@ long getMinDeleteTimestamp() { } } + // All deletes also go here, and delete "tombstones" are retained after refresh: + private final Map tombstones = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); private volatile Maps maps = new Maps(); // we maintain a second map that only receives the updates that we skip on the actual map (unsafe ops) @@ -265,6 +229,11 @@ long getMinDeleteTimestamp() { BASE_BYTES_PER_CHM_ENTRY = chmEntryShallowSize + 2 * RamUsageEstimator.NUM_BYTES_OBJECT_REF; } + /** + * Tracks bytes used by tombstones (deletes) + */ + final AtomicLong ramBytesUsedTombstones = new AtomicLong(); + @Override public void beforeRefresh() throws IOException { // Start sending all updates after this point to the new @@ -311,7 +280,7 @@ private VersionValue getUnderLock(final BytesRef uid, Maps currentMaps) { return value; } - return maps.tombstones.getTombstone(uid); + return tombstones.get(uid); } VersionValue getVersionForAssert(final BytesRef uid) { @@ -369,18 +338,41 @@ private void putUnderLock(BytesRef uid, VersionValue version, Maps maps) { assert uid.bytes.length == uid.length : "Oversized _uid! UID length: " + uid.length + ", bytes length: " + uid.bytes.length; if (version.isDelete() == false) { maps.put(uid, version); - maps.tombstones.removeTombstone(uid); + removeTombstoneUnderLock(uid); } else { DeleteVersionValue versionValue = (DeleteVersionValue) version; - maps.tombstones.putTombstone(uid, versionValue); + putTombstone(uid, versionValue); maps.remove(uid, versionValue); } } - // For testing + private void putTombstone(BytesRef uid, DeleteVersionValue version) { + long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length; + // Also enroll the delete into tombstones, and account for its RAM too: + final VersionValue prevTombstone = tombstones.put(uid, version); + long accountRam = (BASE_BYTES_PER_CHM_ENTRY + version.ramBytesUsed() + uidRAMBytesUsed); + // Deduct tombstones bytes used for the version we just removed or replaced: + if (prevTombstone != null) { + accountRam -= (BASE_BYTES_PER_CHM_ENTRY + prevTombstone.ramBytesUsed() + uidRAMBytesUsed); + } + if (accountRam != 0) { + long v = ramBytesUsedTombstones.addAndGet(accountRam); + assert v >= 0: "bytes=" + v; + } + } + + /** + * Removes this uid from the pending deletes map. + */ void removeTombstoneUnderLock(BytesRef uid) { - assert keyedLock.isHeldByCurrentThread(uid) : Thread.currentThread().getName(); - maps.tombstones.removeTombstone(uid); + assert keyedLock.isHeldByCurrentThread(uid); + long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length; + final VersionValue prev = tombstones.remove(uid); + if (prev != null) { + assert prev.isDelete(); + long v = ramBytesUsedTombstones.addAndGet(-(BASE_BYTES_PER_CHM_ENTRY + prev.ramBytesUsed() + uidRAMBytesUsed)); + assert v >= 0 : "bytes=" + v; + } } private boolean canRemoveTombstone(long maxTimestampToPrune, long maxSeqNoToPrune, DeleteVersionValue versionValue) { @@ -397,8 +389,7 @@ private boolean canRemoveTombstone(long maxTimestampToPrune, long maxSeqNoToPrun * Try to prune tombstones whose timestamp is less than maxTimestampToPrune and seqno at most the maxSeqNoToPrune. */ void pruneTombstones(long maxTimestampToPrune, long maxSeqNoToPrune) { - final Tombstones tombstones = maps.tombstones; - for (Map.Entry entry : tombstones.tombstones.entrySet()) { + for (Map.Entry entry : tombstones.entrySet()) { // we do check before we actually lock the key - this way we don't need to acquire the lock for tombstones that are not // prune-able. If the tombstone changes concurrently we will re-read and step out below since if we can't collect it now w // we won't collect the tombstone below since it must be newer than this one. @@ -410,10 +401,10 @@ void pruneTombstones(long maxTimestampToPrune, long maxSeqNoToPrune) { // if we do use a blocking acquire. see #28714 if (lock != null) { // did we get the lock? // Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator: - final DeleteVersionValue versionValue = tombstones.getTombstone(uid); + final DeleteVersionValue versionValue = tombstones.get(uid); if (versionValue != null) { if (canRemoveTombstone(maxTimestampToPrune, maxSeqNoToPrune, versionValue)) { - tombstones.removeTombstone(uid); + removeTombstoneUnderLock(uid); } } } @@ -427,6 +418,7 @@ void pruneTombstones(long maxTimestampToPrune, long maxSeqNoToPrune) { */ synchronized void clear() { maps = new Maps(); + tombstones.clear(); // NOTE: we can't zero this here, because a refresh thread could be calling InternalEngine.pruneDeletedTombstones at the same time, // and this will lead to an assert trip. Presumably it's fine if our ramBytesUsedTombstones is non-zero after clear since the index // is being closed: @@ -435,7 +427,7 @@ synchronized void clear() { @Override public long ramBytesUsed() { - return maps.current.ramBytesUsed.get() + maps.tombstones.ramBytesUsedTombstones.get(); + return maps.current.ramBytesUsed.get() + ramBytesUsedTombstones.get(); } /** @@ -461,7 +453,7 @@ Map getAllCurrent() { /** Iterates over all deleted versions, including new ones (not yet exposed via reader) and old ones (exposed via reader but not yet GC'd). */ Map getAllTombstones() { - return maps.tombstones.tombstones; + return tombstones; } /** From 0346ba61ef276851cfb3a182cff66f04d510a1e0 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 19 Apr 2018 10:11:46 -0400 Subject: [PATCH 5/8] =?UTF-8?q?Untangle=20put=20index=20and=20put=20delete?= =?UTF-8?q?=20-=20Boaz=E2=80=99s=20suggestion?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../index/engine/DeleteVersionValue.java | 2 +- ...rsionValue.java => IndexVersionValue.java} | 10 +-- .../index/engine/InternalEngine.java | 13 ++-- .../index/engine/LiveVersionMap.java | 53 +++++++-------- .../index/engine/VersionValue.java | 2 +- .../index/engine/LiveVersionMapTests.java | 64 +++++++++++++------ .../index/engine/VersionValueTests.java | 9 ++- 7 files changed, 89 insertions(+), 64 deletions(-) rename server/src/main/java/org/elasticsearch/index/engine/{TranslogVersionValue.java => IndexVersionValue.java} (86%) diff --git a/server/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java b/server/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java index d2b2e24c616a8..9f094197b8d9c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java +++ b/server/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java @@ -23,7 +23,7 @@ /** Holds a deleted version, which just adds a timestamp to {@link VersionValue} so we know when we can expire the deletion. */ -class DeleteVersionValue extends VersionValue { +final class DeleteVersionValue extends VersionValue { private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(DeleteVersionValue.class); diff --git a/server/src/main/java/org/elasticsearch/index/engine/TranslogVersionValue.java b/server/src/main/java/org/elasticsearch/index/engine/IndexVersionValue.java similarity index 86% rename from server/src/main/java/org/elasticsearch/index/engine/TranslogVersionValue.java rename to server/src/main/java/org/elasticsearch/index/engine/IndexVersionValue.java index 67415ea6139a6..4f67372926712 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/TranslogVersionValue.java +++ b/server/src/main/java/org/elasticsearch/index/engine/IndexVersionValue.java @@ -24,13 +24,13 @@ import java.util.Objects; -final class TranslogVersionValue extends VersionValue { +final class IndexVersionValue extends VersionValue { - private static final long RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(TranslogVersionValue.class); + private static final long RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(IndexVersionValue.class); private final Translog.Location translogLocation; - TranslogVersionValue(Translog.Location translogLocation, long version, long seqNo, long term) { + IndexVersionValue(Translog.Location translogLocation, long version, long seqNo, long term) { super(version, seqNo, term); this.translogLocation = translogLocation; } @@ -45,7 +45,7 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; if (!super.equals(o)) return false; - TranslogVersionValue that = (TranslogVersionValue) o; + IndexVersionValue that = (IndexVersionValue) o; return Objects.equals(translogLocation, that.translogLocation); } @@ -56,7 +56,7 @@ public int hashCode() { @Override public String toString() { - return "TranslogVersionValue{" + + return "IndexVersionValue{" + "version=" + version + ", seqNo=" + seqNo + ", term=" + term + diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index b28a5cd59e25b..c9a052ac86bee 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -623,7 +623,7 @@ private VersionValue resolveDocVersion(final Operation op) throws IOException { assert incrementIndexVersionLookup(); // used for asserting in tests final long currentVersion = loadCurrentVersionFromIndex(op.uid()); if (currentVersion != Versions.NOT_FOUND) { - versionValue = new VersionValue(currentVersion, SequenceNumbers.UNASSIGNED_SEQ_NO, 0L); + versionValue = new IndexVersionValue(null, currentVersion, SequenceNumbers.UNASSIGNED_SEQ_NO, 0L); } } else if (engineConfig.isEnableGcDeletes() && versionValue.isDelete() && (engineConfig.getThreadPool().relativeTimeInMillis() - ((DeleteVersionValue)versionValue).time) > getGcDeletesInMillis()) { @@ -785,7 +785,7 @@ public IndexResult index(Index index) throws IOException { indexResult.setTranslogLocation(location); } if (plan.indexIntoLucene && indexResult.hasFailure() == false) { - versionMap.maybePutUnderLock(index.uid().bytes(), + versionMap.maybePutIndexUnderLock(index.uid().bytes(), getVersionValue(plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm(), indexResult.getTranslogLocation())); } if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { @@ -937,11 +937,8 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) } } - private VersionValue getVersionValue(long version, long seqNo, long term, Translog.Location location) { - if (location != null && trackTranslogLocation.get()) { - return new TranslogVersionValue(location, version, seqNo, term); - } - return new VersionValue(version, seqNo, term); + private IndexVersionValue getVersionValue(long version, long seqNo, long term, Translog.Location location) { + return new IndexVersionValue(trackTranslogLocation.get() ? location : null, version, seqNo, term); } /** @@ -1193,7 +1190,7 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) indexWriter.deleteDocuments(delete.uid()); numDocDeletes.inc(); } - versionMap.putUnderLock(delete.uid().bytes(), + versionMap.putDeleteUnderLock(delete.uid().bytes(), new DeleteVersionValue(plan.versionOfDeletion, plan.seqNoOfDeletion, delete.primaryTerm(), engineConfig.getThreadPool().relativeTimeInMillis())); return new DeleteResult( diff --git a/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java b/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java index 7c5dcfa5c9050..797f944195d6b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java @@ -268,7 +268,7 @@ VersionValue getUnderLock(final BytesRef uid) { } private VersionValue getUnderLock(final BytesRef uid, Maps currentMaps) { - assert keyedLock.isHeldByCurrentThread(uid); + assert assertKeyedLockHeldByCurrentThread(uid); // First try to get the "live" value: VersionValue value = currentMaps.current.get(uid); if (value != null) { @@ -306,44 +306,40 @@ boolean isSafeAccessRequired() { /** * Adds this uid/version to the pending adds map iff the map needs safe access. */ - void maybePutUnderLock(BytesRef uid, VersionValue version) { - assert keyedLock.isHeldByCurrentThread(uid); + void maybePutIndexUnderLock(BytesRef uid, IndexVersionValue version) { + assert assertKeyedLockHeldByCurrentThread(uid); Maps maps = this.maps; if (maps.isSafeAccessMode()) { - putUnderLock(uid, version, maps); + putIndexUnderLock(uid, version); } else { maps.current.markAsUnsafe(); assert putAssertionMap(uid, version); } } - private boolean putAssertionMap(BytesRef uid, VersionValue version) { - putUnderLock(uid, version, unsafeKeysMap); + // For testing + void putIndexUnderLock(BytesRef uid, IndexVersionValue version) { + assert assertKeyedLockHeldByCurrentThread(uid); + putIndexUnderLock(uid, version, maps); + } + + private boolean putAssertionMap(BytesRef uid, IndexVersionValue version) { + putIndexUnderLock(uid, version, unsafeKeysMap); return true; } - /** - * Adds this uid/version to the pending adds map. - */ - void putUnderLock(BytesRef uid, VersionValue version) { - Maps maps = this.maps; - putUnderLock(uid, version, maps); + void putDeleteUnderLock(BytesRef uid, DeleteVersionValue version) { + assert assertKeyedLockHeldByCurrentThread(uid); + assert uid.bytes.length == uid.length : "Oversized _uid! UID length: " + uid.length + ", bytes length: " + uid.bytes.length; + putTombstone(uid, version); + maps.remove(uid, version); } - /** - * Adds this uid/version to the pending adds map. - */ - private void putUnderLock(BytesRef uid, VersionValue version, Maps maps) { - assert keyedLock.isHeldByCurrentThread(uid); + private void putIndexUnderLock(BytesRef uid, IndexVersionValue version, Maps maps) { + assert assertKeyedLockHeldByCurrentThread(uid); assert uid.bytes.length == uid.length : "Oversized _uid! UID length: " + uid.length + ", bytes length: " + uid.bytes.length; - if (version.isDelete() == false) { - maps.put(uid, version); - removeTombstoneUnderLock(uid); - } else { - DeleteVersionValue versionValue = (DeleteVersionValue) version; - putTombstone(uid, versionValue); - maps.remove(uid, versionValue); - } + maps.put(uid, version); + removeTombstoneUnderLock(uid); } private void putTombstone(BytesRef uid, DeleteVersionValue version) { @@ -365,7 +361,7 @@ private void putTombstone(BytesRef uid, DeleteVersionValue version) { * Removes this uid from the pending deletes map. */ void removeTombstoneUnderLock(BytesRef uid) { - assert keyedLock.isHeldByCurrentThread(uid); + assert assertKeyedLockHeldByCurrentThread(uid); long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length; final VersionValue prev = tombstones.remove(uid); if (prev != null) { @@ -465,4 +461,9 @@ Map getAllTombstones() { Releasable acquireLock(BytesRef uid) { return keyedLock.acquire(uid); } + + private boolean assertKeyedLockHeldByCurrentThread(BytesRef uid) { + assert keyedLock.isHeldByCurrentThread(uid) : "Thread [" + Thread.currentThread().getName() + "], uid [" + uid.utf8ToString() + "]"; + return true; + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/VersionValue.java b/server/src/main/java/org/elasticsearch/index/engine/VersionValue.java index d63306486732e..567a7964186ad 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/VersionValue.java +++ b/server/src/main/java/org/elasticsearch/index/engine/VersionValue.java @@ -27,7 +27,7 @@ import java.util.Collection; import java.util.Collections; -class VersionValue implements Accountable { +abstract class VersionValue implements Accountable { private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(VersionValue.class); diff --git a/server/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java b/server/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java index ce3ddff00dade..6025d5f917463 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java @@ -25,6 +25,7 @@ import org.apache.lucene.util.RamUsageTester; import org.apache.lucene.util.TestUtil; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.index.translog.Translog; import org.elasticsearch.test.ESTestCase; import java.io.IOException; @@ -47,9 +48,8 @@ public void testRamBytesUsed() throws Exception { for (int i = 0; i < 100000; ++i) { BytesRefBuilder uid = new BytesRefBuilder(); uid.copyChars(TestUtil.randomSimpleString(random(), 10, 20)); - VersionValue version = new VersionValue(randomLong(), randomLong(), randomLong()); try (Releasable r = map.acquireLock(uid.toBytesRef())) { - map.putUnderLock(uid.toBytesRef(), version); + map.putIndexUnderLock(uid.toBytesRef(), randomIndexVersionValue()); } } long actualRamBytesUsed = RamUsageTester.sizeOf(map); @@ -64,9 +64,8 @@ public void testRamBytesUsed() throws Exception { for (int i = 0; i < 100000; ++i) { BytesRefBuilder uid = new BytesRefBuilder(); uid.copyChars(TestUtil.randomSimpleString(random(), 10, 20)); - VersionValue version = new VersionValue(randomLong(), randomLong(), randomLong()); try (Releasable r = map.acquireLock(uid.toBytesRef())) { - map.putUnderLock(uid.toBytesRef(), version); + map.putIndexUnderLock(uid.toBytesRef(), randomIndexVersionValue()); } } actualRamBytesUsed = RamUsageTester.sizeOf(map); @@ -100,14 +99,15 @@ private BytesRef uid(String string) { public void testBasics() throws IOException { LiveVersionMap map = new LiveVersionMap(); try (Releasable r = map.acquireLock(uid("test"))) { - map.putUnderLock(uid("test"), new VersionValue(1,1,1)); - assertEquals(new VersionValue(1,1,1), map.getUnderLock(uid("test"))); + Translog.Location tlogLoc = randomTranslogLocation(); + map.putIndexUnderLock(uid("test"), new IndexVersionValue(tlogLoc, 1, 1, 1)); + assertEquals(new IndexVersionValue(tlogLoc, 1, 1, 1), map.getUnderLock(uid("test"))); map.beforeRefresh(); - assertEquals(new VersionValue(1,1,1), map.getUnderLock(uid("test"))); + assertEquals(new IndexVersionValue(tlogLoc, 1, 1, 1), map.getUnderLock(uid("test"))); map.afterRefresh(randomBoolean()); assertNull(map.getUnderLock(uid("test"))); - map.putUnderLock(uid("test"), new DeleteVersionValue(1,1,1,1)); + map.putDeleteUnderLock(uid("test"), new DeleteVersionValue(1,1,1,1)); assertEquals(new DeleteVersionValue(1,1,1,1), map.getUnderLock(uid("test"))); map.beforeRefresh(); assertEquals(new DeleteVersionValue(1,1,1,1), map.getUnderLock(uid("test"))); @@ -154,7 +154,8 @@ public void testConcurrently() throws IOException, InterruptedException { BytesRef bytesRef = randomFrom(random(), keyList); try (Releasable r = map.acquireLock(bytesRef)) { VersionValue versionValue = values.computeIfAbsent(bytesRef, - v -> new VersionValue(randomLong(), maxSeqNo.incrementAndGet(), randomLong())); + v -> new IndexVersionValue( + randomTranslogLocation(), randomLong(), maxSeqNo.incrementAndGet(), randomLong())); boolean isDelete = versionValue instanceof DeleteVersionValue; if (isDelete) { map.removeTombstoneUnderLock(bytesRef); @@ -165,10 +166,11 @@ public void testConcurrently() throws IOException, InterruptedException { versionValue.term, clock.getAndIncrement()); deletes.put(bytesRef, (DeleteVersionValue) versionValue); } else { - versionValue = new VersionValue(versionValue.version + 1, maxSeqNo.incrementAndGet(), versionValue.term); + versionValue = new IndexVersionValue(randomTranslogLocation(), versionValue.version + 1, + maxSeqNo.incrementAndGet(), versionValue.term); } values.put(bytesRef, versionValue); - map.putUnderLock(bytesRef, versionValue); + putUnderLock(map, bytesRef, versionValue); } if (rarely()) { final long pruneSeqNo = randomLongBetween(0, maxSeqNo.get()); @@ -268,7 +270,7 @@ public void testCarryOnSafeAccess() throws IOException { } try (Releasable r = map.acquireLock(uid(""))) { - map.maybePutUnderLock(new BytesRef(""), new VersionValue(randomLong(), randomLong(), randomLong())); + map.maybePutIndexUnderLock(new BytesRef(""), randomIndexVersionValue()); } assertFalse(map.isUnsafe()); assertEquals(1, map.getAllCurrent().size()); @@ -278,7 +280,7 @@ public void testCarryOnSafeAccess() throws IOException { assertFalse(map.isUnsafe()); assertFalse(map.isSafeAccessRequired()); try (Releasable r = map.acquireLock(uid(""))) { - map.maybePutUnderLock(new BytesRef(""), new VersionValue(randomLong(), randomLong(), randomLong())); + map.maybePutIndexUnderLock(new BytesRef(""), randomIndexVersionValue()); } assertTrue(map.isUnsafe()); assertFalse(map.isSafeAccessRequired()); @@ -288,7 +290,7 @@ public void testCarryOnSafeAccess() throws IOException { public void testRefreshTransition() throws IOException { LiveVersionMap map = new LiveVersionMap(); try (Releasable r = map.acquireLock(uid("1"))) { - map.maybePutUnderLock(uid("1"), new VersionValue(randomLong(), randomLong(), randomLong())); + map.maybePutIndexUnderLock(uid("1"), randomIndexVersionValue()); assertTrue(map.isUnsafe()); assertNull(map.getUnderLock(uid("1"))); map.beforeRefresh(); @@ -299,7 +301,7 @@ public void testRefreshTransition() throws IOException { assertFalse(map.isUnsafe()); map.enforceSafeAccess(); - map.maybePutUnderLock(uid("1"), new VersionValue(randomLong(), randomLong(), randomLong())); + map.maybePutIndexUnderLock(uid("1"), randomIndexVersionValue()); assertFalse(map.isUnsafe()); assertNotNull(map.getUnderLock(uid("1"))); map.beforeRefresh(); @@ -320,9 +322,9 @@ public void testAddAndDeleteRefreshConcurrently() throws IOException, Interrupte AtomicLong version = new AtomicLong(); CountDownLatch start = new CountDownLatch(2); BytesRef uid = uid("1"); - VersionValue initialVersion = new VersionValue(version.incrementAndGet(), 1, 1); + VersionValue initialVersion = new IndexVersionValue(randomTranslogLocation(), version.incrementAndGet(), 1, 1); try (Releasable ignore = map.acquireLock(uid)) { - map.putUnderLock(uid, initialVersion); + putUnderLock(map, uid, initialVersion); } Thread t = new Thread(() -> { start.countDown(); @@ -338,13 +340,13 @@ public void testAddAndDeleteRefreshConcurrently() throws IOException, Interrupte underLock = nextVersionValue; } if (underLock.isDelete()) { - nextVersionValue = new VersionValue(version.incrementAndGet(), 1, 1); + nextVersionValue = new IndexVersionValue(randomTranslogLocation(), version.incrementAndGet(), 1, 1); } else if (randomBoolean()) { - nextVersionValue = new VersionValue(version.incrementAndGet(), 1, 1); + nextVersionValue = new IndexVersionValue(randomTranslogLocation(), version.incrementAndGet(), 1, 1); } else { nextVersionValue = new DeleteVersionValue(version.incrementAndGet(), 1, 1, 0); } - map.putUnderLock(uid, nextVersionValue); + putUnderLock(map, uid, nextVersionValue); } } } catch (Exception e) { @@ -375,7 +377,7 @@ public void testPruneTombstonesWhileLocked() throws InterruptedException, IOExce BytesRef uid = uid("1"); ; try (Releasable ignore = map.acquireLock(uid)) { - map.putUnderLock(uid, new DeleteVersionValue(0, 0, 0, 0)); + map.putDeleteUnderLock(uid, new DeleteVersionValue(0, 0, 0, 0)); map.beforeRefresh(); // refresh otherwise we won't prune since it's tracked by the current map map.afterRefresh(false); Thread thread = new Thread(() -> { @@ -392,4 +394,24 @@ public void testPruneTombstonesWhileLocked() throws InterruptedException, IOExce thread.join(); assertEquals(0, map.getAllTombstones().size()); } + + void putUnderLock(LiveVersionMap maps, BytesRef uid, VersionValue version) { + if (version instanceof IndexVersionValue) { + maps.putIndexUnderLock(uid, (IndexVersionValue) version); + } else { + maps.putDeleteUnderLock(uid, (DeleteVersionValue) version); + } + } + + IndexVersionValue randomIndexVersionValue() { + return new IndexVersionValue(randomTranslogLocation(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); + } + + Translog.Location randomTranslogLocation() { + if (randomBoolean()) { + return null; + } else { + return new Translog.Location(randomNonNegativeLong(), randomNonNegativeLong(), randomInt()); + } + } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/VersionValueTests.java b/server/src/test/java/org/elasticsearch/index/engine/VersionValueTests.java index 3b953edece1b4..242a568295dd6 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/VersionValueTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/VersionValueTests.java @@ -20,12 +20,17 @@ package org.elasticsearch.index.engine; import org.apache.lucene.util.RamUsageTester; +import org.elasticsearch.index.translog.Translog; import org.elasticsearch.test.ESTestCase; public class VersionValueTests extends ESTestCase { - public void testRamBytesUsed() { - VersionValue versionValue = new VersionValue(randomLong(), randomLong(), randomLong()); + public void testIndexRamBytesUsed() { + Translog.Location translogLoc = null; + if (randomBoolean()) { + translogLoc = new Translog.Location(randomNonNegativeLong(), randomNonNegativeLong(), randomInt()); + } + IndexVersionValue versionValue = new IndexVersionValue(translogLoc, randomLong(), randomLong(), randomLong()); assertEquals(RamUsageTester.sizeOf(versionValue), versionValue.ramBytesUsed()); } From bec5ac39d6bf794b8f7a3bf52cba115426742abd Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 19 Apr 2018 10:18:07 -0400 Subject: [PATCH 6/8] inline getVersionValue method --- .../org/elasticsearch/index/engine/InternalEngine.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index c9a052ac86bee..ab13639ce2fd2 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -785,8 +785,9 @@ public IndexResult index(Index index) throws IOException { indexResult.setTranslogLocation(location); } if (plan.indexIntoLucene && indexResult.hasFailure() == false) { + final Translog.Location translogLocation = trackTranslogLocation.get() ? indexResult.getTranslogLocation() : null; versionMap.maybePutIndexUnderLock(index.uid().bytes(), - getVersionValue(plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm(), indexResult.getTranslogLocation())); + new IndexVersionValue(translogLocation, plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm())); } if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { localCheckpointTracker.markSeqNoAsCompleted(indexResult.getSeqNo()); @@ -937,10 +938,6 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) } } - private IndexVersionValue getVersionValue(long version, long seqNo, long term, Translog.Location location) { - return new IndexVersionValue(trackTranslogLocation.get() ? location : null, version, seqNo, term); - } - /** * returns true if the indexing operation may have already be processed by this engine. * Note that it is OK to rarely return true even if this is not the case. However a `false` From e2b9f9694bfeab3f876f1f6015fab7efb09f8737 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 19 Apr 2018 10:36:31 -0400 Subject: [PATCH 7/8] Inline another test util method --- .../index/engine/LiveVersionMapTests.java | 31 +++++++------------ 1 file changed, 12 insertions(+), 19 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java b/server/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java index 6025d5f917463..e0efcf9f0f73f 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java @@ -162,15 +162,16 @@ public void testConcurrently() throws IOException, InterruptedException { deletes.remove(bytesRef); } if (isDelete == false && rarely()) { - versionValue = new DeleteVersionValue(versionValue.version + 1, maxSeqNo.incrementAndGet(), - versionValue.term, clock.getAndIncrement()); + versionValue = new DeleteVersionValue(versionValue.version + 1, + maxSeqNo.incrementAndGet(), versionValue.term, clock.getAndIncrement()); deletes.put(bytesRef, (DeleteVersionValue) versionValue); + map.putDeleteUnderLock(bytesRef, (DeleteVersionValue) versionValue); } else { - versionValue = new IndexVersionValue(randomTranslogLocation(), versionValue.version + 1, - maxSeqNo.incrementAndGet(), versionValue.term); + versionValue = new IndexVersionValue(randomTranslogLocation(), + versionValue.version + 1, maxSeqNo.incrementAndGet(), versionValue.term); + map.putIndexUnderLock(bytesRef, (IndexVersionValue) versionValue); } values.put(bytesRef, versionValue); - putUnderLock(map, bytesRef, versionValue); } if (rarely()) { final long pruneSeqNo = randomLongBetween(0, maxSeqNo.get()); @@ -322,9 +323,10 @@ public void testAddAndDeleteRefreshConcurrently() throws IOException, Interrupte AtomicLong version = new AtomicLong(); CountDownLatch start = new CountDownLatch(2); BytesRef uid = uid("1"); - VersionValue initialVersion = new IndexVersionValue(randomTranslogLocation(), version.incrementAndGet(), 1, 1); + VersionValue initialVersion; try (Releasable ignore = map.acquireLock(uid)) { - putUnderLock(map, uid, initialVersion); + initialVersion = new IndexVersionValue(randomTranslogLocation(), version.incrementAndGet(), 1, 1); + map.putIndexUnderLock(uid, (IndexVersionValue) initialVersion); } Thread t = new Thread(() -> { start.countDown(); @@ -339,14 +341,13 @@ public void testAddAndDeleteRefreshConcurrently() throws IOException, Interrupte } else { underLock = nextVersionValue; } - if (underLock.isDelete()) { - nextVersionValue = new IndexVersionValue(randomTranslogLocation(), version.incrementAndGet(), 1, 1); - } else if (randomBoolean()) { + if (underLock.isDelete() || randomBoolean()) { nextVersionValue = new IndexVersionValue(randomTranslogLocation(), version.incrementAndGet(), 1, 1); + map.putIndexUnderLock(uid, (IndexVersionValue) nextVersionValue); } else { nextVersionValue = new DeleteVersionValue(version.incrementAndGet(), 1, 1, 0); + map.putDeleteUnderLock(uid, (DeleteVersionValue) nextVersionValue); } - putUnderLock(map, uid, nextVersionValue); } } } catch (Exception e) { @@ -395,14 +396,6 @@ public void testPruneTombstonesWhileLocked() throws InterruptedException, IOExce assertEquals(0, map.getAllTombstones().size()); } - void putUnderLock(LiveVersionMap maps, BytesRef uid, VersionValue version) { - if (version instanceof IndexVersionValue) { - maps.putIndexUnderLock(uid, (IndexVersionValue) version); - } else { - maps.putDeleteUnderLock(uid, (DeleteVersionValue) version); - } - } - IndexVersionValue randomIndexVersionValue() { return new IndexVersionValue(randomTranslogLocation(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); } From 516156a88c09bef019b5aafc1c6bb2cfc68526b7 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 19 Apr 2018 11:07:08 -0400 Subject: [PATCH 8/8] Do no remove tombstone when safe-access not required --- .../index/engine/LiveVersionMap.java | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java b/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java index 797f944195d6b..6d9dc4a38974c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java @@ -317,14 +317,17 @@ void maybePutIndexUnderLock(BytesRef uid, IndexVersionValue version) { } } - // For testing void putIndexUnderLock(BytesRef uid, IndexVersionValue version) { assert assertKeyedLockHeldByCurrentThread(uid); - putIndexUnderLock(uid, version, maps); + assert uid.bytes.length == uid.length : "Oversized _uid! UID length: " + uid.length + ", bytes length: " + uid.bytes.length; + maps.put(uid, version); + removeTombstoneUnderLock(uid); } private boolean putAssertionMap(BytesRef uid, IndexVersionValue version) { - putIndexUnderLock(uid, version, unsafeKeysMap); + assert assertKeyedLockHeldByCurrentThread(uid); + assert uid.bytes.length == uid.length : "Oversized _uid! UID length: " + uid.length + ", bytes length: " + uid.bytes.length; + unsafeKeysMap.put(uid, version); return true; } @@ -335,13 +338,6 @@ void putDeleteUnderLock(BytesRef uid, DeleteVersionValue version) { maps.remove(uid, version); } - private void putIndexUnderLock(BytesRef uid, IndexVersionValue version, Maps maps) { - assert assertKeyedLockHeldByCurrentThread(uid); - assert uid.bytes.length == uid.length : "Oversized _uid! UID length: " + uid.length + ", bytes length: " + uid.bytes.length; - maps.put(uid, version); - removeTombstoneUnderLock(uid); - } - private void putTombstone(BytesRef uid, DeleteVersionValue version) { long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length; // Also enroll the delete into tombstones, and account for its RAM too: