Skip to content

Commit

Permalink
addressing comments + fix gradle check
Browse files Browse the repository at this point in the history
Signed-off-by: Poojita Raj <poojiraj@amazon.com>
  • Loading branch information
Poojita-Raj committed Jun 13, 2022
1 parent f366918 commit ebb2b5e
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 4 deletions.
6 changes: 6 additions & 0 deletions server/src/main/java/org/opensearch/OpenSearchException.java
Original file line number Diff line number Diff line change
Expand Up @@ -1594,6 +1594,12 @@ private enum OpenSearchExceptionHandle {
org.opensearch.transport.NoSeedNodeLeftException::new,
160,
LegacyESVersion.V_7_10_0
),
REPLICATION_FAILED_EXCEPTION(
org.opensearch.indices.replication.common.ReplicationFailedException.class,
org.opensearch.indices.replication.common.ReplicationFailedException::new,
161,
UNKNOWN_VERSION_ADDED
);

final Class<? extends OpenSearchException> exceptionClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ public NRTReplicationEngine(EngineConfig engineConfig) {

public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException {
// Update the current infos reference on the Engine's reader.
assert engineConfig.isReadOnlyReplica() : "Only replicas should update Infos";
readerManager.updateSegments(infos);

// only update the persistedSeqNo and "lastCommitted" infos reference if the incoming segments have a higher
Expand All @@ -95,7 +94,6 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th
rollTranslogGeneration();
}
localCheckpointTracker.fastForwardProcessedSeqNo(seqNo);
readerManager.maybeRefresh();
}

@Override
Expand Down
13 changes: 11 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1364,9 +1364,18 @@ public GatedCloseable<IndexCommit> acquireLastIndexCommit(boolean flushFirst) th
}
}

public Optional<NRTReplicationEngine> getReplicationEngine() {
if (getEngine() instanceof NRTReplicationEngine) {
return Optional.of((NRTReplicationEngine) getEngine());
} else {
return Optional.empty();
}
}

public void finalizeReplication(SegmentInfos infos, long seqNo) throws IOException {
assert getEngine() instanceof NRTReplicationEngine;
((NRTReplicationEngine) getEngine()).updateSegments(infos, seqNo);
if (getReplicationEngine().isPresent()) {
getReplicationEngine().get().updateSegments(infos, seqNo);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener<GetSeg
Store.MetadataSnapshot localMetadata = getMetadataSnapshot();
final Store.RecoveryDiff diff = snapshot.recoveryDiff(localMetadata);
logger.debug("Replication diff {}", diff);
// Segments are immutable. So if the replica has any segments with the same name that differ from the one in the incoming snapshot
// from
// source that means the local copy of the segment has been corrupted/changed in some way and we throw an IllegalStateException to
// fail the shard
if (diff.different.isEmpty() == false) {
getFilesListener.onFailure(
new IllegalStateException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@

import java.io.IOException;

/**
* Exception thrown if replication fails
*
* @opensearch.internal
*/
public class ReplicationFailedException extends OpenSearchException {

public ReplicationFailedException(IndexShard shard, Throwable cause) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.opensearch.indices.InvalidIndexTemplateException;
import org.opensearch.indices.recovery.PeerRecoveryNotFound;
import org.opensearch.indices.recovery.RecoverFilesRecoveryException;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.ingest.IngestProcessorException;
import org.opensearch.cluster.coordination.NodeHealthCheckFailureException;
import org.opensearch.repositories.RepositoryException;
Expand Down Expand Up @@ -849,6 +850,7 @@ public void testIds() {
ids.put(158, PeerRecoveryNotFound.class);
ids.put(159, NodeHealthCheckFailureException.class);
ids.put(160, NoSeedNodeLeftException.class);
ids.put(161, ReplicationFailedException.class);

Map<Class<? extends OpenSearchException>, Integer> reverse = new HashMap<>();
for (Map.Entry<Integer, Class<? extends OpenSearchException>> entry : ids.entrySet()) {
Expand Down

0 comments on commit ebb2b5e

Please sign in to comment.