Skip to content

Commit

Permalink
Add a new Engine implementation for replicas with segment replication…
Browse files Browse the repository at this point in the history
… enabled. (#3240)

* Change fastForwardProcessedSeqNo method in LocalCheckpointTracker to persisted checkpoint.

This change inverts fastForwardProcessedSeqNo to fastForwardPersistedSeqNo for use in
Segment Replication.  This is so that a Segrep Engine can match the logic of InternalEngine
where the seqNo is incremented with each operation, but only persisted in the tracker on a flush.
With Segment Replication we bump the processed number with each operation received index/delete/noOp, and
invoke this method when we receive a new set of segments to bump the persisted seqNo.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Extract Translog specific engine methods into an abstract class.

This change extracts translog specific methods to an abstract engine class so that other engine
implementations can reuse translog logic.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Add a separate Engine implementation for replicas with segment replication enabled.

This change adds a new engine intended to be used on replicas with segment replication enabled.
This engine does not wire up an IndexWriter, but still writes all operations to a translog.
The engine uses a new ReaderManager that refreshes from an externally provided SegmentInfos.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Fix spotless checks.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Fix :server:compileInternalClusterTestJava compilation.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Fix failing test naming convention check.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* PR feedback.

- Removed isReadOnlyReplica from overloaded constructor and added feature flag checks.
- Updated log msg in NRTReplicationReaderManager
- cleaned up store ref counting in NRTReplicationEngine.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Fix spotless check.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Remove TranslogAwareEngine and build translog in NRTReplicationEngine.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Fix formatting

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Add missing translog methods to NRTEngine.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Remove persistent seqNo check from fastForwardProcessedSeqNo.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* PR feedback.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Add test specific to translog trimming.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Javadoc check.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Add failEngine calls to translog methods in NRTReplicationEngine.
Roll xlog generation on replica when a new commit point is received.

Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 authored May 24, 2022
1 parent fd5a38d commit a0030df
Show file tree
Hide file tree
Showing 16 changed files with 1,007 additions and 70 deletions.
17 changes: 17 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,30 @@ public final EngineConfig config() {

protected abstract SegmentInfos getLastCommittedSegmentInfos();

/**
* Return the latest active SegmentInfos from the engine.
* @return {@link SegmentInfos}
*/
protected abstract SegmentInfos getLatestSegmentInfos();

public MergeStats getMergeStats() {
return new MergeStats();
}

/** returns the history uuid for the engine */
public abstract String getHistoryUUID();

/**
* Reads the current stored history ID from commit data.
*/
String loadHistoryUUID(Map<String, String> commitData) {
final String uuid = commitData.get(HISTORY_UUID_KEY);
if (uuid == null) {
throw new IllegalStateException("commit doesn't contain history uuid");
}
return uuid;
}

/** Returns how many bytes we are currently moving from heap to disk */
public abstract long getWritingBytes();

Expand Down
72 changes: 72 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/EngineConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public final class EngineConfig {
private final CircuitBreakerService circuitBreakerService;
private final LongSupplier globalCheckpointSupplier;
private final Supplier<RetentionLeases> retentionLeasesSupplier;
private final boolean isReadOnlyReplica;

/**
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
Expand Down Expand Up @@ -228,6 +229,66 @@ public EngineConfig(
LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier
) {
this(
shardId,
threadPool,
indexSettings,
warmer,
store,
mergePolicy,
analyzer,
similarity,
codecService,
eventListener,
queryCache,
queryCachingPolicy,
translogConfig,
translogDeletionPolicyFactory,
flushMergesAfter,
externalRefreshListener,
internalRefreshListener,
indexSort,
circuitBreakerService,
globalCheckpointSupplier,
retentionLeasesSupplier,
primaryTermSupplier,
tombstoneDocSupplier,
false
);
}

/**
* Creates a new {@link org.opensearch.index.engine.EngineConfig}
*/
EngineConfig(
ShardId shardId,
ThreadPool threadPool,
IndexSettings indexSettings,
Engine.Warmer warmer,
Store store,
MergePolicy mergePolicy,
Analyzer analyzer,
Similarity similarity,
CodecService codecService,
Engine.EventListener eventListener,
QueryCache queryCache,
QueryCachingPolicy queryCachingPolicy,
TranslogConfig translogConfig,
TranslogDeletionPolicyFactory translogDeletionPolicyFactory,
TimeValue flushMergesAfter,
List<ReferenceManager.RefreshListener> externalRefreshListener,
List<ReferenceManager.RefreshListener> internalRefreshListener,
Sort indexSort,
CircuitBreakerService circuitBreakerService,
LongSupplier globalCheckpointSupplier,
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier,
boolean isReadOnlyReplica
) {
if (isReadOnlyReplica && indexSettings.isSegRepEnabled() == false) {
throw new IllegalArgumentException("Shard can only be wired as a read only replica with Segment Replication enabled");
}
this.shardId = shardId;
this.indexSettings = indexSettings;
this.threadPool = threadPool;
Expand Down Expand Up @@ -266,6 +327,7 @@ public EngineConfig(
this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier);
this.primaryTermSupplier = primaryTermSupplier;
this.tombstoneDocSupplier = tombstoneDocSupplier;
this.isReadOnlyReplica = isReadOnlyReplica;
}

/**
Expand Down Expand Up @@ -460,6 +522,16 @@ public LongSupplier getPrimaryTermSupplier() {
return primaryTermSupplier;
}

/**
* Returns if this replica should be wired as a read only.
* This is used for Segment Replication where the engine implementation used is dependent on
* if the shard is a primary/replica.
* @return true if this engine should be wired as read only.
*/
public boolean isReadOnlyReplica() {
return indexSettings.isSegRepEnabled() && isReadOnlyReplica;
}

/**
* A supplier supplies tombstone documents which will be used in soft-update methods.
* The returned document consists only _uid, _seqno, _term and _version fields; other metadata fields are excluded.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ public EngineConfig newEngineConfig(
LongSupplier globalCheckpointSupplier,
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier,
boolean isReadOnlyReplica
) {
CodecService codecServiceToUse = codecService;
if (codecService == null && this.codecServiceFactory != null) {
Expand Down Expand Up @@ -176,7 +177,8 @@ public EngineConfig newEngineConfig(
globalCheckpointSupplier,
retentionLeasesSupplier,
primaryTermSupplier,
tombstoneDocSupplier
tombstoneDocSupplier,
isReadOnlyReplica
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.ShuffleForcedMergePolicy;
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.sandbox.index.MergeOnFlushMergePolicy;
import org.apache.lucene.search.BooleanClause;
Expand Down Expand Up @@ -648,17 +649,6 @@ public long getWritingBytes() {
return indexWriter.getFlushingBytes() + versionMap.getRefreshingBytes();
}

/**
* Reads the current stored history ID from the IW commit data.
*/
private String loadHistoryUUID(Map<String, String> commitData) {
final String uuid = commitData.get(HISTORY_UUID_KEY);
if (uuid == null) {
throw new IllegalStateException("commit doesn't contain history uuid");
}
return uuid;
}

private ExternalReaderManager createReaderManager(RefreshWarmerListener externalRefreshListener) throws EngineException {
boolean success = false;
OpenSearchReaderManager internalReaderManager = null;
Expand Down Expand Up @@ -2298,6 +2288,23 @@ protected SegmentInfos getLastCommittedSegmentInfos() {
return lastCommittedSegmentInfos;
}

@Override
public SegmentInfos getLatestSegmentInfos() {
OpenSearchDirectoryReader reader = null;
try {
reader = internalReaderManager.acquire();
return ((StandardDirectoryReader) reader.getDelegate()).getSegmentInfos();
} catch (IOException e) {
throw new EngineException(shardId, e.getMessage(), e);
} finally {
try {
internalReaderManager.release(reader);
} catch (IOException e) {
throw new EngineException(shardId, e.getMessage(), e);
}
}
}

@Override
protected final void writerSegmentStats(SegmentsStats stats) {
stats.addVersionMapMemoryInBytes(versionMap.ramBytesUsed());
Expand Down
Loading

0 comments on commit a0030df

Please sign in to comment.