Skip to content

Commit

Permalink
Segment Replication - Commit SegmentInfos on replicas when new genera…
Browse files Browse the repository at this point in the history
…tion received.

This change updates NRTReplicationEngine to trigger a commit when a new segment generation is received from
the primary.  It also updates the engine to commit when a replica is closed so that it can restart from the same
segments.

Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Sep 5, 2022
1 parent a0ab4d7 commit e18fec6
Show file tree
Hide file tree
Showing 8 changed files with 279 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public class NRTReplicationEngine extends Engine {
private final LocalCheckpointTracker localCheckpointTracker;
private final WriteOnlyTranslogManager translogManager;

private volatile long lastReceivedGen = SequenceNumbers.NO_OPS_PERFORMED;

private static final int SI_COUNTER_INCREMENT = 10;

public NRTReplicationEngine(EngineConfig engineConfig) {
Expand Down Expand Up @@ -120,14 +122,16 @@ public TranslogManager translogManager() {

public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException {
// Update the current infos reference on the Engine's reader.
long incomingGeneration = infos.getGeneration();
readerManager.updateSegments(infos);

// only update the persistedSeqNo and "lastCommitted" infos reference if the incoming segments have a higher
// generation. We can still refresh with incoming SegmentInfos that are not part of a commit point.
if (infos.getGeneration() > lastCommittedSegmentInfos.getGeneration()) {
this.lastCommittedSegmentInfos = infos;
// Commit and roll the xlog when we receive a different generation than what was last received.
// lower/higher gens are possible from a new primary that was just elected.
if (incomingGeneration != lastReceivedGen) {
commitSegmentInfos();
translogManager.rollTranslogGeneration();
}
lastReceivedGen = incomingGeneration;
localCheckpointTracker.fastForwardProcessedSeqNo(seqNo);
}

Expand All @@ -141,20 +145,16 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th
*
* @throws IOException - When there is an IO error committing the SegmentInfos.
*/
public void commitSegmentInfos() throws IOException {
// TODO: This method should wait for replication events to finalize.
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
/*
This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied
from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is
used to generate new segment file names. The ideal solution is to identify the counter from previous primary.
*/
latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT;
latestSegmentInfos.changed();
store.commitSegmentInfos(latestSegmentInfos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint());
private void commitSegmentInfos(SegmentInfos infos) throws IOException {
store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint());
this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
translogManager.syncTranslog();
}

protected void commitSegmentInfos() throws IOException {
commitSegmentInfos(getLatestSegmentInfos());
}

@Override
public String getHistoryUUID() {
return loadHistoryUUID(lastCommittedSegmentInfos.userData);
Expand Down Expand Up @@ -354,6 +354,15 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread()
: "Either the write lock must be held or the engine must be currently be failing itself";
try {
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
/*
This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied
from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is
used to generate new segment file names. The ideal solution is to identify the counter from previous primary.
*/
latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT;
latestSegmentInfos.changed();
commitSegmentInfos(latestSegmentInfos);
IOUtils.close(readerManager, translogManager, store::decRef);
} catch (Exception e) {
logger.warn("failed to close engine", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re
* @throws IOException - When Refresh fails with an IOException.
*/
public synchronized void updateSegments(SegmentInfos infos) throws IOException {
// roll over the currentInfo's generation, this ensures the on-disk gen
// is always increased.
infos.updateGeneration(currentInfos);
currentInfos = infos;
maybeRefresh();
}
Expand Down
28 changes: 4 additions & 24 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ public void updateShardState(
if (indexSettings.isSegRepEnabled()) {
// this Shard's engine was read only, we need to update its engine before restoring local history from xlog.
assert newRouting.primary() && currentRouting.primary() == false;
promoteNRTReplicaToPrimary();
resetEngineToGlobalCheckpoint();
}
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
ensurePeerRecoveryRetentionLeasesExist();
Expand Down Expand Up @@ -3557,7 +3557,9 @@ private void innerAcquireReplicaOperationPermit(
currentGlobalCheckpoint,
maxSeqNo
);
if (currentGlobalCheckpoint < maxSeqNo) {
// With Segment Replication enabled, we never want to reset a replica's engine unless
// it is promoted to primary.
if (currentGlobalCheckpoint < maxSeqNo && indexSettings.isSegRepEnabled() == false) {
resetEngineToGlobalCheckpoint();
} else {
getEngine().translogManager().rollTranslogGeneration();
Expand Down Expand Up @@ -4120,26 +4122,4 @@ RetentionLeaseSyncer getRetentionLeaseSyncer() {
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
return getEngine().getSegmentInfosSnapshot();
}

/**
* With segment replication enabled - prepare the shard's engine to be promoted as the new primary.
*
* If this shard is currently using a replication engine, this method:
* 1. Invokes {@link NRTReplicationEngine#commitSegmentInfos()} to ensure the engine can be reopened as writeable from the latest refresh point.
* InternalEngine opens its IndexWriter from an on-disk commit point, but this replica may have recently synced from a primary's refresh point, meaning it has documents searchable in its in-memory SegmentInfos
* that are not part of a commit point. This ensures that those documents are made part of a commit and do not need to be reindexed after promotion.
* 2. Invokes resetEngineToGlobalCheckpoint - This call performs the engine swap, opening up as a writeable engine and replays any operations in the xlog. The operations indexed from xlog here will be
* any ack'd writes that were not copied to this replica before promotion.
*/
private void promoteNRTReplicaToPrimary() {
assert shardRouting.primary() && indexSettings.isSegRepEnabled();
getReplicationEngine().ifPresentOrElse(engine -> {
try {
engine.commitSegmentInfos();
resetEngineToGlobalCheckpoint();
} catch (IOException e) {
throw new EngineException(shardId, "Unable to update replica to writeable engine, failing shard", e);
}
}, () -> { throw new EngineException(shardId, "Expected replica engine to be of type NRTReplicationEngine"); });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ private void innerWriteFileChunk(StoreFileMetadata fileMetadata, long position,
+ temporaryFileName
+ "] in "
+ Arrays.toString(store.directory().listAll());
store.directory().sync(Collections.singleton(temporaryFileName));
// With Segment Replication, we will fsync after a full commit has been received.
if (store.indexSettings().isSegRepEnabled() == false) {
store.directory().sync(Collections.singleton(temporaryFileName));
}
IndexOutput remove = removeOpenIndexOutputs(name);
assert remove == null || remove == indexOutput; // remove maybe null if we got finished
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

package org.opensearch.indices.replication.common;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.ByteBuffersDataOutput;
Expand Down Expand Up @@ -41,7 +39,6 @@ public class CopyState extends AbstractRefCounted {
private final byte[] infosBytes;
private GatedCloseable<IndexCommit> commitRef;
private final IndexShard shard;
public static final Logger logger = LogManager.getLogger(CopyState.class);

public CopyState(ReplicationCheckpoint requestedReplicationCheckpoint, IndexShard shard) throws IOException {
super("CopyState-" + shard.shardId());
Expand Down
Loading

0 comments on commit e18fec6

Please sign in to comment.