From 2bf259081c2d93e4b99bb203dbd82d7f91267296 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Tue, 6 Sep 2022 10:06:24 -0700 Subject: [PATCH] Segment Replication - Fix NoSuchFileException errors caused when computing metadata snapshot on primary shards. (#4366) * Segment Replication - Fix NoSuchFileException errors caused when computing metadata snapshot on primary shards. This change fixes the errors that occur when computing metadata snapshots on primary shards from the latest in-memory SegmentInfos. The error occurs when a segments_N file that is referenced by the in-memory infos is deleted as part of a concurrent commit. The segments themselves are incref'd by IndexWriter.incRefDeleter but the commit file (Segments_N) is not. This change resolves this by ignoring the segments_N file when computing metadata for CopyState and only sending incref'd segment files to replicas. Signed-off-by: Marc Handalian * Fix spotless. Signed-off-by: Marc Handalian * Update StoreTests.testCleanupAndPreserveLatestCommitPoint to assert additional segments are deleted. Signed-off-by: Marc Handalian * Rename snapshot to metadataMap in CheckpointInfoResponse. Signed-off-by: Marc Handalian * Refactor segmentReplicationDiff method to compute off two maps instead of MetadataSnapshots. Signed-off-by: Marc Handalian * Fix spotless. Signed-off-by: Marc Handalian * Revert catchall in SegmentReplicationSourceService. Signed-off-by: Marc Handalian * Revert log lvl change. Signed-off-by: Marc Handalian * Fix SegmentReplicationTargetTests Signed-off-by: Marc Handalian * Cleanup unused logger. Signed-off-by: Marc Handalian Signed-off-by: Marc Handalian Co-authored-by: Suraj Singh --- CHANGELOG.md | 1 + .../replication/SegmentReplicationIT.java | 50 ++++++- .../org/opensearch/index/store/Store.java | 133 +++++++++--------- .../replication/CheckpointInfoResponse.java | 30 ++-- .../SegmentReplicationSourceService.java | 7 +- .../replication/SegmentReplicationTarget.java | 36 ++--- .../indices/replication/common/CopyState.java | 28 +--- .../SegmentReplicationIndexShardTests.java | 7 +- .../opensearch/index/store/StoreTests.java | 131 ++++++++++++++--- .../OngoingSegmentReplicationsTests.java | 28 ++-- .../SegmentReplicationSourceHandlerTests.java | 8 +- .../SegmentReplicationSourceServiceTests.java | 4 +- .../SegmentReplicationTargetServiceTests.java | 2 +- .../SegmentReplicationTargetTests.java | 48 ++----- .../replication/common/CopyStateTests.java | 10 +- .../index/shard/IndexShardTestCase.java | 8 +- 16 files changed, 301 insertions(+), 230 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cb48b3aedeea5..a2b6528783a39 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Fix flaky random test `NRTReplicationEngineTests.testUpdateSegments` ([#4352](https://github.com/opensearch-project/OpenSearch/pull/4352)) - [Segment Replication] Extend FileChunkWriter to allow cancel on transport client ([#4386](https://github.com/opensearch-project/OpenSearch/pull/4386)) - [Segment Replication] Add check to cancel ongoing replication with old primary on onNewCheckpoint on replica ([#4363](https://github.com/opensearch-project/OpenSearch/pull/4363)) +- Fix NoSuchFileExceptions with segment replication when computing primary metadata snapshots ([#4366](https://github.com/opensearch-project/OpenSearch/pull/4366)) - [Segment Replication] Update flaky testOnNewCheckpointFromNewPrimaryCancelOngoingReplication unit test ([#4414](https://github.com/opensearch-project/OpenSearch/pull/4414)) - Fixed the `_cat/shards/10_basic.yml` test cases fix. diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 16e9d78b17826..9b2ab753832d3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -9,7 +9,6 @@ package org.opensearch.indices.replication; import com.carrotsearch.randomizedtesting.RandomizedTest; -import org.apache.lucene.index.SegmentInfos; import org.junit.BeforeClass; import org.opensearch.action.admin.indices.segments.IndexShardSegments; import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; @@ -586,13 +585,56 @@ private void assertSegmentStats(int numberOfReplicas) throws IOException { ClusterState state = client(internalCluster().getMasterName()).admin().cluster().prepareState().get().getState(); final DiscoveryNode replicaNode = state.nodes().resolveNode(replicaShardRouting.currentNodeId()); IndexShard indexShard = getIndexShard(replicaNode.getName()); - final String lastCommitSegmentsFileName = SegmentInfos.getLastCommitSegmentsFileName(indexShard.store().directory()); // calls to readCommit will fail if a valid commit point and all its segments are not in the store. - SegmentInfos.readCommit(indexShard.store().directory(), lastCommitSegmentsFileName); + indexShard.store().readLastCommittedSegmentsInfo(); } } } + public void testDropPrimaryDuringReplication() throws Exception { + final Settings settings = Settings.builder() + .put(indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 6) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); + final String primaryNode = internalCluster().startDataOnlyNode(Settings.EMPTY); + createIndex(INDEX_NAME, settings); + internalCluster().startDataOnlyNodes(6); + ensureGreen(INDEX_NAME); + + int initialDocCount = scaledRandomIntBetween(100, 200); + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(initialDocCount); + waitForDocs(initialDocCount, indexer); + refresh(INDEX_NAME); + // don't wait for replication to complete, stop the primary immediately. + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode)); + ensureYellow(INDEX_NAME); + + // start another replica. + internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); + + // index another doc and refresh - without this the new replica won't catch up. + client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").get(); + + flushAndRefresh(INDEX_NAME); + waitForReplicaUpdate(); + assertSegmentStats(6); + } + } + /** * Waits until the replica is caught up to the latest primary segments gen. * @throws Exception if assertion fails @@ -611,10 +653,12 @@ private void waitForReplicaUpdate() throws Exception { final List replicaShardSegments = segmentListMap.get(false); // if we don't have any segments yet, proceed. final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); + logger.debug("Primary Segments: {}", primaryShardSegments.getSegments()); if (primaryShardSegments.getSegments().isEmpty() == false) { final Map latestPrimarySegments = getLatestSegments(primaryShardSegments); final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get(); for (ShardSegments shardSegments : replicaShardSegments) { + logger.debug("Replica {} Segments: {}", shardSegments.getShardRouting(), shardSegments.getSegments()); final boolean isReplicaCaughtUpToPrimary = shardSegments.getSegments() .stream() .anyMatch(segment -> segment.getGeneration() == latestPrimaryGen); diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 58598ab2d08f4..9122c950a6ab6 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -105,6 +105,7 @@ import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -122,6 +123,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY; +import static org.opensearch.index.store.Store.MetadataSnapshot.loadMetadata; /** * A Store provides plain access to files written by an opensearch index shard. Each shard @@ -334,6 +336,51 @@ public MetadataSnapshot getMetadata(SegmentInfos segmentInfos) throws IOExceptio return new MetadataSnapshot(segmentInfos, directory, logger); } + /** + * Segment Replication method - Fetch a map of StoreFileMetadata for segments, ignoring Segment_N files. + * @param segmentInfos {@link SegmentInfos} from which to compute metadata. + * @return {@link Map} map file name to {@link StoreFileMetadata}. + */ + public Map getSegmentMetadataMap(SegmentInfos segmentInfos) throws IOException { + assert indexSettings.isSegRepEnabled(); + return loadMetadata(segmentInfos, directory, logger, true).fileMetadata; + } + + /** + * Segment Replication method + * Returns a diff between the Maps of StoreFileMetadata that can be used for getting list of files to copy over to a replica for segment replication. The returned diff will hold a list of files that are: + *
    + *
  • identical: they exist in both maps and they can be considered the same ie. they don't need to be recovered
  • + *
  • different: they exist in both maps but their they are not identical
  • + *
  • missing: files that exist in the source but not in the target
  • + *
+ */ + public static RecoveryDiff segmentReplicationDiff(Map source, Map target) { + final List identical = new ArrayList<>(); + final List different = new ArrayList<>(); + final List missing = new ArrayList<>(); + for (StoreFileMetadata value : source.values()) { + if (value.name().startsWith(IndexFileNames.SEGMENTS)) { + continue; + } + if (target.containsKey(value.name()) == false) { + missing.add(value); + } else { + final StoreFileMetadata fileMetadata = target.get(value.name()); + if (fileMetadata.isSame(value)) { + identical.add(value); + } else { + different.add(value); + } + } + } + return new RecoveryDiff( + Collections.unmodifiableList(identical), + Collections.unmodifiableList(different), + Collections.unmodifiableList(missing) + ); + } + /** * Renames all the given files from the key of the map to the * value of the map. All successfully renamed files are removed from the map in-place. @@ -709,31 +756,34 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr } /** - * This method deletes every file in this store that is not contained in either the remote or local metadata snapshots. + * Segment Replication method - + * This method deletes every file in this store that is not referenced by the passed in SegmentInfos or + * part of the latest on-disk commit point. * This method is used for segment replication when the in memory SegmentInfos can be ahead of the on disk segment file. * In this case files from both snapshots must be preserved. Verification has been done that all files are present on disk. * @param reason the reason for this cleanup operation logged for each deleted file - * @param localSnapshot The local snapshot from in memory SegmentInfos. + * @param infos {@link SegmentInfos} Files from this infos will be preserved on disk if present. * @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup. */ - public void cleanupAndPreserveLatestCommitPoint(String reason, MetadataSnapshot localSnapshot) throws IOException { + public void cleanupAndPreserveLatestCommitPoint(String reason, SegmentInfos infos) throws IOException { + assert indexSettings.isSegRepEnabled(); // fetch a snapshot from the latest on disk Segments_N file. This can be behind // the passed in local in memory snapshot, so we want to ensure files it references are not removed. metadataLock.writeLock().lock(); try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { - cleanupFiles(reason, localSnapshot, getMetadata(readLastCommittedSegmentsInfo())); + cleanupFiles(reason, getMetadata(readLastCommittedSegmentsInfo()), infos.files(true)); } finally { metadataLock.writeLock().unlock(); } } - private void cleanupFiles(String reason, MetadataSnapshot localSnapshot, @Nullable MetadataSnapshot additionalSnapshot) + private void cleanupFiles(String reason, MetadataSnapshot localSnapshot, @Nullable Collection additionalFiles) throws IOException { assert metadataLock.isWriteLockedByCurrentThread(); for (String existingFile : directory.listAll()) { if (Store.isAutogenerated(existingFile) || localSnapshot.contains(existingFile) - || (additionalSnapshot != null && additionalSnapshot.contains(existingFile))) { + || (additionalFiles != null && additionalFiles.contains(existingFile))) { // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete // checksum) continue; @@ -825,17 +875,9 @@ public void commitSegmentInfos(SegmentInfos latestSegmentInfos, long maxSeqNo, l userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo)); latestSegmentInfos.setUserData(userData, true); latestSegmentInfos.commit(directory()); - - // similar to TrimUnsafeCommits, create a commit with an appending IW, this will delete old commits and ensure all files - // associated with the SegmentInfos.commit are fsynced. - final List existingCommits = DirectoryReader.listCommits(directory); - assert existingCommits.isEmpty() == false : "Expected at least one commit but none found"; - final IndexCommit lastIndexCommit = existingCommits.get(existingCommits.size() - 1); - assert latestSegmentInfos.getSegmentsFileName().equals(lastIndexCommit.getSegmentsFileName()); - try (IndexWriter writer = newAppendingIndexWriter(directory, lastIndexCommit)) { - writer.setLiveCommitData(lastIndexCommit.getUserData().entrySet()); - writer.commit(); - } + directory.sync(latestSegmentInfos.files(true)); + directory.syncMetaData(); + cleanupAndPreserveLatestCommitPoint("After commit", latestSegmentInfos); } finally { metadataLock.writeLock().unlock(); } @@ -1033,6 +1075,11 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg } static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory directory, Logger logger) throws IOException { + return loadMetadata(segmentInfos, directory, logger, false); + } + + static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory directory, Logger logger, boolean ignoreSegmentsFile) + throws IOException { long numDocs = Lucene.getNumDocs(segmentInfos); Map commitUserDataBuilder = new HashMap<>(); commitUserDataBuilder.putAll(segmentInfos.getUserData()); @@ -1067,8 +1114,10 @@ static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory director if (maxVersion == null) { maxVersion = org.opensearch.Version.CURRENT.minimumIndexCompatibilityVersion().luceneVersion; } - final String segmentsFile = segmentInfos.getSegmentsFileName(); - checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true); + if (ignoreSegmentsFile == false) { + final String segmentsFile = segmentInfos.getSegmentsFileName(); + checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true); + } return new LoadedMetadata(unmodifiableMap(builder), unmodifiableMap(commitUserDataBuilder), numDocs); } @@ -1148,7 +1197,6 @@ public Map asMap() { * Helper method used to group store files according to segment and commit. * * @see MetadataSnapshot#recoveryDiff(MetadataSnapshot) - * @see MetadataSnapshot#segmentReplicationDiff(MetadataSnapshot) */ private Iterable> getGroupedFilesIterable() { final Map> perSegment = new HashMap<>(); @@ -1241,51 +1289,6 @@ public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) { return recoveryDiff; } - /** - * Segment Replication method - * Returns a diff between the two snapshots that can be used for getting list of files to copy over to a replica for segment replication. The given snapshot is treated as the - * target and this snapshot as the source. The returned diff will hold a list of files that are: - *
    - *
  • identical: they exist in both snapshots and they can be considered the same ie. they don't need to be recovered
  • - *
  • different: they exist in both snapshots but their they are not identical
  • - *
  • missing: files that exist in the source but not in the target
  • - *
- */ - public RecoveryDiff segmentReplicationDiff(MetadataSnapshot recoveryTargetSnapshot) { - final List identical = new ArrayList<>(); - final List different = new ArrayList<>(); - final List missing = new ArrayList<>(); - final ArrayList identicalFiles = new ArrayList<>(); - for (List segmentFiles : getGroupedFilesIterable()) { - identicalFiles.clear(); - boolean consistent = true; - for (StoreFileMetadata meta : segmentFiles) { - StoreFileMetadata storeFileMetadata = recoveryTargetSnapshot.get(meta.name()); - if (storeFileMetadata == null) { - // Do not consider missing files as inconsistent in SegRep as replicas may lag while primary updates - // documents and generate new files specific to a segment - missing.add(meta); - } else if (storeFileMetadata.isSame(meta) == false) { - consistent = false; - different.add(meta); - } else { - identicalFiles.add(meta); - } - } - if (consistent) { - identical.addAll(identicalFiles); - } else { - different.addAll(identicalFiles); - } - } - RecoveryDiff recoveryDiff = new RecoveryDiff( - Collections.unmodifiableList(identical), - Collections.unmodifiableList(different), - Collections.unmodifiableList(missing) - ); - return recoveryDiff; - } - /** * Returns the number of files in this snapshot */ diff --git a/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java b/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java index a73a3b54184da..48c2dfd30f589 100644 --- a/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java +++ b/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java @@ -10,13 +10,12 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; -import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.transport.TransportResponse; import java.io.IOException; -import java.util.Set; +import java.util.Map; /** * Response returned from a {@link SegmentReplicationSource} that includes the file metadata, and SegmentInfos @@ -28,52 +27,41 @@ public class CheckpointInfoResponse extends TransportResponse { private final ReplicationCheckpoint checkpoint; - private final Store.MetadataSnapshot snapshot; + private final Map metadataMap; private final byte[] infosBytes; - // pendingDeleteFiles are segments that have been merged away in the latest in memory SegmentInfos - // but are still referenced by the latest commit point (Segments_N). - private final Set pendingDeleteFiles; public CheckpointInfoResponse( final ReplicationCheckpoint checkpoint, - final Store.MetadataSnapshot snapshot, - final byte[] infosBytes, - final Set additionalFiles + final Map metadataMap, + final byte[] infosBytes ) { this.checkpoint = checkpoint; - this.snapshot = snapshot; + this.metadataMap = metadataMap; this.infosBytes = infosBytes; - this.pendingDeleteFiles = additionalFiles; } public CheckpointInfoResponse(StreamInput in) throws IOException { this.checkpoint = new ReplicationCheckpoint(in); - this.snapshot = new Store.MetadataSnapshot(in); + this.metadataMap = in.readMap(StreamInput::readString, StoreFileMetadata::new); this.infosBytes = in.readByteArray(); - this.pendingDeleteFiles = in.readSet(StoreFileMetadata::new); } @Override public void writeTo(StreamOutput out) throws IOException { checkpoint.writeTo(out); - snapshot.writeTo(out); + out.writeMap(metadataMap, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut)); out.writeByteArray(infosBytes); - out.writeCollection(pendingDeleteFiles); } public ReplicationCheckpoint getCheckpoint() { return checkpoint; } - public Store.MetadataSnapshot getSnapshot() { - return snapshot; + public Map getMetadataMap() { + return metadataMap; } public byte[] getInfosBytes() { return infosBytes; } - - public Set getPendingDeleteFiles() { - return pendingDeleteFiles; - } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index db3f87201b774..91b8243440ac5 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -133,12 +133,7 @@ public void messageReceived(CheckpointInfoRequest request, TransportChannel chan ); final CopyState copyState = ongoingSegmentReplications.prepareForReplication(request, segmentSegmentFileChunkWriter); channel.sendResponse( - new CheckpointInfoResponse( - copyState.getCheckpoint(), - copyState.getMetadataSnapshot(), - copyState.getInfosBytes(), - copyState.getPendingDeleteFiles() - ) + new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) ); timer.stop(); logger.trace( diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 6a9406aca13b9..26bec2203c599 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -23,6 +23,7 @@ import org.opensearch.action.StepListener; import org.opensearch.common.UUIDs; import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lucene.Lucene; import org.opensearch.common.util.CancellableThreads; import org.opensearch.index.shard.IndexShard; @@ -37,12 +38,9 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; +import java.util.Collections; +import java.util.Map; /** * Represents the target of a replication event. @@ -178,9 +176,7 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener filesToFetch = new ArrayList(diff.missing); - Set storeFiles = new HashSet<>(Arrays.asList(store.directory().listAll())); - final Set pendingDeleteFiles = checkpointInfo.getPendingDeleteFiles() - .stream() - .filter(f -> storeFiles.contains(f.name()) == false) - .collect(Collectors.toSet()); - - filesToFetch.addAll(pendingDeleteFiles); - logger.trace("Files to fetch {}", filesToFetch); - - for (StoreFileMetadata file : filesToFetch) { + for (StoreFileMetadata file : diff.missing) { state.getIndex().addFileDetail(file.name(), file.length(), false); } // always send a req even if not fetching files so the primary can clear the copyState for this shard. state.setStage(SegmentReplicationState.Stage.GET_FILES); cancellableThreads.checkForCancel(); - source.getSegmentFiles(getId(), checkpointInfo.getCheckpoint(), filesToFetch, store, getFilesListener); + source.getSegmentFiles(getId(), checkpointInfo.getCheckpoint(), diff.missing, store, getFilesListener); } private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, ActionListener listener) { @@ -231,7 +217,7 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, responseCheckpoint.getSegmentsGen() ); indexShard.finalizeReplication(infos, responseCheckpoint.getSeqNo()); - store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", store.getMetadata(infos)); + store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", infos); } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { // this is a fatal exception at this stage. // this means we transferred files from the remote that have not be checksummed and they are @@ -280,11 +266,13 @@ private ChecksumIndexInput toIndexInput(byte[] input) { ); } - Store.MetadataSnapshot getMetadataSnapshot() throws IOException { + Map getMetadataMap() throws IOException { if (indexShard.getSegmentInfosSnapshot() == null) { - return Store.MetadataSnapshot.EMPTY; + return Collections.emptyMap(); + } + try (final GatedCloseable snapshot = indexShard.getSegmentInfosSnapshot()) { + return store.getSegmentMetadataMap(snapshot.get()); } - return store.getMetadata(indexShard.getSegmentInfosSnapshot().get()); } @Override diff --git a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java index c0e0b4dee2b3f..1dd0886fd2f36 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java @@ -15,14 +15,12 @@ import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.util.concurrent.AbstractRefCounted; import org.opensearch.index.shard.IndexShard; -import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import java.io.IOException; import java.io.UncheckedIOException; -import java.util.HashSet; -import java.util.Set; +import java.util.Map; /** * An Opensearch-specific version of Lucene's CopyState class that @@ -37,8 +35,7 @@ public class CopyState extends AbstractRefCounted { private final ReplicationCheckpoint requestedReplicationCheckpoint; /** Actual ReplicationCheckpoint returned by the shard */ private final ReplicationCheckpoint replicationCheckpoint; - private final Store.MetadataSnapshot metadataSnapshot; - private final HashSet pendingDeleteFiles; + private final Map metadataMap; private final byte[] infosBytes; private GatedCloseable commitRef; private final IndexShard shard; @@ -49,7 +46,7 @@ public CopyState(ReplicationCheckpoint requestedReplicationCheckpoint, IndexShar this.shard = shard; this.segmentInfosRef = shard.getSegmentInfosSnapshot(); SegmentInfos segmentInfos = this.segmentInfosRef.get(); - this.metadataSnapshot = shard.store().getMetadata(segmentInfos); + this.metadataMap = shard.store().getSegmentMetadataMap(segmentInfos); this.replicationCheckpoint = new ReplicationCheckpoint( shard.shardId(), shard.getOperationPrimaryTerm(), @@ -57,18 +54,7 @@ public CopyState(ReplicationCheckpoint requestedReplicationCheckpoint, IndexShar shard.getProcessedLocalCheckpoint(), segmentInfos.getVersion() ); - - // Send files that are merged away in the latest SegmentInfos but not in the latest on disk Segments_N. - // This ensures that the store on replicas is in sync with the store on primaries. this.commitRef = shard.acquireLastIndexCommit(false); - Store.MetadataSnapshot metadata = shard.store().getMetadata(this.commitRef.get()); - final Store.RecoveryDiff diff = metadata.recoveryDiff(this.metadataSnapshot); - this.pendingDeleteFiles = new HashSet<>(diff.missing); - if (this.pendingDeleteFiles.isEmpty()) { - // If there are no additional files we can release the last commit immediately. - this.commitRef.close(); - this.commitRef = null; - } ByteBuffersDataOutput buffer = new ByteBuffersDataOutput(); // resource description and name are not used, but resource description cannot be null @@ -95,18 +81,14 @@ public ReplicationCheckpoint getCheckpoint() { return replicationCheckpoint; } - public Store.MetadataSnapshot getMetadataSnapshot() { - return metadataSnapshot; + public Map getMetadataMap() { + return metadataMap; } public byte[] getInfosBytes() { return infosBytes; } - public Set getPendingDeleteFiles() { - return pendingDeleteFiles; - } - public IndexShard getShard() { return shard; } diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 88a3bdad53d0c..3af882a8087ec 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -489,12 +489,7 @@ private void resolveCheckpointInfoResponseListener(ActionListener additionalSegments = new ArrayList<>(); + for (String file : store.directory().listAll()) { + if (commitMetadata.contains(file) == false) { + additionalSegments.add(file); + } + } + assertFalse(additionalSegments.isEmpty()); + + // clean up everything not in the latest commit point. + store.cleanupAndPreserveLatestCommitPoint("test", store.readLastCommittedSegmentsInfo()); + + // we want to ensure commitMetadata files are preserved after calling cleanup + for (String existingFile : store.directory().listAll()) { + assertTrue(commitMetadata.contains(existingFile)); + assertFalse(additionalSegments.contains(existingFile)); + } + deleteContent(store.directory()); + IOUtils.close(store); + } + + public void testGetSegmentMetadataMap() throws IOException { + final ShardId shardId = new ShardId("index", "_na_", 1); + Store store = new Store( + shardId, + SEGMENT_REPLICATION_INDEX_SETTINGS, + new NIOFSDirectory(createTempDir()), + new DummyShardLock(shardId) + ); + store.createEmpty(Version.LATEST); + final Map metadataSnapshot = store.getSegmentMetadataMap(store.readLastCommittedSegmentsInfo()); + // no docs indexed only _N file exists. + assertTrue(metadataSnapshot.isEmpty()); + + // commit some docs to create a commit point. + commitRandomDocs(store); + + final Map snapshotAfterCommit = store.getSegmentMetadataMap(store.readLastCommittedSegmentsInfo()); + assertFalse(snapshotAfterCommit.isEmpty()); + assertFalse(snapshotAfterCommit.keySet().stream().anyMatch((name) -> name.startsWith(IndexFileNames.SEGMENTS))); + store.close(); + } + + public void testSegmentReplicationDiff() { + final String segmentName = "_0.si"; + final StoreFileMetadata SEGMENT_FILE = new StoreFileMetadata(segmentName, 1L, "0", Version.LATEST); + // source has file target is missing. + Store.RecoveryDiff diff = Store.segmentReplicationDiff(Map.of(segmentName, SEGMENT_FILE), Collections.emptyMap()); + assertEquals(List.of(SEGMENT_FILE), diff.missing); + assertTrue(diff.different.isEmpty()); + assertTrue(diff.identical.isEmpty()); + + // target has file not on source. + diff = Store.segmentReplicationDiff(Collections.emptyMap(), Map.of(segmentName, SEGMENT_FILE)); + assertTrue(diff.missing.isEmpty()); + assertTrue(diff.different.isEmpty()); + assertTrue(diff.identical.isEmpty()); + + // source and target have identical file. + diff = Store.segmentReplicationDiff(Map.of(segmentName, SEGMENT_FILE), Map.of(segmentName, SEGMENT_FILE)); + assertTrue(diff.missing.isEmpty()); + assertTrue(diff.different.isEmpty()); + assertEquals(List.of(SEGMENT_FILE), diff.identical); + + // source has diff copy of same file as target. + StoreFileMetadata SOURCE_DIFF_FILE = new StoreFileMetadata(segmentName, 1L, "abc", Version.LATEST); + diff = Store.segmentReplicationDiff(Map.of(segmentName, SOURCE_DIFF_FILE), Map.of(segmentName, SEGMENT_FILE)); + assertTrue(diff.missing.isEmpty()); + assertEquals(List.of(SOURCE_DIFF_FILE), diff.different); + assertTrue(diff.identical.isEmpty()); + + // ignore _N files if included in source map. + final String segmentsFile = IndexFileNames.SEGMENTS.concat("_2"); + StoreFileMetadata SEGMENTS_FILE = new StoreFileMetadata(segmentsFile, 1L, "abc", Version.LATEST); + diff = Store.segmentReplicationDiff(Map.of(segmentsFile, SEGMENTS_FILE), Collections.emptyMap()); + assertTrue(diff.missing.isEmpty()); + assertTrue(diff.different.isEmpty()); + assertTrue(diff.identical.isEmpty()); + } + + private void commitRandomDocs(Store store) throws IOException { + IndexWriter writer = indexRandomDocs(store); + writer.commit(); + writer.close(); + } + + private IndexWriter indexRandomDocs(Store store) throws IOException { IndexWriterConfig indexWriterConfig = newIndexWriterConfig(random(), new MockAnalyzer(random())).setCodec( TestUtil.getDefaultCodec() ); + indexWriterConfig.setCommitOnClose(false); indexWriterConfig.setIndexDeletionPolicy(NoDeletionPolicy.INSTANCE); IndexWriter writer = new IndexWriter(store.directory(), indexWriterConfig); int docs = 1 + random().nextInt(100); @@ -1171,21 +1281,6 @@ public void testcleanupAndPreserveLatestCommitPoint() throws IOException { ); doc.add(new SortedDocValuesField("dv", new BytesRef(TestUtil.randomRealisticUnicodeString(random())))); writer.addDocument(doc); - writer.commit(); - writer.close(); - - Store.MetadataSnapshot commitMetadata = store.getMetadata(); - - Store.MetadataSnapshot refreshMetadata = Store.MetadataSnapshot.EMPTY; - - store.cleanupAndPreserveLatestCommitPoint("test", refreshMetadata); - - // we want to ensure commitMetadata files are preserved after calling cleanup - for (String existingFile : store.directory().listAll()) { - assert (commitMetadata.contains(existingFile) == true); - } - - deleteContent(store.directory()); - IOUtils.close(store); + return writer; } } diff --git a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java index f49ee0471b5e8..bd3106454f49b 100644 --- a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java @@ -11,28 +11,30 @@ import org.junit.Assert; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.CancellableThreads; import org.opensearch.common.xcontent.XContentType; import org.opensearch.index.IndexService; +import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.index.shard.ShardId; -import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.FileChunkWriter; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.CopyState; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.transport.TransportService; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; -import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -55,15 +57,18 @@ public class OngoingSegmentReplicationsTests extends IndexShardTestCase { private GetSegmentFilesRequest getSegmentFilesRequest; - final Settings settings = Settings.builder().put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()).build(); + final Settings settings = Settings.builder() + .put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); @Override public void setUp() throws Exception { super.setUp(); - primary = newStartedShard(true); - replica = newShard(primary.shardId(), false); + primary = newStartedShard(true, settings); + replica = newShard(false, settings, new NRTReplicationEngineFactory()); recoverReplica(replica, primary, true); replicaDiscoveryNode = replica.recoveryState().getTargetNode(); primaryDiscoveryNode = replica.recoveryState().getSourceNode(); @@ -93,6 +98,8 @@ public void tearDown() throws Exception { } public void testPrepareAndSendSegments() throws IOException { + indexDoc(primary, "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar"); + primary.refresh("Test"); OngoingSegmentReplications replications = spy(new OngoingSegmentReplications(mockIndicesService, recoverySettings)); final CheckpointInfoRequest request = new CheckpointInfoRequest( 1L, @@ -112,17 +119,14 @@ public void testPrepareAndSendSegments() throws IOException { 1L, replica.routingEntry().allocationId().getId(), replicaDiscoveryNode, - new ArrayList<>(copyState.getMetadataSnapshot().asMap().values()), + new ArrayList<>(copyState.getMetadataMap().values()), testCheckpoint ); - final Collection expectedFiles = List.copyOf(primary.store().getMetadata().asMap().values()); replications.startSegmentCopy(getSegmentFilesRequest, new ActionListener<>() { @Override public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { - assertEquals(1, getSegmentFilesResponse.files.size()); - assertEquals(1, expectedFiles.size()); - assertTrue(expectedFiles.stream().findFirst().get().isSame(getSegmentFilesResponse.files.get(0))); + assertEquals(copyState.getMetadataMap().size(), getSegmentFilesResponse.files.size()); assertEquals(0, copyState.refCount()); assertFalse(replications.isInCopyStateMap(request.getCheckpoint())); assertEquals(0, replications.size()); @@ -181,7 +185,7 @@ public void testCancelReplication_AfterSendFilesStarts() throws IOException, Int 1L, replica.routingEntry().allocationId().getId(), replicaDiscoveryNode, - new ArrayList<>(copyState.getMetadataSnapshot().asMap().values()), + new ArrayList<>(copyState.getMetadataMap().values()), testCheckpoint ); replications.startSegmentCopy(getSegmentFilesRequest, new ActionListener<>() { diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java index 5f6ec7e505805..cde5cd980a91d 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java @@ -19,6 +19,7 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.CancellableThreads; +import org.opensearch.common.xcontent.XContentType; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.index.store.StoreFileMetadata; @@ -76,7 +77,7 @@ public void testSendFiles() throws IOException { 1 ); - final List expectedFiles = List.copyOf(copyState.getMetadataSnapshot().asMap().values()); + final List expectedFiles = List.copyOf(copyState.getMetadataMap().values()); final GetSegmentFilesRequest getSegmentFilesRequest = new GetSegmentFilesRequest( 1L, @@ -137,6 +138,9 @@ public void onFailure(Exception e) { } public void testSendFileFails() throws IOException { + // index some docs on the primary so a segment is created. + indexDoc(primary, "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar"); + primary.refresh("Test"); chunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> listener.onFailure( new OpenSearchException("Test") ); @@ -153,7 +157,7 @@ public void testSendFileFails() throws IOException { 1 ); - final List expectedFiles = List.copyOf(copyState.getMetadataSnapshot().asMap().values()); + final List expectedFiles = List.copyOf(copyState.getMetadataMap().values()); final GetSegmentFilesRequest getSegmentFilesRequest = new GetSegmentFilesRequest( 1L, diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java index 4bfdd81d50a1e..6183f1e5d9dfb 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java @@ -121,9 +121,7 @@ public void testCheckpointInfo() { public void onResponse(CheckpointInfoResponse response) { assertEquals(testCheckpoint, response.getCheckpoint()); assertNotNull(response.getInfosBytes()); - // CopyStateTests sets up one pending delete file and one committed segments file - assertEquals(1, response.getPendingDeleteFiles().size()); - assertEquals(1, response.getSnapshot().size()); + assertEquals(1, response.getMetadataMap().size()); } @Override diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index f2eb635f24bbf..7437cb22e44d1 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -62,7 +62,7 @@ public void setUp() throws Exception { .put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()) .build(); final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - primaryShard = newStartedShard(true); + primaryShard = newStartedShard(true, settings); replicaShard = newShard(false, settings, new NRTReplicationEngineFactory()); recoverReplica(replicaShard, primaryShard, true); checkpoint = new ReplicationCheckpoint(replicaShard.shardId(), 0L, 0L, 0L, 0L); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index 11217a46b3c69..f8341573770a6 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -18,7 +18,6 @@ import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.Term; import org.apache.lucene.index.IndexFormatTooNewException; -import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.store.ByteBuffersDataOutput; import org.apache.lucene.store.ByteBuffersIndexOutput; import org.apache.lucene.store.Directory; @@ -51,7 +50,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.Random; import java.util.Arrays; @@ -71,26 +69,13 @@ public class SegmentReplicationTargetTests extends IndexShardTestCase { private ReplicationCheckpoint repCheckpoint; private ByteBuffersDataOutput buffer; - private static final StoreFileMetadata SEGMENTS_FILE = new StoreFileMetadata(IndexFileNames.SEGMENTS, 1L, "0", Version.LATEST); - private static final StoreFileMetadata SEGMENTS_FILE_DIFF = new StoreFileMetadata( - IndexFileNames.SEGMENTS, - 5L, - "different", - Version.LATEST - ); - private static final StoreFileMetadata PENDING_DELETE_FILE = new StoreFileMetadata("pendingDelete.del", 1L, "1", Version.LATEST); + private static final String SEGMENT_NAME = "_0.si"; + private static final StoreFileMetadata SEGMENT_FILE = new StoreFileMetadata(SEGMENT_NAME, 1L, "0", Version.LATEST); + private static final StoreFileMetadata SEGMENT_FILE_DIFF = new StoreFileMetadata(SEGMENT_NAME, 5L, "different", Version.LATEST); - private static final Store.MetadataSnapshot SI_SNAPSHOT = new Store.MetadataSnapshot( - Map.of(SEGMENTS_FILE.name(), SEGMENTS_FILE), - null, - 0 - ); + private static final Map SI_SNAPSHOT = Map.of(SEGMENT_FILE.name(), SEGMENT_FILE); - private static final Store.MetadataSnapshot SI_SNAPSHOT_DIFFERENT = new Store.MetadataSnapshot( - Map.of(SEGMENTS_FILE_DIFF.name(), SEGMENTS_FILE_DIFF), - null, - 0 - ); + private static final Map SI_SNAPSHOT_DIFFERENT = Map.of(SEGMENT_FILE_DIFF.name(), SEGMENT_FILE_DIFF); private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings( "index", @@ -135,7 +120,7 @@ public void getCheckpointMetadata( ReplicationCheckpoint checkpoint, ActionListener listener ) { - listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE))); + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy())); } @Override @@ -146,9 +131,8 @@ public void getSegmentFiles( Store store, ActionListener listener ) { - assertEquals(filesToFetch.size(), 2); - assert (filesToFetch.contains(SEGMENTS_FILE)); - assert (filesToFetch.contains(PENDING_DELETE_FILE)); + assertEquals(1, filesToFetch.size()); + assert (filesToFetch.contains(SEGMENT_FILE)); listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); } }; @@ -230,7 +214,7 @@ public void getCheckpointMetadata( ReplicationCheckpoint checkpoint, ActionListener listener ) { - listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE))); + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy())); } @Override @@ -273,7 +257,7 @@ public void getCheckpointMetadata( ReplicationCheckpoint checkpoint, ActionListener listener ) { - listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE))); + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy())); } @Override @@ -318,7 +302,7 @@ public void getCheckpointMetadata( ReplicationCheckpoint checkpoint, ActionListener listener ) { - listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE))); + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy())); } @Override @@ -362,7 +346,7 @@ public void getCheckpointMetadata( ReplicationCheckpoint checkpoint, ActionListener listener ) { - listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE))); + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy())); } @Override @@ -380,7 +364,7 @@ public void getSegmentFiles( SegmentReplicationTargetService.SegmentReplicationListener.class ); segrepTarget = spy(new SegmentReplicationTarget(repCheckpoint, indexShard, segrepSource, segRepListener)); - when(segrepTarget.getMetadataSnapshot()).thenReturn(SI_SNAPSHOT_DIFFERENT); + when(segrepTarget.getMetadataMap()).thenReturn(SI_SNAPSHOT_DIFFERENT); segrepTarget.startReplication(new ActionListener() { @Override public void onResponse(Void replicationResponse) { @@ -413,9 +397,7 @@ public void getCheckpointMetadata( ReplicationCheckpoint checkpoint, ActionListener listener ) { - listener.onResponse( - new CheckpointInfoResponse(checkpoint, storeMetadataSnapshots.get(1), buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE)) - ); + listener.onResponse(new CheckpointInfoResponse(checkpoint, storeMetadataSnapshots.get(1).asMap(), buffer.toArrayCopy())); } @Override @@ -434,7 +416,7 @@ public void getSegmentFiles( ); segrepTarget = spy(new SegmentReplicationTarget(repCheckpoint, indexShard, segrepSource, segRepListener)); - when(segrepTarget.getMetadataSnapshot()).thenReturn(storeMetadataSnapshots.get(0)); + when(segrepTarget.getMetadataMap()).thenReturn(storeMetadataSnapshots.get(0).asMap()); segrepTarget.startReplication(new ActionListener() { @Override public void onResponse(Void replicationResponse) { diff --git a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java index a6f0cf7e98411..77a4a6d22039e 100644 --- a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java @@ -22,7 +22,6 @@ import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import java.io.IOException; -import java.util.Set; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -32,6 +31,7 @@ public class CopyStateTests extends IndexShardTestCase { private static final long EXPECTED_LONG_VALUE = 1L; private static final ShardId TEST_SHARD_ID = new ShardId("testIndex", "testUUID", 0); private static final StoreFileMetadata SEGMENTS_FILE = new StoreFileMetadata(IndexFileNames.SEGMENTS, 1L, "0", Version.LATEST); + private static final StoreFileMetadata SEGMENT_FILE = new StoreFileMetadata("_0.si", 1L, "0", Version.LATEST); private static final StoreFileMetadata PENDING_DELETE_FILE = new StoreFileMetadata("pendingDelete.del", 1L, "1", Version.LATEST); private static final Store.MetadataSnapshot COMMIT_SNAPSHOT = new Store.MetadataSnapshot( @@ -41,7 +41,7 @@ public class CopyStateTests extends IndexShardTestCase { ); private static final Store.MetadataSnapshot SI_SNAPSHOT = new Store.MetadataSnapshot( - Map.of(SEGMENTS_FILE.name(), SEGMENTS_FILE), + Map.of(SEGMENT_FILE.name(), SEGMENT_FILE), null, 0 ); @@ -61,10 +61,6 @@ public void testCopyStateCreation() throws IOException { // version was never set so this should be zero assertEquals(0, checkpoint.getSegmentInfosVersion()); assertEquals(EXPECTED_LONG_VALUE, checkpoint.getPrimaryTerm()); - - Set pendingDeleteFiles = copyState.getPendingDeleteFiles(); - assertEquals(1, pendingDeleteFiles.size()); - assertTrue(pendingDeleteFiles.contains(PENDING_DELETE_FILE)); } public static IndexShard createMockIndexShard() throws IOException { @@ -78,7 +74,7 @@ public static IndexShard createMockIndexShard() throws IOException { SegmentInfos testSegmentInfos = new SegmentInfos(Version.LATEST.major); when(mockShard.getSegmentInfosSnapshot()).thenReturn(new GatedCloseable<>(testSegmentInfos, () -> {})); - when(mockStore.getMetadata(testSegmentInfos)).thenReturn(SI_SNAPSHOT); + when(mockStore.getSegmentMetadataMap(testSegmentInfos)).thenReturn(SI_SNAPSHOT.asMap()); IndexCommit mockIndexCommit = mock(IndexCommit.class); when(mockShard.acquireLastIndexCommit(false)).thenReturn(new GatedCloseable<>(mockIndexCommit, () -> {})); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 0838a1fe87aa4..073dc4b84472e 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -134,6 +134,7 @@ import java.io.IOException; import java.util.ArrayList; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -1200,12 +1201,7 @@ public void getCheckpointMetadata( try { final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primaryShard.shardId), primaryShard); listener.onResponse( - new CheckpointInfoResponse( - copyState.getCheckpoint(), - copyState.getMetadataSnapshot(), - copyState.getInfosBytes(), - copyState.getPendingDeleteFiles() - ) + new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) ); copyState.decRef(); } catch (IOException e) {