From fe6733791b37c5af4ca6ded83e2ebd0ae5576c19 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 3 Feb 2020 16:53:11 -0500 Subject: [PATCH 1/9] Use local checkpoint to calculate min translog gen for recovery --- .../index/engine/CombinedDeletionPolicy.java | 14 +- .../index/engine/InternalEngine.java | 52 +-- .../index/engine/NoOpEngine.java | 34 +- .../index/engine/ReadOnlyEngine.java | 5 +- .../org/elasticsearch/index/store/Store.java | 5 +- .../index/translog/Translog.java | 91 ++--- .../translog/TranslogDeletionPolicy.java | 60 ++-- .../translog/TruncateTranslogAction.java | 20 +- .../indices/stats/IndicesStatsTests.java | 1 - .../index/IndexServiceTests.java | 18 +- .../engine/CombinedDeletionPolicyTests.java | 45 +-- .../index/engine/InternalEngineTests.java | 65 ++-- .../index/engine/NoOpEngineTests.java | 39 +-- .../index/shard/IndexShardIT.java | 4 +- .../index/shard/IndexShardTests.java | 15 +- .../elasticsearch/index/store/StoreTests.java | 4 - .../index/translog/TestTranslog.java | 19 +- .../translog/TranslogDeletionPolicyTests.java | 21 +- .../index/translog/TranslogTests.java | 316 ++++++++---------- .../PeerRecoveryTargetServiceTests.java | 9 +- .../indices/recovery/RecoveryTests.java | 4 - 21 files changed, 324 insertions(+), 517 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 70507fd18e7a3..d8b524d5ff7bb 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -121,16 +121,12 @@ private void updateRetentionPolicy() throws IOException { assert Thread.holdsLock(this); logger.debug("Safe commit [{}], last commit [{}]", commitDescription(safeCommit), commitDescription(lastCommit)); assert safeCommit.isDeleted() == false : "The safe commit must not be deleted"; - final long minRequiredGen = Long.parseLong(safeCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); assert lastCommit.isDeleted() == false : "The last commit must not be deleted"; - final long lastGen = Long.parseLong(lastCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); - - assert minRequiredGen <= lastGen : "minRequiredGen must not be greater than lastGen"; - translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastGen); - translogDeletionPolicy.setMinTranslogGenerationForRecovery(minRequiredGen); - - softDeletesPolicy.setLocalCheckpointOfSafeCommit( - Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY))); + final long localCheckpointOfSafeCommit = Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + final long localCheckpointOfLastCommit = Long.parseLong(lastCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + softDeletesPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit); + translogDeletionPolicy.setLocalCheckpointOfLastCommit(localCheckpointOfLastCommit); + translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit); } protected int getDocCountOfCommit(IndexCommit indexCommit) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index c73ecbc88144a..af066651322dc 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -385,7 +385,7 @@ public int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecove try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint(); - try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(localCheckpoint + 1)) { + try (Translog.Snapshot snapshot = getTranslog().newSnapshot(localCheckpoint + 1, Long.MAX_VALUE)) { return translogRecoveryRunner.run(this, snapshot); } } @@ -458,23 +458,22 @@ public void skipTranslogRecovery() { } private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException { - Translog.TranslogGeneration translogGeneration = translog.getGeneration(); final int opsRecovered; - final long translogFileGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); - try (Translog.Snapshot snapshot = translog.newSnapshotFromGen( - new Translog.TranslogGeneration(translog.getTranslogUUID(), translogFileGen), recoverUpToSeqNo)) { - opsRecovered = translogRecoveryRunner.run(this, snapshot); - } catch (Exception e) { - throw new EngineException(shardId, "failed to recover from translog", e); + final long localCheckpoint = getProcessedLocalCheckpoint(); + if (localCheckpoint < recoverUpToSeqNo) { + try (Translog.Snapshot snapshot = translog.newSnapshot(localCheckpoint + 1, recoverUpToSeqNo)) { + opsRecovered = translogRecoveryRunner.run(this, snapshot); + } catch (Exception e) { + throw new EngineException(shardId, "failed to recover from translog", e); + } + } else { + opsRecovered = 0; } // flush if we recovered something or if we have references to older translogs // note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length. assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be"; pendingTranslogRecovery.set(false); // we are good - now we can commit if (opsRecovered > 0) { - logger.trace("flushing post recovery from translog. ops recovered [{}]. committed translog id [{}]. current id [{}]", - opsRecovered, translogGeneration == null ? null : - translogGeneration.translogFileGeneration, translog.currentFileGeneration()); commitIndexWriter(indexWriter, translog); refreshLastCommittedSegmentInfos(); refresh("translog_recovery"); @@ -486,7 +485,8 @@ private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy LongSupplier globalCheckpointSupplier, LongConsumer persistedSequenceNumberConsumer) throws IOException { final TranslogConfig translogConfig = engineConfig.getTranslogConfig(); - final String translogUUID = loadTranslogUUIDFromLastCommit(); + final Map userData = store.readLastCommittedSegmentsInfo().getUserData(); + final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY)); // We expect that this shard already exists, so it must already have an existing translog else something is badly wrong! return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier, engineConfig.getPrimaryTermSupplier(), persistedSequenceNumberConsumer); @@ -551,18 +551,6 @@ public long getWritingBytes() { return indexWriter.getFlushingBytes() + versionMap.getRefreshingBytes(); } - /** - * Reads the current stored translog ID from the last commit data. - */ - @Nullable - private String loadTranslogUUIDFromLastCommit() throws IOException { - final Map commitUserData = store.readLastCommittedSegmentsInfo().getUserData(); - if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) { - throw new IllegalStateException("commit doesn't contain translog generation id"); - } - return commitUserData.get(Translog.TRANSLOG_UUID_KEY); - } - /** * Reads the current stored history ID from the IW commit data. */ @@ -1588,8 +1576,10 @@ public boolean shouldPeriodicallyFlush() { if (shouldPeriodicallyFlushAfterBigMerge.get()) { return true; } + final long localCheckpointOfLastCommit = + Long.parseLong(lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); final long translogGenerationOfLastCommit = - Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY)); + translog.getMinGenerationForSeqNo(localCheckpointOfLastCommit + 1).translogFileGeneration; final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes(); if (translog.sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) { return false; @@ -2281,11 +2271,6 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl ensureCanFlush(); try { final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint(); - final Translog.TranslogGeneration translogGeneration = translog.getMinGenerationForSeqNo(localCheckpoint + 1); - final String translogFileGeneration = Long.toString(translogGeneration.translogFileGeneration); - final String translogUUID = translogGeneration.translogUUID; - final String localCheckpointValue = Long.toString(localCheckpoint); - writer.setLiveCommitData(() -> { /* * The user data captured above (e.g. local checkpoint) contains data that must be evaluated *before* Lucene flushes @@ -2296,10 +2281,9 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl * {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time * of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene). */ - final Map commitData = new HashMap<>(7); - commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGeneration); - commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID); - commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpointValue); + final Map commitData = new HashMap<>(6); + commitData.put(Translog.TRANSLOG_UUID_KEY, translog.getTranslogUUID()); + commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint)); commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo())); commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get())); commitData.put(HISTORY_UUID_KEY, historyUUID); diff --git a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java index 67163f53516e0..e17d274144616 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java @@ -28,6 +28,7 @@ import org.apache.lucene.store.Directory; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.util.concurrent.ReleasableLock; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; @@ -137,31 +138,26 @@ public void trimUnreferencedTranslogFiles() { try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); final List commits = DirectoryReader.listCommits(store.directory()); - if (commits.size() == 1) { + if (commits.size() == 1 && + translogStats.getUncommittedOperations() == 0 && + translogStats.getTranslogSizeInBytes() > translogStats.getUncommittedSizeInBytes()) { final Map commitUserData = getLastCommittedSegmentInfos().getUserData(); final String translogUuid = commitUserData.get(Translog.TRANSLOG_UUID_KEY); if (translogUuid == null) { throw new IllegalStateException("commit doesn't contain translog unique id"); } - if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) { - throw new IllegalStateException("commit doesn't contain translog generation id"); - } - final long lastCommitGeneration = Long.parseLong(commitUserData.get(Translog.TRANSLOG_GENERATION_KEY)); final TranslogConfig translogConfig = engineConfig.getTranslogConfig(); - final long minTranslogGeneration = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUuid); - - if (minTranslogGeneration < lastCommitGeneration) { - // a translog deletion policy that retains nothing but the last translog generation from safe commit - final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(); - translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastCommitGeneration); - translogDeletionPolicy.setMinTranslogGenerationForRecovery(lastCommitGeneration); - - try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy, - engineConfig.getGlobalCheckpointSupplier(), engineConfig.getPrimaryTermSupplier(), seqNo -> {})) { - translog.trimUnreferencedReaders(); - // refresh the translog stats - this.translogStats = translog.stats(); - } + final long localCheckpoint = Long.parseLong(commitUserData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(); + translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint); + translogDeletionPolicy.setLocalCheckpointOfLastCommit(localCheckpoint); + try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy, + engineConfig.getGlobalCheckpointSupplier(), engineConfig.getPrimaryTermSupplier(), seqNo -> {})) { + translog.trimUnreferencedReaders(); + // refresh the translog stats + this.translogStats = translog.stats(); + assert translog.currentFileGeneration() == translog.getMinFileGeneration() : "translog was not trimmed " + + " current gen " + translog.currentFileGeneration() + " != min gen " + translog.getMinFileGeneration(); } } } catch (final Exception e) { diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index c943137c6d4dd..ac5b7826e7082 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -220,11 +220,10 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm if (translogUuid == null) { throw new IllegalStateException("commit doesn't contain translog unique id"); } - final long translogGenOfLastCommit = Long.parseLong(infos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); final TranslogConfig translogConfig = config.getTranslogConfig(); final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(); - translogDeletionPolicy.setTranslogGenerationOfLastCommit(translogGenOfLastCommit); - + final long localCheckpoint = Long.parseLong(infos.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + translogDeletionPolicy.setLocalCheckpointOfLastCommit(localCheckpoint); try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy, config.getGlobalCheckpointSupplier(), config.getPrimaryTermSupplier(), seqNo -> {}) ) { diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index 88738fbd04e09..37876e3b13617 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -1449,10 +1449,7 @@ public void associateIndexWithNewTranslog(final String translogUUID) throws IOEx if (translogUUID.equals(getUserData(writer).get(Translog.TRANSLOG_UUID_KEY))) { throw new IllegalArgumentException("a new translog uuid can't be equal to existing one. got [" + translogUUID + "]"); } - final Map map = new HashMap<>(); - map.put(Translog.TRANSLOG_GENERATION_KEY, "1"); - map.put(Translog.TRANSLOG_UUID_KEY, translogUUID); - updateCommitData(writer, map); + updateCommitData(writer, Map.of(Translog.TRANSLOG_UUID_KEY, translogUUID)); } finally { metadataLock.writeLock().unlock(); } diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 549caf9f8cb0a..eb8aeed905dc0 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -74,9 +74,7 @@ /** * A Translog is a per index shard component that records all non-committed index operations in a durable manner. - * In Elasticsearch there is one Translog instance per {@link org.elasticsearch.index.engine.InternalEngine}. The engine - * records the current translog generation {@link Translog#getGeneration()} in it's commit metadata using {@link #TRANSLOG_GENERATION_KEY} - * to reference the generation that contains all operations that have not yet successfully been committed to the engines lucene index. + * In Elasticsearch there is one Translog instance per {@link org.elasticsearch.index.engine.InternalEngine}. * Additionally, since Elasticsearch 2.0 the engine also records a {@link #TRANSLOG_UUID_KEY} with each commit to ensure a strong * association between the lucene index an the transaction log file. This UUID is used to prevent accidental recovery from a transaction * log that belongs to a @@ -107,7 +105,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC * - we need to page align the last write before we sync, we can take advantage of ensureSynced for this since we might have already * fsynced far enough */ - public static final String TRANSLOG_GENERATION_KEY = "translog_generation"; public static final String TRANSLOG_UUID_KEY = "translog_uuid"; public static final String TRANSLOG_FILE_PREFIX = "translog-"; public static final String TRANSLOG_FILE_SUFFIX = ".tlog"; @@ -602,33 +599,28 @@ final Checkpoint getLastSyncedCheckpoint() { } } - /** - * Snapshots the current transaction log allowing to safely iterate over the snapshot. - * Snapshots are fixed in time and will not be updated with future operations. - */ + // for testing public Snapshot newSnapshot() throws IOException { - try (ReleasableLock ignored = readLock.acquire()) { - return newSnapshotFromGen(new TranslogGeneration(translogUUID, getMinFileGeneration()), Long.MAX_VALUE); - } + return newSnapshot(0, Long.MAX_VALUE); } - public Snapshot newSnapshotFromGen(TranslogGeneration fromGeneration, long upToSeqNo) throws IOException { + /** + * Creates a new translog snapshot containing operations from the given range. + * + * @param fromSeqNo the lower bound of the range (inclusive) + * @param toSeqNo the upper bound of the range (inclusive) + * @return the new snapshot + */ + public Snapshot newSnapshot(long fromSeqNo, long toSeqNo) throws IOException { + assert fromSeqNo <= toSeqNo : fromSeqNo + " > " + toSeqNo; + assert fromSeqNo >= 0 : "from_seq_no must be non-negative " + fromSeqNo; try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); - final long fromFileGen = fromGeneration.translogFileGeneration; - if (fromFileGen < getMinFileGeneration()) { - throw new IllegalArgumentException("requested snapshot generation [" + fromFileGen + "] is not available. " + - "Min referenced generation is [" + getMinFileGeneration() + "]"); - } TranslogSnapshot[] snapshots = Stream.concat(readers.stream(), Stream.of(current)) - .filter(reader -> reader.getGeneration() >= fromFileGen && reader.getCheckpoint().minSeqNo <= upToSeqNo) + .filter(reader -> reader.getCheckpoint().minSeqNo <= toSeqNo && fromSeqNo <= reader.getCheckpoint().maxEffectiveSeqNo()) .map(BaseTranslogReader::newSnapshot).toArray(TranslogSnapshot[]::new); final Snapshot snapshot = newMultiSnapshot(snapshots); - if (upToSeqNo == Long.MAX_VALUE) { - return snapshot; - } else { - return new SeqNoFilterSnapshot(snapshot, Long.MIN_VALUE, upToSeqNo); - } + return new SeqNoFilterSnapshot(snapshot, fromSeqNo, toSeqNo); } } @@ -662,15 +654,6 @@ public Operation readOperation(Location location) throws IOException { return null; } - public Snapshot newSnapshotFromMinSeqNo(long minSeqNo) throws IOException { - try (ReleasableLock ignored = readLock.acquire()) { - ensureOpen(); - TranslogSnapshot[] snapshots = readersAboveMinSeqNo(minSeqNo).map(BaseTranslogReader::newSnapshot) - .toArray(TranslogSnapshot[]::new); - return newMultiSnapshot(snapshots); - } - } - private Snapshot newMultiSnapshot(TranslogSnapshot[] snapshots) throws IOException { final Closeable onClose; if (snapshots.length == 0) { @@ -860,7 +843,7 @@ protected void closeOnTragicEvent(final Exception ex) { public TranslogStats stats() { // acquire lock to make the two numbers roughly consistent (no file change half way) try (ReleasableLock lock = readLock.acquire()) { - final long uncommittedGen = deletionPolicy.getTranslogGenerationOfLastCommit(); + final long uncommittedGen = minGenerationForSeqNo(deletionPolicy.getLocalCheckpointOfLastCommit() + 1, current, readers); return new TranslogStats(totalOperations(), sizeInBytes(), totalOperationsByMinGen(uncommittedGen), sizeInBytesByMinGen(uncommittedGen), earliestLastModifiedAge()); } @@ -961,7 +944,7 @@ default int skippedOperations() { * shares the same underlying resources with the {@code delegate} snapshot, therefore we should not * use the {@code delegate} after passing it to this filtered snapshot. */ - static final class SeqNoFilterSnapshot implements Snapshot { + private static final class SeqNoFilterSnapshot implements Snapshot { private final Snapshot delegate; private int filteredOpsCount; private final long fromSeqNo; // inclusive @@ -1617,20 +1600,18 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl */ public TranslogGeneration getMinGenerationForSeqNo(final long seqNo) { try (ReleasableLock ignored = readLock.acquire()) { - /* - * When flushing, the engine will ask the translog for the minimum generation that could contain any sequence number after the - * local checkpoint. Immediately after flushing, there will be no such generation, so this minimum generation in this case will - * be the current translog generation as we do not need any prior generations to have a complete history up to the current local - * checkpoint. - */ - long minTranslogFileGeneration = this.currentFileGeneration(); - for (final TranslogReader reader : readers) { - if (seqNo <= reader.getCheckpoint().maxEffectiveSeqNo()) { - minTranslogFileGeneration = Math.min(minTranslogFileGeneration, reader.getGeneration()); - } + return new TranslogGeneration(translogUUID, minGenerationForSeqNo(seqNo, current, readers)); + } + } + + private static long minGenerationForSeqNo(long seqNo, TranslogWriter writer, List readers) { + long minGen = writer.generation; + for (final TranslogReader reader : readers) { + if (seqNo <= reader.getCheckpoint().maxEffectiveSeqNo()) { + minGen = Math.min(minGen, reader.getGeneration()); } - return new TranslogGeneration(translogUUID, minTranslogFileGeneration); } + return minGen; } /** @@ -1672,7 +1653,9 @@ public void trimUnreferencedReaders() throws IOException { // we're shutdown potentially on some tragic event, don't delete anything return; } - long minReferencedGen = deletionPolicy.minTranslogGenRequired(); + long minReferencedGen = Math.min( + deletionPolicy.getMinTranslogGenRequiredByLocks(), + minGenerationForSeqNo(deletionPolicy.getLocalCheckpointOfSafeCommit() + 1, current, readers)); assert minReferencedGen >= getMinFileGeneration() : "deletion policy requires a minReferenceGen of [" + minReferencedGen + "] but the lowest gen available is [" + getMinFileGeneration() + "]"; @@ -1803,20 +1786,6 @@ private static Checkpoint readCheckpoint(Path location, String expectedTranslogU return checkpoint; } - /** - * Returns the minimum translog generation retained by the translog at the given location. - * This ensures that the translogUUID from this translog matches with the provided translogUUID. - * - * @param location the location of the translog - * @return the minimum translog generation - * @throws IOException if an I/O exception occurred reading the checkpoint - * @throws TranslogCorruptedException if the translog is corrupted or mismatched with the given uuid - */ - public static long readMinTranslogGeneration(final Path location, final String expectedTranslogUUID) throws IOException { - final Checkpoint checkpoint = readCheckpoint(location, expectedTranslogUUID); - return checkpoint.minTranslogGeneration; - } - /** * Returns the translog uuid used to associate a lucene index with a translog. */ diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java index 24c3e75c4d381..c9b3cc8b1ba1d 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java @@ -22,6 +22,7 @@ import org.apache.lucene.util.Counter; import org.elasticsearch.Assertions; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.index.seqno.SequenceNumbers; import java.util.HashMap; import java.util.Map; @@ -45,17 +46,8 @@ public void assertNoOpenTranslogRefs() { * translog generation */ private final Map translogRefCounts = new HashMap<>(); - - /** - * the translog generation that is requires to properly recover from the oldest non deleted - * {@link org.apache.lucene.index.IndexCommit}. - */ - private long minTranslogGenerationForRecovery = 1; - - /** - * This translog generation is used to calculate the number of uncommitted operations since the last index commit. - */ - private long translogGenerationOfLastCommit = 1; + private long localCheckpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED; + private long localCheckpointOfLastCommit = SequenceNumbers.NO_OPS_PERFORMED; public TranslogDeletionPolicy() { @@ -66,23 +58,20 @@ public TranslogDeletionPolicy() { } } - public synchronized void setMinTranslogGenerationForRecovery(long newGen) { - if (newGen < minTranslogGenerationForRecovery || newGen > translogGenerationOfLastCommit) { - throw new IllegalArgumentException("Invalid minTranslogGenerationForRecovery can't go backwards; new [" + newGen + "]," + - "current [" + minTranslogGenerationForRecovery + "], lastGen [" + translogGenerationOfLastCommit + "]"); + public synchronized void setLocalCheckpointOfSafeCommit(long newCheckpoint) { + if (newCheckpoint < this.localCheckpointOfSafeCommit) { + throw new IllegalArgumentException("local checkpoint of the safe commit can't go backwards: " + + "current [" + this.localCheckpointOfSafeCommit + "] new [" + newCheckpoint + "]"); } - minTranslogGenerationForRecovery = newGen; + this.localCheckpointOfSafeCommit = newCheckpoint; } - /** - * Sets the translog generation of the last index commit. - */ - public synchronized void setTranslogGenerationOfLastCommit(long lastGen) { - if (lastGen < translogGenerationOfLastCommit || lastGen < minTranslogGenerationForRecovery) { - throw new IllegalArgumentException("Invalid translogGenerationOfLastCommit; new [" + lastGen + "]," + - "current [" + translogGenerationOfLastCommit + "], minRequiredGen [" + minTranslogGenerationForRecovery + "]"); + public synchronized void setLocalCheckpointOfLastCommit(long newCheckpoint) { + if (newCheckpoint < this.localCheckpointOfLastCommit) { + throw new IllegalArgumentException("local checkpoint of the last commit can't go backwards: " + + "current [" + this.localCheckpointOfLastCommit + "] new [" + newCheckpoint + "]"); } - translogGenerationOfLastCommit = lastGen; + this.localCheckpointOfLastCommit = newCheckpoint; } /** @@ -132,27 +121,24 @@ private synchronized void releaseTranslogGen(long translogGen) { } /** - * returns the minimum translog generation that is still required by the system. Any generation below - * the returned value may be safely deleted + * Returns the minimum translog generation that is still required by the locks (via {@link #acquireTranslogGen(long)}. */ - synchronized long minTranslogGenRequired() { - return Math.min(getMinTranslogGenRequiredByLocks(), minTranslogGenerationForRecovery); - } - - private long getMinTranslogGenRequiredByLocks() { + synchronized long getMinTranslogGenRequiredByLocks() { return translogRefCounts.keySet().stream().reduce(Math::min).orElse(Long.MAX_VALUE); } - /** returns the translog generation that will be used as a basis of a future store/peer recovery */ - public synchronized long getMinTranslogGenerationForRecovery() { - return minTranslogGenerationForRecovery; + /** + * Returns the local checkpoint of the safe commit. This value is used to calculate the min required generation for recovery. + */ + public synchronized long getLocalCheckpointOfSafeCommit() { + return localCheckpointOfSafeCommit; } /** - * Returns a translog generation that will be used to calculate the number of uncommitted operations since the last index commit. + * Returns the local checkpoint of the last commit. This value is used to calculate the translog stats. */ - public synchronized long getTranslogGenerationOfLastCommit() { - return translogGenerationOfLastCommit; + public synchronized long getLocalCheckpointOfLastCommit() { + return localCheckpointOfLastCommit; } synchronized long getTranslogRefCount(long gen) { diff --git a/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java b/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java index 76bb38876095d..3299b0da9b106 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java @@ -129,30 +129,25 @@ public void execute(Terminal terminal, ShardPath shardPath, Directory indexDirec // Retrieve the generation and UUID from the existing data commitData = commits.get(commits.size() - 1).getUserData(); - final String translogGeneration = commitData.get(Translog.TRANSLOG_GENERATION_KEY); final String translogUUID = commitData.get(Translog.TRANSLOG_UUID_KEY); - if (translogGeneration == null || translogUUID == null) { - throw new ElasticsearchException("shard must have a valid translog generation and UUID but got: [{}] and: [{}]", - translogGeneration, translogUUID); + if (translogUUID == null) { + throw new ElasticsearchException("shard must have a valid translog UUID"); } final long globalCheckpoint = commitData.containsKey(SequenceNumbers.MAX_SEQ_NO) ? Long.parseLong(commitData.get(SequenceNumbers.MAX_SEQ_NO)) : SequenceNumbers.UNASSIGNED_SEQ_NO; - terminal.println("Translog Generation: " + translogGeneration); terminal.println("Translog UUID : " + translogUUID); terminal.println("History UUID : " + historyUUID); Path tempEmptyCheckpoint = translogPath.resolve("temp-" + Translog.CHECKPOINT_FILE_NAME); Path realEmptyCheckpoint = translogPath.resolve(Translog.CHECKPOINT_FILE_NAME); - Path tempEmptyTranslog = translogPath.resolve("temp-" + Translog.TRANSLOG_FILE_PREFIX + - translogGeneration + Translog.TRANSLOG_FILE_SUFFIX); - Path realEmptyTranslog = translogPath.resolve(Translog.TRANSLOG_FILE_PREFIX + - translogGeneration + Translog.TRANSLOG_FILE_SUFFIX); + final long gen = 1; + Path tempEmptyTranslog = translogPath.resolve("temp-" + Translog.TRANSLOG_FILE_PREFIX + gen + Translog.TRANSLOG_FILE_SUFFIX); + Path realEmptyTranslog = translogPath.resolve(Translog.TRANSLOG_FILE_PREFIX + gen + Translog.TRANSLOG_FILE_SUFFIX); // Write empty checkpoint and translog to empty files - long gen = Long.parseLong(translogGeneration); int translogLen = writeEmptyTranslog(tempEmptyTranslog, translogUUID); writeEmptyCheckpoint(tempEmptyCheckpoint, translogLen, gen, globalCheckpoint); @@ -181,13 +176,10 @@ private boolean isTranslogClean(ShardPath shardPath, ClusterState clusterState, final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(); try (Translog translog = new Translog(translogConfig, translogUUID, translogDeletionPolicy, () -> translogGlobalCheckpoint, () -> primaryTerm, seqNo -> {}); - Translog.Snapshot snapshot = translog.newSnapshot()) { + Translog.Snapshot snapshot = translog.newSnapshot(0, Long.MAX_VALUE)) { //noinspection StatementWithEmptyBody we are just checking that we can iterate through the whole snapshot while (snapshot.next() != null) { } - // We open translog to check for corruption, do not clean anything. - translogDeletionPolicy.setTranslogGenerationOfLastCommit(translog.getMinFileGeneration()); - translogDeletionPolicy.setMinTranslogGenerationForRecovery(translog.getMinFileGeneration()); } return true; } catch (TranslogCorruptedException e) { diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java index bfb8fa0ea3705..8f11c2f07932a 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java @@ -124,7 +124,6 @@ public void testCommitStats() throws Exception { assertNotNull(commitStats); assertThat(commitStats.getGeneration(), greaterThan(0L)); assertThat(commitStats.getId(), notNullValue()); - assertThat(commitStats.getUserData(), hasKey(Translog.TRANSLOG_GENERATION_KEY)); assertThat(commitStats.getUserData(), hasKey(Translog.TRANSLOG_UUID_KEY)); } } diff --git a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java index 36a04a8c1020e..0bfe2c37def14 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java @@ -42,10 +42,8 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.nio.file.Path; import java.util.Collection; import java.util.Collections; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -386,8 +384,6 @@ public void testAsyncTranslogTrimTaskOnClosedIndex() throws Exception { .put(TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING.getKey(), "100ms") .build()); Translog translog = IndexShardTestCase.getTranslog(indexService.getShard(0)); - final Path translogPath = translog.getConfig().getTranslogPath(); - final String translogUuid = translog.getTranslogUUID(); int translogOps = 0; final int numDocs = scaledRandomIntBetween(10, 100); @@ -408,15 +404,9 @@ public void testAsyncTranslogTrimTaskOnClosedIndex() throws Exception { indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(indexService.index()); assertTrue(indexService.getTrimTranslogTask().mustReschedule()); - final long lastCommitedTranslogGeneration; - try (Engine.IndexCommitRef indexCommitRef = getEngine(indexService.getShard(0)).acquireLastIndexCommit(false)) { - Map lastCommittedUserData = indexCommitRef.getIndexCommit().getUserData(); - lastCommitedTranslogGeneration = Long.parseLong(lastCommittedUserData.get(Translog.TRANSLOG_GENERATION_KEY)); - } - assertBusy(() -> { - long minTranslogGen = Translog.readMinTranslogGeneration(translogPath, translogUuid); - assertThat(minTranslogGen, equalTo(lastCommitedTranslogGeneration)); - }); + final Engine readOnlyEngine = getEngine(indexService.getShard(0)); + assertBusy(() -> + assertThat(readOnlyEngine.getTranslogStats().getTranslogSizeInBytes(), equalTo((long) Translog.DEFAULT_HEADER_SIZE_IN_BYTES))); assertAcked(client().admin().indices().prepareOpen("test")); @@ -424,6 +414,8 @@ public void testAsyncTranslogTrimTaskOnClosedIndex() throws Exception { translog = IndexShardTestCase.getTranslog(indexService.getShard(0)); assertThat(translog.totalOperations(), equalTo(0)); assertThat(translog.stats().estimatedNumberOfOperations(), equalTo(0)); + assertThat(getEngine(indexService.getShard(0)).getTranslogStats().getTranslogSizeInBytes(), + equalTo((long) Translog.DEFAULT_HEADER_SIZE_IN_BYTES)); } public void testIllegalFsyncInterval() { diff --git a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index 50441feec7e1c..60f83128804bc 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -57,20 +57,16 @@ public void testKeepCommitsAfterGlobalCheckpoint() throws Exception { CombinedDeletionPolicy indexPolicy = newCombinedDeletionPolicy(translogPolicy, softDeletesPolicy, globalCheckpoint); final LongArrayList maxSeqNoList = new LongArrayList(); - final LongArrayList translogGenList = new LongArrayList(); final List commitList = new ArrayList<>(); int totalCommits = between(2, 20); long lastMaxSeqNo = 0; long lastCheckpoint = lastMaxSeqNo; - long lastTranslogGen = 0; final UUID translogUUID = UUID.randomUUID(); for (int i = 0; i < totalCommits; i++) { lastMaxSeqNo += between(1, 10000); lastCheckpoint = randomLongBetween(lastCheckpoint, lastMaxSeqNo); - lastTranslogGen += between(1, 100); - commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, translogUUID, lastTranslogGen)); + commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, translogUUID)); maxSeqNoList.add(lastMaxSeqNo); - translogGenList.add(lastTranslogGen); } int keptIndex = randomInt(commitList.size() - 1); @@ -87,8 +83,8 @@ public void testKeepCommitsAfterGlobalCheckpoint() throws Exception { verify(commitList.get(i), never()).delete(); } } - assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(translogGenList.get(keptIndex))); - assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen)); + assertThat(translogPolicy.getLocalCheckpointOfSafeCommit(), equalTo(getLocalCheckpoint(commitList.get(keptIndex)))); + assertThat(translogPolicy.getLocalCheckpointOfLastCommit(), equalTo(getLocalCheckpoint(commitList.get(commitList.size() - 1)))); assertThat(softDeletesPolicy.getMinRetainedSeqNo(), equalTo(Math.max(NO_OPS_PERFORMED, Math.min(getLocalCheckpoint(commitList.get(keptIndex)) + 1, globalCheckpoint.get() + 1 - extraRetainedOps)))); @@ -104,7 +100,6 @@ public void testAcquireIndexCommit() throws Exception { CombinedDeletionPolicy indexPolicy = newCombinedDeletionPolicy(translogPolicy, softDeletesPolicy, globalCheckpoint); long lastMaxSeqNo = between(1, 1000); long lastCheckpoint = randomLongBetween(-1, lastMaxSeqNo); - long lastTranslogGen = between(1, 20); int safeIndex = 0; List commitList = new ArrayList<>(); List snapshottingCommits = new ArrayList<>(); @@ -114,8 +109,7 @@ public void testAcquireIndexCommit() throws Exception { for (int n = 0; n < newCommits; n++) { lastMaxSeqNo += between(1, 1000); lastCheckpoint = randomLongBetween(lastCheckpoint, lastMaxSeqNo); - lastTranslogGen += between(1, 20); - commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, translogUUID, lastTranslogGen)); + commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, translogUUID)); } // Advance the global checkpoint to between [safeIndex, safeIndex + 1) safeIndex = randomIntBetween(safeIndex, commitList.size() - 1); @@ -154,10 +148,8 @@ public void testAcquireIndexCommit() throws Exception { // Snapshotting commits must not be deleted. snapshottingCommits.forEach(snapshot -> assertThat(snapshot.isDeleted(), equalTo(false))); // We don't need to retain translog for snapshotting commits. - assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), - equalTo(Long.parseLong(commitList.get(safeIndex).getUserData().get(Translog.TRANSLOG_GENERATION_KEY)))); - assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), - equalTo(Long.parseLong(commitList.get(commitList.size() - 1).getUserData().get(Translog.TRANSLOG_GENERATION_KEY)))); + assertThat(translogPolicy.getLocalCheckpointOfSafeCommit(), equalTo(getLocalCheckpoint(commitList.get(safeIndex)))); + assertThat(translogPolicy.getLocalCheckpointOfLastCommit(), equalTo(getLocalCheckpoint(commitList.get(commitList.size() - 1)))); assertThat(softDeletesPolicy.getMinRetainedSeqNo(), equalTo( Math.max(NO_OPS_PERFORMED, Math.min(getLocalCheckpoint(commitList.get(safeIndex)) + 1, globalCheckpoint.get() + 1 - extraRetainedOps)))); @@ -170,8 +162,8 @@ public void testAcquireIndexCommit() throws Exception { assertThat(commitList.get(i).isDeleted(), equalTo(true)); } assertThat(commitList.get(commitList.size() - 1).isDeleted(), equalTo(false)); - assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen)); - assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen)); + assertThat(translogPolicy.getLocalCheckpointOfSafeCommit(), equalTo(getLocalCheckpoint(commitList.get(commitList.size() - 1)))); + assertThat(translogPolicy.getLocalCheckpointOfLastCommit(), equalTo(getLocalCheckpoint(commitList.get(commitList.size() - 1)))); IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(commitList, globalCheckpoint.get()); assertThat(softDeletesPolicy.getMinRetainedSeqNo(), equalTo( Math.max(NO_OPS_PERFORMED, Math.min(getLocalCheckpoint(safeCommit) + 1, globalCheckpoint.get() + 1 - extraRetainedOps)))); @@ -187,19 +179,17 @@ public void testDeleteInvalidCommits() throws Exception { final List commitList = new ArrayList<>(); for (int i = 0; i < invalidCommits; i++) { long maxSeqNo = randomNonNegativeLong(); - commitList.add(mockIndexCommit(randomLongBetween(-1, maxSeqNo), maxSeqNo, UUID.randomUUID(), randomNonNegativeLong())); + commitList.add(mockIndexCommit(randomLongBetween(-1, maxSeqNo), maxSeqNo, UUID.randomUUID())); } final UUID expectedTranslogUUID = UUID.randomUUID(); - long lastTranslogGen = 0; final int validCommits = between(1, 10); long lastMaxSeqNo = between(1, 1000); long lastCheckpoint = randomLongBetween(-1, lastMaxSeqNo); for (int i = 0; i < validCommits; i++) { - lastTranslogGen += between(1, 1000); lastMaxSeqNo += between(1, 1000); lastCheckpoint = randomLongBetween(lastCheckpoint, lastMaxSeqNo); - commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, expectedTranslogUUID, lastTranslogGen)); + commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, expectedTranslogUUID)); } // We should never keep invalid commits regardless of the value of the global checkpoint. @@ -221,12 +211,10 @@ public void testCheckUnreferencedCommits() throws Exception { int totalCommits = between(2, 20); long lastMaxSeqNo = between(1, 1000); long lastCheckpoint = randomLongBetween(-1, lastMaxSeqNo); - long lastTranslogGen = between(1, 50); for (int i = 0; i < totalCommits; i++) { lastMaxSeqNo += between(1, 10000); - lastTranslogGen += between(1, 100); lastCheckpoint = randomLongBetween(lastCheckpoint, lastMaxSeqNo); - commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, translogUUID, lastTranslogGen)); + commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, translogUUID)); } int safeCommitIndex = randomIntBetween(0, commitList.size() - 1); globalCheckpoint.set(Long.parseLong(commitList.get(safeCommitIndex).getUserData().get(SequenceNumbers.MAX_SEQ_NO))); @@ -235,8 +223,8 @@ public void testCheckUnreferencedCommits() throws Exception { if (safeCommitIndex == commitList.size() - 1) { // Safe commit is the last commit - no need to clean up - assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen)); - assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen)); + assertThat(translogPolicy.getLocalCheckpointOfSafeCommit(), equalTo(getLocalCheckpoint(commitList.get(commitList.size() - 1)))); + assertThat(translogPolicy.getLocalCheckpointOfLastCommit(), equalTo(getLocalCheckpoint(commitList.get(commitList.size() - 1)))); assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false)); } else { // Advanced but not enough for any commit after the safe commit becomes safe @@ -253,8 +241,8 @@ public void testCheckUnreferencedCommits() throws Exception { commitList.forEach(this::resetDeletion); indexPolicy.onCommit(commitList); // Safe commit is the last commit - no need to clean up - assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen)); - assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen)); + assertThat(translogPolicy.getLocalCheckpointOfSafeCommit(), equalTo(getLocalCheckpoint(commitList.get(commitList.size() - 1)))); + assertThat(translogPolicy.getLocalCheckpointOfLastCommit(), equalTo(getLocalCheckpoint(commitList.get(commitList.size() - 1)))); assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false)); } } @@ -270,12 +258,11 @@ protected int getDocCountOfCommit(IndexCommit indexCommit) { }; } - IndexCommit mockIndexCommit(long localCheckpoint, long maxSeqNo, UUID translogUUID, long translogGen) throws IOException { + IndexCommit mockIndexCommit(long localCheckpoint, long maxSeqNo, UUID translogUUID) throws IOException { final Map userData = new HashMap<>(); userData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint)); userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo)); userData.put(Translog.TRANSLOG_UUID_KEY, translogUUID.toString()); - userData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGen)); final IndexCommit commit = mock(IndexCommit.class); final Directory directory = mock(Directory.class); when(commit.getUserData()).thenReturn(userData); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index f437e4d6664ca..ad6fdc28bf90a 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -496,7 +496,6 @@ public long getProcessedCheckpoint() { CommitStats stats1 = engine.commitStats(); assertThat(stats1.getGeneration(), greaterThan(0L)); assertThat(stats1.getId(), notNullValue()); - assertThat(stats1.getUserData(), hasKey(Translog.TRANSLOG_GENERATION_KEY)); assertThat(stats1.getUserData(), hasKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); assertThat( Long.parseLong(stats1.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), @@ -520,11 +519,7 @@ public long getProcessedCheckpoint() { assertThat(stats2.getGeneration(), greaterThan(stats1.getGeneration())); assertThat(stats2.getId(), notNullValue()); assertThat(stats2.getId(), not(equalTo(stats1.getId()))); - assertThat(stats2.getUserData(), hasKey(Translog.TRANSLOG_GENERATION_KEY)); assertThat(stats2.getUserData(), hasKey(Translog.TRANSLOG_UUID_KEY)); - assertThat( - stats2.getUserData().get(Translog.TRANSLOG_GENERATION_KEY), - not(equalTo(stats1.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)))); assertThat(stats2.getUserData().get(Translog.TRANSLOG_UUID_KEY), equalTo(stats1.getUserData().get(Translog.TRANSLOG_UUID_KEY))); assertThat(Long.parseLong(stats2.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), equalTo(localCheckpoint.get())); @@ -965,24 +960,20 @@ public void testCommitAdvancesMinTranslogForRecovery() throws IOException { engine.flush(); assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L)); - assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync ? 3L : 1L)); - assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(3L)); + assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(inSync ? 3L : 2L)); engine.flush(); assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L)); - assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync ? 3L : 1L)); - assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(3L)); + assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(inSync ? 3L : 2L)); engine.flush(true, true); assertThat(engine.getTranslog().currentFileGeneration(), equalTo(4L)); - assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync ? 4L : 1L)); - assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(4L)); + assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(inSync ? 4L : 2L)); globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); engine.flush(true, true); assertThat(engine.getTranslog().currentFileGeneration(), equalTo(5L)); - assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(5L)); - assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(5L)); + assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(5L)); } public void testSyncedFlushSurvivesEngineRestart() throws IOException { @@ -2486,7 +2477,7 @@ public void testSettings() { assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName()); } - public void testCurrentTranslogIDisCommitted() throws IOException { + public void testCurrentTranslogUUIIDIsCommitted() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try (Store store = createStore()) { EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, @@ -2511,7 +2502,6 @@ public void testCurrentTranslogIDisCommitted() throws IOException { globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE)); Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); } } @@ -2521,18 +2511,9 @@ public void testCurrentTranslogIDisCommitted() throws IOException { try (InternalEngine engine = new InternalEngine(config)) { expectThrows(IllegalStateException.class, engine::ensureCanFlush); Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - if (i == 0) { - assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); - } else { - // creating an empty index will create the first translog gen and commit it - // opening the empty index will make the second translog file but not commit it - // opening the engine again (i=0) will make the third translog file, which then be committed - assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY)); - } assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY)); assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); } } @@ -2545,7 +2526,6 @@ public void testCurrentTranslogIDisCommitted() throws IOException { store.associateIndexWithNewTranslog(translogUUID); try (InternalEngine engine = new InternalEngine(config)) { Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(2, engine.getTranslog().currentFileGeneration()); @@ -2558,12 +2538,9 @@ public void testCurrentTranslogIDisCommitted() throws IOException { for (int i = 0; i < 2; i++) { try (InternalEngine engine = new InternalEngine(config)) { Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals("no changes - nothing to commit", "1", - userData.get(Translog.TRANSLOG_GENERATION_KEY)); assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); } } @@ -2672,8 +2649,9 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog) throws I globalCheckpointSupplier))) { engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertVisibleCount(engine, 1); - final long committedGen = Long.valueOf( - engine.getLastCommittedSegmentInfos().getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); + final long localCheckpoint = Long.parseLong( + engine.getLastCommittedSegmentInfos().userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + final long committedGen = engine.getTranslog().getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration; for (int gen = 1; gen < committedGen; gen++) { final Path genFile = translogPath.resolve(Translog.getFilename(gen)); assertFalse(genFile + " wasn't cleaned up", Files.exists(genFile)); @@ -4168,7 +4146,6 @@ public void testMinGenerationForSeqNo() throws IOException, BrokenBarrierExcepti for (final Map.Entry entry : threads.entrySet()) { final Map userData = finalActualEngine.commitStats().getUserData(); assertThat(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY), equalTo(Long.toString(3 * i))); - assertThat(userData.get(Translog.TRANSLOG_GENERATION_KEY), equalTo(Long.toString(i + generation))); entry.getValue().countDown(); entry.getKey().join(); finalActualEngine.flush(); @@ -4229,6 +4206,7 @@ public void testRestoreLocalHistoryFromTranslog() throws IOException { final EngineConfig engineConfig; final SeqNoStats prevSeqNoStats; final List prevDocs; + final List existingTranslog; try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) { engineConfig = engine.config(); for (final long seqNo : seqNos) { @@ -4247,6 +4225,9 @@ public void testRestoreLocalHistoryFromTranslog() throws IOException { engine.syncTranslog(); prevSeqNoStats = engine.getSeqNoStats(globalCheckpoint.get()); prevDocs = getDocIds(engine, true); + try (Translog.Snapshot snapshot = engine.getTranslog().newSnapshot()) { + existingTranslog = TestTranslog.drainSnapshot(snapshot, false); + } } try (InternalEngine engine = new InternalEngine(engineConfig)) { final Translog.TranslogGeneration currrentTranslogGeneration = new Translog.TranslogGeneration( @@ -4257,8 +4238,10 @@ public void testRestoreLocalHistoryFromTranslog() throws IOException { SeqNoStats seqNoStats = engine.getSeqNoStats(globalCheckpoint.get()); assertThat(seqNoStats.getLocalCheckpoint(), equalTo(prevSeqNoStats.getLocalCheckpoint())); assertThat(seqNoStats.getMaxSeqNo(), equalTo(prevSeqNoStats.getMaxSeqNo())); - try (Translog.Snapshot snapshot = engine.getTranslog().newSnapshotFromGen(currrentTranslogGeneration, Long.MAX_VALUE)) { - assertThat("restore from local translog must not add operations to translog", snapshot, SnapshotMatchers.size(0)); + try (Translog.Snapshot snapshot = engine.getTranslog().newSnapshot()) { + assertThat("restore from local translog must not add operations to translog", + snapshot.totalOperations(), equalTo(existingTranslog.size())); + assertThat(TestTranslog.drainSnapshot(snapshot, false), equalTo(existingTranslog)); } } assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService()); @@ -5664,8 +5647,8 @@ public void testRecoverFromLocalTranslog() throws Exception { for (Engine.Operation op : operations) { applyOperation(engine, op); if (randomBoolean()) { + globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getProcessedLocalCheckpoint())); engine.syncTranslog(); - globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getPersistedLocalCheckpoint())); } if (randomInt(100) < 10) { engine.refresh("test"); @@ -5677,11 +5660,21 @@ public void testRecoverFromLocalTranslog() throws Exception { engine.forceMerge(randomBoolean(), 1, false, false, false); } } + if (randomBoolean()) { + // engine is flushed properly before shutting down. + engine.syncTranslog(); + globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); + engine.flush(); + } docs = getDocIds(engine, true); } try (InternalEngine engine = new InternalEngine(config)) { engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertThat(getDocIds(engine, randomBoolean()), equalTo(docs)); + if (engine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo() == globalCheckpoint.get()) { + assertThat("engine should trim all unreferenced translog after recovery", + engine.getTranslog().getMinFileGeneration(), equalTo(engine.getTranslog().currentFileGeneration())); + } } } } @@ -5736,12 +5729,12 @@ public void testAlwaysRecordReplicaOrPeerRecoveryOperationsToTranslog() throws E engine.rollTranslogGeneration(); engine.trimOperationsFromTranslog(primaryTerm.get(), NO_OPS_PERFORMED); // trim everything in translog try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) { - assertThat(snapshot.totalOperations(), equalTo(operations.size())); + assertThat(snapshot.totalOperations(), equalTo(0)); assertNull(snapshot.next()); } applyOperations(engine, operations); try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) { - assertThat(snapshot.totalOperations(), equalTo(operations.size() * 2)); + assertThat(snapshot.totalOperations(), equalTo(operations.size())); assertThat(TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()), equalTo(seqNos)); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java index 309d6e4f36b0b..58f9f248ed356 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java @@ -36,14 +36,12 @@ import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; -import org.elasticsearch.index.translog.TranslogDeletionPolicy; import org.elasticsearch.test.IndexSettingsModule; import java.io.IOException; import java.io.UncheckedIOException; import java.nio.file.Path; import java.util.Collections; -import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import static org.hamcrest.Matchers.equalTo; @@ -130,7 +128,7 @@ public void testNoOpEngineStats() throws Exception { } } engine.getLocalCheckpointTracker().waitForProcessedOpsToComplete(numDocs + deletions - 1); - flushAndTrimTranslog(engine); + engine.flush(true, true); } final DocsStats expectedDocStats; @@ -168,7 +166,6 @@ public void testTrimUnreferencedTranslogFiles() throws Exception { tracker.updateFromMaster(1L, Collections.singleton(allocationId.getId()), table); tracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); - boolean softDeleteEnabled = engine.config().getIndexSettings().isSoftDeleteEnabled(); final int numDocs = scaledRandomIntBetween(10, 3000); for (int i = 0; i < numDocs; i++) { engine.index(indexForDoc(createParsedDoc(Integer.toString(i), null))); @@ -176,41 +173,21 @@ public void testTrimUnreferencedTranslogFiles() throws Exception { if (rarely()) { engine.flush(); } + if (randomBoolean()) { + engine.rollTranslogGeneration(); + } } + // prevent translog from trimming so we can test trimUnreferencedFiles in NoOpEngine. + final Translog.Snapshot snapshot = engine.getTranslog().newSnapshot(); engine.flush(true, true); - - final String translogUuid = engine.getTranslog().getTranslogUUID(); - final long minFileGeneration = engine.getTranslog().getMinFileGeneration(); - final long currentFileGeneration = engine.getTranslog().currentFileGeneration(); engine.close(); final NoOpEngine noOpEngine = new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, primaryTranslogDir, tracker)); - final Path translogPath = noOpEngine.config().getTranslogConfig().getTranslogPath(); - - final long lastCommitedTranslogGeneration; - try (Engine.IndexCommitRef indexCommitRef = noOpEngine.acquireLastIndexCommit(false)) { - Map lastCommittedUserData = indexCommitRef.getIndexCommit().getUserData(); - lastCommitedTranslogGeneration = Long.parseLong(lastCommittedUserData.get(Translog.TRANSLOG_GENERATION_KEY)); - assertThat(lastCommitedTranslogGeneration, equalTo(currentFileGeneration)); - } - - assertThat(Translog.readMinTranslogGeneration(translogPath, translogUuid), equalTo(minFileGeneration)); - assertThat(noOpEngine.getTranslogStats().estimatedNumberOfOperations(), equalTo(softDeleteEnabled ? 0 : numDocs)); - assertThat(noOpEngine.getTranslogStats().getUncommittedOperations(), equalTo(0)); - noOpEngine.trimUnreferencedTranslogFiles(); - - assertThat(Translog.readMinTranslogGeneration(translogPath, translogUuid), equalTo(lastCommitedTranslogGeneration)); assertThat(noOpEngine.getTranslogStats().estimatedNumberOfOperations(), equalTo(0)); assertThat(noOpEngine.getTranslogStats().getUncommittedOperations(), equalTo(0)); - + assertThat(noOpEngine.getTranslogStats().getTranslogSizeInBytes(), equalTo((long)Translog.DEFAULT_HEADER_SIZE_IN_BYTES)); + snapshot.close(); noOpEngine.close(); } - - private void flushAndTrimTranslog(final InternalEngine engine) { - engine.flush(true, true); - final TranslogDeletionPolicy deletionPolicy = engine.getTranslog().getDeletionPolicy(); - deletionPolicy.setMinTranslogGenerationForRecovery(engine.getTranslog().getGeneration().translogFileGeneration); - engine.flush(true, true); - } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 5cc9bfd2b6993..73eb214a55c6f 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -333,7 +333,7 @@ public void testMaybeFlush() throws Exception { assertFalse(shard.shouldPeriodicallyFlush()); client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder() .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), - new ByteSizeValue(190 /* size of the operation + two generations header&footer*/, ByteSizeUnit.BYTES)).build()).get(); + new ByteSizeValue(135 /* size of the operation + one generation header&footer*/, ByteSizeUnit.BYTES)).build()).get(); client().prepareIndex("test").setId("0") .setSource("{}", XContentType.JSON).setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); assertFalse(shard.shouldPeriodicallyFlush()); @@ -416,7 +416,7 @@ public void testStressMaybeFlushOrRollTranslogGeneration() throws Exception { final Settings settings; if (flush) { // size of the operation plus two generations of overhead. - settings = Settings.builder().put("index.translog.flush_threshold_size", "180b").build(); + settings = Settings.builder().put("index.translog.flush_threshold_size", "125b").build(); } else { // size of the operation plus header and footer settings = Settings.builder().put("index.translog.generation_threshold_size", "117b").build(); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 39f120b2fa3a3..ff06f3490a4ea 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -114,7 +114,6 @@ import org.elasticsearch.index.translog.TestTranslog; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogStats; -import org.elasticsearch.index.translog.TranslogTests; import org.elasticsearch.indices.IndicesQueryCache; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; @@ -1400,7 +1399,7 @@ public void run() { latch.await(); for (int i = 0; i < 10000; i++) { semaphore.acquire(); - shard.sync(TranslogTests.randomTranslogLocation(), (ex) -> semaphore.release()); + shard.sync(new Translog.Location(randomLong(), randomLong(), randomInt()), (ex) -> semaphore.release()); } } catch (Exception ex) { throw new RuntimeException(ex); @@ -2022,17 +2021,20 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException { shard.sync(); // advance local checkpoint final int translogOps; + final int replayedOps; if (randomBoolean()) { // Advance the global checkpoint to remove the 1st commit; this shard will recover the 2nd commit. shard.updateGlobalCheckpointOnReplica(3, "test"); logger.info("--> flushing shard"); shard.flush(new FlushRequest().force(true).waitIfOngoing(true)); translogOps = 4; // delete #1 won't be replayed. - } else if (randomBoolean()) { - shard.getEngine().rollTranslogGeneration(); - translogOps = 5; + replayedOps = 3; } else { + if (randomBoolean()) { + shard.getEngine().rollTranslogGeneration(); + } translogOps = 5; + replayedOps = 5; } final ShardRouting replicaRouting = shard.routingEntry(); @@ -2042,10 +2044,9 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException { DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); assertTrue(recoverFromStore(newShard)); - assertEquals(translogOps, newShard.recoveryState().getTranslog().recoveredOperations()); + assertEquals(replayedOps, newShard.recoveryState().getTranslog().recoveredOperations()); assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations()); assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart()); - assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f); updateRoutingEntry(newShard, ShardRoutingHelper.moveToStarted(newShard.routingEntry())); assertDocCount(newShard, 3); closeShards(newShard); diff --git a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java index c76c58a00d6e2..30903df58b43e 100644 --- a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java +++ b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java @@ -67,7 +67,6 @@ import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; @@ -842,9 +841,7 @@ public void testUserDataRead() throws IOException { writer.addDocument(doc); Map commitData = new HashMap<>(2); String syncId = "a sync id"; - String translogId = "a translog id"; commitData.put(Engine.SYNC_COMMIT_ID, syncId); - commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogId); writer.setLiveCommitData(commitData.entrySet()); writer.commit(); writer.close(); @@ -853,7 +850,6 @@ public void testUserDataRead() throws IOException { assertFalse(metadata.asMap().isEmpty()); // do not check for correct files, we have enough tests for that above assertThat(metadata.getCommitUserData().get(Engine.SYNC_COMMIT_ID), equalTo(syncId)); - assertThat(metadata.getCommitUserData().get(Translog.TRANSLOG_GENERATION_KEY), equalTo(translogId)); TestUtil.checkIndex(store.directory()); assertDeleteContent(store, store.directory()); IOUtils.close(store); diff --git a/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java b/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java index d78a731674258..56de723ac50cf 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java @@ -22,12 +22,8 @@ import com.carrotsearch.randomizedtesting.generators.RandomNumbers; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import org.apache.logging.log4j.Logger; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexCommit; -import org.apache.lucene.store.NIOFSDirectory; import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.index.engine.CombinedDeletionPolicy; import java.io.IOException; import java.nio.ByteBuffer; @@ -69,7 +65,7 @@ public class TestTranslog { * See {@link TestTranslog#corruptFile(Logger, Random, Path, boolean)} for details of the corruption applied. */ public static void corruptRandomTranslogFile(Logger logger, Random random, Path translogDir) throws IOException { - corruptRandomTranslogFile(logger, random, translogDir, minTranslogGenUsedInRecovery(translogDir)); + corruptRandomTranslogFile(logger, random, translogDir, Translog.readCheckpoint(translogDir).minTranslogGeneration); } /** @@ -188,19 +184,6 @@ static void corruptFile(Logger logger, Random random, Path fileToCorrupt, boolea } } - /** - * Lists all existing commits in a given index path, then read the minimum translog generation that will be used in recoverFromTranslog. - */ - private static long minTranslogGenUsedInRecovery(Path translogPath) throws IOException { - try (NIOFSDirectory directory = new NIOFSDirectory(translogPath.getParent().resolve("index"))) { - List commits = DirectoryReader.listCommits(directory); - final String translogUUID = commits.get(commits.size() - 1).getUserData().get(Translog.TRANSLOG_UUID_KEY); - long globalCheckpoint = Translog.readGlobalCheckpoint(translogPath, translogUUID); - IndexCommit recoveringCommit = CombinedDeletionPolicy.findSafeCommitPoint(commits, globalCheckpoint); - return Long.parseLong(recoveringCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); - } - } - /** * Returns the primary term associated with the current translog writer of the given translog. */ diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java index adb0455def075..dc1a72b20120f 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java @@ -46,35 +46,26 @@ public void testMinRetainedGeneration() throws IOException { allGens.add(readersAndWriter.v2()); try { TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); - assertThat(deletionPolicy.minTranslogGenRequired(), equalTo(1L)); - final int committedReader = randomIntBetween(0, allGens.size() - 1); - final long committedGen = allGens.get(committedReader).generation; - deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(committedGen, Long.MAX_VALUE)); - deletionPolicy.setMinTranslogGenerationForRecovery(committedGen); - assertThat(deletionPolicy.minTranslogGenRequired(), equalTo(committedGen)); + assertThat(deletionPolicy.getMinTranslogGenRequiredByLocks(), equalTo(Long.MAX_VALUE)); long gen1 = randomIntBetween(0, allGens.size() - 1); Releasable releaseGen1 = deletionPolicy.acquireTranslogGen(gen1); - assertThat(deletionPolicy.minTranslogGenRequired(), - equalTo(Math.min(gen1, committedGen))); + assertThat(deletionPolicy.getMinTranslogGenRequiredByLocks(), equalTo(gen1)); long gen2 = randomIntBetween(0, allGens.size() - 1); Releasable releaseGen2 = deletionPolicy.acquireTranslogGen(gen2); - assertThat(deletionPolicy.minTranslogGenRequired(), - equalTo(Math.min(Math.min(gen1, gen2), committedGen))); + assertThat(deletionPolicy.getMinTranslogGenRequiredByLocks(), equalTo(Math.min(gen1, gen2))); if (randomBoolean()) { releaseGen1.close(); - assertThat(deletionPolicy.minTranslogGenRequired(), - equalTo(Math.min(gen2, committedGen))); + assertThat(deletionPolicy.getMinTranslogGenRequiredByLocks(), equalTo(gen2)); releaseGen2.close(); } else { releaseGen2.close(); - assertThat(deletionPolicy.minTranslogGenRequired(), - equalTo(Math.min(gen1, committedGen))); + assertThat(deletionPolicy.getMinTranslogGenRequiredByLocks(), equalTo(gen1)); releaseGen1.close(); } - assertThat(deletionPolicy.minTranslogGenRequired(), equalTo(committedGen)); + assertThat(deletionPolicy.getMinTranslogGenRequiredByLocks(), equalTo(Long.MAX_VALUE)); } finally { IOUtils.close(readersAndWriter.v1()); diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index b653a35e650fe..84ef3ea2e9123 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -165,7 +165,8 @@ protected void afterIfSuccessful() throws Exception { if (translog.isOpen()) { if (translog.currentFileGeneration() > 1) { - markCurrentGenAsCommitted(translog); + translog.getDeletionPolicy().setLocalCheckpointOfLastCommit(Long.MAX_VALUE); + translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(Long.MAX_VALUE); translog.trimUnreferencedReaders(); assertFileDeleted(translog, translog.currentFileGeneration() - 1); } @@ -200,28 +201,6 @@ protected Translog openTranslog(TranslogConfig config, String translogUUID) thro } - private void markCurrentGenAsCommitted(Translog translog) throws IOException { - long genToCommit = translog.currentFileGeneration(); - long genToRetain = randomLongBetween(translog.getDeletionPolicy().getMinTranslogGenerationForRecovery(), genToCommit); - commit(translog, genToRetain, genToCommit); - } - - private void rollAndCommit(Translog translog) throws IOException { - translog.rollGeneration(); - markCurrentGenAsCommitted(translog); - } - - private long commit(Translog translog, long genToRetain, long genToCommit) throws IOException { - final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); - deletionPolicy.setTranslogGenerationOfLastCommit(genToCommit); - deletionPolicy.setMinTranslogGenerationForRecovery(genToRetain); - long minGenRequired = deletionPolicy.minTranslogGenRequired(); - translog.trimUnreferencedReaders(); - assertThat(minGenRequired, equalTo(translog.getMinFileGeneration())); - assertFilePresences(translog); - return minGenRequired; - } - @Override @Before public void setUp() throws Exception { @@ -348,7 +327,7 @@ public void testSimpleOperations() throws IOException { assertThat(snapshot.totalOperations(), equalTo(ops.size())); } - final long seqNo = randomNonNegativeLong(); + final long seqNo = randomLongBetween(0, Integer.MAX_VALUE); final String reason = randomAlphaOfLength(16); final long noopTerm = randomLongBetween(1, primaryTerm.get()); addToTranslogAndList(translog, ops, new Translog.NoOp(seqNo, noopTerm, reason)); @@ -381,9 +360,7 @@ public void testSimpleOperations() throws IOException { assertThat(snapshot.totalOperations(), equalTo(ops.size())); } - markCurrentGenAsCommitted(translog); - try (Translog.Snapshot snapshot = translog.newSnapshotFromGen( - new Translog.TranslogGeneration(translog.getTranslogUUID(), firstId + 1), randomNonNegativeLong())) { + try (Translog.Snapshot snapshot = translog.newSnapshot(seqNo + 1, randomLongBetween(seqNo + 1, Long.MAX_VALUE))) { assertThat(snapshot, SnapshotMatchers.size(0)); assertThat(snapshot.totalOperations(), equalTo(0)); } @@ -440,8 +417,8 @@ public void testStats() throws IOException { assertThat(stats.estimatedNumberOfOperations(), equalTo(1)); assertThat(stats.getTranslogSizeInBytes(), equalTo(157L)); assertThat(stats.getUncommittedOperations(), equalTo(1)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(157L)); - assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(102L)); + assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L)); } translog.add(new Translog.Delete("2", 1, primaryTerm.get(), newUid("2"))); @@ -450,8 +427,8 @@ public void testStats() throws IOException { assertThat(stats.estimatedNumberOfOperations(), equalTo(2)); assertThat(stats.getTranslogSizeInBytes(), equalTo(200L)); assertThat(stats.getUncommittedOperations(), equalTo(2)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(200L)); - assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(145L)); + assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L)); } translog.add(new Translog.Delete("3", 2, primaryTerm.get(), newUid("3"))); @@ -460,8 +437,8 @@ public void testStats() throws IOException { assertThat(stats.estimatedNumberOfOperations(), equalTo(3)); assertThat(stats.getTranslogSizeInBytes(), equalTo(243L)); assertThat(stats.getUncommittedOperations(), equalTo(3)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(243L)); - assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(188L)); + assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L)); } translog.add(new Translog.NoOp(3, 1, randomAlphaOfLength(16))); @@ -470,19 +447,18 @@ public void testStats() throws IOException { assertThat(stats.estimatedNumberOfOperations(), equalTo(4)); assertThat(stats.getTranslogSizeInBytes(), equalTo(285L)); assertThat(stats.getUncommittedOperations(), equalTo(4)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(285L)); - assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(230L)); + assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L)); } - final long expectedSizeInBytes = 340L; translog.rollGeneration(); { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(4)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(expectedSizeInBytes)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(340L)); assertThat(stats.getUncommittedOperations(), equalTo(4)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(expectedSizeInBytes)); - assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(285L)); + assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L)); } { @@ -491,26 +467,27 @@ public void testStats() throws IOException { stats.writeTo(out); final TranslogStats copy = new TranslogStats(out.bytes().streamInput()); assertThat(copy.estimatedNumberOfOperations(), equalTo(4)); - assertThat(copy.getTranslogSizeInBytes(), equalTo(expectedSizeInBytes)); + assertThat(copy.getTranslogSizeInBytes(), equalTo(340L)); try (XContentBuilder builder = XContentFactory.jsonBuilder()) { builder.startObject(); copy.toXContent(builder, ToXContent.EMPTY_PARAMS); builder.endObject(); - assertThat(Strings.toString(builder), equalTo("{\"translog\":{\"operations\":4,\"size_in_bytes\":" + expectedSizeInBytes - + ",\"uncommitted_operations\":4,\"uncommitted_size_in_bytes\":" + expectedSizeInBytes + assertThat(Strings.toString(builder), equalTo("{\"translog\":{\"operations\":4,\"size_in_bytes\":" + 340 + + ",\"uncommitted_operations\":4,\"uncommitted_size_in_bytes\":" + 285 + ",\"earliest_last_modified_age\":" + stats.getEarliestLastModifiedAge() + "}}")); } } - - commit(translog, translog.currentFileGeneration(), translog.currentFileGeneration()); + translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(randomLongBetween(3, Long.MAX_VALUE)); + translog.getDeletionPolicy().setLocalCheckpointOfLastCommit(randomLongBetween(3, Long.MAX_VALUE)); + translog.trimUnreferencedReaders(); { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(0)); assertThat(stats.getTranslogSizeInBytes(), equalTo(firstOperationPosition)); assertThat(stats.getUncommittedOperations(), equalTo(0)); assertThat(stats.getUncommittedSizeInBytes(), equalTo(firstOperationPosition)); - assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L)); + assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L)); } } @@ -529,7 +506,8 @@ public void testUncommittedOperations() throws Exception { } assertThat(translog.stats().getUncommittedOperations(), equalTo(uncommittedOps)); if (frequently()) { - markCurrentGenAsCommitted(translog); + deletionPolicy.setLocalCheckpointOfSafeCommit(randomLongBetween(deletionPolicy.getLocalCheckpointOfSafeCommit(), i)); + deletionPolicy.setLocalCheckpointOfLastCommit(i); assertThat(translog.stats().getUncommittedOperations(), equalTo(operationsInLastGen)); uncommittedOps = operationsInLastGen; } @@ -590,7 +568,7 @@ public void testOldestEntryInSeconds() { assertThat(e, hasToString(containsString("earliestLastModifiedAge must be >= 0"))); } - public void testSnapshot() throws IOException { + public void testBasicSnapshot() throws IOException { ArrayList ops = new ArrayList<>(); try (Translog.Snapshot snapshot = translog.newSnapshot()) { assertThat(snapshot, SnapshotMatchers.size(0)); @@ -598,13 +576,13 @@ public void testSnapshot() throws IOException { addToTranslogAndList(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[]{1})); - try (Translog.Snapshot snapshot = translog.newSnapshot()) { + try (Translog.Snapshot snapshot = translog.newSnapshot(0, Long.MAX_VALUE)) { assertThat(snapshot, SnapshotMatchers.equalsTo(ops)); assertThat(snapshot.totalOperations(), equalTo(1)); } - try (Translog.Snapshot snapshot = translog.newSnapshot(); - Translog.Snapshot snapshot1 = translog.newSnapshot()) { + try (Translog.Snapshot snapshot = translog.newSnapshot(0, randomIntBetween(0, 10)); + Translog.Snapshot snapshot1 = translog.newSnapshot(0, randomIntBetween(0, 10))) { assertThat(snapshot, SnapshotMatchers.equalsTo(ops)); assertThat(snapshot.totalOperations(), equalTo(1)); @@ -647,7 +625,7 @@ public void testSnapshotWithNewTranslog() throws IOException { Translog.Snapshot snapshot2 = translog.newSnapshot(); toClose.add(snapshot2); - markCurrentGenAsCommitted(translog); + translog.getDeletionPolicy().setLocalCheckpointOfLastCommit(2); assertThat(snapshot2, containsOperationsInAnyOrder(ops)); assertThat(snapshot2.totalOperations(), equalTo(ops.size())); } finally { @@ -663,78 +641,62 @@ public void testSnapshotOnClosedTranslog() throws IOException { assertEquals(ex.getMessage(), "translog is already closed"); } - public void testSnapshotFromMinGen() throws Exception { - Map> operationsByGen = new HashMap<>(); - try (Translog.Snapshot snapshot = translog.newSnapshotFromGen( - new Translog.TranslogGeneration(translog.getTranslogUUID(), 1), randomNonNegativeLong())) { - assertThat(snapshot, SnapshotMatchers.size(0)); - } - int iters = between(1, 10); - for (int i = 0; i < iters; i++) { - long currentGeneration = translog.currentFileGeneration(); - operationsByGen.putIfAbsent(currentGeneration, new ArrayList<>()); - int numOps = between(0, 20); - for (int op = 0; op < numOps; op++) { - long seqNo = randomLongBetween(0, 1000); - addToTranslogAndList(translog, operationsByGen.get(currentGeneration), new Translog.Index( - Long.toString(seqNo), seqNo, primaryTerm.get(), new byte[]{1})); - } - long minGen = randomLongBetween(translog.getMinFileGeneration(), translog.currentFileGeneration()); - try (Translog.Snapshot snapshot = translog.newSnapshotFromGen( - new Translog.TranslogGeneration(translog.getTranslogUUID(), minGen), Long.MAX_VALUE)) { - List expectedOps = operationsByGen.entrySet().stream() - .filter(e -> e.getKey() >= minGen) - .flatMap(e -> e.getValue().stream()) - .collect(Collectors.toList()); - assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedOps)); - } - long upToSeqNo = randomLongBetween(0, 2000); - try (Translog.Snapshot snapshot = translog.newSnapshotFromGen( - new Translog.TranslogGeneration(translog.getTranslogUUID(), minGen), upToSeqNo)) { - List expectedOps = operationsByGen.entrySet().stream() - .filter(e -> e.getKey() >= minGen) - .flatMap(e -> e.getValue().stream().filter(op -> op.seqNo() <= upToSeqNo)) - .collect(Collectors.toList()); - assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedOps)); - } - translog.rollGeneration(); - } - } - - public void testSeqNoFilterSnapshot() throws Exception { + public void testRangeSnapshot() throws Exception { + long minSeqNo = SequenceNumbers.NO_OPS_PERFORMED; + long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED; final int generations = between(2, 20); + Map> operationsByGen = new HashMap<>(); for (int gen = 0; gen < generations; gen++) { - List batch = LongStream.rangeClosed(0, between(0, 100)).boxed().collect(Collectors.toList()); - Randomness.shuffle(batch); - for (long seqNo : batch) { - Translog.Index op = - new Translog.Index(randomAlphaOfLength(10), seqNo, primaryTerm.get(), new byte[]{1}); + Set seqNos = new HashSet<>(); + int numOps = randomIntBetween(1, 100); + for (int i = 0; i < numOps; i++) { + final long seqNo = randomValueOtherThanMany(seqNos::contains, () -> randomLongBetween(0, 1000)); + minSeqNo = SequenceNumbers.min(minSeqNo, seqNo); + maxSeqNo = SequenceNumbers.max(maxSeqNo, seqNo); + seqNos.add(seqNo); + } + List ops = new ArrayList<>(seqNos.size()); + for (long seqNo : seqNos) { + Translog.Index op = new Translog.Index(randomAlphaOfLength(10), seqNo, primaryTerm.get(), new byte[]{randomByte()}); translog.add(op); + ops.add(op); } + operationsByGen.put(translog.currentFileGeneration(), ops); translog.rollGeneration(); + if (rarely()) { + translog.rollGeneration(); // empty generation + } } - List operations = new ArrayList<>(); - try (Translog.Snapshot snapshot = translog.newSnapshot()) { - Translog.Operation op; - while ((op = snapshot.next()) != null) { - operations.add(op); + + if (minSeqNo > 0) { + long fromSeqNo = randomLongBetween(0, minSeqNo - 1); + long toSeqNo = randomLongBetween(fromSeqNo, minSeqNo - 1); + try (Translog.Snapshot snapshot = translog.newSnapshot(fromSeqNo, toSeqNo)) { + assertThat(snapshot.totalOperations(), equalTo(0)); + assertNull(snapshot.next()); } } - try (Translog.Snapshot snapshot = translog.newSnapshot()) { - Translog.Snapshot filter = new Translog.SeqNoFilterSnapshot(snapshot, between(200, 300), between(300, 400)); // out range - assertThat(filter, SnapshotMatchers.size(0)); - assertThat(filter.totalOperations(), equalTo(snapshot.totalOperations())); - assertThat(filter.skippedOperations(), equalTo(snapshot.totalOperations())); + + long fromSeqNo = randomLongBetween(maxSeqNo + 1, Long.MAX_VALUE); + long toSeqNo = randomLongBetween(fromSeqNo, Long.MAX_VALUE); + try (Translog.Snapshot snapshot = translog.newSnapshot(fromSeqNo, toSeqNo)) { + assertThat(snapshot.totalOperations(), equalTo(0)); + assertNull(snapshot.next()); } - try (Translog.Snapshot snapshot = translog.newSnapshot()) { - int fromSeqNo = between(-2, 500); - int toSeqNo = between(fromSeqNo, 500); - List selectedOps = operations.stream() - .filter(op -> fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo).collect(Collectors.toList()); - Translog.Snapshot filter = new Translog.SeqNoFilterSnapshot(snapshot, fromSeqNo, toSeqNo); - assertThat(filter, SnapshotMatchers.containsOperationsInAnyOrder(selectedOps)); - assertThat(filter.totalOperations(), equalTo(snapshot.totalOperations())); - assertThat(filter.skippedOperations(), equalTo(snapshot.skippedOperations() + operations.size() - selectedOps.size())); + + fromSeqNo = randomLongBetween(0, 2000); + toSeqNo = randomLongBetween(fromSeqNo, 2000); + try (Translog.Snapshot snapshot = translog.newSnapshot(fromSeqNo, toSeqNo)) { + Set seenSeqNos = new HashSet<>(); + List expectedOps = new ArrayList<>(); + for (long gen = translog.currentFileGeneration(); gen > 0; gen--) { + for (Translog.Operation op : operationsByGen.getOrDefault(gen, Collections.emptyList())) { + if (fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo && seenSeqNos.add(op.seqNo())) { + expectedOps.add(op); + } + } + } + assertThat(TestTranslog.drainSnapshot(snapshot, false), equalTo(expectedOps)); } } @@ -756,6 +718,7 @@ private void assertFilePresences(Translog translog) { for (long gen = 1; gen < translog.getMinFileGeneration(); gen++) { assertFileDeleted(translog, gen); } + } static class LocationOperation implements Comparable { @@ -936,8 +899,8 @@ public void testVerifyTranslogIsNotDeleted() throws IOException { assertThat(snapshot.totalOperations(), equalTo(1)); } translog.close(); - - assertFileIsPresent(translog, 1); + assertFileDeleted(translog, 1); + assertFileIsPresent(translog, 2); } /** @@ -1009,9 +972,8 @@ public void doRun() throws BrokenBarrierException, InterruptedException, IOExcep translog.rollGeneration(); // expose the new checkpoint (simulating a commit), before we trim the translog lastCommittedLocalCheckpoint.set(localCheckpoint); - deletionPolicy.setTranslogGenerationOfLastCommit(translog.currentFileGeneration()); - deletionPolicy.setMinTranslogGenerationForRecovery( - translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration); + deletionPolicy.setLocalCheckpointOfLastCommit(localCheckpoint); + deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint); translog.trimUnreferencedReaders(); } } @@ -1080,7 +1042,7 @@ protected void doRun() throws Exception { // these are what we expect the snapshot to return (and potentially some more). Set expectedOps = new HashSet<>(writtenOps.keySet()); expectedOps.removeIf(op -> op.seqNo() <= committedLocalCheckpointAtView); - try (Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(committedLocalCheckpointAtView + 1L)) { + try (Translog.Snapshot snapshot = translog.newSnapshot(committedLocalCheckpointAtView + 1L, Long.MAX_VALUE)) { Translog.Operation op; while ((op = snapshot.next()) != null) { expectedOps.remove(op); @@ -1158,7 +1120,7 @@ public void testSyncUpTo() throws IOException { assertTrue("we only synced a previous operation yet", translog.syncNeeded()); } if (rarely()) { - rollAndCommit(translog); + translog.rollGeneration(); assertFalse("location is from a previous translog - already synced", translog.ensureSynced(location)); // not syncing now assertFalse("no sync needed since no operations in current translog", translog.syncNeeded()); } @@ -1178,7 +1140,7 @@ public void testSyncUpToStream() throws IOException { ArrayList locations = new ArrayList<>(); for (int op = 0; op < translogOperations; op++) { if (rarely()) { - rollAndCommit(translog); // do this first so that there is at least one pending tlog entry + translog.rollGeneration(); } final Translog.Location location = translog.add(new Translog.Index("" + op, op, primaryTerm.get(), @@ -1192,7 +1154,7 @@ public void testSyncUpToStream() throws IOException { // we are the last location so everything should be synced assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded()); } else if (rarely()) { - rollAndCommit(translog); + translog.rollGeneration(); // not syncing now assertFalse("location is from a previous translog - already synced", translog.ensureSynced(locations.stream())); assertFalse("no sync needed since no operations in current translog", translog.syncNeeded()); @@ -1215,7 +1177,7 @@ public void testLocationComparison() throws IOException { translog.add(new Translog.Index("" + op, op, primaryTerm.get(), Integer.toString(++count).getBytes(Charset.forName("UTF-8"))))); if (rarely() && translogOperations > op + 1) { - rollAndCommit(translog); + translog.rollGeneration(); } } Collections.shuffle(locations, random()); @@ -1398,7 +1360,7 @@ public void testBasicRecovery() throws IOException { Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); final boolean commit = commitOften ? frequently() : rarely(); if (commit && op < translogOperations - 1) { - rollAndCommit(translog); + translog.getDeletionPolicy().setLocalCheckpointOfLastCommit(op); minUncommittedOp = op + 1; translogGeneration = translog.getGeneration(); } @@ -1421,7 +1383,7 @@ public void testBasicRecovery() throws IOException { assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(translogGeneration, Long.MAX_VALUE)) { + try (Translog.Snapshot snapshot = translog.newSnapshot(minUncommittedOp, Long.MAX_VALUE)) { for (int i = minUncommittedOp; i < translogOperations; i++) { assertEquals("expected operation" + i + " to be in the previous translog but wasn't", translog.currentFileGeneration() - 1, locations.get(i).generation); @@ -1852,7 +1814,10 @@ public void testOpenForeignTranslog() throws IOException { locations.add(translog.add(new Translog.Index("" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); if (randomBoolean()) { - rollAndCommit(translog); + translog.rollGeneration(); + translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(op); + translog.getDeletionPolicy().setLocalCheckpointOfLastCommit(op); + translog.trimUnreferencedReaders(); firstUncommitted = op + 1; } } @@ -1873,7 +1838,7 @@ public void testOpenForeignTranslog() throws IOException { } this.translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, seqNo -> {}); - try (Translog.Snapshot snapshot = this.translog.newSnapshotFromGen(translogGeneration, Long.MAX_VALUE)) { + try (Translog.Snapshot snapshot = this.translog.newSnapshot(randomLongBetween(0, firstUncommitted), Long.MAX_VALUE)) { for (int i = firstUncommitted; i < translogOperations; i++) { Translog.Operation next = snapshot.next(); assertNotNull("" + i, next); @@ -2054,7 +2019,7 @@ public void testFailFlush() throws IOException { } try { - rollAndCommit(translog); + translog.rollGeneration(); fail("already closed"); } catch (AlreadyClosedException ex) { assertNotNull(ex.getCause()); @@ -2231,7 +2196,8 @@ protected void afterAdd() throws IOException { */ public void testRecoveryFromAFutureGenerationCleansUp() throws IOException { int translogOperations = randomIntBetween(10, 100); - for (int op = 0; op < translogOperations / 2; op++) { + int op = 0; + for (; op < translogOperations / 2; op++) { translog.add(new Translog.Index("" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8")))); if (rarely()) { @@ -2239,20 +2205,21 @@ public void testRecoveryFromAFutureGenerationCleansUp() throws IOException { } } translog.rollGeneration(); - long comittedGeneration = randomLongBetween(2, translog.currentFileGeneration()); - for (int op = translogOperations / 2; op < translogOperations; op++) { + long localCheckpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, op); + for (op = translogOperations / 2; op < translogOperations; op++) { translog.add(new Translog.Index("" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8")))); if (rarely()) { translog.rollGeneration(); } } + long minRetainedGen = translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration; // engine blows up, after committing the above generation translog.close(); TranslogConfig config = translog.getConfig(); final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); - deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(comittedGeneration, Long.MAX_VALUE)); - deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration); + deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint); + deletionPolicy.setLocalCheckpointOfLastCommit(randomLongBetween(localCheckpoint, Long.MAX_VALUE)); translog = new Translog(config, translog.getTranslogUUID(), deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, seqNo -> {}); assertThat(translog.getMinFileGeneration(), equalTo(1L)); @@ -2261,7 +2228,7 @@ public void testRecoveryFromAFutureGenerationCleansUp() throws IOException { assertFileIsPresent(translog, gen); } translog.trimUnreferencedReaders(); - for (long gen = 1; gen < comittedGeneration; gen++) { + for (long gen = 1; gen < minRetainedGen; gen++) { assertFileDeleted(translog, gen); } } @@ -2275,14 +2242,16 @@ public void testRecoveryFromFailureOnTrimming() throws IOException { final FailSwitch fail = new FailSwitch(); fail.failNever(); final TranslogConfig config = getTranslogConfig(tempDir); - final long comittedGeneration; + final long localCheckpoint; final String translogUUID; + long minGenForRecovery = 1L; try (Translog translog = getFailableTranslog(fail, config)) { final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); // disable retention so we trim things translogUUID = translog.getTranslogUUID(); int translogOperations = randomIntBetween(10, 100); - for (int op = 0; op < translogOperations / 2; op++) { + int op = 0; + for (; op < translogOperations / 2; op++) { translog.add(new Translog.Index("" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8")))); if (rarely()) { @@ -2290,16 +2259,17 @@ public void testRecoveryFromFailureOnTrimming() throws IOException { } } translog.rollGeneration(); - comittedGeneration = randomLongBetween(2, translog.currentFileGeneration()); - for (int op = translogOperations / 2; op < translogOperations; op++) { + localCheckpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, op); + for (op = translogOperations / 2; op < translogOperations; op++) { translog.add(new Translog.Index("" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8")))); if (rarely()) { translog.rollGeneration(); } } - deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(comittedGeneration, translog.currentFileGeneration())); - deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration); + deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint); + deletionPolicy.setLocalCheckpointOfLastCommit(randomLongBetween(localCheckpoint, op)); + minGenForRecovery = translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration; fail.failRandomly(); try { translog.trimUnreferencedReaders(); @@ -2308,16 +2278,17 @@ public void testRecoveryFromFailureOnTrimming() throws IOException { } } final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); - deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(comittedGeneration, Long.MAX_VALUE)); - deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration); + deletionPolicy.setLocalCheckpointOfLastCommit(randomLongBetween(localCheckpoint, Long.MAX_VALUE)); + deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint); try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, seqNo -> {})) { // we don't know when things broke exactly assertThat(translog.getMinFileGeneration(), greaterThanOrEqualTo(1L)); - assertThat(translog.getMinFileGeneration(), lessThanOrEqualTo(comittedGeneration)); + assertThat(translog.getMinFileGeneration(), lessThanOrEqualTo(minGenForRecovery)); assertFilePresences(translog); + minGenForRecovery = translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration; translog.trimUnreferencedReaders(); - assertThat(translog.getMinFileGeneration(), equalTo(comittedGeneration)); + assertThat(translog.getMinFileGeneration(), equalTo(minGenForRecovery)); assertFilePresences(translog); } } @@ -2625,7 +2596,7 @@ public void testWithRandomException() throws IOException { fail.failRandomly(); TranslogConfig config = getTranslogConfig(tempDir); final int numOps = randomIntBetween(100, 200); - long minGenForRecovery = 1; + long localCheckpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED; List syncedDocs = new ArrayList<>(); List unsynced = new ArrayList<>(); if (randomBoolean()) { @@ -2655,8 +2626,8 @@ public void testWithRandomException() throws IOException { unsynced.clear(); failableTLog.rollGeneration(); committing = true; - failableTLog.getDeletionPolicy().setTranslogGenerationOfLastCommit(failableTLog.currentFileGeneration()); - failableTLog.getDeletionPolicy().setMinTranslogGenerationForRecovery(failableTLog.currentFileGeneration()); + failableTLog.getDeletionPolicy().setLocalCheckpointOfLastCommit(opsAdded); + failableTLog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(opsAdded); syncedDocs.clear(); failableTLog.trimUnreferencedReaders(); committing = false; @@ -2688,7 +2659,7 @@ public void testWithRandomException() throws IOException { assertThat(unsynced, empty()); } generationUUID = failableTLog.getTranslogUUID(); - minGenForRecovery = failableTLog.getDeletionPolicy().getMinTranslogGenerationForRecovery(); + localCheckpointOfSafeCommit = failableTLog.getDeletionPolicy().getLocalCheckpointOfSafeCommit(); IOUtils.closeWhileHandlingException(failableTLog); } } catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) { @@ -2700,8 +2671,8 @@ public void testWithRandomException() throws IOException { if (randomBoolean()) { try { TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); - deletionPolicy.setTranslogGenerationOfLastCommit(minGenForRecovery); - deletionPolicy.setMinTranslogGenerationForRecovery(minGenForRecovery); + deletionPolicy.setLocalCheckpointOfLastCommit(localCheckpointOfSafeCommit); + deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit); IOUtils.close(getFailableTranslog(fail, config, randomBoolean(), false, generationUUID, deletionPolicy)); } catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) { // failed - that's ok, we didn't even create it @@ -2712,8 +2683,8 @@ public void testWithRandomException() throws IOException { fail.failNever(); // we don't wanna fail here but we might since we write a new checkpoint and create a new tlog file TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); - deletionPolicy.setTranslogGenerationOfLastCommit(minGenForRecovery); - deletionPolicy.setMinTranslogGenerationForRecovery(minGenForRecovery); + deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit); + deletionPolicy.setLocalCheckpointOfLastCommit(localCheckpointOfSafeCommit); if (generationUUID == null) { // we never managed to successfully create a translog, make it generationUUID = Translog.createEmptyTranslog(config.getTranslogPath(), @@ -2721,8 +2692,7 @@ public void testWithRandomException() throws IOException { } try (Translog translog = new Translog(config, generationUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, seqNo -> {}); - Translog.Snapshot snapshot = translog.newSnapshotFromGen( - new Translog.TranslogGeneration(generationUUID, minGenForRecovery), Long.MAX_VALUE)) { + Translog.Snapshot snapshot = translog.newSnapshot(localCheckpointOfSafeCommit + 1, Long.MAX_VALUE)) { assertEquals(syncedDocs.size(), snapshot.totalOperations()); for (int i = 0; i < syncedDocs.size(); i++) { Translog.Operation next = snapshot.next(); @@ -2893,15 +2863,22 @@ public void testRollGeneration() throws Exception { .map(t -> t.getPrimaryTerm()).collect(Collectors.toList()); assertThat(storedPrimaryTerms, equalTo(primaryTerms)); } - long minGenForRecovery = randomLongBetween(generation, generation + rolls); - commit(translog, minGenForRecovery, generation + rolls); + + final BaseTranslogReader minRetainedReader = randomFrom( + Stream.concat(translog.getReaders().stream(), Stream.of(translog.getCurrent())) + .filter(r -> r.getCheckpoint().minSeqNo >= 0) + .collect(Collectors.toList())); + deletionPolicy.setLocalCheckpointOfSafeCommit( + randomLongBetween(minRetainedReader.getCheckpoint().minSeqNo, minRetainedReader.getCheckpoint().maxSeqNo) - 1); + deletionPolicy.setLocalCheckpointOfLastCommit(seqNo); + translog.trimUnreferencedReaders(); assertThat(translog.currentFileGeneration(), equalTo(generation + rolls)); assertThat(translog.stats().getUncommittedOperations(), equalTo(0)); // immediate cleanup - for (long i = 0; i < minGenForRecovery; i++) { + for (long i = 0; i < minRetainedReader.generation; i++) { assertFileDeleted(translog, i); } - for (long i = minGenForRecovery; i < generation + rolls; i++) { + for (long i = minRetainedReader.generation; i < generation + rolls; i++) { assertFileIsPresent(translog, i); } } @@ -2960,7 +2937,7 @@ public void testMinSeqNoBasedAPI() throws IOException { } assertThat(translog.estimateTotalOperationsFromMinSeq(seqNo), equalTo(expectedSnapshotOps)); int readFromSnapshot = 0; - try (Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(seqNo)) { + try (Translog.Snapshot snapshot = translog.newSnapshot(seqNo, Long.MAX_VALUE)) { assertThat(snapshot.totalOperations(), equalTo(expectedSnapshotOps)); Translog.Operation op; while ((op = snapshot.next()) != null) { @@ -2989,8 +2966,8 @@ public void testSimpleCommit() throws IOException { translog.rollGeneration(); } } - long lastGen = randomLongBetween(1, translog.currentFileGeneration()); - commit(translog, randomLongBetween(1, lastGen), lastGen); + translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(randomLongBetween(0, operations)); + translog.getDeletionPolicy().setLocalCheckpointOfLastCommit(randomLongBetween(operations, Long.MAX_VALUE)); } public void testAcquiredLockIsPassedToDeletionPolicy() throws IOException { @@ -3002,9 +2979,10 @@ public void testAcquiredLockIsPassedToDeletionPolicy() throws IOException { translog.rollGeneration(); } if (rarely()) { - long lastGen = randomLongBetween(deletionPolicy.getTranslogGenerationOfLastCommit(), translog.currentFileGeneration()); - long minGen = randomLongBetween(deletionPolicy.getMinTranslogGenerationForRecovery(), lastGen); - commit(translog, minGen, lastGen); + translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit( + randomLongBetween(deletionPolicy.getLocalCheckpointOfSafeCommit(), i)); + translog.getDeletionPolicy().setLocalCheckpointOfLastCommit( + randomLongBetween(deletionPolicy.getLocalCheckpointOfLastCommit(), Long.MAX_VALUE)); } if (frequently()) { long minGen; @@ -3027,7 +3005,7 @@ public void testReadGlobalCheckpoint() throws Exception { translog.rollGeneration(); } } - rollAndCommit(translog); + translog.rollGeneration(); translog.close(); assertThat(Translog.readGlobalCheckpoint(translogDir, translogUUID), equalTo(globalCheckpoint.get())); expectThrows(TranslogCorruptedException.class, () -> Translog.readGlobalCheckpoint(translogDir, UUIDs.randomBase64UUID())); @@ -3158,9 +3136,11 @@ public void testMaxSeqNo() throws Exception { translog.sync(); assertThat(translog.getMaxSeqNo(), equalTo(maxSeqNoPerGeneration.isEmpty() ? SequenceNumbers.NO_OPS_PERFORMED : Collections.max(maxSeqNoPerGeneration.values()))); - long minRetainedGen = commit(translog, randomLongBetween(1, translog.currentFileGeneration()), translog.currentFileGeneration()); + if (randomBoolean()) { + translog.getDeletionPolicy().setLocalCheckpointOfLastCommit(randomFrom(maxSeqNoPerGeneration.values())); + } long expectedMaxSeqNo = maxSeqNoPerGeneration.entrySet().stream() - .filter(e -> e.getKey() >= minRetainedGen).mapToLong(e -> e.getValue()) + .filter(e -> e.getKey() >= translog.getMinFileGeneration()).mapToLong(e -> e.getValue()) .max().orElse(SequenceNumbers.NO_OPS_PERFORMED); assertThat(translog.getMaxSeqNo(), equalTo(expectedMaxSeqNo)); } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index dcc5457557daf..e44a8f5bb38b5 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -36,7 +36,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.NoOpEngine; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.seqno.SeqNoStats; @@ -177,14 +176,8 @@ public void testPrepareIndexForPeerRecovery() throws Exception { long globalCheckpoint = populateRandomData(shard).getGlobalCheckpoint(); Optional safeCommit = shard.store().findSafeIndexCommit(globalCheckpoint); assertTrue(safeCommit.isPresent()); - final Translog.TranslogGeneration recoveringTranslogGeneration; - try (Engine.IndexCommitRef commitRef = shard.acquireSafeIndexCommit()) { - recoveringTranslogGeneration = new Translog.TranslogGeneration( - commitRef.getIndexCommit().getUserData().get(Translog.TRANSLOG_UUID_KEY), - Long.parseLong(commitRef.getIndexCommit().getUserData().get(Translog.TRANSLOG_GENERATION_KEY))); - } int expectedTotalLocal = 0; - try (Translog.Snapshot snapshot = getTranslog(shard).newSnapshotFromGen(recoveringTranslogGeneration, globalCheckpoint)) { + try (Translog.Snapshot snapshot = getTranslog(shard).newSnapshot(safeCommit.get().localCheckpoint + 1, globalCheckpoint)) { Translog.Operation op; while ((op = snapshot.next()) != null) { if (op.seqNo() <= globalCheckpoint) { diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index a47dd4b681fd4..876bb6eaafe85 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -202,21 +202,17 @@ public void testDifferentHistoryUUIDDisablesOPsRecovery() throws Exception { .setOpenMode(IndexWriterConfig.OpenMode.APPEND); Map userData = new HashMap<>(replica.store().readLastCommittedSegmentsInfo().getUserData()); final String translogUUIDtoUse; - final long translogGenToUse; final String historyUUIDtoUse = UUIDs.randomBase64UUID(random()); if (randomBoolean()) { // create a new translog translogUUIDtoUse = Translog.createEmptyTranslog(replica.shardPath().resolveTranslog(), flushedDocs, replica.shardId(), replica.getPendingPrimaryTerm()); - translogGenToUse = 1; } else { translogUUIDtoUse = translogGeneration.translogUUID; - translogGenToUse = translogGeneration.translogFileGeneration; } try (IndexWriter writer = new IndexWriter(replica.store().directory(), iwc)) { userData.put(Engine.HISTORY_UUID_KEY, historyUUIDtoUse); userData.put(Translog.TRANSLOG_UUID_KEY, translogUUIDtoUse); - userData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGenToUse)); writer.setLiveCommitData(userData.entrySet()); writer.commit(); } From 1eb0d5399e72449ff10af0417221394048b838cc Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 5 Feb 2020 10:22:34 -0500 Subject: [PATCH 2/9] force flush in translog yaml test --- .../rest-api-spec/test/indices.stats/20_translog.yml | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/20_translog.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/20_translog.yml index 9912db128b7de..5832ea2ec1228 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/20_translog.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/20_translog.yml @@ -10,6 +10,13 @@ cluster.health: wait_for_no_initializing_shards: true wait_for_events: languid + # Before 8.0, an empty shard has two empty translog files as we used the translog_generation commit tag as the minimum required + # translog generation for recovery. Here we force-flush to have a consistent translog stats for both old and new indices. + - do: + indices.flush: + index: test + force: true + wait_if_ongoing: true - do: indices.stats: metric: [ translog ] @@ -37,10 +44,9 @@ - do: indices.stats: metric: [ translog ] - # after flushing we have one empty translog file while an empty index before flushing has two empty translog files. - - lt: { indices.test.primaries.translog.size_in_bytes: $creation_size } + - match: { indices.test.primaries.translog.size_in_bytes: $creation_size } - match: { indices.test.primaries.translog.operations: 0 } - - lt: { indices.test.primaries.translog.uncommitted_size_in_bytes: $creation_size } + - match: { indices.test.primaries.translog.uncommitted_size_in_bytes: $creation_size } - match: { indices.test.primaries.translog.uncommitted_operations: 0 } --- From c4bcceafc01ddbb745063fe0cfbea246079c5e64 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 7 Feb 2020 09:19:44 -0500 Subject: [PATCH 3/9] Use local checkpoint of safe commit to calculate committed stats --- .../index/engine/CombinedDeletionPolicy.java | 2 -- .../index/engine/NoOpEngine.java | 1 - .../index/engine/ReadOnlyEngine.java | 2 +- .../index/translog/Translog.java | 2 +- .../translog/TranslogDeletionPolicy.java | 15 ---------- .../engine/CombinedDeletionPolicyTests.java | 5 ---- .../index/translog/TranslogTests.java | 30 +++++-------------- 7 files changed, 10 insertions(+), 47 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index d8b524d5ff7bb..5fbf9e69b2400 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -123,9 +123,7 @@ private void updateRetentionPolicy() throws IOException { assert safeCommit.isDeleted() == false : "The safe commit must not be deleted"; assert lastCommit.isDeleted() == false : "The last commit must not be deleted"; final long localCheckpointOfSafeCommit = Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); - final long localCheckpointOfLastCommit = Long.parseLong(lastCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); softDeletesPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit); - translogDeletionPolicy.setLocalCheckpointOfLastCommit(localCheckpointOfLastCommit); translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java index e17d274144616..041d6e2cd875e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java @@ -150,7 +150,6 @@ public void trimUnreferencedTranslogFiles() { final long localCheckpoint = Long.parseLong(commitUserData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(); translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint); - translogDeletionPolicy.setLocalCheckpointOfLastCommit(localCheckpoint); try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier(), engineConfig.getPrimaryTermSupplier(), seqNo -> {})) { translog.trimUnreferencedReaders(); diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index ac5b7826e7082..4d20615cc2237 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -223,7 +223,7 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm final TranslogConfig translogConfig = config.getTranslogConfig(); final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(); final long localCheckpoint = Long.parseLong(infos.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); - translogDeletionPolicy.setLocalCheckpointOfLastCommit(localCheckpoint); + translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint); try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy, config.getGlobalCheckpointSupplier(), config.getPrimaryTermSupplier(), seqNo -> {}) ) { diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index eb8aeed905dc0..1427b070b9d43 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -843,7 +843,7 @@ protected void closeOnTragicEvent(final Exception ex) { public TranslogStats stats() { // acquire lock to make the two numbers roughly consistent (no file change half way) try (ReleasableLock lock = readLock.acquire()) { - final long uncommittedGen = minGenerationForSeqNo(deletionPolicy.getLocalCheckpointOfLastCommit() + 1, current, readers); + final long uncommittedGen = minGenerationForSeqNo(deletionPolicy.getLocalCheckpointOfSafeCommit() + 1, current, readers); return new TranslogStats(totalOperations(), sizeInBytes(), totalOperationsByMinGen(uncommittedGen), sizeInBytesByMinGen(uncommittedGen), earliestLastModifiedAge()); } diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java index c9b3cc8b1ba1d..8dafde2d2c9d9 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java @@ -47,7 +47,6 @@ public void assertNoOpenTranslogRefs() { */ private final Map translogRefCounts = new HashMap<>(); private long localCheckpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED; - private long localCheckpointOfLastCommit = SequenceNumbers.NO_OPS_PERFORMED; public TranslogDeletionPolicy() { @@ -66,14 +65,6 @@ public synchronized void setLocalCheckpointOfSafeCommit(long newCheckpoint) { this.localCheckpointOfSafeCommit = newCheckpoint; } - public synchronized void setLocalCheckpointOfLastCommit(long newCheckpoint) { - if (newCheckpoint < this.localCheckpointOfLastCommit) { - throw new IllegalArgumentException("local checkpoint of the last commit can't go backwards: " + - "current [" + this.localCheckpointOfLastCommit + "] new [" + newCheckpoint + "]"); - } - this.localCheckpointOfLastCommit = newCheckpoint; - } - /** * acquires the basis generation for a new snapshot. Any translog generation above, and including, the returned generation * will not be deleted until the returned {@link Releasable} is closed. @@ -134,12 +125,6 @@ public synchronized long getLocalCheckpointOfSafeCommit() { return localCheckpointOfSafeCommit; } - /** - * Returns the local checkpoint of the last commit. This value is used to calculate the translog stats. - */ - public synchronized long getLocalCheckpointOfLastCommit() { - return localCheckpointOfLastCommit; - } synchronized long getTranslogRefCount(long gen) { final Counter counter = translogRefCounts.get(gen); diff --git a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index 60f83128804bc..f7d68b092c012 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -84,7 +84,6 @@ public void testKeepCommitsAfterGlobalCheckpoint() throws Exception { } } assertThat(translogPolicy.getLocalCheckpointOfSafeCommit(), equalTo(getLocalCheckpoint(commitList.get(keptIndex)))); - assertThat(translogPolicy.getLocalCheckpointOfLastCommit(), equalTo(getLocalCheckpoint(commitList.get(commitList.size() - 1)))); assertThat(softDeletesPolicy.getMinRetainedSeqNo(), equalTo(Math.max(NO_OPS_PERFORMED, Math.min(getLocalCheckpoint(commitList.get(keptIndex)) + 1, globalCheckpoint.get() + 1 - extraRetainedOps)))); @@ -149,7 +148,6 @@ public void testAcquireIndexCommit() throws Exception { snapshottingCommits.forEach(snapshot -> assertThat(snapshot.isDeleted(), equalTo(false))); // We don't need to retain translog for snapshotting commits. assertThat(translogPolicy.getLocalCheckpointOfSafeCommit(), equalTo(getLocalCheckpoint(commitList.get(safeIndex)))); - assertThat(translogPolicy.getLocalCheckpointOfLastCommit(), equalTo(getLocalCheckpoint(commitList.get(commitList.size() - 1)))); assertThat(softDeletesPolicy.getMinRetainedSeqNo(), equalTo( Math.max(NO_OPS_PERFORMED, Math.min(getLocalCheckpoint(commitList.get(safeIndex)) + 1, globalCheckpoint.get() + 1 - extraRetainedOps)))); @@ -163,7 +161,6 @@ public void testAcquireIndexCommit() throws Exception { } assertThat(commitList.get(commitList.size() - 1).isDeleted(), equalTo(false)); assertThat(translogPolicy.getLocalCheckpointOfSafeCommit(), equalTo(getLocalCheckpoint(commitList.get(commitList.size() - 1)))); - assertThat(translogPolicy.getLocalCheckpointOfLastCommit(), equalTo(getLocalCheckpoint(commitList.get(commitList.size() - 1)))); IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(commitList, globalCheckpoint.get()); assertThat(softDeletesPolicy.getMinRetainedSeqNo(), equalTo( Math.max(NO_OPS_PERFORMED, Math.min(getLocalCheckpoint(safeCommit) + 1, globalCheckpoint.get() + 1 - extraRetainedOps)))); @@ -224,7 +221,6 @@ public void testCheckUnreferencedCommits() throws Exception { if (safeCommitIndex == commitList.size() - 1) { // Safe commit is the last commit - no need to clean up assertThat(translogPolicy.getLocalCheckpointOfSafeCommit(), equalTo(getLocalCheckpoint(commitList.get(commitList.size() - 1)))); - assertThat(translogPolicy.getLocalCheckpointOfLastCommit(), equalTo(getLocalCheckpoint(commitList.get(commitList.size() - 1)))); assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false)); } else { // Advanced but not enough for any commit after the safe commit becomes safe @@ -242,7 +238,6 @@ public void testCheckUnreferencedCommits() throws Exception { indexPolicy.onCommit(commitList); // Safe commit is the last commit - no need to clean up assertThat(translogPolicy.getLocalCheckpointOfSafeCommit(), equalTo(getLocalCheckpoint(commitList.get(commitList.size() - 1)))); - assertThat(translogPolicy.getLocalCheckpointOfLastCommit(), equalTo(getLocalCheckpoint(commitList.get(commitList.size() - 1)))); assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false)); } } diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 84ef3ea2e9123..8a7ab38da424d 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -165,7 +165,6 @@ protected void afterIfSuccessful() throws Exception { if (translog.isOpen()) { if (translog.currentFileGeneration() > 1) { - translog.getDeletionPolicy().setLocalCheckpointOfLastCommit(Long.MAX_VALUE); translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(Long.MAX_VALUE); translog.trimUnreferencedReaders(); assertFileDeleted(translog, translog.currentFileGeneration() - 1); @@ -479,7 +478,6 @@ public void testStats() throws IOException { } } translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(randomLongBetween(3, Long.MAX_VALUE)); - translog.getDeletionPolicy().setLocalCheckpointOfLastCommit(randomLongBetween(3, Long.MAX_VALUE)); translog.trimUnreferencedReaders(); { final TranslogStats stats = stats(); @@ -506,8 +504,7 @@ public void testUncommittedOperations() throws Exception { } assertThat(translog.stats().getUncommittedOperations(), equalTo(uncommittedOps)); if (frequently()) { - deletionPolicy.setLocalCheckpointOfSafeCommit(randomLongBetween(deletionPolicy.getLocalCheckpointOfSafeCommit(), i)); - deletionPolicy.setLocalCheckpointOfLastCommit(i); + deletionPolicy.setLocalCheckpointOfSafeCommit(i); assertThat(translog.stats().getUncommittedOperations(), equalTo(operationsInLastGen)); uncommittedOps = operationsInLastGen; } @@ -625,7 +622,7 @@ public void testSnapshotWithNewTranslog() throws IOException { Translog.Snapshot snapshot2 = translog.newSnapshot(); toClose.add(snapshot2); - translog.getDeletionPolicy().setLocalCheckpointOfLastCommit(2); + translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(2); assertThat(snapshot2, containsOperationsInAnyOrder(ops)); assertThat(snapshot2.totalOperations(), equalTo(ops.size())); } finally { @@ -972,7 +969,6 @@ public void doRun() throws BrokenBarrierException, InterruptedException, IOExcep translog.rollGeneration(); // expose the new checkpoint (simulating a commit), before we trim the translog lastCommittedLocalCheckpoint.set(localCheckpoint); - deletionPolicy.setLocalCheckpointOfLastCommit(localCheckpoint); deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint); translog.trimUnreferencedReaders(); } @@ -1360,7 +1356,7 @@ public void testBasicRecovery() throws IOException { Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); final boolean commit = commitOften ? frequently() : rarely(); if (commit && op < translogOperations - 1) { - translog.getDeletionPolicy().setLocalCheckpointOfLastCommit(op); + translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(op); minUncommittedOp = op + 1; translogGeneration = translog.getGeneration(); } @@ -1816,7 +1812,6 @@ public void testOpenForeignTranslog() throws IOException { if (randomBoolean()) { translog.rollGeneration(); translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(op); - translog.getDeletionPolicy().setLocalCheckpointOfLastCommit(op); translog.trimUnreferencedReaders(); firstUncommitted = op + 1; } @@ -2219,7 +2214,6 @@ public void testRecoveryFromAFutureGenerationCleansUp() throws IOException { TranslogConfig config = translog.getConfig(); final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint); - deletionPolicy.setLocalCheckpointOfLastCommit(randomLongBetween(localCheckpoint, Long.MAX_VALUE)); translog = new Translog(config, translog.getTranslogUUID(), deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, seqNo -> {}); assertThat(translog.getMinFileGeneration(), equalTo(1L)); @@ -2268,7 +2262,6 @@ public void testRecoveryFromFailureOnTrimming() throws IOException { } } deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint); - deletionPolicy.setLocalCheckpointOfLastCommit(randomLongBetween(localCheckpoint, op)); minGenForRecovery = translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration; fail.failRandomly(); try { @@ -2278,7 +2271,6 @@ public void testRecoveryFromFailureOnTrimming() throws IOException { } } final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); - deletionPolicy.setLocalCheckpointOfLastCommit(randomLongBetween(localCheckpoint, Long.MAX_VALUE)); deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint); try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, seqNo -> {})) { @@ -2626,7 +2618,6 @@ public void testWithRandomException() throws IOException { unsynced.clear(); failableTLog.rollGeneration(); committing = true; - failableTLog.getDeletionPolicy().setLocalCheckpointOfLastCommit(opsAdded); failableTLog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(opsAdded); syncedDocs.clear(); failableTLog.trimUnreferencedReaders(); @@ -2671,7 +2662,6 @@ public void testWithRandomException() throws IOException { if (randomBoolean()) { try { TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); - deletionPolicy.setLocalCheckpointOfLastCommit(localCheckpointOfSafeCommit); deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit); IOUtils.close(getFailableTranslog(fail, config, randomBoolean(), false, generationUUID, deletionPolicy)); } catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) { @@ -2684,7 +2674,6 @@ public void testWithRandomException() throws IOException { fail.failNever(); // we don't wanna fail here but we might since we write a new checkpoint and create a new tlog file TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit); - deletionPolicy.setLocalCheckpointOfLastCommit(localCheckpointOfSafeCommit); if (generationUUID == null) { // we never managed to successfully create a translog, make it generationUUID = Translog.createEmptyTranslog(config.getTranslogPath(), @@ -2868,12 +2857,15 @@ public void testRollGeneration() throws Exception { Stream.concat(translog.getReaders().stream(), Stream.of(translog.getCurrent())) .filter(r -> r.getCheckpoint().minSeqNo >= 0) .collect(Collectors.toList())); + int retainedOps = Stream.concat(translog.getReaders().stream(), Stream.of(translog.getCurrent())) + .filter(r -> r.getCheckpoint().generation >= minRetainedReader.generation) + .mapToInt(r -> r.getCheckpoint().numOps) + .sum(); deletionPolicy.setLocalCheckpointOfSafeCommit( randomLongBetween(minRetainedReader.getCheckpoint().minSeqNo, minRetainedReader.getCheckpoint().maxSeqNo) - 1); - deletionPolicy.setLocalCheckpointOfLastCommit(seqNo); translog.trimUnreferencedReaders(); assertThat(translog.currentFileGeneration(), equalTo(generation + rolls)); - assertThat(translog.stats().getUncommittedOperations(), equalTo(0)); + assertThat(translog.stats().getUncommittedOperations(), equalTo(retainedOps)); // immediate cleanup for (long i = 0; i < minRetainedReader.generation; i++) { assertFileDeleted(translog, i); @@ -2967,7 +2959,6 @@ public void testSimpleCommit() throws IOException { } } translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(randomLongBetween(0, operations)); - translog.getDeletionPolicy().setLocalCheckpointOfLastCommit(randomLongBetween(operations, Long.MAX_VALUE)); } public void testAcquiredLockIsPassedToDeletionPolicy() throws IOException { @@ -2981,8 +2972,6 @@ public void testAcquiredLockIsPassedToDeletionPolicy() throws IOException { if (rarely()) { translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit( randomLongBetween(deletionPolicy.getLocalCheckpointOfSafeCommit(), i)); - translog.getDeletionPolicy().setLocalCheckpointOfLastCommit( - randomLongBetween(deletionPolicy.getLocalCheckpointOfLastCommit(), Long.MAX_VALUE)); } if (frequently()) { long minGen; @@ -3136,9 +3125,6 @@ public void testMaxSeqNo() throws Exception { translog.sync(); assertThat(translog.getMaxSeqNo(), equalTo(maxSeqNoPerGeneration.isEmpty() ? SequenceNumbers.NO_OPS_PERFORMED : Collections.max(maxSeqNoPerGeneration.values()))); - if (randomBoolean()) { - translog.getDeletionPolicy().setLocalCheckpointOfLastCommit(randomFrom(maxSeqNoPerGeneration.values())); - } long expectedMaxSeqNo = maxSeqNoPerGeneration.entrySet().stream() .filter(e -> e.getKey() >= translog.getMinFileGeneration()).mapToLong(e -> e.getValue()) .max().orElse(SequenceNumbers.NO_OPS_PERFORMED); From 2c0b2573dc8ee7aa21c418b0319dad3e276f1c8a Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 7 Feb 2020 09:23:04 -0500 Subject: [PATCH 4/9] restore log --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index af066651322dc..9656b091c2c6f 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -474,6 +474,8 @@ private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecovery assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be"; pendingTranslogRecovery.set(false); // we are good - now we can commit if (opsRecovered > 0) { + logger.trace("flushing post recovery from translog: ops recovered [{}], current translog generation [{}]", + opsRecovered, translog.currentFileGeneration()); commitIndexWriter(indexWriter, translog); refreshLastCommittedSegmentInfos(); refresh("translog_recovery"); From 5ea2cae112c15f1efd2ff13ddd22e2469e9762a3 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 7 Feb 2020 11:07:06 -0500 Subject: [PATCH 5/9] fix test --- .../index/engine/InternalEngineTests.java | 13 +++++++++---- .../replication/RecoveryDuringReplicationTests.java | 2 +- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index ad6fdc28bf90a..5bbe611454385 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -172,6 +172,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Function; +import java.util.function.IntSupplier; import java.util.function.LongSupplier; import java.util.function.Supplier; import java.util.function.ToLongBiFunction; @@ -4695,6 +4696,10 @@ public void testShouldPeriodicallyFlush() throws Exception { assertThat("Empty engine does not need flushing", engine.shouldPeriodicallyFlush(), equalTo(false)); // A new engine may have more than one empty translog files - the test should account this extra. final Translog translog = engine.getTranslog(); + final IntSupplier uncommittedTranslogOperationsSinceLastCommit = () -> { + long localCheckpoint = Long.parseLong(engine.getLastCommittedSegmentInfos().userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + return translog.totalOperationsByMinGen(translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration); + }; final long extraTranslogSizeInNewEngine = engine.getTranslog().stats().getUncommittedSizeInBytes() - Translog.DEFAULT_HEADER_SIZE_IN_BYTES; int numDocs = between(10, 100); @@ -4712,10 +4717,10 @@ public void testShouldPeriodicallyFlush() throws Exception { .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")).build(); indexSettings.updateIndexMetaData(indexMetaData); engine.onSettingsChanged(); - assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(numDocs)); + assertThat(uncommittedTranslogOperationsSinceLastCommit.getAsInt(), equalTo(numDocs)); assertThat(engine.shouldPeriodicallyFlush(), equalTo(true)); engine.flush(); - assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(0)); + assertThat(uncommittedTranslogOperationsSinceLastCommit.getAsInt(), equalTo(0)); // Stale operations skipped by Lucene but added to translog - still able to flush for (int id = 0; id < numDocs; id++) { final ParsedDocument doc = @@ -4724,11 +4729,11 @@ public void testShouldPeriodicallyFlush() throws Exception { assertThat(result.isCreated(), equalTo(false)); } SegmentInfos lastCommitInfo = engine.getLastCommittedSegmentInfos(); - assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(numDocs)); + assertThat(uncommittedTranslogOperationsSinceLastCommit.getAsInt(), equalTo(numDocs)); assertThat(engine.shouldPeriodicallyFlush(), equalTo(true)); engine.flush(false, false); assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo))); - assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(0)); + assertThat(uncommittedTranslogOperationsSinceLastCommit.getAsInt(), equalTo(0)); // If the new index commit still points to the same translog generation as the current index commit, // we should not enable the periodically flush condition; otherwise we can get into an infinite loop of flushes. generateNewSeqNo(engine); // create a gap here diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index d018cae78a467..5b9f9d3614b9d 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -734,7 +734,7 @@ public void testRollbackOnPromotion() throws Exception { shards.assertAllEqual(initDocs + inFlightOpsOnNewPrimary + moreDocsAfterRollback); done.set(true); thread.join(); - + shards.syncGlobalCheckpoint(); for (IndexShard shard : shards) { shard.flush(new FlushRequest().force(true).waitIfOngoing(true)); assertThat(shard.translogStats().getUncommittedOperations(), equalTo(0)); From 63c25e2adc5802bd1bc128e3204b844e6a83a7bf Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 7 Feb 2020 15:06:05 -0500 Subject: [PATCH 6/9] remove uncommitted translog --- .../main/java/org/elasticsearch/index/engine/NoOpEngine.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java index 041d6e2cd875e..7c39a3f11c4d5 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java @@ -138,9 +138,7 @@ public void trimUnreferencedTranslogFiles() { try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); final List commits = DirectoryReader.listCommits(store.directory()); - if (commits.size() == 1 && - translogStats.getUncommittedOperations() == 0 && - translogStats.getTranslogSizeInBytes() > translogStats.getUncommittedSizeInBytes()) { + if (commits.size() == 1 && translogStats.getTranslogSizeInBytes() > translogStats.getUncommittedSizeInBytes()) { final Map commitUserData = getLastCommittedSegmentInfos().getUserData(); final String translogUuid = commitUserData.get(Translog.TRANSLOG_UUID_KEY); if (translogUuid == null) { From 630da78ad629bf1a9fb7af78c9dcee77cb7c2b66 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 7 Feb 2020 15:51:22 -0500 Subject: [PATCH 7/9] revert checkpoint sync --- .../org/elasticsearch/index/engine/InternalEngineTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 5bbe611454385..c94b4967cb7cf 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -5652,8 +5652,8 @@ public void testRecoverFromLocalTranslog() throws Exception { for (Engine.Operation op : operations) { applyOperation(engine, op); if (randomBoolean()) { - globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getProcessedLocalCheckpoint())); engine.syncTranslog(); + globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getPersistedLocalCheckpoint())); } if (randomInt(100) < 10) { engine.refresh("test"); From 02103b77d611ab3fc10d0d90299f28c7508b9cd8 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 7 Feb 2020 16:01:02 -0500 Subject: [PATCH 8/9] assert translog ops before trimming --- .../java/org/elasticsearch/index/engine/NoOpEngineTests.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java index 58f9f248ed356..b1eb9e6ded532 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java @@ -167,10 +167,13 @@ public void testTrimUnreferencedTranslogFiles() throws Exception { tracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); final int numDocs = scaledRandomIntBetween(10, 3000); + int totalTranslogOps = 0; for (int i = 0; i < numDocs; i++) { + totalTranslogOps++; engine.index(indexForDoc(createParsedDoc(Integer.toString(i), null))); tracker.updateLocalCheckpoint(allocationId.getId(), i); if (rarely()) { + totalTranslogOps = 0; engine.flush(); } if (randomBoolean()) { @@ -183,6 +186,7 @@ public void testTrimUnreferencedTranslogFiles() throws Exception { engine.close(); final NoOpEngine noOpEngine = new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, primaryTranslogDir, tracker)); + assertThat(noOpEngine.getTranslogStats().estimatedNumberOfOperations(), equalTo(totalTranslogOps)); noOpEngine.trimUnreferencedTranslogFiles(); assertThat(noOpEngine.getTranslogStats().estimatedNumberOfOperations(), equalTo(0)); assertThat(noOpEngine.getTranslogStats().getUncommittedOperations(), equalTo(0)); From 8c91dcfe43d55bef8c109885527cc7a3584f74a4 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 7 Feb 2020 16:21:35 -0500 Subject: [PATCH 9/9] comment --- .../test/java/org/elasticsearch/index/shard/IndexShardIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 73eb214a55c6f..bcec724a72de4 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -415,7 +415,7 @@ public void testStressMaybeFlushOrRollTranslogGeneration() throws Exception { final boolean flush = randomBoolean(); final Settings settings; if (flush) { - // size of the operation plus two generations of overhead. + // size of the operation plus the overhead of one generation. settings = Settings.builder().put("index.translog.flush_threshold_size", "125b").build(); } else { // size of the operation plus header and footer