From 912d9970c56635469d74725eaef1b15b2cce9cf6 Mon Sep 17 00:00:00 2001 From: Kartik Ganesh Date: Thu, 26 May 2022 11:57:28 -0700 Subject: [PATCH 01/16] Added CopyState class and relevant downstream methods Signed-off-by: Kartik Ganesh --- .../org/opensearch/index/engine/Engine.java | 18 ++++ .../index/engine/InternalEngine.java | 14 +++ .../opensearch/index/shard/IndexShard.java | 18 ++++ .../org/opensearch/index/store/Store.java | 91 ++++++++++++------- .../indices/replication/common/CopyState.java | 77 ++++++++++++++++ .../opensearch/index/store/StoreTests.java | 18 ++-- .../PeerRecoveryTargetServiceTests.java | 2 +- .../recovery/RecoverySourceHandlerTests.java | 8 +- 8 files changed, 197 insertions(+), 49 deletions(-) create mode 100644 server/src/main/java/org/opensearch/indices/replication/common/CopyState.java diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index c242d98b4b65c..61f2c0072b6b1 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -175,6 +175,19 @@ public final EngineConfig config() { */ protected abstract SegmentInfos getLatestSegmentInfos(); + /** + * Fetch a snapshot of the latest SegmentInfos from the engine. Using this method + * ensures that segment files are retained in the directory until the reference is closed. + * + * @return {@link GatedCloseable} - A wrapper around a {@link SegmentInfos} instance that + * must be closed for segment files to be deleted. + * @throws EngineException - When segment infos cannot be safely retrieved + */ + public GatedCloseable getSegmentInfosSnapshot() { + // default implementation + return new GatedCloseable<>(getLatestSegmentInfos(), () -> {}); + } + public MergeStats getMergeStats() { return new MergeStats(); } @@ -846,6 +859,11 @@ public final CommitStats commitStats() { */ public abstract long getPersistedLocalCheckpoint(); + public long getProcessedLocalCheckpoint() { + // default implementation + return 0L; + } + /** * @return a {@link SeqNoStats} object, using local state and the supplied global checkpoint */ diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index e60e650372ec4..5b28b29914b4e 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -2305,6 +2305,19 @@ public SegmentInfos getLatestSegmentInfos() { } } + @Override + public GatedCloseable getSegmentInfosSnapshot() { + // this should never be called by read-only engines + assert (engineConfig.isReadOnlyReplica() == false); + final SegmentInfos segmentInfos = getLatestSegmentInfos(); + try { + indexWriter.incRefDeleter(segmentInfos); + } catch (IOException e) { + throw new EngineException(shardId, e.getMessage(), e); + } + return new GatedCloseable<>(segmentInfos, () -> indexWriter.decRefDeleter(segmentInfos)); + } + @Override protected final void writerSegmentStats(SegmentsStats stats) { stats.addVersionMapMemoryInBytes(versionMap.ramBytesUsed()); @@ -2724,6 +2737,7 @@ public long getLastSyncedGlobalCheckpoint() { return getTranslog().getLastSyncedGlobalCheckpoint(); } + @Override public long getProcessedLocalCheckpoint() { return localCheckpointTracker.getProcessedCheckpoint(); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 995a92e94aeb3..5d11c34ca205c 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2638,6 +2638,14 @@ public long getLocalCheckpoint() { return getEngine().getPersistedLocalCheckpoint(); } + /** + * Fetch the latest checkpoint that has been processed but not necessarily persisted. + * Also see {@link #getLocalCheckpoint()}. + */ + public long getProcessedLocalCheckpoint() { + return getEngine().getProcessedLocalCheckpoint(); + } + /** * Returns the global checkpoint for the shard. * @@ -4005,4 +4013,14 @@ public void verifyShardBeforeIndexClosing() throws IllegalStateException { RetentionLeaseSyncer getRetentionLeaseSyncer() { return retentionLeaseSyncer; } + + /** + * Fetch the latest SegmentInfos held by the shard's underlying Engine, wrapped + * by a a {@link GatedCloseable} to ensure files are not deleted/merged away. + * + * @throws EngineException - When segment infos cannot be safely retrieved + */ + public GatedCloseable getSegmentInfosSnapshot() { + return getEngine().getSegmentInfosSnapshot(); + } } diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 65c47f66b7654..3a121b5dcaff3 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -274,6 +274,13 @@ public MetadataSnapshot getMetadata(IndexCommit commit) throws IOException { return getMetadata(commit, false); } + /** + * Conveience wrapper around the {@link #getMetadata(IndexCommit)} method for null input. + */ + public MetadataSnapshot getMetadata() throws IOException { + return getMetadata((IndexCommit) null, false); + } + /** * Returns a new MetadataSnapshot for the given commit. If the given commit is null * the latest commit point is used. @@ -315,6 +322,10 @@ public MetadataSnapshot getMetadata(IndexCommit commit, boolean lockDirectory) t } } + public MetadataSnapshot getMetadata(SegmentInfos segmentInfos) throws IOException { + return new MetadataSnapshot(segmentInfos, directory, logger); + } + /** * Renames all the given files from the key of the map to the * value of the map. All successfully renamed files are removed from the map in-place. @@ -477,7 +488,7 @@ public static MetadataSnapshot readMetadataSnapshot( Directory dir = new NIOFSDirectory(indexLocation) ) { failIfCorrupted(dir); - return new MetadataSnapshot(null, dir, logger); + return new MetadataSnapshot((IndexCommit) null, dir, logger); } catch (IndexNotFoundException ex) { // that's fine - happens all the time no need to log } catch (FileNotFoundException | NoSuchFileException ex) { @@ -682,7 +693,7 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr } } directory.syncMetaData(); - final Store.MetadataSnapshot metadataOrEmpty = getMetadata(null); + final Store.MetadataSnapshot metadataOrEmpty = getMetadata(); verifyAfterCleanup(sourceMetadata, metadataOrEmpty); } finally { metadataLock.writeLock().unlock(); @@ -822,7 +833,14 @@ public MetadataSnapshot(Map metadata, Map builder = new HashMap<>(); - Map commitUserDataBuilder = new HashMap<>(); try { final SegmentInfos segmentCommitInfos = Store.readSegmentsInfo(commit, directory); - numDocs = Lucene.getNumDocs(segmentCommitInfos); - commitUserDataBuilder.putAll(segmentCommitInfos.getUserData()); - // we don't know which version was used to write so we take the max version. - Version maxVersion = segmentCommitInfos.getMinSegmentLuceneVersion(); - for (SegmentCommitInfo info : segmentCommitInfos) { - final Version version = info.info.getVersion(); - if (version == null) { - // version is written since 3.1+: we should have already hit IndexFormatTooOld. - throw new IllegalArgumentException("expected valid version value: " + info.info.toString()); - } - if (version.onOrAfter(maxVersion)) { - maxVersion = version; - } - for (String file : info.files()) { - checksumFromLuceneFile( - directory, - file, - builder, - logger, - version, - SEGMENT_INFO_EXTENSION.equals(IndexFileNames.getExtension(file)) - ); - } - } - if (maxVersion == null) { - maxVersion = org.opensearch.Version.CURRENT.minimumIndexCompatibilityVersion().luceneVersion; - } - final String segmentsFile = segmentCommitInfos.getSegmentsFileName(); - checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true); + return loadMetadata(segmentCommitInfos, directory, logger); } catch (CorruptIndexException | IndexNotFoundException | IndexFormatTooOldException | IndexFormatTooNewException ex) { // we either know the index is corrupted or it's just not there throw ex; @@ -949,6 +936,40 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg } throw ex; } + } + + static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory directory, Logger logger) throws IOException { + long numDocs = Lucene.getNumDocs(segmentInfos); + Map commitUserDataBuilder = new HashMap<>(); + commitUserDataBuilder.putAll(segmentInfos.getUserData()); + Map builder = new HashMap<>(); + // we don't know which version was used to write so we take the max version. + Version maxVersion = segmentInfos.getMinSegmentLuceneVersion(); + for (SegmentCommitInfo info : segmentInfos) { + final Version version = info.info.getVersion(); + if (version == null) { + // version is written since 3.1+: we should have already hit IndexFormatTooOld. + throw new IllegalArgumentException("expected valid version value: " + info.info.toString()); + } + if (version.onOrAfter(maxVersion)) { + maxVersion = version; + } + for (String file : info.files()) { + checksumFromLuceneFile( + directory, + file, + builder, + logger, + version, + SEGMENT_INFO_EXTENSION.equals(IndexFileNames.getExtension(file)) + ); + } + } + if (maxVersion == null) { + maxVersion = org.opensearch.Version.CURRENT.minimumIndexCompatibilityVersion().luceneVersion; + } + final String segmentsFile = segmentInfos.getSegmentsFileName(); + checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true); return new LoadedMetadata(unmodifiableMap(builder), unmodifiableMap(commitUserDataBuilder), numDocs); } diff --git a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java new file mode 100644 index 0000000000000..581b0e43d0693 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java @@ -0,0 +1,77 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication.common; + +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.store.ByteBuffersDataOutput; +import org.apache.lucene.store.ByteBuffersIndexOutput; +import org.opensearch.common.concurrent.GatedCloseable; +import org.opensearch.common.util.concurrent.AbstractRefCounted; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; + +import java.io.IOException; +import java.util.HashSet; + +/** + * An Opensearch-specific version of Lucene's CopyState class that + * holds incRef'd file level details for one point-in-time segment infos. + * + * @opensearch.internal + */ +public class CopyState extends AbstractRefCounted { + + private final GatedCloseable segmentInfosRef; + private final ReplicationCheckpoint replicationCheckpoint; + private final Store.MetadataSnapshot metadataSnapshot; + private final HashSet pendingDeleteFiles; + private final byte[] infosBytes; + private GatedCloseable commitRef; + + public CopyState(IndexShard shard) throws IOException { + super("CopyState-" + shard.shardId()); + this.segmentInfosRef = shard.getSegmentInfosSnapshot(); + SegmentInfos segmentInfos = this.segmentInfosRef.get(); + this.metadataSnapshot = shard.store().getMetadata(segmentInfos); + this.replicationCheckpoint = new ReplicationCheckpoint( + shard.shardId(), + shard.getOperationPrimaryTerm(), + segmentInfos.getGeneration(), + shard.getProcessedLocalCheckpoint(), + segmentInfos.getVersion() + ); + + // Send files that are merged away in the latest SegmentInfos but not in the latest on disk Segments_N. + // This ensures that the store on replicas is in sync with the store on primaries. + this.commitRef = shard.acquireLastIndexCommit(false); + Store.MetadataSnapshot metadata = shard.store().getMetadata(this.commitRef.get()); + final Store.RecoveryDiff diff = metadata.recoveryDiff(this.metadataSnapshot); + this.pendingDeleteFiles = new HashSet<>(diff.missing); + if (this.pendingDeleteFiles.isEmpty()) { + // If there are no additional files we can release the last commit immediately. + this.commitRef.close(); + this.commitRef = null; + } + + ByteBuffersDataOutput buffer = new ByteBuffersDataOutput(); + // resource description and name are not used, but resource description cannot be null + try (ByteBuffersIndexOutput indexOutput = new ByteBuffersIndexOutput(buffer, "", null)) { + segmentInfos.write(indexOutput); + } + this.infosBytes = buffer.toArrayCopy(); + } + + @Override + protected void closeInternal() { + // TODO + } +} diff --git a/server/src/test/java/org/opensearch/index/store/StoreTests.java b/server/src/test/java/org/opensearch/index/store/StoreTests.java index fdec86e7912fd..5f1302c535437 100644 --- a/server/src/test/java/org/opensearch/index/store/StoreTests.java +++ b/server/src/test/java/org/opensearch/index/store/StoreTests.java @@ -364,14 +364,14 @@ public void testNewChecksums() throws IOException { Store.MetadataSnapshot metadata; // check before we committed try { - store.getMetadata(null); + store.getMetadata(); fail("no index present - expected exception"); } catch (IndexNotFoundException ex) { // expected } writer.commit(); writer.close(); - metadata = store.getMetadata(null); + metadata = store.getMetadata(); assertThat(metadata.asMap().isEmpty(), is(false)); for (StoreFileMetadata meta : metadata) { try (IndexInput input = store.directory().openInput(meta.name(), IOContext.DEFAULT)) { @@ -552,7 +552,7 @@ public void testRecoveryDiff() throws IOException, InterruptedException { } writer.commit(); writer.close(); - first = store.getMetadata(null); + first = store.getMetadata(); assertDeleteContent(store, store.directory()); store.close(); } @@ -581,7 +581,7 @@ public void testRecoveryDiff() throws IOException, InterruptedException { } writer.commit(); writer.close(); - second = store.getMetadata(null); + second = store.getMetadata(); } Store.RecoveryDiff diff = first.recoveryDiff(second); assertThat(first.size(), equalTo(second.size())); @@ -610,7 +610,7 @@ public void testRecoveryDiff() throws IOException, InterruptedException { writer.deleteDocuments(new Term("id", Integer.toString(random().nextInt(numDocs)))); writer.commit(); writer.close(); - Store.MetadataSnapshot metadata = store.getMetadata(null); + Store.MetadataSnapshot metadata = store.getMetadata(); StoreFileMetadata delFile = null; for (StoreFileMetadata md : metadata) { if (md.name().endsWith(".liv")) { @@ -645,7 +645,7 @@ public void testRecoveryDiff() throws IOException, InterruptedException { writer.addDocument(docs.get(0)); writer.close(); - Store.MetadataSnapshot newCommitMetadata = store.getMetadata(null); + Store.MetadataSnapshot newCommitMetadata = store.getMetadata(); Store.RecoveryDiff newCommitDiff = newCommitMetadata.recoveryDiff(metadata); if (delFile != null) { assertThat(newCommitDiff.identical.size(), equalTo(newCommitMetadata.size() - 5)); // segments_N, del file, cfs, cfe, si for the @@ -710,7 +710,7 @@ public void testCleanupFromSnapshot() throws IOException { writer.addDocument(doc); } - Store.MetadataSnapshot firstMeta = store.getMetadata(null); + Store.MetadataSnapshot firstMeta = store.getMetadata(); if (random().nextBoolean()) { for (int i = 0; i < docs; i++) { @@ -731,7 +731,7 @@ public void testCleanupFromSnapshot() throws IOException { writer.commit(); writer.close(); - Store.MetadataSnapshot secondMeta = store.getMetadata(null); + Store.MetadataSnapshot secondMeta = store.getMetadata(); if (randomBoolean()) { store.cleanupAndVerify("test", firstMeta); @@ -1000,7 +1000,7 @@ public void testMarkCorruptedOnTruncatedSegmentsFile() throws IOException { try { if (randomBoolean()) { - store.getMetadata(null); + store.getMetadata(); } else { store.readLastCommittedSegmentsInfo(); } diff --git a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java index bda2a910d922e..d85b2f1e22979 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -85,7 +85,7 @@ public void testWriteFileChunksConcurrently() throws Exception { indexDoc(sourceShard, "_doc", Integer.toString(i)); } sourceShard.flush(new FlushRequest()); - Store.MetadataSnapshot sourceSnapshot = sourceShard.store().getMetadata(null); + Store.MetadataSnapshot sourceSnapshot = sourceShard.store().getMetadata(); List mdFiles = new ArrayList<>(); for (StoreFileMetadata md : sourceSnapshot) { mdFiles.add(md); diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java index 1739f546150d9..fc5c429d74b16 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java @@ -189,7 +189,7 @@ public void testSendFiles() throws Throwable { writer.commit(); writer.close(); - Store.MetadataSnapshot metadata = store.getMetadata(null); + Store.MetadataSnapshot metadata = store.getMetadata(); ReplicationLuceneIndex luceneIndex = new ReplicationLuceneIndex(); List metas = new ArrayList<>(); for (StoreFileMetadata md : metadata) { @@ -226,7 +226,7 @@ public void writeFileChunk( PlainActionFuture sendFilesFuture = new PlainActionFuture<>(); handler.sendFiles(store, metas.toArray(new StoreFileMetadata[0]), () -> 0, sendFilesFuture); sendFilesFuture.actionGet(); - Store.MetadataSnapshot targetStoreMetadata = targetStore.getMetadata(null); + Store.MetadataSnapshot targetStoreMetadata = targetStore.getMetadata(); Store.RecoveryDiff recoveryDiff = targetStoreMetadata.recoveryDiff(metadata); assertEquals(metas.size(), recoveryDiff.identical.size()); assertEquals(0, recoveryDiff.different.size()); @@ -512,7 +512,7 @@ public void testHandleCorruptedIndexOnSendSendFiles() throws Throwable { writer.close(); ReplicationLuceneIndex luceneIndex = new ReplicationLuceneIndex(); - Store.MetadataSnapshot metadata = store.getMetadata(null); + Store.MetadataSnapshot metadata = store.getMetadata(); List metas = new ArrayList<>(); for (StoreFileMetadata md : metadata) { metas.add(md); @@ -594,7 +594,7 @@ public void testHandleExceptionOnSendFiles() throws Throwable { writer.commit(); writer.close(); - Store.MetadataSnapshot metadata = store.getMetadata(null); + Store.MetadataSnapshot metadata = store.getMetadata(); List metas = new ArrayList<>(); for (StoreFileMetadata md : metadata) { metas.add(md); From f362d44f03a22aab0728cdd5c68b30e8eed7ecdb Mon Sep 17 00:00:00 2001 From: Kartik Ganesh Date: Thu, 26 May 2022 17:22:36 -0700 Subject: [PATCH 02/16] Add PrimaryShardReplicationSource as an implementation of SegmentReplicationSource Signed-off-by: Kartik Ganesh --- .../PrimaryShardReplicationSource.java | 81 +++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java diff --git a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java new file mode 100644 index 0000000000000..6b53572fc2353 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java @@ -0,0 +1,81 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.action.ActionListener; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.recovery.RetryableTransportClient; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.transport.TransportService; + +import java.util.List; + +/** + * Implementation of a {@link SegmentReplicationSource} where the source is a primary node + * + * @opensearch.internal + */ +public class PrimaryShardReplicationSource implements SegmentReplicationSource { + + /** + * Internal actions used by the segment replication source service on the primary shard + * + * @opensearch.internal + */ + public static class Actions { + public static final String GET_CHECKPOINT_INFO = "internal:index/shard/replication/get_checkpoint_info"; + public static final String GET_SEGMENT_FILES = "internal:index/shard/replication/get_segment_files"; + } + + private final RetryableTransportClient transportClient; + private final RecoverySettings recoverySettings; + private final DiscoveryNode localNode; + private final String allocationId; + + public PrimaryShardReplicationSource( + TransportService transportService, + RecoverySettings recoverySettings, + DiscoveryNode targetNode, + DiscoveryNode localNode, + String allocationId + ) { + this.transportClient = new RetryableTransportClient(transportService, targetNode, recoverySettings.internalActionRetryTimeout()); + this.recoverySettings = recoverySettings; + this.localNode = localNode; + this.allocationId = allocationId; + } + + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + final Writeable.Reader reader = CheckpointInfoResponse::new; + final ActionListener responseListener = ActionListener.map(listener, r -> r); + // TODO CheckpointInfoRequest and execute action + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + Store store, + ActionListener listener + ) { + final Writeable.Reader reader = GetSegmentFilesResponse::new; + final ActionListener responseListener = ActionListener.map(listener, r -> r); + // TODO GetSegmentFilesRequest and execute action + } +} From 36b4fb1435c439d6b7532ae6f5c337b1c9d2d867 Mon Sep 17 00:00:00 2001 From: Kartik Ganesh Date: Thu, 26 May 2022 17:24:33 -0700 Subject: [PATCH 03/16] Change SegmentReplicationSourceFactory to create a PrimaryShardReplicationSource instance as the implementation Signed-off-by: Kartik Ganesh --- .../SegmentReplicationSourceFactory.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java index 3ca31503f176d..1e34868027909 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java @@ -8,8 +8,11 @@ package org.opensearch.indices.replication; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.service.ClusterService; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.transport.TransportService; @@ -35,7 +38,17 @@ public SegmentReplicationSourceFactory( } public SegmentReplicationSource get(IndexShard shard) { - // TODO: Default to an implementation that uses the primary shard. - return null; + return new PrimaryShardReplicationSource( + transportService, + recoverySettings, + clusterService.localNode(), + getPrimaryNode(shard.shardId()), + shard.routingEntry().allocationId().getId() + ); + } + + private DiscoveryNode getPrimaryNode(ShardId shardId) { + ShardRouting primaryShard = clusterService.state().routingTable().shardRoutingTable(shardId).primaryShard(); + return clusterService.state().nodes().get(primaryShard.currentNodeId()); } } From f72aa141f5c077072ea5188dde9bad0a05d4508c Mon Sep 17 00:00:00 2001 From: Kartik Ganesh Date: Fri, 27 May 2022 10:32:48 -0700 Subject: [PATCH 04/16] Added the SegmentReplicationSourceService service class CopyState has closeInternal now implemented and includes a new getter that is used by the service class. The Action definitions have been moved from the ReplicationSource class to the service class. Signed-off-by: Kartik Ganesh --- .../PrimaryShardReplicationSource.java | 10 --- .../SegmentReplicationSourceService.java | 89 +++++++++++++++++++ .../indices/replication/common/CopyState.java | 15 +++- 3 files changed, 103 insertions(+), 11 deletions(-) create mode 100644 server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java diff --git a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java index 6b53572fc2353..f8bc2ea23dcae 100644 --- a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java @@ -27,16 +27,6 @@ */ public class PrimaryShardReplicationSource implements SegmentReplicationSource { - /** - * Internal actions used by the segment replication source service on the primary shard - * - * @opensearch.internal - */ - public static class Actions { - public static final String GET_CHECKPOINT_INFO = "internal:index/shard/replication/get_checkpoint_info"; - public static final String GET_SEGMENT_FILES = "internal:index/shard/replication/get_segment_files"; - } - private final RetryableTransportClient transportClient; private final RecoverySettings recoverySettings; private final DiscoveryNode localNode; diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java new file mode 100644 index 0000000000000..358fad5de54e3 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -0,0 +1,89 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.CopyState; +import org.opensearch.transport.TransportService; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * Service class that handles segment replication requests from replica shards. + * Typically, the "source" is a primary shard. + * + * @opensearch.internal + */ +public class SegmentReplicationSourceService { + + /** + * Internal actions used by the segment replication source service on the primary shard + * + * @opensearch.internal + */ + public static class Actions { + public static final String GET_CHECKPOINT_INFO = "internal:index/shard/replication/get_checkpoint_info"; + public static final String GET_SEGMENT_FILES = "internal:index/shard/replication/get_segment_files"; + } + + private final Map copyStateMap; + private final TransportService transportService; + + public SegmentReplicationSourceService(TransportService transportService) { + copyStateMap = Collections.synchronizedMap(new HashMap<>()); + this.transportService = transportService; + // TODO register request handlers + } + + /** + * A synchronized method that checks {@link #copyStateMap} for the given {@link ReplicationCheckpoint} key + * and returns the cached value if one is present. If the key is not present, a {@link CopyState} + * object is constructed and stored in the map before being returned. + */ + private synchronized CopyState getCachedCopyState(ReplicationCheckpoint replicationCheckpoint) { + if (isInCopyStateMap(replicationCheckpoint)) { + final CopyState copyState = fetchFromCopyStateMap(replicationCheckpoint); + copyState.incRef(); + return copyState; + } else { + // TODO fetch the shard object to build the CopyState + return null; + } + } + + /** + * Operations on the {@link #copyStateMap} member. + */ + + /** + * Adds the input {@link CopyState} object to {@link #copyStateMap}. + * The key is the CopyState's {@link ReplicationCheckpoint} object. + */ + private void addToCopyStateMap(CopyState copyState) { + copyStateMap.putIfAbsent(copyState.getReplicationCheckpoint(), copyState); + } + + /** + * Given a {@link ReplicationCheckpoint}, return the corresponding + * {@link CopyState} object, if any, from {@link #copyStateMap}. + */ + private CopyState fetchFromCopyStateMap(ReplicationCheckpoint replicationCheckpoint) { + return copyStateMap.get(replicationCheckpoint); + } + + /** + * Checks if the {@link #copyStateMap} has the input {@link ReplicationCheckpoint} + * as a key by invoking {@link Map#containsKey(Object)}. + */ + private boolean isInCopyStateMap(ReplicationCheckpoint replicationCheckpoint) { + return copyStateMap.containsKey(replicationCheckpoint); + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java index 581b0e43d0693..974a90b1a4099 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java @@ -20,6 +20,7 @@ import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.HashSet; /** @@ -72,6 +73,18 @@ public CopyState(IndexShard shard) throws IOException { @Override protected void closeInternal() { - // TODO + try { + segmentInfosRef.close(); + // commitRef may be null if there were no pending delete files + if (commitRef != null) { + commitRef.close(); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public ReplicationCheckpoint getReplicationCheckpoint() { + return replicationCheckpoint; } } From 2e3f243257c71af17aada608f9e63468560e26ff Mon Sep 17 00:00:00 2001 From: Kartik Ganesh Date: Fri, 27 May 2022 11:58:29 -0700 Subject: [PATCH 05/16] Added SegmentReplicationTransportRequest and its two concrete subclasses PrimaryShardReplicationSource has been updated to wire in these classes. Signed-off-by: Kartik Ganesh --- .../replication/CheckpointInfoRequest.java | 49 ++++++++++++++++++ .../replication/GetSegmentFilesRequest.java | 50 +++++++++++++++++++ .../PrimaryShardReplicationSource.java | 11 ++-- .../SegmentReplicationTransportRequest.java | 49 ++++++++++++++++++ 4 files changed, 155 insertions(+), 4 deletions(-) create mode 100644 server/src/main/java/org/opensearch/indices/replication/CheckpointInfoRequest.java create mode 100644 server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesRequest.java create mode 100644 server/src/main/java/org/opensearch/indices/replication/common/SegmentReplicationTransportRequest.java diff --git a/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoRequest.java b/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoRequest.java new file mode 100644 index 0000000000000..55d532cf8fd7a --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoRequest.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.SegmentReplicationTransportRequest; + +import java.io.IOException; + +/** + * Request object for fetching segment metadata for a {@link ReplicationCheckpoint} from + * a {@link SegmentReplicationSource}. + * + * @opensearch.internal + */ +public class CheckpointInfoRequest extends SegmentReplicationTransportRequest { + + private final ReplicationCheckpoint checkpoint; + + public CheckpointInfoRequest(StreamInput in) throws IOException { + super(in); + checkpoint = new ReplicationCheckpoint(in); + } + + public CheckpointInfoRequest( + long replicationId, + String targetAllocationId, + DiscoveryNode targetNode, + ReplicationCheckpoint checkpoint + ) { + super(replicationId, targetAllocationId, targetNode); + this.checkpoint = checkpoint; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + checkpoint.writeTo(out); + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesRequest.java b/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesRequest.java new file mode 100644 index 0000000000000..b440a37357b04 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesRequest.java @@ -0,0 +1,50 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.SegmentReplicationTransportRequest; + +import java.io.IOException; +import java.util.List; + +public class GetSegmentFilesRequest extends SegmentReplicationTransportRequest { + + private final List filesToFetch; + private final ReplicationCheckpoint checkpoint; + + public GetSegmentFilesRequest(StreamInput in) throws IOException { + super(in); + this.filesToFetch = in.readList(StoreFileMetadata::new); + this.checkpoint = new ReplicationCheckpoint(in); + } + + public GetSegmentFilesRequest( + long replicationId, + String targetAllocationId, + DiscoveryNode targetNode, + List filesToFetch, + ReplicationCheckpoint checkpoint + ) { + super(replicationId, targetAllocationId, targetNode); + this.filesToFetch = filesToFetch; + this.checkpoint = checkpoint; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeList(filesToFetch); + checkpoint.writeTo(out); + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java index f8bc2ea23dcae..959b541c3d06b 100644 --- a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java @@ -20,6 +20,9 @@ import java.util.List; +import static org.opensearch.indices.replication.SegmentReplicationSourceService.Actions.GET_CHECKPOINT_INFO; +import static org.opensearch.indices.replication.SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES; + /** * Implementation of a {@link SegmentReplicationSource} where the source is a primary node * @@ -28,7 +31,6 @@ public class PrimaryShardReplicationSource implements SegmentReplicationSource { private final RetryableTransportClient transportClient; - private final RecoverySettings recoverySettings; private final DiscoveryNode localNode; private final String allocationId; @@ -40,7 +42,6 @@ public PrimaryShardReplicationSource( String allocationId ) { this.transportClient = new RetryableTransportClient(transportService, targetNode, recoverySettings.internalActionRetryTimeout()); - this.recoverySettings = recoverySettings; this.localNode = localNode; this.allocationId = allocationId; } @@ -53,7 +54,8 @@ public void getCheckpointMetadata( ) { final Writeable.Reader reader = CheckpointInfoResponse::new; final ActionListener responseListener = ActionListener.map(listener, r -> r); - // TODO CheckpointInfoRequest and execute action + final CheckpointInfoRequest request = new CheckpointInfoRequest(replicationId, allocationId, localNode, checkpoint); + transportClient.executeRetryableAction(GET_CHECKPOINT_INFO, request, responseListener, reader); } @Override @@ -66,6 +68,7 @@ public void getSegmentFiles( ) { final Writeable.Reader reader = GetSegmentFilesResponse::new; final ActionListener responseListener = ActionListener.map(listener, r -> r); - // TODO GetSegmentFilesRequest and execute action + final GetSegmentFilesRequest request = new GetSegmentFilesRequest(replicationId, allocationId, localNode, filesToFetch, checkpoint); + transportClient.executeRetryableAction(GET_SEGMENT_FILES, request, responseListener, reader); } } diff --git a/server/src/main/java/org/opensearch/indices/replication/common/SegmentReplicationTransportRequest.java b/server/src/main/java/org/opensearch/indices/replication/common/SegmentReplicationTransportRequest.java new file mode 100644 index 0000000000000..db8206d131c13 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/common/SegmentReplicationTransportRequest.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication.common; + +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.transport.TransportRequest; + +import java.io.IOException; + +/** + * Abstract base class for transport-layer requests related to segment replication. + * + * @opensearch.internal + */ +public abstract class SegmentReplicationTransportRequest extends TransportRequest { + + private final long replicationId; + private final String targetAllocationId; + private final DiscoveryNode targetNode; + + protected SegmentReplicationTransportRequest(long replicationId, String targetAllocationId, DiscoveryNode targetNode) { + this.replicationId = replicationId; + this.targetAllocationId = targetAllocationId; + this.targetNode = targetNode; + } + + protected SegmentReplicationTransportRequest(StreamInput in) throws IOException { + super(in); + this.replicationId = in.readLong(); + this.targetAllocationId = in.readString(); + this.targetNode = new DiscoveryNode(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeLong(this.replicationId); + out.writeString(this.targetAllocationId); + targetNode.writeTo(out); + } +} From b874d0125581ef126b461beffe2952ee60896a1a Mon Sep 17 00:00:00 2001 From: Kartik Ganesh Date: Fri, 27 May 2022 12:39:56 -0700 Subject: [PATCH 06/16] Added unit tests for PrimaryShardReplicationSource Signed-off-by: Kartik Ganesh --- .../PrimaryShardReplicationSource.java | 11 +- .../PrimaryShardReplicationSourceTests.java | 139 ++++++++++++++++++ 2 files changed, 149 insertions(+), 1 deletion(-) create mode 100644 server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java diff --git a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java index 959b541c3d06b..2b293a2cd916e 100644 --- a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java @@ -8,6 +8,8 @@ package org.opensearch.indices.replication; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.action.ActionListener; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.io.stream.Writeable; @@ -30,6 +32,8 @@ */ public class PrimaryShardReplicationSource implements SegmentReplicationSource { + private static final Logger logger = LogManager.getLogger(PrimaryShardReplicationSource.class); + private final RetryableTransportClient transportClient; private final DiscoveryNode localNode; private final String allocationId; @@ -41,7 +45,12 @@ public PrimaryShardReplicationSource( DiscoveryNode localNode, String allocationId ) { - this.transportClient = new RetryableTransportClient(transportService, targetNode, recoverySettings.internalActionRetryTimeout()); + this.transportClient = new RetryableTransportClient( + transportService, + targetNode, + recoverySettings.internalActionRetryTimeout(), + logger + ); this.localNode = localNode; this.allocationId = allocationId; } diff --git a/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java b/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java new file mode 100644 index 0000000000000..59653af95641d --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java @@ -0,0 +1,139 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.apache.lucene.util.Version; +import org.opensearch.action.ActionListener; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.internal.io.IOUtils; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardTestCase; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.test.ClusterServiceUtils; +import org.opensearch.test.transport.CapturingTransport; +import org.opensearch.transport.TransportService; + +import java.util.Arrays; +import java.util.Collections; + +import static org.mockito.Mockito.mock; + +public class PrimaryShardReplicationSourceTests extends IndexShardTestCase { + + private static final long PRIMARY_TERM = 1L; + private static final long SEGMENTS_GEN = 2L; + private static final long SEQ_NO = 3L; + private static final long VERSION = 4L; + private static final long REPLICATION_ID = 123L; + + private CapturingTransport transport; + private ClusterService clusterService; + private TransportService transportService; + private PrimaryShardReplicationSource replicationSource; + private IndexShard indexShard; + private DiscoveryNode targetNode; + + @Override + public void setUp() throws Exception { + super.setUp(); + final Settings settings = Settings.builder().put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()).build(); + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); + transport = new CapturingTransport(); + targetNode = newDiscoveryNode("targetNode"); + final DiscoveryNode localNode = newDiscoveryNode("localNode"); + clusterService = ClusterServiceUtils.createClusterService(threadPool, localNode); + transportService = transport.createTransportService( + clusterService.getSettings(), + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> clusterService.localNode(), + null, + Collections.emptySet() + ); + transportService.start(); + transportService.acceptIncomingRequests(); + + indexShard = newStartedShard(true); + + replicationSource = new PrimaryShardReplicationSource( + transportService, + recoverySettings, + targetNode, + localNode, + indexShard.routingEntry().allocationId().toString() + ); + } + + @Override + public void tearDown() throws Exception { + IOUtils.close(transportService, clusterService, transport); + closeShards(indexShard); + super.tearDown(); + } + + public void testGetCheckpointMetadata() { + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( + indexShard.shardId(), + PRIMARY_TERM, + SEGMENTS_GEN, + SEQ_NO, + VERSION + ); + replicationSource.getCheckpointMetadata(REPLICATION_ID, checkpoint, mock(ActionListener.class)); + CapturingTransport.CapturedRequest[] requestList = transport.getCapturedRequestsAndClear(); + assertEquals(1, requestList.length); + CapturingTransport.CapturedRequest capturedRequest = requestList[0]; + assertEquals(SegmentReplicationSourceService.Actions.GET_CHECKPOINT_INFO, capturedRequest.action); + assertEquals(targetNode, capturedRequest.node); + assertTrue(capturedRequest.request instanceof CheckpointInfoRequest); + } + + public void testGetSegmentFiles() { + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( + indexShard.shardId(), + PRIMARY_TERM, + SEGMENTS_GEN, + SEQ_NO, + VERSION + ); + StoreFileMetadata testMetadata = new StoreFileMetadata("testFile", 1L, "checksum", Version.LATEST); + replicationSource.getSegmentFiles( + REPLICATION_ID, + checkpoint, + Arrays.asList(testMetadata), + mock(Store.class), + mock(ActionListener.class) + ); + CapturingTransport.CapturedRequest[] requestList = transport.getCapturedRequestsAndClear(); + assertEquals(1, requestList.length); + CapturingTransport.CapturedRequest capturedRequest = requestList[0]; + assertEquals(SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES, capturedRequest.action); + assertEquals(targetNode, capturedRequest.node); + assertTrue(capturedRequest.request instanceof GetSegmentFilesRequest); + } + + private DiscoveryNode newDiscoveryNode(String nodeName) { + return new DiscoveryNode( + nodeName, + randomAlphaOfLength(10), + buildNewFakeTransportAddress(), + Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE), + org.opensearch.Version.CURRENT + ); + } +} From 83a47a55378791758879dbbef9b639dc91e0b5f2 Mon Sep 17 00:00:00 2001 From: Kartik Ganesh Date: Fri, 27 May 2022 16:16:01 -0700 Subject: [PATCH 07/16] Added request handlers to SegmentReplicationSourceService Also added an IndicesService member since this is needed to resolve the IndexShard from incoming shardId. Signed-off-by: Kartik Ganesh --- .../replication/CheckpointInfoRequest.java | 4 + .../replication/GetSegmentFilesRequest.java | 4 + .../SegmentReplicationSourceService.java | 88 ++++++++++++++++--- .../indices/replication/common/CopyState.java | 15 +++- 4 files changed, 98 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoRequest.java b/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoRequest.java index 55d532cf8fd7a..e7363ff44392d 100644 --- a/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoRequest.java +++ b/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoRequest.java @@ -46,4 +46,8 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); checkpoint.writeTo(out); } + + public ReplicationCheckpoint getCheckpoint() { + return checkpoint; + } } diff --git a/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesRequest.java b/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesRequest.java index b440a37357b04..5da121bf0fa25 100644 --- a/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesRequest.java +++ b/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesRequest.java @@ -47,4 +47,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeList(filesToFetch); checkpoint.writeTo(out); } + + public ReplicationCheckpoint getCheckpoint() { + return checkpoint; + } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index 358fad5de54e3..6c61ddd85e968 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -8,10 +8,22 @@ package org.opensearch.indices.replication; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.index.IndexService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.CopyState; +import org.opensearch.tasks.Task; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportRequestHandler; +import org.opensearch.transport.TransportResponse; import org.opensearch.transport.TransportService; +import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -24,6 +36,8 @@ */ public class SegmentReplicationSourceService { + private static final Logger logger = LogManager.getLogger(SegmentReplicationSourceService.class); + /** * Internal actions used by the segment replication source service on the primary shard * @@ -36,39 +50,89 @@ public static class Actions { private final Map copyStateMap; private final TransportService transportService; + private final IndicesService indicesService; - public SegmentReplicationSourceService(TransportService transportService) { + // TODO mark this as injected and bind in Node + public SegmentReplicationSourceService(TransportService transportService, IndicesService indicesService) { copyStateMap = Collections.synchronizedMap(new HashMap<>()); this.transportService = transportService; - // TODO register request handlers + this.indicesService = indicesService; + + transportService.registerRequestHandler( + Actions.GET_CHECKPOINT_INFO, + ThreadPool.Names.GENERIC, + CheckpointInfoRequest::new, + new CheckpointInfoRequestHandler() + ); + transportService.registerRequestHandler( + Actions.GET_SEGMENT_FILES, + ThreadPool.Names.GENERIC, + GetSegmentFilesRequest::new, + new GetSegmentFilesRequestHandler() + ); + } + + private class CheckpointInfoRequestHandler implements TransportRequestHandler { + @Override + public void messageReceived(CheckpointInfoRequest request, TransportChannel channel, Task task) throws Exception { + // TODO is this the right checkpoint? + final ReplicationCheckpoint checkpoint = request.getCheckpoint(); + logger.trace("Received request for checkpoint {}", checkpoint); + final CopyState copyState = getCachedCopyState(checkpoint); + channel.sendResponse( + new CheckpointInfoResponse( + copyState.getCheckpoint(), + copyState.getMetadataSnapshot(), + copyState.getInfosBytes(), + copyState.getPendingDeleteFiles() + ) + ); + } + } + + class GetSegmentFilesRequestHandler implements TransportRequestHandler { + @Override + public void messageReceived(GetSegmentFilesRequest request, TransportChannel channel, Task task) throws Exception { + if (isInCopyStateMap(request.getCheckpoint())) { + // TODO send files + } else { + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } + } } + /** + * Operations on the {@link #copyStateMap} member. + */ + /** * A synchronized method that checks {@link #copyStateMap} for the given {@link ReplicationCheckpoint} key * and returns the cached value if one is present. If the key is not present, a {@link CopyState} * object is constructed and stored in the map before being returned. */ - private synchronized CopyState getCachedCopyState(ReplicationCheckpoint replicationCheckpoint) { - if (isInCopyStateMap(replicationCheckpoint)) { - final CopyState copyState = fetchFromCopyStateMap(replicationCheckpoint); + private synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoint) throws IOException { + if (isInCopyStateMap(checkpoint)) { + final CopyState copyState = fetchFromCopyStateMap(checkpoint); copyState.incRef(); return copyState; } else { - // TODO fetch the shard object to build the CopyState - return null; + // From the checkpoint's shard ID, fetch the IndexShard + ShardId shardId = checkpoint.getShardId(); + final IndexService indexService = indicesService.indexService(shardId.getIndex()); + final IndexShard indexShard = indexService.getShard(shardId.id()); + // build the CopyState object and cache it before returning + final CopyState copyState = new CopyState(indexShard); + addToCopyStateMap(copyState); + return copyState; } } - /** - * Operations on the {@link #copyStateMap} member. - */ - /** * Adds the input {@link CopyState} object to {@link #copyStateMap}. * The key is the CopyState's {@link ReplicationCheckpoint} object. */ private void addToCopyStateMap(CopyState copyState) { - copyStateMap.putIfAbsent(copyState.getReplicationCheckpoint(), copyState); + copyStateMap.putIfAbsent(copyState.getCheckpoint(), copyState); } /** diff --git a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java index 974a90b1a4099..250df3481435a 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.HashSet; +import java.util.Set; /** * An Opensearch-specific version of Lucene's CopyState class that @@ -84,7 +85,19 @@ protected void closeInternal() { } } - public ReplicationCheckpoint getReplicationCheckpoint() { + public ReplicationCheckpoint getCheckpoint() { return replicationCheckpoint; } + + public Store.MetadataSnapshot getMetadataSnapshot() { + return metadataSnapshot; + } + + public byte[] getInfosBytes() { + return infosBytes; + } + + public Set getPendingDeleteFiles() { + return pendingDeleteFiles; + } } From d5b85fec95a477fe3023b983925009af6a2add0c Mon Sep 17 00:00:00 2001 From: Kartik Ganesh Date: Fri, 27 May 2022 16:45:50 -0700 Subject: [PATCH 08/16] Refactoring argument order to avoid confusion between source and target DiscoveryNodes Also updated variable naming to try and be consistent. Signed-off-by: Kartik Ganesh --- .../PrimaryShardReplicationSource.java | 29 ++++++++++++------- .../SegmentReplicationSourceFactory.java | 6 ++-- .../SegmentReplicationSourceService.java | 4 +-- .../PrimaryShardReplicationSourceTests.java | 14 ++++----- 4 files changed, 30 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java index 2b293a2cd916e..08dc0b97b31d5 100644 --- a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java @@ -26,7 +26,8 @@ import static org.opensearch.indices.replication.SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES; /** - * Implementation of a {@link SegmentReplicationSource} where the source is a primary node + * Implementation of a {@link SegmentReplicationSource} where the source is a primary node. + * This code executes on the target node. * * @opensearch.internal */ @@ -35,24 +36,24 @@ public class PrimaryShardReplicationSource implements SegmentReplicationSource { private static final Logger logger = LogManager.getLogger(PrimaryShardReplicationSource.class); private final RetryableTransportClient transportClient; - private final DiscoveryNode localNode; - private final String allocationId; + private final DiscoveryNode targetNode; + private final String targetAllocationId; public PrimaryShardReplicationSource( + DiscoveryNode targetNode, + String targetAllocationId, TransportService transportService, RecoverySettings recoverySettings, - DiscoveryNode targetNode, - DiscoveryNode localNode, - String allocationId + DiscoveryNode sourceNode ) { + this.targetAllocationId = targetAllocationId; this.transportClient = new RetryableTransportClient( transportService, - targetNode, + sourceNode, recoverySettings.internalActionRetryTimeout(), logger ); - this.localNode = localNode; - this.allocationId = allocationId; + this.targetNode = targetNode; } @Override @@ -63,7 +64,7 @@ public void getCheckpointMetadata( ) { final Writeable.Reader reader = CheckpointInfoResponse::new; final ActionListener responseListener = ActionListener.map(listener, r -> r); - final CheckpointInfoRequest request = new CheckpointInfoRequest(replicationId, allocationId, localNode, checkpoint); + final CheckpointInfoRequest request = new CheckpointInfoRequest(replicationId, targetAllocationId, targetNode, checkpoint); transportClient.executeRetryableAction(GET_CHECKPOINT_INFO, request, responseListener, reader); } @@ -77,7 +78,13 @@ public void getSegmentFiles( ) { final Writeable.Reader reader = GetSegmentFilesResponse::new; final ActionListener responseListener = ActionListener.map(listener, r -> r); - final GetSegmentFilesRequest request = new GetSegmentFilesRequest(replicationId, allocationId, localNode, filesToFetch, checkpoint); + final GetSegmentFilesRequest request = new GetSegmentFilesRequest( + replicationId, + targetAllocationId, + targetNode, + filesToFetch, + checkpoint + ); transportClient.executeRetryableAction(GET_SEGMENT_FILES, request, responseListener, reader); } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java index 1e34868027909..afbb80d263805 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java @@ -39,11 +39,11 @@ public SegmentReplicationSourceFactory( public SegmentReplicationSource get(IndexShard shard) { return new PrimaryShardReplicationSource( + clusterService.localNode(), + shard.routingEntry().allocationId().getId(), transportService, recoverySettings, - clusterService.localNode(), - getPrimaryNode(shard.shardId()), - shard.routingEntry().allocationId().getId() + getPrimaryNode(shard.shardId()) ); } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index 6c61ddd85e968..4ca15352a065b 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -30,7 +30,7 @@ /** * Service class that handles segment replication requests from replica shards. - * Typically, the "source" is a primary shard. + * Typically, the "source" is a primary shard. This code executes on the source node. * * @opensearch.internal */ @@ -75,7 +75,6 @@ public SegmentReplicationSourceService(TransportService transportService, Indice private class CheckpointInfoRequestHandler implements TransportRequestHandler { @Override public void messageReceived(CheckpointInfoRequest request, TransportChannel channel, Task task) throws Exception { - // TODO is this the right checkpoint? final ReplicationCheckpoint checkpoint = request.getCheckpoint(); logger.trace("Received request for checkpoint {}", checkpoint); final CopyState copyState = getCachedCopyState(checkpoint); @@ -122,6 +121,7 @@ private synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoi final IndexShard indexShard = indexService.getShard(shardId.id()); // build the CopyState object and cache it before returning final CopyState copyState = new CopyState(indexShard); + // TODO This will add with the latest checkpoint, not the one from the request addToCopyStateMap(copyState); return copyState; } diff --git a/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java b/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java index 59653af95641d..6bce74be569c3 100644 --- a/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java @@ -44,7 +44,7 @@ public class PrimaryShardReplicationSourceTests extends IndexShardTestCase { private TransportService transportService; private PrimaryShardReplicationSource replicationSource; private IndexShard indexShard; - private DiscoveryNode targetNode; + private DiscoveryNode sourceNode; @Override public void setUp() throws Exception { @@ -53,7 +53,7 @@ public void setUp() throws Exception { final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); transport = new CapturingTransport(); - targetNode = newDiscoveryNode("targetNode"); + sourceNode = newDiscoveryNode("sourceNode"); final DiscoveryNode localNode = newDiscoveryNode("localNode"); clusterService = ClusterServiceUtils.createClusterService(threadPool, localNode); transportService = transport.createTransportService( @@ -70,11 +70,11 @@ public void setUp() throws Exception { indexShard = newStartedShard(true); replicationSource = new PrimaryShardReplicationSource( + localNode, + indexShard.routingEntry().allocationId().toString(), transportService, recoverySettings, - targetNode, - localNode, - indexShard.routingEntry().allocationId().toString() + sourceNode ); } @@ -98,7 +98,7 @@ public void testGetCheckpointMetadata() { assertEquals(1, requestList.length); CapturingTransport.CapturedRequest capturedRequest = requestList[0]; assertEquals(SegmentReplicationSourceService.Actions.GET_CHECKPOINT_INFO, capturedRequest.action); - assertEquals(targetNode, capturedRequest.node); + assertEquals(sourceNode, capturedRequest.node); assertTrue(capturedRequest.request instanceof CheckpointInfoRequest); } @@ -122,7 +122,7 @@ public void testGetSegmentFiles() { assertEquals(1, requestList.length); CapturingTransport.CapturedRequest capturedRequest = requestList[0]; assertEquals(SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES, capturedRequest.action); - assertEquals(targetNode, capturedRequest.node); + assertEquals(sourceNode, capturedRequest.node); assertTrue(capturedRequest.request instanceof GetSegmentFilesRequest); } From 005e403137265f6fb9683b4801263302542f9b8a Mon Sep 17 00:00:00 2001 From: Kartik Ganesh Date: Tue, 31 May 2022 17:46:48 -0700 Subject: [PATCH 09/16] Added a subset of unit tests Classes updated: * InternalEngineTests * IndexShardTests * StoreTests Also added EngineConfigTests to test the read-only engine use-case with seg-rep Signed-off-by: Kartik Ganesh --- .../org/opensearch/index/engine/Engine.java | 4 + .../org/opensearch/index/store/Store.java | 2 +- .../index/engine/EngineConfigTests.java | 108 ++++++++++++++++++ .../index/engine/InternalEngineTests.java | 87 ++++++++++++++ .../opensearch/index/store/StoreTests.java | 11 ++ 5 files changed, 211 insertions(+), 1 deletion(-) create mode 100644 server/src/test/java/org/opensearch/index/engine/EngineConfigTests.java diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 61f2c0072b6b1..5dd4914770708 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -859,6 +859,10 @@ public final CommitStats commitStats() { */ public abstract long getPersistedLocalCheckpoint(); + /** + * @return the latest checkpoint that has been processed but not necessarily persisted. + * Also see {@link #getPersistedLocalCheckpoint()} + */ public long getProcessedLocalCheckpoint() { // default implementation return 0L; diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 3a121b5dcaff3..94a7e5dfd5bff 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -278,7 +278,7 @@ public MetadataSnapshot getMetadata(IndexCommit commit) throws IOException { * Conveience wrapper around the {@link #getMetadata(IndexCommit)} method for null input. */ public MetadataSnapshot getMetadata() throws IOException { - return getMetadata((IndexCommit) null, false); + return getMetadata(null, false); } /** diff --git a/server/src/test/java/org/opensearch/index/engine/EngineConfigTests.java b/server/src/test/java/org/opensearch/index/engine/EngineConfigTests.java new file mode 100644 index 0000000000000..1c6d06e9bcc08 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/engine/EngineConfigTests.java @@ -0,0 +1,108 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine; + +import org.opensearch.Version; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.seqno.RetentionLeases; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.IndexSettingsModule; +import org.opensearch.test.OpenSearchTestCase; + +public class EngineConfigTests extends OpenSearchTestCase { + + private IndexSettings defaultIndexSettings; + + @Override + public void setUp() throws Exception { + super.setUp(); + final IndexMetadata defaultIndexMetadata = IndexMetadata.builder("test") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + defaultIndexSettings = IndexSettingsModule.newIndexSettings("test", defaultIndexMetadata.getSettings()); + } + + public void testEngineConfig_DefaultValueForReadOnlyEngine() { + EngineConfig config = new EngineConfig( + null, + null, + defaultIndexSettings, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + () -> RetentionLeases.EMPTY, + null, + null + ); + assertFalse(config.isReadOnlyReplica()); + } + + public void testEngineConfig_ReadOnlyEngineWithSegRepDisabled() { + expectThrows(IllegalArgumentException.class, () -> createReadOnlyEngine(defaultIndexSettings)); + } + + public void testEngineConfig_ReadOnlyEngineWithSegRepEnabled() { + IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( + "test", + Settings.builder() + .put(defaultIndexSettings.getSettings()) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build() + ); + EngineConfig engineConfig = createReadOnlyEngine(indexSettings); + assertTrue(engineConfig.isReadOnlyReplica()); + } + + private EngineConfig createReadOnlyEngine(IndexSettings indexSettings) { + return new EngineConfig( + null, + null, + indexSettings, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + () -> RetentionLeases.EMPTY, + null, + null, + true + ); + } +} diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index cbae55a047a1e..eb92abf18e74e 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -147,6 +147,7 @@ import org.opensearch.index.translog.TranslogConfig; import org.opensearch.index.translog.TranslogDeletionPolicyFactory; import org.opensearch.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.VersionUtils; import org.opensearch.threadpool.ThreadPool; @@ -211,7 +212,9 @@ import static org.hamcrest.Matchers.nullValue; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.opensearch.index.engine.Engine.Operation.Origin.LOCAL_RESET; @@ -7384,4 +7387,88 @@ public void testMaxDocsOnReplica() throws Exception { restoreIndexWriterMaxDocs(); } } + + public void testGetSegmentInfosSnapshot_OnReadReplica() throws IOException { + engine.close(); + Store store = createStore(); + // create an engine just so we can easily fetch the engine config constructor parameters + InternalEngine tempEngine = createEngine(store, createTempDir()); + EngineConfig tempConfig = tempEngine.config(); + // read-only engine config requires the replication type setting to be SEGMENT + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( + "test", + Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build() + ); + // create the read-only engine config + EngineConfig readOnlyEngineConfig = new EngineConfig( + tempConfig.getShardId(), + tempConfig.getThreadPool(), + indexSettings, + tempConfig.getWarmer(), + store, + tempConfig.getMergePolicy(), + tempConfig.getAnalyzer(), + tempConfig.getSimilarity(), + new CodecService(null, logger), + tempConfig.getEventListener(), + tempConfig.getQueryCache(), + tempConfig.getQueryCachingPolicy(), + tempConfig.getTranslogConfig(), + null, + tempConfig.getFlushMergesAfter(), + tempConfig.getExternalRefreshListener(), + tempConfig.getInternalRefreshListener(), + tempConfig.getIndexSort(), + tempConfig.getCircuitBreakerService(), + tempConfig.getGlobalCheckpointSupplier(), + tempConfig.retentionLeasesSupplier(), + tempConfig.getPrimaryTermSupplier(), + tempConfig.getTombstoneDocSupplier(), + true + ); + // close engine now that it is no longer needed + tempEngine.close(); + + SetOnce indexWriterHolder = new SetOnce<>(); + IndexWriterFactory indexWriterFactory = (directory, iwc) -> { + indexWriterHolder.set(new IndexWriter(directory, iwc)); + return indexWriterHolder.get(); + }; + InternalEngine engine = createEngine(readOnlyEngineConfig); + expectThrows(AssertionError.class, engine::getSegmentInfosSnapshot); + engine.close(); + store.close(); + } + + public void testGetSegmentInfosSnapshot() throws IOException { + IOUtils.close(store, engine); + Store store = createStore(); + InternalEngine engine = spy(createEngine(store, createTempDir())); + GatedCloseable segmentInfosSnapshot = engine.getSegmentInfosSnapshot(); + assertNotNull(segmentInfosSnapshot); + assertNotNull(segmentInfosSnapshot.get()); + verify(engine, times(1)).getLatestSegmentInfos(); + store.close(); + engine.close(); + } + + public void testGetProcessedLocalCheckpoint() throws IOException { + final long expectedLocalCheckpoint = 1L; + IOUtils.close(store, engine); + // set up mock + final LocalCheckpointTracker mockCheckpointTracker = mock(LocalCheckpointTracker.class); + when(mockCheckpointTracker.getProcessedCheckpoint()).thenReturn(expectedLocalCheckpoint); + + Store store = createStore(); + InternalEngine engine = createEngine(store, createTempDir(), (a, b) -> mockCheckpointTracker); + + long actualLocalCheckpoint = engine.getProcessedLocalCheckpoint(); + assertEquals(expectedLocalCheckpoint, actualLocalCheckpoint); + verify(mockCheckpointTracker, atLeastOnce()).getProcessedCheckpoint(); + store.close(); + engine.close(); + } } diff --git a/server/src/test/java/org/opensearch/index/store/StoreTests.java b/server/src/test/java/org/opensearch/index/store/StoreTests.java index 5f1302c535437..d99bde4764adf 100644 --- a/server/src/test/java/org/opensearch/index/store/StoreTests.java +++ b/server/src/test/java/org/opensearch/index/store/StoreTests.java @@ -1138,4 +1138,15 @@ public void testGetPendingFiles() throws IOException { } } } + + public void testGetMetadataWithSegmentInfos() throws IOException { + final ShardId shardId = new ShardId("index", "_na_", 1); + Store store = new Store(shardId, INDEX_SETTINGS, new NIOFSDirectory(createTempDir()), new DummyShardLock(shardId)); + store.createEmpty(Version.LATEST); + SegmentInfos segmentInfos = Lucene.readSegmentInfos(store.directory()); + Store.MetadataSnapshot metadataSnapshot = store.getMetadata(segmentInfos); + // loose check for equality + assertEquals(segmentInfos.getSegmentsFileName(), metadataSnapshot.getSegmentsFile().name()); + store.close(); + } } From 2227f455c5a9e9aa44cddec2b94a4c849cfe18cf Mon Sep 17 00:00:00 2001 From: Kartik Ganesh Date: Tue, 31 May 2022 21:34:27 -0700 Subject: [PATCH 10/16] Added and updated Javadocs Signed-off-by: Kartik Ganesh --- .../indices/replication/CheckpointInfoRequest.java | 3 ++- .../indices/replication/GetSegmentFilesRequest.java | 6 ++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoRequest.java b/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoRequest.java index e7363ff44392d..188a4c1e40fa7 100644 --- a/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoRequest.java +++ b/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoRequest.java @@ -18,7 +18,8 @@ /** * Request object for fetching segment metadata for a {@link ReplicationCheckpoint} from - * a {@link SegmentReplicationSource}. + * a {@link SegmentReplicationSource}. This object is created by the target node and sent + * to the source node. * * @opensearch.internal */ diff --git a/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesRequest.java b/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesRequest.java index 5da121bf0fa25..21749d3fe7d8a 100644 --- a/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesRequest.java +++ b/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesRequest.java @@ -18,6 +18,12 @@ import java.io.IOException; import java.util.List; +/** + * Request object for fetching a list of segment files metadata from a {@link SegmentReplicationSource}. + * This object is created by the target node and sent to the source node. + * + * @opensearch.internal + */ public class GetSegmentFilesRequest extends SegmentReplicationTransportRequest { private final List filesToFetch; From 6ecb56069fa42892b5c959db56b0d814155fa731 Mon Sep 17 00:00:00 2001 From: Kartik Ganesh Date: Wed, 1 Jun 2022 14:04:09 -0700 Subject: [PATCH 11/16] Added a unit test for CopyState Signed-off-by: Kartik Ganesh --- .../replication/common/CopyStateTests.java | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java diff --git a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java new file mode 100644 index 0000000000000..faf12d5f1025c --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java @@ -0,0 +1,79 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication.common; + +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.util.Version; +import org.opensearch.common.collect.Map; +import org.opensearch.common.concurrent.GatedCloseable; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardTestCase; +import org.opensearch.index.shard.ShardId; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; + +import java.io.IOException; +import java.util.Set; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class CopyStateTests extends IndexShardTestCase { + + public void testCopyStateCreation() throws IOException { + // dummy objects setup + final long expectedLongValue = 1L; + final ShardId testShardId = new ShardId("testIndex", "testUUID", 0); + final StoreFileMetadata segmentsFile = new StoreFileMetadata(IndexFileNames.SEGMENTS, 1L, "0", Version.LATEST); + final StoreFileMetadata pendingDeleteFile = new StoreFileMetadata("pendingDelete.del", 1L, "1", Version.LATEST); + final Store.MetadataSnapshot commitMetadataSnapshot = new Store.MetadataSnapshot( + Map.of("segmentsFile", segmentsFile, "pendingDeleteFile", pendingDeleteFile), + null, + 0 + ); + final Store.MetadataSnapshot segmentInfosMetadataSnapshot = new Store.MetadataSnapshot( + Map.of("segmentsFile", segmentsFile), + null, + 0 + ); + + // Mock objects setup + IndexShard mockShard = mock(IndexShard.class); + when(mockShard.shardId()).thenReturn(testShardId); + when(mockShard.getOperationPrimaryTerm()).thenReturn(expectedLongValue); + when(mockShard.getProcessedLocalCheckpoint()).thenReturn(expectedLongValue); + + Store mockStore = mock(Store.class); + when(mockShard.store()).thenReturn(mockStore); + + SegmentInfos testSegmentInfos = new SegmentInfos(Version.LATEST.major); + when(mockShard.getSegmentInfosSnapshot()).thenReturn(new GatedCloseable<>(testSegmentInfos, () -> {})); + + when(mockStore.getMetadata(testSegmentInfos)).thenReturn(segmentInfosMetadataSnapshot); + + IndexCommit mockIndexCommit = mock(IndexCommit.class); + when(mockShard.acquireLastIndexCommit(false)).thenReturn(new GatedCloseable<>(mockIndexCommit, () -> {})); + when(mockStore.getMetadata(mockIndexCommit)).thenReturn(commitMetadataSnapshot); + + // unit test + CopyState copyState = new CopyState(mockShard); + ReplicationCheckpoint checkpoint = copyState.getCheckpoint(); + assertEquals(testShardId, checkpoint.getShardId()); + // version was never set so this should be zero + assertEquals(0, checkpoint.getSegmentInfosVersion()); + assertEquals(expectedLongValue, checkpoint.getPrimaryTerm()); + + Set pendingDeleteFiles = copyState.getPendingDeleteFiles(); + assertEquals(1, pendingDeleteFiles.size()); + assertTrue(pendingDeleteFiles.contains(pendingDeleteFile)); + } +} From 1a2c70b66b32dab2460b87c1134b8e01a2af6122 Mon Sep 17 00:00:00 2001 From: Kartik Ganesh Date: Wed, 1 Jun 2022 14:46:36 -0700 Subject: [PATCH 12/16] Incorporating PR comments Signed-off-by: Kartik Ganesh --- .../index/engine/InternalEngine.java | 2 - .../SegmentReplicationSourceService.java | 15 +++-- .../index/engine/InternalEngineTests.java | 56 ------------------- 3 files changed, 11 insertions(+), 62 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 5b28b29914b4e..d65c06cce1b69 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -2307,8 +2307,6 @@ public SegmentInfos getLatestSegmentInfos() { @Override public GatedCloseable getSegmentInfosSnapshot() { - // this should never be called by read-only engines - assert (engineConfig.isReadOnlyReplica() == false); final SegmentInfos segmentInfos = getLatestSegmentInfos(); try { indexWriter.incRefDeleter(segmentInfos); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index 4ca15352a065b..ebcf2d27e6933 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -121,8 +121,15 @@ private synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoi final IndexShard indexShard = indexService.getShard(shardId.id()); // build the CopyState object and cache it before returning final CopyState copyState = new CopyState(indexShard); - // TODO This will add with the latest checkpoint, not the one from the request - addToCopyStateMap(copyState); + + /** + * Use the checkpoint from the request as the key in the map, rather than + * the checkpoint from the created CopyState. This maximizes cache hits + * if replication targets make a request with an older checkpoint. + * Replication targets are expected to fetch the checkpoint in the response + * CopyState to bring themselves up to date. + */ + addToCopyStateMap(checkpoint, copyState); return copyState; } } @@ -131,8 +138,8 @@ private synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoi * Adds the input {@link CopyState} object to {@link #copyStateMap}. * The key is the CopyState's {@link ReplicationCheckpoint} object. */ - private void addToCopyStateMap(CopyState copyState) { - copyStateMap.putIfAbsent(copyState.getCheckpoint(), copyState); + private void addToCopyStateMap(ReplicationCheckpoint checkpoint, CopyState copyState) { + copyStateMap.putIfAbsent(checkpoint, copyState); } /** diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index eb92abf18e74e..b14ad15070118 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -147,7 +147,6 @@ import org.opensearch.index.translog.TranslogConfig; import org.opensearch.index.translog.TranslogDeletionPolicyFactory; import org.opensearch.indices.breaker.NoneCircuitBreakerService; -import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.VersionUtils; import org.opensearch.threadpool.ThreadPool; @@ -7388,61 +7387,6 @@ public void testMaxDocsOnReplica() throws Exception { } } - public void testGetSegmentInfosSnapshot_OnReadReplica() throws IOException { - engine.close(); - Store store = createStore(); - // create an engine just so we can easily fetch the engine config constructor parameters - InternalEngine tempEngine = createEngine(store, createTempDir()); - EngineConfig tempConfig = tempEngine.config(); - // read-only engine config requires the replication type setting to be SEGMENT - final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( - "test", - Settings.builder() - .put(defaultSettings.getSettings()) - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .build() - ); - // create the read-only engine config - EngineConfig readOnlyEngineConfig = new EngineConfig( - tempConfig.getShardId(), - tempConfig.getThreadPool(), - indexSettings, - tempConfig.getWarmer(), - store, - tempConfig.getMergePolicy(), - tempConfig.getAnalyzer(), - tempConfig.getSimilarity(), - new CodecService(null, logger), - tempConfig.getEventListener(), - tempConfig.getQueryCache(), - tempConfig.getQueryCachingPolicy(), - tempConfig.getTranslogConfig(), - null, - tempConfig.getFlushMergesAfter(), - tempConfig.getExternalRefreshListener(), - tempConfig.getInternalRefreshListener(), - tempConfig.getIndexSort(), - tempConfig.getCircuitBreakerService(), - tempConfig.getGlobalCheckpointSupplier(), - tempConfig.retentionLeasesSupplier(), - tempConfig.getPrimaryTermSupplier(), - tempConfig.getTombstoneDocSupplier(), - true - ); - // close engine now that it is no longer needed - tempEngine.close(); - - SetOnce indexWriterHolder = new SetOnce<>(); - IndexWriterFactory indexWriterFactory = (directory, iwc) -> { - indexWriterHolder.set(new IndexWriter(directory, iwc)); - return indexWriterHolder.get(); - }; - InternalEngine engine = createEngine(readOnlyEngineConfig); - expectThrows(AssertionError.class, engine::getSegmentInfosSnapshot); - engine.close(); - store.close(); - } - public void testGetSegmentInfosSnapshot() throws IOException { IOUtils.close(store, engine); Store store = createStore(); From caca5334c18e832434c8898f444e32b602e3b364 Mon Sep 17 00:00:00 2001 From: Kartik Ganesh Date: Wed, 1 Jun 2022 16:53:49 -0700 Subject: [PATCH 13/16] Added tests for SegmentReplicationSourceService This includes refactoring CopyStateTests for code reuse. Also fixed CopyStateTests since these were failing. Signed-off-by: Kartik Ganesh --- .../SegmentReplicationSourceService.java | 4 +- .../SegmentReplicationSourceServiceTests.java | 161 ++++++++++++++++++ .../replication/common/CopyStateTests.java | 69 ++++---- 3 files changed, 198 insertions(+), 36 deletions(-) create mode 100644 server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index ebcf2d27e6933..9f70120dedd6c 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -20,7 +20,6 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportChannel; import org.opensearch.transport.TransportRequestHandler; -import org.opensearch.transport.TransportResponse; import org.opensearch.transport.TransportService; import java.io.IOException; @@ -95,7 +94,8 @@ public void messageReceived(GetSegmentFilesRequest request, TransportChannel cha if (isInCopyStateMap(request.getCheckpoint())) { // TODO send files } else { - channel.sendResponse(TransportResponse.Empty.INSTANCE); + // Return an empty list of files + channel.sendResponse(new GetSegmentFilesResponse(Collections.emptyList())); } } } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java new file mode 100644 index 0000000000000..67c867d360e70 --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java @@ -0,0 +1,161 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.Version; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.CopyStateTests; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.transport.CapturingTransport; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportException; +import org.opensearch.transport.TransportResponseHandler; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SegmentReplicationSourceServiceTests extends OpenSearchTestCase { + + private ShardId testShardId; + private ReplicationCheckpoint testCheckpoint; + private IndicesService mockIndicesService; + private IndexService mockIndexService; + private IndexShard mockIndexShard; + private TestThreadPool testThreadPool; + private CapturingTransport transport; + private TransportService transportService; + private DiscoveryNode localNode; + private SegmentReplicationSourceService segmentReplicationSourceService; + + @Override + public void setUp() throws Exception { + super.setUp(); + // setup mocks + mockIndexShard = CopyStateTests.createMockIndexShard(); + testShardId = mockIndexShard.shardId(); + mockIndicesService = mock(IndicesService.class); + mockIndexService = mock(IndexService.class); + when(mockIndicesService.indexService(testShardId.getIndex())).thenReturn(mockIndexService); + when(mockIndexService.getShard(testShardId.id())).thenReturn(mockIndexShard); + + // This mirrors the creation of the ReplicationCheckpoint inside CopyState + testCheckpoint = new ReplicationCheckpoint( + testShardId, + mockIndexShard.getOperationPrimaryTerm(), + 0L, + mockIndexShard.getProcessedLocalCheckpoint(), + 0L + ); + testThreadPool = new TestThreadPool("test", Settings.EMPTY); + transport = new CapturingTransport(); + localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT); + transportService = transport.createTransportService( + Settings.EMPTY, + testThreadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> localNode, + null, + Collections.emptySet() + ); + transportService.start(); + transportService.acceptIncomingRequests(); + segmentReplicationSourceService = new SegmentReplicationSourceService(transportService, mockIndicesService); + } + + @Override + public void tearDown() throws Exception { + ThreadPool.terminate(testThreadPool, 30, TimeUnit.SECONDS); + testThreadPool = null; + super.tearDown(); + } + + public void testGetSegmentFiles_EmptyResponse() { + final GetSegmentFilesRequest request = new GetSegmentFilesRequest( + 1, + "allocationId", + localNode, + Collections.emptyList(), + testCheckpoint + ); + transportService.sendRequest( + localNode, + SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES, + request, + new TransportResponseHandler() { + @Override + public void handleResponse(GetSegmentFilesResponse response) { + assertEquals(0, response.files.size()); + } + + @Override + public void handleException(TransportException e) { + fail("unexpected exception: " + e); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public GetSegmentFilesResponse read(StreamInput in) throws IOException { + return new GetSegmentFilesResponse(in); + } + } + ); + } + + public void testCheckpointInfo() { + final CheckpointInfoRequest request = new CheckpointInfoRequest(1L, "testAllocationId", localNode, testCheckpoint); + transportService.sendRequest( + localNode, + SegmentReplicationSourceService.Actions.GET_CHECKPOINT_INFO, + request, + new TransportResponseHandler() { + @Override + public void handleResponse(CheckpointInfoResponse response) { + assertEquals(testCheckpoint, response.getCheckpoint()); + assertNotNull(response.getInfosBytes()); + // CopyStateTests sets up one pending delete file and one committed segments file + assertEquals(1, response.getPendingDeleteFiles().size()); + assertEquals(1, response.getSnapshot().size()); + } + + @Override + public void handleException(TransportException e) { + fail("unexpected exception: " + e); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public CheckpointInfoResponse read(StreamInput in) throws IOException { + return new CheckpointInfoResponse(in); + } + } + ); + } + +} diff --git a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java index faf12d5f1025c..afa38afb0cf2f 100644 --- a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java @@ -29,51 +29,52 @@ public class CopyStateTests extends IndexShardTestCase { + private static final long EXPECTED_LONG_VALUE = 1L; + private static final ShardId TEST_SHARD_ID = new ShardId("testIndex", "testUUID", 0); + private static final StoreFileMetadata SEGMENTS_FILE = new StoreFileMetadata(IndexFileNames.SEGMENTS, 1L, "0", Version.LATEST); + private static final StoreFileMetadata PENDING_DELETE_FILE = new StoreFileMetadata("pendingDelete.del", 1L, "1", Version.LATEST); + + private static final Store.MetadataSnapshot COMMIT_SNAPSHOT = new Store.MetadataSnapshot( + Map.of(SEGMENTS_FILE.name(), SEGMENTS_FILE, PENDING_DELETE_FILE.name(), PENDING_DELETE_FILE), + null, + 0 + ); + + private static final Store.MetadataSnapshot SI_SNAPSHOT = new Store.MetadataSnapshot( + Map.of(SEGMENTS_FILE.name(), SEGMENTS_FILE), + null, + 0 + ); + public void testCopyStateCreation() throws IOException { - // dummy objects setup - final long expectedLongValue = 1L; - final ShardId testShardId = new ShardId("testIndex", "testUUID", 0); - final StoreFileMetadata segmentsFile = new StoreFileMetadata(IndexFileNames.SEGMENTS, 1L, "0", Version.LATEST); - final StoreFileMetadata pendingDeleteFile = new StoreFileMetadata("pendingDelete.del", 1L, "1", Version.LATEST); - final Store.MetadataSnapshot commitMetadataSnapshot = new Store.MetadataSnapshot( - Map.of("segmentsFile", segmentsFile, "pendingDeleteFile", pendingDeleteFile), - null, - 0 - ); - final Store.MetadataSnapshot segmentInfosMetadataSnapshot = new Store.MetadataSnapshot( - Map.of("segmentsFile", segmentsFile), - null, - 0 - ); + CopyState copyState = new CopyState(createMockIndexShard()); + ReplicationCheckpoint checkpoint = copyState.getCheckpoint(); + assertEquals(TEST_SHARD_ID, checkpoint.getShardId()); + // version was never set so this should be zero + assertEquals(0, checkpoint.getSegmentInfosVersion()); + assertEquals(EXPECTED_LONG_VALUE, checkpoint.getPrimaryTerm()); + + Set pendingDeleteFiles = copyState.getPendingDeleteFiles(); + assertEquals(1, pendingDeleteFiles.size()); + assertTrue(pendingDeleteFiles.contains(PENDING_DELETE_FILE)); + } - // Mock objects setup + public static IndexShard createMockIndexShard() throws IOException { IndexShard mockShard = mock(IndexShard.class); - when(mockShard.shardId()).thenReturn(testShardId); - when(mockShard.getOperationPrimaryTerm()).thenReturn(expectedLongValue); - when(mockShard.getProcessedLocalCheckpoint()).thenReturn(expectedLongValue); + when(mockShard.shardId()).thenReturn(TEST_SHARD_ID); + when(mockShard.getOperationPrimaryTerm()).thenReturn(EXPECTED_LONG_VALUE); + when(mockShard.getProcessedLocalCheckpoint()).thenReturn(EXPECTED_LONG_VALUE); Store mockStore = mock(Store.class); when(mockShard.store()).thenReturn(mockStore); SegmentInfos testSegmentInfos = new SegmentInfos(Version.LATEST.major); when(mockShard.getSegmentInfosSnapshot()).thenReturn(new GatedCloseable<>(testSegmentInfos, () -> {})); - - when(mockStore.getMetadata(testSegmentInfos)).thenReturn(segmentInfosMetadataSnapshot); + when(mockStore.getMetadata(testSegmentInfos)).thenReturn(SI_SNAPSHOT); IndexCommit mockIndexCommit = mock(IndexCommit.class); when(mockShard.acquireLastIndexCommit(false)).thenReturn(new GatedCloseable<>(mockIndexCommit, () -> {})); - when(mockStore.getMetadata(mockIndexCommit)).thenReturn(commitMetadataSnapshot); - - // unit test - CopyState copyState = new CopyState(mockShard); - ReplicationCheckpoint checkpoint = copyState.getCheckpoint(); - assertEquals(testShardId, checkpoint.getShardId()); - // version was never set so this should be zero - assertEquals(0, checkpoint.getSegmentInfosVersion()); - assertEquals(expectedLongValue, checkpoint.getPrimaryTerm()); - - Set pendingDeleteFiles = copyState.getPendingDeleteFiles(); - assertEquals(1, pendingDeleteFiles.size()); - assertTrue(pendingDeleteFiles.contains(pendingDeleteFile)); + when(mockStore.getMetadata(mockIndexCommit)).thenReturn(COMMIT_SNAPSHOT); + return mockShard; } } From 91372c69d6893f89b4bedb7c442fb8f08bc17fab Mon Sep 17 00:00:00 2001 From: Kartik Ganesh Date: Wed, 1 Jun 2022 19:23:54 -0700 Subject: [PATCH 14/16] Removing default implementation of getProcessedLocalCheckpoint This now requires implementations in child classes. NRTReplicationEngine already has an implementation, so an @Override annotation has been added. For ReadOnlyEngine, where no processing occurs, the processed local checkpoint is expected to be equal to the persisted local checkpoint. Unit test for ReadOnlyEngine have been updated. Signed-off-by: Kartik Ganesh --- .../src/main/java/org/opensearch/index/engine/Engine.java | 5 +---- .../org/opensearch/index/engine/NRTReplicationEngine.java | 1 + .../java/org/opensearch/index/engine/ReadOnlyEngine.java | 7 +++++++ .../org/opensearch/index/engine/ReadOnlyEngineTests.java | 3 +++ 4 files changed, 12 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 5dd4914770708..c6cdec328be85 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -863,10 +863,7 @@ public final CommitStats commitStats() { * @return the latest checkpoint that has been processed but not necessarily persisted. * Also see {@link #getPersistedLocalCheckpoint()} */ - public long getProcessedLocalCheckpoint() { - // default implementation - return 0L; - } + public abstract long getProcessedLocalCheckpoint(); /** * @return a {@link SeqNoStats} object, using local state and the supplied global checkpoint diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 106643198cc3b..e4f4bbbba8f16 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -248,6 +248,7 @@ public long getPersistedLocalCheckpoint() { return localCheckpointTracker.getPersistedCheckpoint(); } + @Override public long getProcessedLocalCheckpoint() { return localCheckpointTracker.getProcessedCheckpoint(); } diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index 23a86d8da5599..6262a9269c01c 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -374,6 +374,13 @@ public long getPersistedLocalCheckpoint() { return seqNoStats.getLocalCheckpoint(); } + @Override + public long getProcessedLocalCheckpoint() { + // the read-only engine does not process checkpoints, so its + // processed checkpoint is identical to its persisted one. + return getPersistedLocalCheckpoint(); + } + @Override public SeqNoStats getSeqNoStats(long globalCheckpoint) { return new SeqNoStats(seqNoStats.getMaxSeqNo(), seqNoStats.getLocalCheckpoint(), globalCheckpoint); diff --git a/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java b/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java index 2106c5e1067fb..da0db02ac402e 100644 --- a/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java @@ -107,6 +107,7 @@ public void testReadOnlyEngine() throws Exception { lastSeqNoStats = engine.getSeqNoStats(globalCheckpoint.get()); lastDocIds = getDocIds(engine, true); assertThat(readOnlyEngine.getPersistedLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint())); + assertThat(readOnlyEngine.getProcessedLocalCheckpoint(), equalTo(readOnlyEngine.getPersistedLocalCheckpoint())); assertThat(readOnlyEngine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo(), equalTo(lastSeqNoStats.getMaxSeqNo())); assertThat(getDocIds(readOnlyEngine, false), equalTo(lastDocIds)); for (int i = 0; i < numDocs; i++) { @@ -131,6 +132,7 @@ public void testReadOnlyEngine() throws Exception { IOUtils.close(external, internal); // the locked down engine should still point to the previous commit assertThat(readOnlyEngine.getPersistedLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint())); + assertThat(readOnlyEngine.getProcessedLocalCheckpoint(), equalTo(readOnlyEngine.getPersistedLocalCheckpoint())); assertThat(readOnlyEngine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo(), equalTo(lastSeqNoStats.getMaxSeqNo())); assertThat(getDocIds(readOnlyEngine, false), equalTo(lastDocIds)); try (Engine.GetResult getResult = readOnlyEngine.get(get, readOnlyEngine::acquireSearcher)) { @@ -142,6 +144,7 @@ public void testReadOnlyEngine() throws Exception { recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); // the locked down engine should still point to the previous commit assertThat(readOnlyEngine.getPersistedLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint())); + assertThat(readOnlyEngine.getProcessedLocalCheckpoint(), equalTo(readOnlyEngine.getPersistedLocalCheckpoint())); assertThat(readOnlyEngine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo(), equalTo(lastSeqNoStats.getMaxSeqNo())); assertThat(getDocIds(readOnlyEngine, false), equalTo(lastDocIds)); } From 7075793519a7fcf541a88b8ebbb3a9b4531a59e1 Mon Sep 17 00:00:00 2001 From: Kartik Ganesh Date: Wed, 1 Jun 2022 19:59:25 -0700 Subject: [PATCH 15/16] Made the javadoc for getSegmentInfosSnapshot() better Signed-off-by: Kartik Ganesh --- .../src/main/java/org/opensearch/index/engine/Engine.java | 8 +++++--- .../java/org/opensearch/index/engine/InternalEngine.java | 5 +++++ 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index c6cdec328be85..4829148322b31 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -176,12 +176,14 @@ public final EngineConfig config() { protected abstract SegmentInfos getLatestSegmentInfos(); /** - * Fetch a snapshot of the latest SegmentInfos from the engine. Using this method - * ensures that segment files are retained in the directory until the reference is closed. + * In contrast to {@link #getLatestSegmentInfos()}, which returns a {@link SegmentInfos} + * object directly, this method returns a {@link GatedCloseable} reference to the same object. + * This allows the engine to include a clean-up {@link org.opensearch.common.CheckedRunnable} + * which is run when the reference is closed. The default implementation of the clean-up + * procedure is a no-op. * * @return {@link GatedCloseable} - A wrapper around a {@link SegmentInfos} instance that * must be closed for segment files to be deleted. - * @throws EngineException - When segment infos cannot be safely retrieved */ public GatedCloseable getSegmentInfosSnapshot() { // default implementation diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index d65c06cce1b69..b63a39ebb1222 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -2305,6 +2305,11 @@ public SegmentInfos getLatestSegmentInfos() { } } + /** + * Fetch the latest {@link SegmentInfos} object via {@link #getLatestSegmentInfos()} + * but also increment the ref-count to ensure that these segment files are retained + * until the reference is closed. On close, the ref-count is decremented. + */ @Override public GatedCloseable getSegmentInfosSnapshot() { final SegmentInfos segmentInfos = getLatestSegmentInfos(); From 71e66dee70cbcadc0488382508fedd08d53f36ad Mon Sep 17 00:00:00 2001 From: Kartik Ganesh Date: Fri, 3 Jun 2022 10:10:17 -0700 Subject: [PATCH 16/16] Incorporating PR feedback on javadocs Signed-off-by: Kartik Ganesh --- .../src/main/java/org/opensearch/index/store/Store.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 94a7e5dfd5bff..f818456c3a2c8 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -275,7 +275,7 @@ public MetadataSnapshot getMetadata(IndexCommit commit) throws IOException { } /** - * Conveience wrapper around the {@link #getMetadata(IndexCommit)} method for null input. + * Convenience wrapper around the {@link #getMetadata(IndexCommit)} method for null input. */ public MetadataSnapshot getMetadata() throws IOException { return getMetadata(null, false); @@ -322,6 +322,12 @@ public MetadataSnapshot getMetadata(IndexCommit commit, boolean lockDirectory) t } } + /** + * Returns a new {@link MetadataSnapshot} for the given {@link SegmentInfos} object. + * In contrast to {@link #getMetadata(IndexCommit)}, this method is useful for scenarios + * where we need to construct a MetadataSnapshot from an in-memory SegmentInfos object that + * may not have a IndexCommit associated with it, such as with segment replication. + */ public MetadataSnapshot getMetadata(SegmentInfos segmentInfos) throws IOException { return new MetadataSnapshot(segmentInfos, directory, logger); }