Skip to content

Commit

Permalink
Use local checkpoint to calculate min translog gen for recovery (elas…
Browse files Browse the repository at this point in the history
…tic#51905)

Today we use the translog_generation of the safe commit as the minimum
required translog generation for recovery. This approach has a
limitation, where we won't be able to clean up translog unless we flush.
Reopening an already recovered engine will create a new empty translog,
and we leave it there until we force flush.

This commit removes the translog_generation commit tag and uses the
local checkpoint of the safe commit to calculate the minimum required
translog generation for recovery instead.

Closes elastic#49970
  • Loading branch information
dnhatn committed Feb 26, 2020
1 parent 848d3bc commit a14d4bc
Show file tree
Hide file tree
Showing 24 changed files with 341 additions and 550 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@
cluster.health:
wait_for_no_initializing_shards: true
wait_for_events: languid
# Before 8.0, an empty shard has two empty translog files as we used the translog_generation commit tag as the minimum required
# translog generation for recovery. Here we force-flush to have a consistent translog stats for both old and new indices.
- do:
indices.flush:
index: test
force: true
wait_if_ongoing: true
- do:
indices.stats:
metric: [ translog ]
Expand Down Expand Up @@ -115,10 +122,9 @@
- do:
indices.stats:
metric: [ translog ]
# after flushing we have one empty translog file while an empty index before flushing has two empty translog files.
- lt: { indices.test.primaries.translog.size_in_bytes: $creation_size }
- match: { indices.test.primaries.translog.size_in_bytes: $creation_size }
- match: { indices.test.primaries.translog.operations: 0 }
- lt: { indices.test.primaries.translog.uncommitted_size_in_bytes: $creation_size }
- match: { indices.test.primaries.translog.uncommitted_size_in_bytes: $creation_size }
- match: { indices.test.primaries.translog.uncommitted_operations: 0 }

---
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,10 @@ private void updateRetentionPolicy() throws IOException {
assert Thread.holdsLock(this);
logger.debug("Safe commit [{}], last commit [{}]", commitDescription(safeCommit), commitDescription(lastCommit));
assert safeCommit.isDeleted() == false : "The safe commit must not be deleted";
final long minRequiredGen = Long.parseLong(safeCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
assert lastCommit.isDeleted() == false : "The last commit must not be deleted";
final long lastGen = Long.parseLong(lastCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));

assert minRequiredGen <= lastGen : "minRequiredGen must not be greater than lastGen";
translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastGen);
translogDeletionPolicy.setMinTranslogGenerationForRecovery(minRequiredGen);

softDeletesPolicy.setLocalCheckpointOfSafeCommit(
Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)));
final long localCheckpointOfSafeCommit = Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
softDeletesPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit);
translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit);
}

protected int getDocCountOfCommit(IndexCommit indexCommit) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ public int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecove
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(localCheckpoint + 1)) {
try (Translog.Snapshot snapshot = getTranslog().newSnapshot(localCheckpoint + 1, Long.MAX_VALUE)) {
return translogRecoveryRunner.run(this, snapshot);
}
}
Expand Down Expand Up @@ -473,23 +473,24 @@ public void skipTranslogRecovery() {
}

private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
final int opsRecovered;
final long translogFileGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(
new Translog.TranslogGeneration(translog.getTranslogUUID(), translogFileGen), recoverUpToSeqNo)) {
opsRecovered = translogRecoveryRunner.run(this, snapshot);
} catch (Exception e) {
throw new EngineException(shardId, "failed to recover from translog", e);
final long localCheckpoint = getProcessedLocalCheckpoint();
if (localCheckpoint < recoverUpToSeqNo) {
try (Translog.Snapshot snapshot = translog.newSnapshot(localCheckpoint + 1, recoverUpToSeqNo)) {
opsRecovered = translogRecoveryRunner.run(this, snapshot);
} catch (Exception e) {
throw new EngineException(shardId, "failed to recover from translog", e);
}
} else {
opsRecovered = 0;
}
// flush if we recovered something or if we have references to older translogs
// note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length.
assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be";
pendingTranslogRecovery.set(false); // we are good - now we can commit
if (opsRecovered > 0) {
logger.trace("flushing post recovery from translog. ops recovered [{}]. committed translog id [{}]. current id [{}]",
opsRecovered, translogGeneration == null ? null :
translogGeneration.translogFileGeneration, translog.currentFileGeneration());
logger.trace("flushing post recovery from translog: ops recovered [{}], current translog generation [{}]",
opsRecovered, translog.currentFileGeneration());
commitIndexWriter(indexWriter, translog, null);
refreshLastCommittedSegmentInfos();
refresh("translog_recovery");
Expand All @@ -501,7 +502,8 @@ private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy
LongSupplier globalCheckpointSupplier, LongConsumer persistedSequenceNumberConsumer) throws IOException {

final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
final String translogUUID = loadTranslogUUIDFromLastCommit();
final Map<String, String> userData = store.readLastCommittedSegmentsInfo().getUserData();
final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY));
// We expect that this shard already exists, so it must already have an existing translog else something is badly wrong!
return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier,
engineConfig.getPrimaryTermSupplier(), persistedSequenceNumberConsumer);
Expand Down Expand Up @@ -549,7 +551,7 @@ public Translog.Snapshot readHistoryOperations(String reason, HistorySource hist
ensureSoftDeletesEnabled();
return newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false);
} else {
return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo);
return getTranslog().newSnapshot(startingSeqNo, Long.MAX_VALUE);
}
}

Expand Down Expand Up @@ -598,18 +600,6 @@ public long getWritingBytes() {
return indexWriter.getFlushingBytes() + versionMap.getRefreshingBytes();
}

/**
* Reads the current stored translog ID from the last commit data.
*/
@Nullable
private String loadTranslogUUIDFromLastCommit() throws IOException {
final Map<String, String> commitUserData = store.readLastCommittedSegmentsInfo().getUserData();
if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) {
throw new IllegalStateException("commit doesn't contain translog generation id");
}
return commitUserData.get(Translog.TRANSLOG_UUID_KEY);
}

/**
* Reads the current stored history ID from the IW commit data.
*/
Expand Down Expand Up @@ -1688,8 +1678,9 @@ final boolean tryRenewSyncCommit() {
ensureOpen();
ensureCanFlush();
String syncId = lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID);
long translogGenOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY));
if (syncId != null && indexWriter.hasUncommittedChanges() && translog.totalOperationsByMinGen(translogGenOfLastCommit) == 0) {
long localCheckpointOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
if (syncId != null && indexWriter.hasUncommittedChanges() &&
translog.estimateTotalOperationsFromMinSeq(localCheckpointOfLastCommit + 1) == 0) {
logger.trace("start renewing sync commit [{}]", syncId);
commitIndexWriter(indexWriter, translog, syncId);
logger.debug("successfully sync committed. sync id [{}].", syncId);
Expand All @@ -1714,8 +1705,10 @@ public boolean shouldPeriodicallyFlush() {
if (shouldPeriodicallyFlushAfterBigMerge.get()) {
return true;
}
final long localCheckpointOfLastCommit =
Long.parseLong(lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
final long translogGenerationOfLastCommit =
Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY));
translog.getMinGenerationForSeqNo(localCheckpointOfLastCommit + 1).translogFileGeneration;
final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes();
if (translog.sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) {
return false;
Expand Down Expand Up @@ -2423,11 +2416,6 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
ensureCanFlush();
try {
final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
final Translog.TranslogGeneration translogGeneration = translog.getMinGenerationForSeqNo(localCheckpoint + 1);
final String translogFileGeneration = Long.toString(translogGeneration.translogFileGeneration);
final String translogUUID = translogGeneration.translogUUID;
final String localCheckpointValue = Long.toString(localCheckpoint);

writer.setLiveCommitData(() -> {
/*
* The user data captured above (e.g. local checkpoint) contains data that must be evaluated *before* Lucene flushes
Expand All @@ -2438,10 +2426,9 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
* {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time
* of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene).
*/
final Map<String, String> commitData = new HashMap<>(8);
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGeneration);
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpointValue);
final Map<String, String> commitData = new HashMap<>(7);
commitData.put(Translog.TRANSLOG_UUID_KEY, translog.getTranslogUUID());
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));
if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}
Expand Down Expand Up @@ -2657,7 +2644,7 @@ public boolean hasCompleteOperationHistory(String reason, HistorySource historyS
return true;
}
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1);
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) {
try (Translog.Snapshot snapshot = getTranslog().newSnapshot(startingSeqNo, Long.MAX_VALUE)) {
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
Expand Down
31 changes: 12 additions & 19 deletions server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
Expand Down Expand Up @@ -137,31 +138,23 @@ public void trimUnreferencedTranslogFiles() {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
final List<IndexCommit> commits = DirectoryReader.listCommits(store.directory());
if (commits.size() == 1) {
if (commits.size() == 1 && translogStats.getTranslogSizeInBytes() > translogStats.getUncommittedSizeInBytes()) {
final Map<String, String> commitUserData = getLastCommittedSegmentInfos().getUserData();
final String translogUuid = commitUserData.get(Translog.TRANSLOG_UUID_KEY);
if (translogUuid == null) {
throw new IllegalStateException("commit doesn't contain translog unique id");
}
if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) {
throw new IllegalStateException("commit doesn't contain translog generation id");
}
final long lastCommitGeneration = Long.parseLong(commitUserData.get(Translog.TRANSLOG_GENERATION_KEY));
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
final long minTranslogGeneration = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUuid);

if (minTranslogGeneration < lastCommitGeneration) {
// a translog deletion policy that retains nothing but the last translog generation from safe commit
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1, 0);
translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastCommitGeneration);
translogDeletionPolicy.setMinTranslogGenerationForRecovery(lastCommitGeneration);

try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy,
engineConfig.getGlobalCheckpointSupplier(), engineConfig.getPrimaryTermSupplier(), seqNo -> {})) {
translog.trimUnreferencedReaders();
// refresh the translog stats
this.translogStats = translog.stats();
}
final long localCheckpoint = Long.parseLong(commitUserData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1, 0);
translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy,
engineConfig.getGlobalCheckpointSupplier(), engineConfig.getPrimaryTermSupplier(), seqNo -> {})) {
translog.trimUnreferencedReaders();
// refresh the translog stats
this.translogStats = translog.stats();
assert translog.currentFileGeneration() == translog.getMinFileGeneration() : "translog was not trimmed "
+ " current gen " + translog.currentFileGeneration() + " != min gen " + translog.getMinFileGeneration();
}
}
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,15 +223,14 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm
if (translogUuid == null) {
throw new IllegalStateException("commit doesn't contain translog unique id");
}
final long translogGenOfLastCommit = Long.parseLong(infos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
final TranslogConfig translogConfig = config.getTranslogConfig();
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
config.getIndexSettings().getTranslogRetentionSize().getBytes(),
config.getIndexSettings().getTranslogRetentionAge().getMillis(),
config.getIndexSettings().getTranslogRetentionTotalFiles()
);
translogDeletionPolicy.setTranslogGenerationOfLastCommit(translogGenOfLastCommit);

final long localCheckpoint = Long.parseLong(infos.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy, config.getGlobalCheckpointSupplier(),
config.getPrimaryTermSupplier(), seqNo -> {})
) {
Expand Down
8 changes: 3 additions & 5 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -1453,10 +1453,7 @@ public void associateIndexWithNewTranslog(final String translogUUID) throws IOEx
if (translogUUID.equals(getUserData(writer).get(Translog.TRANSLOG_UUID_KEY))) {
throw new IllegalArgumentException("a new translog uuid can't be equal to existing one. got [" + translogUUID + "]");
}
final Map<String, String> map = new HashMap<>();
map.put(Translog.TRANSLOG_GENERATION_KEY, "1");
map.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
updateCommitData(writer, map);
updateCommitData(writer, Collections.singletonMap(Translog.TRANSLOG_UUID_KEY, translogUUID));
} finally {
metadataLock.writeLock().unlock();
}
Expand Down Expand Up @@ -1517,7 +1514,8 @@ public void trimUnsafeCommits(final long lastSyncedGlobalCheckpoint, final long
if (indexVersionCreated.before(org.elasticsearch.Version.V_6_2_0)) {
final List<IndexCommit> recoverableCommits = new ArrayList<>();
for (IndexCommit commit : existingCommits) {
if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) {
final String translogGeneration = commit.getUserData().get("translog_generation");
if (translogGeneration == null || minRetainedTranslogGen <= Long.parseLong(translogGeneration)) {
recoverableCommits.add(commit);
}
}
Expand Down
Loading

0 comments on commit a14d4bc

Please sign in to comment.