Skip to content

Commit

Permalink
fix style check
Browse files Browse the repository at this point in the history
Signed-off-by: Poojita Raj <poojiraj@amazon.com>
  • Loading branch information
Poojita-Raj committed Jun 13, 2022
1 parent ed3fe28 commit f366918
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 21 deletions.
5 changes: 1 addition & 4 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -715,10 +715,7 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr
* @param localSnapshot The local snapshot from in memory SegmentInfos.
* @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup.
*/
public void cleanupAndPreserveLatestCommitPoint(
String reason,
MetadataSnapshot localSnapshot
) throws IOException {
public void cleanupAndPreserveLatestCommitPoint(String reason, MetadataSnapshot localSnapshot) throws IOException {
// fetch a snapshot from the latest on disk Segments_N file. This can be behind
// the passed in local in memory snapshot, so we want to ensure files it references are not removed.
metadataLock.writeLock().lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,12 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener<GetSeg
final Store.RecoveryDiff diff = snapshot.recoveryDiff(localMetadata);
logger.debug("Replication diff {}", diff);
if (diff.different.isEmpty() == false) {
getFilesListener.onFailure(new IllegalStateException(new ParameterizedMessage("Shard {} has local copies of segments that differ from the primary", indexShard.shardId()).getFormattedMessage()));
getFilesListener.onFailure(
new IllegalStateException(
new ParameterizedMessage("Shard {} has local copies of segments that differ from the primary", indexShard.shardId())
.getFormattedMessage()
)
);
}
final List<StoreFileMetadata> filesToFetch = Stream.concat(diff.missing.stream(), diff.different.stream())
.collect(Collectors.toList());
Expand Down Expand Up @@ -188,18 +193,14 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse,
try {
// Deserialize the new SegmentInfos object sent from the primary.
final ReplicationCheckpoint responseCheckpoint = checkpointInfoResponse.getCheckpoint();
System.out.println(responseCheckpoint.getSegmentsGen());
SegmentInfos infos = SegmentInfos.readCommit(
store.directory(),
toIndexInput(checkpointInfoResponse.getInfosBytes()),
responseCheckpoint.getSegmentsGen()
);
indexShard.finalizeReplication(infos, responseCheckpoint.getSeqNo());
store.cleanupAndPreserveLatestCommitPoint(
"finalize - clean with in memory infos",
store.getMetadata(infos)
);
//method/function that checks if some segment doesn't match with that of primary we
store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", store.getMetadata(infos));
// method/function that checks if some segment doesn't match with that of primary we
} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
// this is a fatal exception at this stage.
// this means we transferred files from the remote that have not be checksummed and they are
Expand All @@ -216,11 +217,19 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse,
logger.debug("Failed to clean lucene index", e);
ex.addSuppressed(e);
}
ReplicationFailedException rfe = new ReplicationFailedException(indexShard.shardId(), "failed to clean after replication", ex);
ReplicationFailedException rfe = new ReplicationFailedException(
indexShard.shardId(),
"failed to clean after replication",
ex
);
fail(rfe, true);
throw rfe;
} catch (Exception ex) {
ReplicationFailedException rfe = new ReplicationFailedException(indexShard.shardId(), "failed to clean after replication", ex);
ReplicationFailedException rfe = new ReplicationFailedException(
indexShard.shardId(),
"failed to clean after replication",
ex
);
fail(rfe, true);
throw rfe;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ public class SegmentReplicationTargetTests extends IndexShardTestCase {
private ByteBuffersDataOutput buffer;

private static final StoreFileMetadata SEGMENTS_FILE = new StoreFileMetadata(IndexFileNames.SEGMENTS, 1L, "0", Version.LATEST);
private static final StoreFileMetadata SEGMENTS_FILE_DIFF = new StoreFileMetadata(IndexFileNames.SEGMENTS, 5L, "different", Version.LATEST);
private static final StoreFileMetadata SEGMENTS_FILE_DIFF = new StoreFileMetadata(
IndexFileNames.SEGMENTS,
5L,
"different",
Version.LATEST
);
private static final StoreFileMetadata PENDING_DELETE_FILE = new StoreFileMetadata("pendingDelete.del", 1L, "1", Version.LATEST);

private static final Store.MetadataSnapshot SI_SNAPSHOT = new Store.MetadataSnapshot(
Expand Down Expand Up @@ -78,13 +83,18 @@ public void setUp() throws Exception {
spyIndexShard = spy(indexShard);

Mockito.doNothing().when(spyIndexShard).finalizeReplication(any(SegmentInfos.class), anyLong());
testSegmentInfos = spyIndexShard.store().readLastCommittedSegmentsInfo();
testSegmentInfos = spyIndexShard.store().readLastCommittedSegmentsInfo();
buffer = new ByteBuffersDataOutput();
try (ByteBuffersIndexOutput indexOutput = new ByteBuffersIndexOutput(buffer, "", null)) {
testSegmentInfos.write(indexOutput);
}
repCheckpoint = new ReplicationCheckpoint(spyIndexShard.shardId(), spyIndexShard.getPendingPrimaryTerm(), testSegmentInfos.getGeneration(),
spyIndexShard.seqNoStats().getLocalCheckpoint(), testSegmentInfos.version);
repCheckpoint = new ReplicationCheckpoint(
spyIndexShard.shardId(),
spyIndexShard.getPendingPrimaryTerm(),
testSegmentInfos.getGeneration(),
spyIndexShard.seqNoStats().getLocalCheckpoint(),
testSegmentInfos.version
);
}

public void testSuccessfulResponse_startReplication() {
Expand All @@ -107,9 +117,9 @@ public void getSegmentFiles(
Store store,
ActionListener<GetSegmentFilesResponse> listener
) {
assertEquals(filesToFetch.size(),2);
assert(filesToFetch.contains(SEGMENTS_FILE));
assert(filesToFetch.contains(PENDING_DELETE_FILE));
assertEquals(filesToFetch.size(), 2);
assert (filesToFetch.contains(SEGMENTS_FILE));
assert (filesToFetch.contains(PENDING_DELETE_FILE));
listener.onResponse(new GetSegmentFilesResponse(filesToFetch));
}
};
Expand Down Expand Up @@ -302,7 +312,7 @@ public void onResponse(Void replicationResponse) {
@Override
public void onFailure(Exception e) {
logger.error(e);
assert(e instanceof IllegalStateException);
assert (e instanceof IllegalStateException);
}
});
}
Expand Down

0 comments on commit f366918

Please sign in to comment.