Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [Segment Replication] Added source-side classes for orchestrating replication events #4096

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,21 @@ protected SegmentInfos getLatestSegmentInfos() {
return null;
};

/**
* In contrast to {@link #getLatestSegmentInfos()}, which returns a {@link SegmentInfos}
* object directly, this method returns a {@link GatedCloseable} reference to the same object.
* This allows the engine to include a clean-up {@link org.opensearch.common.CheckedRunnable}
* which is run when the reference is closed. The default implementation of the clean-up
* procedure is a no-op.
*
* @return {@link GatedCloseable} - A wrapper around a {@link SegmentInfos} instance that
* must be closed for segment files to be deleted.
*/
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
// default implementation
return new GatedCloseable<>(getLatestSegmentInfos(), () -> {});
}

public MergeStats getMergeStats() {
return new MergeStats();
}
Expand Down Expand Up @@ -865,6 +880,12 @@ public final CommitStats commitStats() {
*/
public abstract long getPersistedLocalCheckpoint();

/**
* @return the latest checkpoint that has been processed but not necessarily persisted.
* Also see {@link #getPersistedLocalCheckpoint()}
*/
public abstract long getProcessedLocalCheckpoint();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Breaking change, need to remove abstract and provide default implementation


/**
* @return a {@link SeqNoStats} object, using local state and the supplied global checkpoint
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2333,6 +2333,22 @@ public SegmentInfos getLatestSegmentInfos() {
}
}

/**
* Fetch the latest {@link SegmentInfos} object via {@link #getLatestSegmentInfos()}
* but also increment the ref-count to ensure that these segment files are retained
* until the reference is closed. On close, the ref-count is decremented.
*/
@Override
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
final SegmentInfos segmentInfos = getLatestSegmentInfos();
try {
indexWriter.incRefDeleter(segmentInfos);
} catch (IOException e) {
throw new EngineException(shardId, e.getMessage(), e);
}
return new GatedCloseable<>(segmentInfos, () -> indexWriter.decRefDeleter(segmentInfos));
}

@Override
protected final void writerSegmentStats(SegmentsStats stats) {
stats.addVersionMapMemoryInBytes(versionMap.ramBytesUsed());
Expand Down Expand Up @@ -2742,6 +2758,7 @@ public long getLastSyncedGlobalCheckpoint() {
return translogManager.getTranslog().getLastSyncedGlobalCheckpoint();
}

@Override
public long getProcessedLocalCheckpoint() {
return localCheckpointTracker.getProcessedCheckpoint();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ public long getPersistedLocalCheckpoint() {
return localCheckpointTracker.getPersistedCheckpoint();
}

@Override
public long getProcessedLocalCheckpoint() {
return localCheckpointTracker.getProcessedCheckpoint();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,13 @@ public long getProcessedLocalCheckpoint() {
return getPersistedLocalCheckpoint();
}

@Override
public long getProcessedLocalCheckpoint() {
// the read-only engine does not process checkpoints, so its
// processed checkpoint is identical to its persisted one.
return getPersistedLocalCheckpoint();
}

@Override
public SeqNoStats getSeqNoStats(long globalCheckpoint) {
return new SeqNoStats(seqNoStats.getMaxSeqNo(), seqNoStats.getLocalCheckpoint(), globalCheckpoint);
Expand Down
18 changes: 18 additions & 0 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2650,6 +2650,14 @@ public long getLocalCheckpoint() {
return getEngine().getPersistedLocalCheckpoint();
}

/**
* Fetch the latest checkpoint that has been processed but not necessarily persisted.
* Also see {@link #getLocalCheckpoint()}.
*/
public long getProcessedLocalCheckpoint() {
return getEngine().getProcessedLocalCheckpoint();
}

/**
* Returns the global checkpoint for the shard.
*
Expand Down Expand Up @@ -4017,4 +4025,14 @@ public void verifyShardBeforeIndexClosing() throws IllegalStateException {
RetentionLeaseSyncer getRetentionLeaseSyncer() {
return retentionLeaseSyncer;
}

/**
* Fetch the latest SegmentInfos held by the shard's underlying Engine, wrapped
* by a a {@link GatedCloseable} to ensure files are not deleted/merged away.
*
* @throws EngineException - When segment infos cannot be safely retrieved
*/
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
return getEngine().getSegmentInfosSnapshot();
}
}
97 changes: 62 additions & 35 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,13 @@ public MetadataSnapshot getMetadata(IndexCommit commit) throws IOException {
return getMetadata(commit, false);
}

/**
* Convenience wrapper around the {@link #getMetadata(IndexCommit)} method for null input.
*/
public MetadataSnapshot getMetadata() throws IOException {
return getMetadata(null, false);
}

/**
* Returns a new MetadataSnapshot for the given commit. If the given commit is <code>null</code>
* the latest commit point is used.
Expand Down Expand Up @@ -315,6 +322,16 @@ public MetadataSnapshot getMetadata(IndexCommit commit, boolean lockDirectory) t
}
}

/**
* Returns a new {@link MetadataSnapshot} for the given {@link SegmentInfos} object.
* In contrast to {@link #getMetadata(IndexCommit)}, this method is useful for scenarios
* where we need to construct a MetadataSnapshot from an in-memory SegmentInfos object that
* may not have a IndexCommit associated with it, such as with segment replication.
*/
public MetadataSnapshot getMetadata(SegmentInfos segmentInfos) throws IOException {
return new MetadataSnapshot(segmentInfos, directory, logger);
}

/**
* Renames all the given files from the key of the map to the
* value of the map. All successfully renamed files are removed from the map in-place.
Expand Down Expand Up @@ -477,7 +494,7 @@ public static MetadataSnapshot readMetadataSnapshot(
Directory dir = new NIOFSDirectory(indexLocation)
) {
failIfCorrupted(dir);
return new MetadataSnapshot(null, dir, logger);
return new MetadataSnapshot((IndexCommit) null, dir, logger);
} catch (IndexNotFoundException ex) {
// that's fine - happens all the time no need to log
} catch (FileNotFoundException | NoSuchFileException ex) {
Expand Down Expand Up @@ -682,7 +699,7 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr
}
}
directory.syncMetaData();
final Store.MetadataSnapshot metadataOrEmpty = getMetadata(null);
final Store.MetadataSnapshot metadataOrEmpty = getMetadata();
verifyAfterCleanup(sourceMetadata, metadataOrEmpty);
} finally {
metadataLock.writeLock().unlock();
Expand Down Expand Up @@ -822,7 +839,14 @@ public MetadataSnapshot(Map<String, StoreFileMetadata> metadata, Map<String, Str
}

MetadataSnapshot(IndexCommit commit, Directory directory, Logger logger) throws IOException {
LoadedMetadata loadedMetadata = loadMetadata(commit, directory, logger);
this(loadMetadata(commit, directory, logger));
}

MetadataSnapshot(SegmentInfos segmentInfos, Directory directory, Logger logger) throws IOException {
this(loadMetadata(segmentInfos, directory, logger));
}

private MetadataSnapshot(LoadedMetadata loadedMetadata) {
metadata = loadedMetadata.fileMetadata;
commitUserData = loadedMetadata.userData;
numDocs = loadedMetadata.numDocs;
Expand Down Expand Up @@ -890,40 +914,9 @@ static class LoadedMetadata {
}

static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logger logger) throws IOException {
long numDocs;
Map<String, StoreFileMetadata> builder = new HashMap<>();
Map<String, String> commitUserDataBuilder = new HashMap<>();
try {
final SegmentInfos segmentCommitInfos = Store.readSegmentsInfo(commit, directory);
numDocs = Lucene.getNumDocs(segmentCommitInfos);
commitUserDataBuilder.putAll(segmentCommitInfos.getUserData());
// we don't know which version was used to write so we take the max version.
Version maxVersion = segmentCommitInfos.getMinSegmentLuceneVersion();
for (SegmentCommitInfo info : segmentCommitInfos) {
final Version version = info.info.getVersion();
if (version == null) {
// version is written since 3.1+: we should have already hit IndexFormatTooOld.
throw new IllegalArgumentException("expected valid version value: " + info.info.toString());
}
if (version.onOrAfter(maxVersion)) {
maxVersion = version;
}
for (String file : info.files()) {
checksumFromLuceneFile(
directory,
file,
builder,
logger,
version,
SEGMENT_INFO_EXTENSION.equals(IndexFileNames.getExtension(file))
);
}
}
if (maxVersion == null) {
maxVersion = org.opensearch.Version.CURRENT.minimumIndexCompatibilityVersion().luceneVersion;
}
final String segmentsFile = segmentCommitInfos.getSegmentsFileName();
checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true);
return loadMetadata(segmentCommitInfos, directory, logger);
} catch (CorruptIndexException | IndexNotFoundException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
// we either know the index is corrupted or it's just not there
throw ex;
Expand All @@ -949,6 +942,40 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg
}
throw ex;
}
}

static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory directory, Logger logger) throws IOException {
long numDocs = Lucene.getNumDocs(segmentInfos);
Map<String, String> commitUserDataBuilder = new HashMap<>();
commitUserDataBuilder.putAll(segmentInfos.getUserData());
Map<String, StoreFileMetadata> builder = new HashMap<>();
// we don't know which version was used to write so we take the max version.
Version maxVersion = segmentInfos.getMinSegmentLuceneVersion();
for (SegmentCommitInfo info : segmentInfos) {
final Version version = info.info.getVersion();
if (version == null) {
// version is written since 3.1+: we should have already hit IndexFormatTooOld.
throw new IllegalArgumentException("expected valid version value: " + info.info.toString());
}
if (version.onOrAfter(maxVersion)) {
maxVersion = version;
}
for (String file : info.files()) {
checksumFromLuceneFile(
directory,
file,
builder,
logger,
version,
SEGMENT_INFO_EXTENSION.equals(IndexFileNames.getExtension(file))
);
}
}
if (maxVersion == null) {
maxVersion = org.opensearch.Version.CURRENT.minimumIndexCompatibilityVersion().luceneVersion;
}
final String segmentsFile = segmentInfos.getSegmentsFileName();
checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true);
return new LoadedMetadata(unmodifiableMap(builder), unmodifiableMap(commitUserDataBuilder), numDocs);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.SegmentReplicationTransportRequest;

import java.io.IOException;

/**
* Request object for fetching segment metadata for a {@link ReplicationCheckpoint} from
* a {@link SegmentReplicationSource}. This object is created by the target node and sent
* to the source node.
*
* @opensearch.internal
*/
public class CheckpointInfoRequest extends SegmentReplicationTransportRequest {

private final ReplicationCheckpoint checkpoint;

public CheckpointInfoRequest(StreamInput in) throws IOException {
super(in);
checkpoint = new ReplicationCheckpoint(in);
}

public CheckpointInfoRequest(
long replicationId,
String targetAllocationId,
DiscoveryNode targetNode,
ReplicationCheckpoint checkpoint
) {
super(replicationId, targetAllocationId, targetNode);
this.checkpoint = checkpoint;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
checkpoint.writeTo(out);
}

public ReplicationCheckpoint getCheckpoint() {
return checkpoint;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.SegmentReplicationTransportRequest;

import java.io.IOException;
import java.util.List;

/**
* Request object for fetching a list of segment files metadata from a {@link SegmentReplicationSource}.
* This object is created by the target node and sent to the source node.
*
* @opensearch.internal
*/
public class GetSegmentFilesRequest extends SegmentReplicationTransportRequest {

private final List<StoreFileMetadata> filesToFetch;
private final ReplicationCheckpoint checkpoint;

public GetSegmentFilesRequest(StreamInput in) throws IOException {
super(in);
this.filesToFetch = in.readList(StoreFileMetadata::new);
this.checkpoint = new ReplicationCheckpoint(in);
}

public GetSegmentFilesRequest(
long replicationId,
String targetAllocationId,
DiscoveryNode targetNode,
List<StoreFileMetadata> filesToFetch,
ReplicationCheckpoint checkpoint
) {
super(replicationId, targetAllocationId, targetNode);
this.filesToFetch = filesToFetch;
this.checkpoint = checkpoint;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(filesToFetch);
checkpoint.writeTo(out);
}

public ReplicationCheckpoint getCheckpoint() {
return checkpoint;
}
}
Loading