Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate Engine with decoupled Translog interfaces #3671

Merged
merged 7 commits into from
Jun 29, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 21 additions & 78 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogStats;
import org.opensearch.index.translog.TranslogManager;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
import org.opensearch.search.suggest.completion.CompletionStats;

import java.io.Closeable;
Expand All @@ -107,7 +109,6 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Stream;

import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
Expand Down Expand Up @@ -167,6 +168,8 @@ public final EngineConfig config() {
return engineConfig;
}

public abstract TranslogManager translogManager();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉 Great to see this decoupling


protected abstract SegmentInfos getLastCommittedSegmentInfos();

/**
Expand Down Expand Up @@ -346,12 +349,6 @@ boolean throttleLockIsHeldByCurrentThread() { // to be used in assertions and te
*/
public abstract boolean isThrottled();

/**
* Trims translog for terms below <code>belowTerm</code> and seq# above <code>aboveSeqNo</code>
* @see Translog#trimOperations(long, long)
*/
public abstract void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException;

/**
* A Lock implementation that always allows the lock to be acquired
*
Expand Down Expand Up @@ -784,18 +781,6 @@ public enum SearcherScope {
INTERNAL
}

/**
* Checks if the underlying storage sync is required.
*/
public abstract boolean isTranslogSyncNeeded();

/**
* Ensures that all locations in the given stream have been written to the underlying storage.
*/
public abstract boolean ensureTranslogSynced(Stream<Translog.Location> locations) throws IOException;

public abstract void syncTranslog() throws IOException;

/**
* Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed
*/
Expand Down Expand Up @@ -831,13 +816,6 @@ public abstract Translog.Snapshot newChangesSnapshot(
*/
public abstract long getMinRetainedSeqNo();

public abstract TranslogStats getTranslogStats();

/**
* Returns the last location that the translog of this engine has written into.
*/
public abstract Translog.Location getTranslogLastWriteLocation();

protected final void ensureOpen(Exception suppressed) {
if (isClosed.get()) {
AlreadyClosedException ace = new AlreadyClosedException(shardId + " engine is closed", failedEngine.get());
Expand Down Expand Up @@ -905,6 +883,22 @@ public SegmentsStats segmentsStats(boolean includeSegmentFileSizes, boolean incl
return stats;
}

protected TranslogDeletionPolicy getTranslogDeletionPolicy(EngineConfig engineConfig) {
TranslogDeletionPolicy customTranslogDeletionPolicy = null;
if (engineConfig.getCustomTranslogDeletionPolicyFactory() != null) {
customTranslogDeletionPolicy = engineConfig.getCustomTranslogDeletionPolicyFactory()
.create(engineConfig.getIndexSettings(), engineConfig.retentionLeasesSupplier());
}
return Objects.requireNonNullElseGet(
customTranslogDeletionPolicy,
() -> new DefaultTranslogDeletionPolicy(
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(),
engineConfig.getIndexSettings().getTranslogRetentionTotalFiles()
)
);
}

protected void fillSegmentStats(SegmentReader segmentReader, boolean includeSegmentFileSizes, SegmentsStats stats) {
stats.add(1);
if (includeSegmentFileSizes) {
Expand Down Expand Up @@ -1152,25 +1146,6 @@ public final void flush() throws EngineException {
flush(false, false);
}

/**
* checks and removes translog files that no longer need to be retained. See
* {@link org.opensearch.index.translog.TranslogDeletionPolicy} for details
*/
public abstract void trimUnreferencedTranslogFiles() throws EngineException;

/**
* Tests whether or not the translog generation should be rolled to a new generation.
* This test is based on the size of the current generation compared to the configured generation threshold size.
*
* @return {@code true} if the current generation should be rolled to a new generation
*/
public abstract boolean shouldRollTranslogGeneration();

/**
* Rolls the translog generation and cleans unneeded.
*/
public abstract void rollTranslogGeneration() throws EngineException;

/**
* Triggers a forced merge on this engine
*/
Expand Down Expand Up @@ -1982,14 +1957,6 @@ public interface Warmer {
*/
public abstract void deactivateThrottling();

/**
* This method replays translog to restore the Lucene index which might be reverted previously.
* This ensures that all acknowledged writes are restored correctly when this engine is promoted.
*
* @return the number of translog operations have been recovered
*/
public abstract int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecoveryRunner) throws IOException;

/**
* Fills up the local checkpoints history with no-ops until the local checkpoint
* and the max seen sequence ID are identical.
Expand All @@ -1998,20 +1965,6 @@ public interface Warmer {
*/
public abstract int fillSeqNoGaps(long primaryTerm) throws IOException;

/**
* Performs recovery from the transaction log up to {@code recoverUpToSeqNo} (inclusive).
* This operation will close the engine if the recovery fails.
*
* @param translogRecoveryRunner the translog recovery runner
* @param recoverUpToSeqNo the upper bound, inclusive, of sequence number to be recovered
*/
public abstract Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException;

/**
* Do not replay translog operations, but make the engine be ready.
*/
public abstract void skipTranslogRecovery();

/**
* Tries to prune buffered deletes from the version map.
*/
Expand All @@ -2032,16 +1985,6 @@ public long getMaxSeenAutoIdTimestamp() {
*/
public abstract void updateMaxUnsafeAutoIdTimestamp(long newTimestamp);

/**
* The runner for translog recovery
*
* @opensearch.internal
*/
@FunctionalInterface
public interface TranslogRecoveryRunner {
int run(Engine engine, Translog.Snapshot snapshot) throws IOException;
}

/**
* Returns the maximum sequence number of either update or delete operations have been processed in this engine
* or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. An index request is considered
Expand Down
Loading