Skip to content

Commit

Permalink
Pass TranslogRecoveryRunner to engine from outside (#33449)
Browse files Browse the repository at this point in the history
This commit allows us to use different TranslogRecoveryRunner when
recovering an engine from its local translog. This change is a
prerequisite for the commit-based rollback PR.

Relates #32867
  • Loading branch information
dnhatn committed Sep 6, 2018
1 parent 03878a1 commit 008c59d
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 78 deletions.
10 changes: 8 additions & 2 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -1648,9 +1648,10 @@ public interface Warmer {
* Performs recovery from the transaction log up to {@code recoverUpToSeqNo} (inclusive).
* This operation will close the engine if the recovery fails.
*
* @param recoverUpToSeqNo the upper bound, inclusive, of sequence number to be recovered
* @param translogRecoveryRunner the translog recovery runner
* @param recoverUpToSeqNo the upper bound, inclusive, of sequence number to be recovered
*/
public abstract Engine recoverFromTranslog(long recoverUpToSeqNo) throws IOException;
public abstract Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException;

/**
* Do not replay translog operations, but make the engine be ready.
Expand All @@ -1668,4 +1669,9 @@ public boolean isRecovering() {
* Tries to prune buffered deletes from the version map.
*/
public abstract void maybePruneDeletes();

@FunctionalInterface
public interface TranslogRecoveryRunner {
int run(Engine engine, Translog.Snapshot snapshot) throws IOException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,11 @@
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.List;
import java.util.function.LongSupplier;

Expand Down Expand Up @@ -76,7 +74,6 @@ public final class EngineConfig {
private final List<ReferenceManager.RefreshListener> internalRefreshListener;
@Nullable
private final Sort indexSort;
private final TranslogRecoveryRunner translogRecoveryRunner;
@Nullable
private final CircuitBreakerService circuitBreakerService;
private final LongSupplier globalCheckpointSupplier;
Expand Down Expand Up @@ -127,9 +124,8 @@ public EngineConfig(ShardId shardId, String allocationId, ThreadPool threadPool,
TranslogConfig translogConfig, TimeValue flushMergesAfter,
List<ReferenceManager.RefreshListener> externalRefreshListener,
List<ReferenceManager.RefreshListener> internalRefreshListener, Sort indexSort,
TranslogRecoveryRunner translogRecoveryRunner, CircuitBreakerService circuitBreakerService,
LongSupplier globalCheckpointSupplier, LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier) {
CircuitBreakerService circuitBreakerService, LongSupplier globalCheckpointSupplier,
LongSupplier primaryTermSupplier, TombstoneDocSupplier tombstoneDocSupplier) {
this.shardId = shardId;
this.allocationId = allocationId;
this.indexSettings = indexSettings;
Expand Down Expand Up @@ -163,7 +159,6 @@ public EngineConfig(ShardId shardId, String allocationId, ThreadPool threadPool,
this.externalRefreshListener = externalRefreshListener;
this.internalRefreshListener = internalRefreshListener;
this.indexSort = indexSort;
this.translogRecoveryRunner = translogRecoveryRunner;
this.circuitBreakerService = circuitBreakerService;
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.primaryTermSupplier = primaryTermSupplier;
Expand Down Expand Up @@ -324,18 +319,6 @@ public TranslogConfig getTranslogConfig() {
*/
public TimeValue getFlushMergesAfter() { return flushMergesAfter; }

@FunctionalInterface
public interface TranslogRecoveryRunner {
int run(Engine engine, Translog.Snapshot snapshot) throws IOException;
}

/**
* Returns a runner that implements the translog recovery from the given snapshot
*/
public TranslogRecoveryRunner getTranslogRecoveryRunner() {
return translogRecoveryRunner;
}

/**
* The refresh listeners to add to Lucene for externally visible refreshes
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,15 +398,15 @@ private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) {
}

@Override
public InternalEngine recoverFromTranslog(long recoverUpToSeqNo) throws IOException {
public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
flushLock.lock();
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
if (pendingTranslogRecovery.get() == false) {
throw new IllegalStateException("Engine has already been recovered");
}
try {
recoverFromTranslogInternal(recoverUpToSeqNo);
recoverFromTranslogInternal(translogRecoveryRunner, recoverUpToSeqNo);
} catch (Exception e) {
try {
pendingTranslogRecovery.set(true); // just play safe and never allow commits on this see #ensureCanFlush
Expand All @@ -428,13 +428,13 @@ public void skipTranslogRecovery() {
pendingTranslogRecovery.set(false); // we are good - now we can commit
}

private void recoverFromTranslogInternal(long recoverUpToSeqNo) throws IOException {
private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
final int opsRecovered;
final long translogFileGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(
new Translog.TranslogGeneration(translog.getTranslogUUID(), translogFileGen), recoverUpToSeqNo)) {
opsRecovered = config().getTranslogRecoveryRunner().run(this, snapshot);
opsRecovered = translogRecoveryRunner.run(this, snapshot);
} catch (Exception e) {
throw new EngineException(shardId, "failed to recover from translog", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1347,7 +1347,7 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOExce
**/
public void openEngineAndRecoverFromTranslog() throws IOException {
innerOpenEngineAndTranslog();
getEngine().recoverFromTranslog(Long.MAX_VALUE);
getEngine().recoverFromTranslog(this::runTranslogRecovery, Long.MAX_VALUE);
}

/**
Expand Down Expand Up @@ -2270,7 +2270,7 @@ private EngineConfig newEngineConfig() {
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Collections.singletonList(refreshListeners),
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
indexSort, this::runTranslogRecovery, circuitBreakerService, replicationTracker, () -> operationPrimaryTerm, tombstoneDocSupplier());
indexSort, circuitBreakerService, replicationTracker, () -> operationPrimaryTerm, tombstoneDocSupplier());
}

/**
Expand Down
Loading

0 comments on commit 008c59d

Please sign in to comment.