From b642e69c85c1aa38e951589a635bdde8a5cf83f7 Mon Sep 17 00:00:00 2001 From: Harish Bhakuni Date: Wed, 14 Jun 2023 23:55:49 -0700 Subject: [PATCH 1/2] [Backport 2.x] [Snapshot Interop] Add Changes in Create Snapshot Flow for remote store interoperability. (#8071) Signed-off-by: Harish Bhakuni --- CHANGELOG.md | 1 + .../cluster/ClusterStateDiffIT.java | 3 +- .../cluster/SnapshotsInProgress.java | 59 ++- .../opensearch/index/shard/IndexShard.java | 40 ++ .../RemoteStoreShardShallowCopySnapshot.java | 411 ++++++++++++++++++ .../repositories/FilterRepository.java | 25 ++ .../repositories/RepositoriesService.java | 24 + .../opensearch/repositories/Repository.java | 58 ++- .../blobstore/BlobStoreRepository.java | 107 +++++ .../opensearch/snapshots/SnapshotInfo.java | 65 ++- .../snapshots/SnapshotShardsService.java | 143 ++++-- .../snapshots/SnapshotsService.java | 17 +- .../create/CreateSnapshotResponseTests.java | 3 +- .../get/GetSnapshotsResponseTests.java | 3 +- .../DeleteDataStreamRequestTests.java | 2 +- .../MetadataDeleteIndexServiceTests.java | 3 +- .../MetadataIndexStateServiceTests.java | 3 +- ...oteStoreShardShallowCopySnapshotTests.java | 228 ++++++++++ .../RepositoriesServiceTests.java | 15 + .../BlobStoreRepositoryRestoreTests.java | 3 +- .../blobstore/BlobStoreRepositoryTests.java | 304 ++++++++++++- .../snapshots/SnapshotInfoTests.java | 48 +- ...SnapshotsInProgressSerializationTests.java | 70 ++- .../snapshots/SnapshotsServiceTests.java | 3 +- ...ckEventuallyConsistentRepositoryTests.java | 9 +- .../AbstractSnapshotIntegTestCase.java | 3 +- 26 files changed, 1541 insertions(+), 109 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshot.java create mode 100644 server/src/test/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshotTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 02c6193a02c5d..bfadf4f34d6ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add new query profile collector fields with concurrent search execution ([#7898](https://github.com/opensearch-project/OpenSearch/pull/7898)) - Align range and default value for deletes_pct_allowed in merge policy ([#7730](https://github.com/opensearch-project/OpenSearch/pull/7730)) - Rename QueryPhase actors like Suggest, Rescore to be processors rather than phase ([#8025](https://github.com/opensearch-project/OpenSearch/pull/8025)) +- [Snapshot Interop] Add Changes in Create Snapshot Flow for remote store interoperability. ([#8071](https://github.com/opensearch-project/OpenSearch/pull/8071)) ### Deprecated diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/ClusterStateDiffIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/ClusterStateDiffIT.java index 7d26a0a31833b..d7275598a2e06 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/ClusterStateDiffIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/ClusterStateDiffIT.java @@ -776,7 +776,8 @@ public ClusterState.Custom randomCreate(String name) { ImmutableOpenMap.of(), null, SnapshotInfoTests.randomUserMetadata(), - randomVersion(random()) + randomVersion(random()), + false ) ) ); diff --git a/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java index 2ab429d4b996b..6b60eed9cb964 100644 --- a/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java @@ -123,7 +123,8 @@ public static Entry startedEntry( long repositoryStateId, ImmutableOpenMap shards, Map userMetadata, - Version version + Version version, + boolean remoteStoreIndexShallowCopy ) { return new SnapshotsInProgress.Entry( snapshot, @@ -137,7 +138,8 @@ public static Entry startedEntry( shards, null, userMetadata, - version + version, + remoteStoreIndexShallowCopy ); } @@ -174,7 +176,8 @@ public static Entry startClone( Collections.emptyMap(), version, source, - ImmutableOpenMap.of() + ImmutableOpenMap.of(), + false // TODO: need to pull this value from the original snapshot, use whatever we set during snapshot create. ); } @@ -187,6 +190,7 @@ public static class Entry implements Writeable, ToXContent, RepositoryOperation private final State state; private final Snapshot snapshot; private final boolean includeGlobalState; + private final boolean remoteStoreIndexShallowCopy; private final boolean partial; /** * Map of {@link ShardId} to {@link ShardSnapshotStatus} tracking the state of each shard snapshot operation. @@ -229,7 +233,8 @@ public Entry( ImmutableOpenMap shards, String failure, Map userMetadata, - Version version + Version version, + boolean remoteStoreIndexShallowCopy ) { this( snapshot, @@ -245,7 +250,8 @@ public Entry( userMetadata, version, null, - ImmutableOpenMap.of() + ImmutableOpenMap.of(), + remoteStoreIndexShallowCopy ); } @@ -263,7 +269,8 @@ private Entry( Map userMetadata, Version version, @Nullable SnapshotId source, - @Nullable ImmutableOpenMap clones + @Nullable ImmutableOpenMap clones, + boolean remoteStoreIndexShallowCopy ) { this.state = state; this.snapshot = snapshot; @@ -284,6 +291,7 @@ private Entry( } else { this.clones = clones; } + this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy; assert assertShardsConsistent(this.source, this.state, this.indices, this.shards, this.clones); } @@ -324,6 +332,11 @@ private Entry(StreamInput in) throws IOException { source = null; clones = ImmutableOpenMap.of(); } + if (in.getVersion().onOrAfter(Version.V_2_9_0)) { + remoteStoreIndexShallowCopy = in.readBoolean(); + } else { + remoteStoreIndexShallowCopy = false; + } } private static boolean assertShardsConsistent( @@ -378,7 +391,8 @@ public Entry( long repositoryStateId, ImmutableOpenMap shards, Map userMetadata, - Version version + Version version, + boolean remoteStoreIndexShallowCopy ) { this( snapshot, @@ -392,7 +406,8 @@ public Entry( shards, null, userMetadata, - version + version, + remoteStoreIndexShallowCopy ); } @@ -417,7 +432,8 @@ public Entry( shards, failure, entry.userMetadata, - version + version, + entry.remoteStoreIndexShallowCopy ); } @@ -441,7 +457,8 @@ public Entry withRepoGen(long newRepoGen) { userMetadata, version, source, - clones + clones, + remoteStoreIndexShallowCopy ); } @@ -463,7 +480,8 @@ public Entry withClones(ImmutableOpenMap userMetadata, version, source, - updatedClones + updatedClones, + remoteStoreIndexShallowCopy ); } @@ -518,7 +536,8 @@ public Entry fail(ImmutableOpenMap shards, State s userMetadata, version, source, - clones + clones, + remoteStoreIndexShallowCopy ); } @@ -544,7 +563,8 @@ public Entry withShardStates(ImmutableOpenMap shar shards, failure, userMetadata, - version + version, + remoteStoreIndexShallowCopy ); } return withStartedShards(shards); @@ -567,7 +587,8 @@ public Entry withStartedShards(ImmutableOpenMap sh shards, failure, userMetadata, - version + version, + remoteStoreIndexShallowCopy ); assert updated.state().completed() == false && completed(updated.shards().values()) == false : "Only running snapshots allowed but saw [" + updated + "]"; @@ -599,6 +620,10 @@ public boolean includeGlobalState() { return includeGlobalState; } + public boolean remoteStoreIndexShallowCopy() { + return remoteStoreIndexShallowCopy; + } + public Map userMetadata() { return userMetadata; } @@ -662,7 +687,7 @@ public boolean equals(Object o) { if (version.equals(entry.version) == false) return false; if (Objects.equals(source, ((Entry) o).source) == false) return false; if (clones.equals(((Entry) o).clones) == false) return false; - + if (remoteStoreIndexShallowCopy != entry.remoteStoreIndexShallowCopy) return false; return true; } @@ -679,6 +704,7 @@ public int hashCode() { result = 31 * result + version.hashCode(); result = 31 * result + (source == null ? 0 : source.hashCode()); result = 31 * result + clones.hashCode(); + result = 31 * result + (remoteStoreIndexShallowCopy ? 1 : 0); return result; } @@ -752,6 +778,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(source); out.writeMap(clones); } + if (out.getVersion().onOrAfter(Version.V_2_9_0)) { + out.writeBoolean(remoteStoreIndexShallowCopy); + } } @Override diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 90e746636b8de..f5e349eb54b99 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1486,6 +1486,46 @@ public GatedCloseable acquireLastIndexCommit(boolean flushFirst) th } } + public GatedCloseable acquireLastIndexCommitAndRefresh(boolean flushFirst) throws EngineException { + GatedCloseable indexCommit = acquireLastIndexCommit(flushFirst); + getEngine().refresh("Snapshot for Remote Store based Shard"); + return indexCommit; + } + + /** + * + * @param snapshotId Snapshot UUID. + * @param primaryTerm current primary term. + * @param generation Snapshot Commit Generation. + * @throws IOException if there is some failure in acquiring lock in remote store. + */ + public void acquireLockOnCommitData(String snapshotId, long primaryTerm, long generation) throws IOException { + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = getRemoteSegmentDirectoryForShard(); + remoteSegmentStoreDirectory.acquireLock(primaryTerm, generation, snapshotId); + } + + /** + * + * @param snapshotId Snapshot UUID. + * @param primaryTerm current primary term. + * @param generation Snapshot Commit Generation. + * @throws IOException if there is some failure in releasing lock in remote store. + */ + public void releaseLockOnCommitData(String snapshotId, long primaryTerm, long generation) throws IOException { + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = getRemoteSegmentDirectoryForShard(); + remoteSegmentStoreDirectory.releaseLock(primaryTerm, generation, snapshotId); + } + + private RemoteSegmentStoreDirectory getRemoteSegmentDirectoryForShard() { + FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory(); + assert remoteStoreDirectory.getDelegate() instanceof FilterDirectory + : "Store.directory is not enclosing an instance of FilterDirectory"; + FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate(); + final Directory remoteDirectory = byteSizeCachingStoreDirectory.getDelegate(); + assert remoteDirectory instanceof RemoteSegmentStoreDirectory : "remoteDirectory is not an instance of RemoteSegmentStoreDirectory"; + return ((RemoteSegmentStoreDirectory) remoteDirectory); + } + public Optional getReplicationEngine() { if (getEngine() instanceof NRTReplicationEngine) { return Optional.of((NRTReplicationEngine) getEngine()); diff --git a/server/src/main/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshot.java b/server/src/main/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshot.java new file mode 100644 index 0000000000000..8e6ed870c904f --- /dev/null +++ b/server/src/main/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshot.java @@ -0,0 +1,411 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.snapshots.blobstore; + +import org.opensearch.OpenSearchParseException; +import org.opensearch.core.ParseField; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Remote Store based Shard snapshot metadata + * + * @opensearch.internal + */ +public class RemoteStoreShardShallowCopySnapshot implements ToXContentFragment { + + private final String snapshot; + private final String version; + private final long indexVersion; + private final long startTime; + private final long time; + private final int totalFileCount; + private final long totalSize; + private final long primaryTerm; + private final long commitGeneration; + private final String remoteStoreRepository; + private final String repositoryBasePath; + private final String indexUUID; + private final List fileNames; + + static final String DEFAULT_VERSION = "1"; + static final String NAME = "name"; + static final String VERSION = "version"; + static final String INDEX_VERSION = "index_version"; + static final String START_TIME = "start_time"; + static final String TIME = "time"; + + static final String INDEX_UUID = "index_uuid"; + + static final String REMOTE_STORE_REPOSITORY = "remote_store_repository"; + static final String REPOSITORY_BASE_PATH = "remote_store_repository_base_path"; + static final String FILE_NAMES = "file_names"; + + static final String PRIMARY_TERM = "primary_term"; + + static final String COMMIT_GENERATION = "commit_generation"; + + static final String TOTAL_FILE_COUNT = "number_of_files"; + static final String TOTAL_SIZE = "total_size"; + + private static final ParseField PARSE_NAME = new ParseField(NAME); + private static final ParseField PARSE_VERSION = new ParseField(VERSION); + private static final ParseField PARSE_PRIMARY_TERM = new ParseField(PRIMARY_TERM); + private static final ParseField PARSE_COMMIT_GENERATION = new ParseField(COMMIT_GENERATION); + private static final ParseField PARSE_INDEX_VERSION = new ParseField(INDEX_VERSION, "index-version"); + private static final ParseField PARSE_START_TIME = new ParseField(START_TIME); + private static final ParseField PARSE_TIME = new ParseField(TIME); + private static final ParseField PARSE_TOTAL_FILE_COUNT = new ParseField(TOTAL_FILE_COUNT); + private static final ParseField PARSE_TOTAL_SIZE = new ParseField(TOTAL_SIZE); + private static final ParseField PARSE_INDEX_UUID = new ParseField(INDEX_UUID); + private static final ParseField PARSE_REMOTE_STORE_REPOSITORY = new ParseField(REMOTE_STORE_REPOSITORY); + private static final ParseField PARSE_REPOSITORY_BASE_PATH = new ParseField(REPOSITORY_BASE_PATH); + private static final ParseField PARSE_FILE_NAMES = new ParseField(FILE_NAMES); + + /** + * Serializes shard snapshot metadata info into JSON + * + * @param builder XContent builder + * @param params parameters + */ + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(VERSION, version); + builder.field(NAME, snapshot); + builder.field(INDEX_VERSION, indexVersion); + builder.field(START_TIME, startTime); + builder.field(TIME, time); + builder.field(TOTAL_FILE_COUNT, totalFileCount); + builder.field(TOTAL_SIZE, totalSize); + builder.field(INDEX_UUID, indexUUID); + builder.field(REMOTE_STORE_REPOSITORY, remoteStoreRepository); + builder.field(COMMIT_GENERATION, commitGeneration); + builder.field(PRIMARY_TERM, primaryTerm); + builder.field(REPOSITORY_BASE_PATH, repositoryBasePath); + builder.startArray(FILE_NAMES); + for (String fileName : fileNames) { + builder.value(fileName); + } + builder.endArray(); + return builder; + } + + public RemoteStoreShardShallowCopySnapshot( + String snapshot, + long indexVersion, + long primaryTerm, + long commitGeneration, + long startTime, + long time, + int totalFileCount, + long totalSize, + String indexUUID, + String remoteStoreRepository, + String repositoryBasePath, + List fileNames + ) { + this.version = DEFAULT_VERSION; + verifyParameters( + version, + snapshot, + indexVersion, + primaryTerm, + commitGeneration, + indexUUID, + remoteStoreRepository, + repositoryBasePath + ); + this.snapshot = snapshot; + this.indexVersion = indexVersion; + this.primaryTerm = primaryTerm; + this.commitGeneration = commitGeneration; + this.startTime = startTime; + this.time = time; + this.totalFileCount = totalFileCount; + this.totalSize = totalSize; + this.indexUUID = indexUUID; + this.remoteStoreRepository = remoteStoreRepository; + this.repositoryBasePath = repositoryBasePath; + this.fileNames = fileNames; + } + + private RemoteStoreShardShallowCopySnapshot( + String version, + String snapshot, + long indexVersion, + long primaryTerm, + long commitGeneration, + long startTime, + long time, + int totalFileCount, + long totalSize, + String indexUUID, + String remoteStoreRepository, + String repositoryBasePath, + List fileNames + ) { + verifyParameters( + version, + snapshot, + indexVersion, + primaryTerm, + commitGeneration, + indexUUID, + remoteStoreRepository, + repositoryBasePath + ); + this.version = version; + this.snapshot = snapshot; + this.indexVersion = indexVersion; + this.primaryTerm = primaryTerm; + this.commitGeneration = commitGeneration; + this.startTime = startTime; + this.time = time; + this.totalFileCount = totalFileCount; + this.totalSize = totalSize; + this.indexUUID = indexUUID; + this.remoteStoreRepository = remoteStoreRepository; + this.repositoryBasePath = repositoryBasePath; + this.fileNames = fileNames; + } + + /** + * Parses shard snapshot metadata + * + * @param parser parser + * @return shard snapshot metadata + */ + public static RemoteStoreShardShallowCopySnapshot fromXContent(XContentParser parser) throws IOException { + String snapshot = null; + String version = null; + long indexVersion = -1; + long startTime = 0; + long time = 0; + int totalFileCount = 0; + long totalSize = 0; + String indexUUID = null; + String remoteStoreRepository = null; + String repositoryBasePath = null; + long primaryTerm = -1; + long commitGeneration = -1; + List fileNames = new ArrayList<>(); + + if (parser.currentToken() == null) { // fresh parser? move to the first token + parser.nextToken(); + } + XContentParser.Token token; + String currentFieldName = parser.currentName(); + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token.isValue()) { + if (PARSE_NAME.match(currentFieldName, parser.getDeprecationHandler())) { + snapshot = parser.text(); + } else if (PARSE_VERSION.match(currentFieldName, parser.getDeprecationHandler())) { + version = parser.text(); + } else if (PARSE_INDEX_VERSION.match(currentFieldName, parser.getDeprecationHandler())) { + indexVersion = parser.longValue(); + } else if (PARSE_PRIMARY_TERM.match(currentFieldName, parser.getDeprecationHandler())) { + primaryTerm = parser.longValue(); + } else if (PARSE_COMMIT_GENERATION.match(currentFieldName, parser.getDeprecationHandler())) { + commitGeneration = parser.longValue(); + } else if (PARSE_START_TIME.match(currentFieldName, parser.getDeprecationHandler())) { + startTime = parser.longValue(); + } else if (PARSE_TIME.match(currentFieldName, parser.getDeprecationHandler())) { + time = parser.longValue(); + } else if (PARSE_TOTAL_FILE_COUNT.match(currentFieldName, parser.getDeprecationHandler())) { + totalFileCount = parser.intValue(); + } else if (PARSE_TOTAL_SIZE.match(currentFieldName, parser.getDeprecationHandler())) { + totalSize = parser.longValue(); + } else if (PARSE_INDEX_UUID.match(currentFieldName, parser.getDeprecationHandler())) { + indexUUID = parser.text(); + } else if (PARSE_REMOTE_STORE_REPOSITORY.match(currentFieldName, parser.getDeprecationHandler())) { + remoteStoreRepository = parser.text(); + } else if (PARSE_REPOSITORY_BASE_PATH.match(currentFieldName, parser.getDeprecationHandler())) { + repositoryBasePath = parser.text(); + } else { + throw new OpenSearchParseException("unknown parameter [{}]", currentFieldName); + } + } else if (token == XContentParser.Token.START_ARRAY) { + if (PARSE_FILE_NAMES.match(currentFieldName, parser.getDeprecationHandler())) { + while ((parser.nextToken()) != XContentParser.Token.END_ARRAY) { + fileNames.add(parser.text()); + } + } else { + parser.skipChildren(); + } + } else { + parser.skipChildren(); + } + } + + return new RemoteStoreShardShallowCopySnapshot( + version, + snapshot, + indexVersion, + primaryTerm, + commitGeneration, + startTime, + time, + totalFileCount, + totalSize, + indexUUID, + remoteStoreRepository, + repositoryBasePath, + fileNames + ); + } + + /** + * Returns shard primary Term during snapshot creation + * + * @return primary Term + */ + public long getPrimaryTerm() { + return primaryTerm; + } + + /** + * Returns snapshot commit generation + * + * @return commit Generation + */ + public long getCommitGeneration() { + return commitGeneration; + } + + /** + * Returns Index UUID + * + * @return index UUID + */ + public String getIndexUUID() { + return indexUUID; + } + + /** + * Returns Remote Store Repository Name + * + * @return remote store Repository Name + */ + public String getRemoteStoreRepository() { + return remoteStoreRepository; + } + + /** + * Returns Remote Store Repository Base Path + * + * @return repository base path + */ + public String getRepositoryBasePath() { + return repositoryBasePath; + } + + /** + * Returns snapshot name + * + * @return snapshot name + */ + public String snapshot() { + return snapshot; + } + + /** + * Returns list of files in the shard + * + * @return list of files + */ + + /** + * Returns snapshot start time + */ + public long startTime() { + return startTime; + } + + /** + * Returns snapshot running time + */ + public long time() { + return time; + } + + /** + * Returns incremental of files that were snapshotted + */ + public int incrementalFileCount() { + return 0; + } + + /** + * Returns total number of files that are referenced by this snapshot + */ + public int totalFileCount() { + return totalFileCount; + } + + /** + * Returns incremental of files size that were snapshotted + */ + public long incrementalSize() { + return 0; + } + + /** + * Returns total size of all files that where snapshotted + */ + public long totalSize() { + return totalSize; + } + + private void verifyParameters( + String version, + String snapshot, + long indexVersion, + long primaryTerm, + long commitGeneration, + String indexUUID, + String remoteStoreRepository, + String repositoryBasePath + ) { + String exceptionStr = null; + if (version == null) { + exceptionStr = "Invalid Version Provided"; + } + if (snapshot == null) { + exceptionStr = "Invalid/Missing Snapshot Name"; + } + if (indexVersion < 0) { + exceptionStr = "Invalid Index Version"; + } + if (primaryTerm < 0) { + exceptionStr = "Invalid Primary Term"; + } + if (commitGeneration < 0) { + exceptionStr = "Invalid Commit Generation"; + } + if (indexUUID == null) { + exceptionStr = "Invalid/Missing Index UUID"; + } + if (remoteStoreRepository == null) { + exceptionStr = "Invalid/Missing Remote Store Repository"; + } + if (repositoryBasePath == null) { + exceptionStr = "Invalid/Missing Repository Base Path"; + } + if (exceptionStr != null) { + throw new IllegalArgumentException(exceptionStr); + } + } +} diff --git a/server/src/main/java/org/opensearch/repositories/FilterRepository.java b/server/src/main/java/org/opensearch/repositories/FilterRepository.java index aaa021a0e8b93..cdc616cecec6c 100644 --- a/server/src/main/java/org/opensearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/opensearch/repositories/FilterRepository.java @@ -188,6 +188,31 @@ public void snapshotShard( ); } + @Override + public void snapshotRemoteStoreIndexShard( + Store store, + SnapshotId snapshotId, + IndexId indexId, + IndexCommit snapshotIndexCommit, + String shardStateIdentifier, + IndexShardSnapshotStatus snapshotStatus, + long primaryTerm, + long startTime, + ActionListener listener + ) { + in.snapshotRemoteStoreIndexShard( + store, + snapshotId, + indexId, + snapshotIndexCommit, + shardStateIdentifier, + snapshotStatus, + primaryTerm, + startTime, + listener + ); + } + @Override public void restoreShard( Store store, diff --git a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java index c438abedfd37c..9502ee2962000 100644 --- a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java @@ -35,6 +35,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; import org.opensearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest; @@ -79,6 +80,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.opensearch.repositories.blobstore.BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY; + /** * Service responsible for maintaining and providing access to snapshot repositories on nodes. * @@ -160,6 +163,7 @@ public void registerRepository(final PutRepositoryRequest request, final ActionL final RepositoryMetadata newRepositoryMetadata = new RepositoryMetadata(request.name(), request.type(), request.settings()); validate(request.name()); + validateRepositoryMetadataSettings(clusterService, request.name(), request.settings()); final ActionListener registrationListener; if (request.verify()) { @@ -605,6 +609,26 @@ private static void validate(final String repositoryName) { } } + public static void validateRepositoryMetadataSettings( + ClusterService clusterService, + final String repositoryName, + final Settings repositoryMetadataSettings + ) { + // We can add more validations here for repository settings in the future. + Version minVersionInCluster = clusterService.state().getNodes().getMinNodeVersion(); + if (REMOTE_STORE_INDEX_SHALLOW_COPY.get(repositoryMetadataSettings) && !minVersionInCluster.onOrAfter(Version.V_2_9_0)) { + throw new RepositoryException( + repositoryName, + "setting " + + REMOTE_STORE_INDEX_SHALLOW_COPY.getKey() + + " cannot be enabled as some of the nodes in cluster are on version older than " + + Version.V_2_9_0 + + ". Minimum node version in cluster is: " + + minVersionInCluster + ); + } + } + private static void ensureRepositoryNotInUse(ClusterState clusterState, String repository) { if (isRepositoryInUse(clusterState, repository)) { throw new IllegalStateException("trying to modify or unregister repository that is currently used"); diff --git a/server/src/main/java/org/opensearch/repositories/Repository.java b/server/src/main/java/org/opensearch/repositories/Repository.java index a16e0e8d441bc..741237ce238a6 100644 --- a/server/src/main/java/org/opensearch/repositories/Repository.java +++ b/server/src/main/java/org/opensearch/repositories/Repository.java @@ -238,18 +238,18 @@ default RepositoryStats stats() { *

* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check * {@link IndexShardSnapshotStatus#isAborted()} to see if the snapshot process should be aborted. - * @param store store to be snapshotted - * @param mapperService the shards mapper service - * @param snapshotId snapshot id - * @param indexId id for the index being snapshotted - * @param snapshotIndexCommit commit point - * @param shardStateIdentifier a unique identifier of the state of the shard that is stored with the shard's snapshot and used - * to detect if the shard has changed between snapshots. If {@code null} is passed as the identifier - * snapshotting will be done by inspecting the physical files referenced by {@code snapshotIndexCommit} - * @param snapshotStatus snapshot status - * @param repositoryMetaVersion version of the updated repository metadata to write - * @param userMetadata user metadata of the snapshot found in {@link SnapshotsInProgress.Entry#userMetadata()} - * @param listener listener invoked on completion + * @param store store to be snapshotted + * @param mapperService the shards mapper service + * @param snapshotId snapshot id + * @param indexId id for the index being snapshotted + * @param snapshotIndexCommit commit point + * @param shardStateIdentifier a unique identifier of the state of the shard that is stored with the shard's snapshot and used + * to detect if the shard has changed between snapshots. If {@code null} is passed as the identifier + * snapshotting will be done by inspecting the physical files referenced by {@code snapshotIndexCommit} + * @param snapshotStatus snapshot status + * @param repositoryMetaVersion version of the updated repository metadata to write + * @param userMetadata user metadata of the snapshot found in {@link SnapshotsInProgress.Entry#userMetadata()} + * @param listener listener invoked on completion */ void snapshotShard( Store store, @@ -264,6 +264,40 @@ void snapshotShard( ActionListener listener ); + /** + * Adds a reference of remote store data for a index commit point. + *

+ * The index commit point can be obtained by using {@link org.opensearch.index.engine.Engine#acquireLastIndexCommit} method. + * Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller. + *

+ * As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check + * {@link IndexShardSnapshotStatus#isAborted()} to see if the snapshot process should be aborted. + * @param store store to be snapshotted + * @param snapshotId snapshot id + * @param indexId id for the index being snapshotted + * @param snapshotIndexCommit commit point + * @param shardStateIdentifier a unique identifier of the state of the shard that is stored with the shard's snapshot and used + * to detect if the shard has changed between snapshots. If {@code null} is passed as the identifier + * snapshotting will be done by inspecting the physical files referenced by {@code snapshotIndexCommit} + * @param snapshotStatus snapshot status + * @param primaryTerm current Primary Term + * @param startTime start time of the snapshot commit, this will be used as the start time for snapshot. + * @param listener listener invoked on completion + */ + default void snapshotRemoteStoreIndexShard( + Store store, + SnapshotId snapshotId, + IndexId indexId, + IndexCommit snapshotIndexCommit, + @Nullable String shardStateIdentifier, + IndexShardSnapshotStatus snapshotStatus, + long primaryTerm, + long startTime, + ActionListener listener + ) { + throw new UnsupportedOperationException(); + } + /** * Restores snapshot of the shard. *

diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 793eac94ec95d..8785271a2d52e 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -107,6 +107,7 @@ import org.opensearch.index.snapshots.IndexShardSnapshotFailedException; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot; import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots; import org.opensearch.index.snapshots.blobstore.RateLimitingInputStream; import org.opensearch.index.snapshots.blobstore.SlicedInputStream; @@ -184,6 +185,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp public static final String SNAPSHOT_PREFIX = "snap-"; + public static final String SHALLOW_SNAPSHOT_PREFIX = "shallow-snap-"; + public static final String INDEX_FILE_PREFIX = "index-"; public static final String INDEX_LATEST_BLOB = "index.latest"; @@ -196,6 +199,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp public static final String SNAPSHOT_NAME_FORMAT = SNAPSHOT_PREFIX + "%s.dat"; + public static final String SHALLOW_SNAPSHOT_NAME_FORMAT = SHALLOW_SNAPSHOT_PREFIX + "%s.dat"; + private static final String SNAPSHOT_INDEX_PREFIX = "index-"; private static final String SNAPSHOT_INDEX_NAME_FORMAT = SNAPSHOT_INDEX_PREFIX + "%s"; @@ -241,6 +246,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp Setting.Property.NodeScope ); + public static final Setting REMOTE_STORE_INDEX_SHALLOW_COPY = Setting.boolSetting("remote_store_index_shallow_copy", false); + /** * Setting to set batch size of stale snapshot shard blobs that will be deleted by snapshot workers as part of snapshot deletion. * For optimal performance the value of the setting should be equal to or close to repository's max # of keys that can be deleted in single operation @@ -314,6 +321,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp BlobStoreIndexShardSnapshot::fromXContent ); + public static final ChecksumBlobStoreFormat REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT = + new ChecksumBlobStoreFormat<>(SNAPSHOT_CODEC, SHALLOW_SNAPSHOT_NAME_FORMAT, RemoteStoreShardShallowCopySnapshot::fromXContent); + public static final ChecksumBlobStoreFormat INDEX_SHARD_SNAPSHOTS_FORMAT = new ChecksumBlobStoreFormat<>( "snapshots", SNAPSHOT_INDEX_NAME_FORMAT, @@ -2359,6 +2369,85 @@ private void writeAtomic(BlobContainer container, final String blobName, final B } } + @Override + public void snapshotRemoteStoreIndexShard( + Store store, + SnapshotId snapshotId, + IndexId indexId, + IndexCommit snapshotIndexCommit, + String shardStateIdentifier, + IndexShardSnapshotStatus snapshotStatus, + long primaryTerm, + long startTime, + ActionListener listener + ) { + if (isReadOnly()) { + listener.onFailure(new RepositoryException(metadata.name(), "cannot snapshot shard on a readonly repository")); + return; + } + final ShardId shardId = store.shardId(); + try { + final String generation = snapshotStatus.generation(); + logger.info("[{}] [{}] snapshot to [{}] [{}] ...", shardId, snapshotId, metadata.name(), generation); + final BlobContainer shardContainer = shardContainer(indexId, shardId); + + long indexTotalFileSize = 0; + // local store is being used here to fetch the files metadata instead of remote store as currently + // remote store is mirroring the local store. + List fileNames = new ArrayList<>(snapshotIndexCommit.getFileNames()); + Store.MetadataSnapshot commitSnapshotMetadata = store.getMetadata(snapshotIndexCommit); + for (String fileName : fileNames) { + indexTotalFileSize += commitSnapshotMetadata.get(fileName).length(); + } + int indexTotalNumberOfFiles = fileNames.size(); + + snapshotStatus.moveToStarted( + startTime, + 0, // incremental File Count is zero as we are storing the data as part of remote store. + indexTotalNumberOfFiles, + 0, // incremental File Size is zero as we are storing the data as part of remote store. + indexTotalFileSize + ); + + final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize(snapshotIndexCommit.getGeneration()); + + // now create and write the commit point + logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId); + try { + REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.write( + new RemoteStoreShardShallowCopySnapshot( + snapshotId.getName(), + lastSnapshotStatus.getIndexVersion(), + primaryTerm, + snapshotIndexCommit.getGeneration(), + lastSnapshotStatus.getStartTime(), + threadPool.absoluteTimeInMillis() - lastSnapshotStatus.getStartTime(), + indexTotalNumberOfFiles, + indexTotalFileSize, + store.indexSettings().getUUID(), + store.indexSettings().getRemoteStoreRepository(), + this.basePath().toString(), + fileNames + ), + shardContainer, + snapshotId.getUUID(), + compressor + ); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException( + shardId, + "Failed to write commit point for snapshot " + snapshotId.getName() + "(" + snapshotId.getUUID() + ")", + e + ); + } + snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis(), generation); + listener.onResponse(generation); + + } catch (Exception e) { + listener.onFailure(e); + } + } + @Override public void snapshotShard( Store store, @@ -2990,6 +3079,24 @@ private static List unusedBlobs( .collect(Collectors.toList()); } + /** + * Loads information about remote store enabled shard snapshot for remote store interop enabled snapshots + */ + public RemoteStoreShardShallowCopySnapshot loadShallowCopyShardSnapshot(BlobContainer shardContainer, SnapshotId snapshotId) { + try { + return REMOTE_STORE_SHARD_SHALLOW_COPY_SNAPSHOT_FORMAT.read(shardContainer, snapshotId.getUUID(), namedXContentRegistry); + } catch (NoSuchFileException ex) { + throw new SnapshotMissingException(metadata.name(), snapshotId, ex); + } catch (IOException ex) { + throw new SnapshotException( + metadata.name(), + snapshotId, + "failed to read shard snapshot file for [" + shardContainer.path() + ']', + ex + ); + } + } + /** * Loads information about shard snapshot */ diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java b/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java index f91a71e21ca00..e61154bd35e6b 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java @@ -98,6 +98,8 @@ public final class SnapshotInfo implements Comparable, ToXContent, private static final String TOTAL_SHARDS = "total_shards"; private static final String SUCCESSFUL_SHARDS = "successful_shards"; private static final String INCLUDE_GLOBAL_STATE = "include_global_state"; + + private static final String REMOTE_STORE_INDEX_SHALLOW_COPY = "remote_store_index_shallow_copy"; private static final String USER_METADATA = "metadata"; private static final Comparator COMPARATOR = Comparator.comparing(SnapshotInfo::startTime) @@ -119,6 +121,8 @@ public static final class SnapshotInfoBuilder { private long endTime = 0L; private ShardStatsBuilder shardStatsBuilder = null; private Boolean includeGlobalState = null; + + private Boolean remoteStoreIndexShallowCopy = null; private Map userMetadata = null; private int version = -1; private List shardFailures = null; @@ -171,6 +175,10 @@ private void setVersion(int version) { this.version = version; } + private void setRemoteStoreIndexShallowCopy(Boolean remoteStoreIndexShallowCopy) { + this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy; + } + private void setShardFailures(List shardFailures) { this.shardFailures = shardFailures; } @@ -209,7 +217,8 @@ public SnapshotInfo build() { successfulShards, shardFailures, includeGlobalState, - userMetadata + userMetadata, + remoteStoreIndexShallowCopy ); } } @@ -260,6 +269,10 @@ int getSuccessfulShards() { SNAPSHOT_INFO_PARSER.declareBoolean(SnapshotInfoBuilder::setIncludeGlobalState, new ParseField(INCLUDE_GLOBAL_STATE)); SNAPSHOT_INFO_PARSER.declareObject(SnapshotInfoBuilder::setUserMetadata, (p, c) -> p.map(), new ParseField(USER_METADATA)); SNAPSHOT_INFO_PARSER.declareInt(SnapshotInfoBuilder::setVersion, new ParseField(VERSION_ID)); + SNAPSHOT_INFO_PARSER.declareBoolean( + SnapshotInfoBuilder::setRemoteStoreIndexShallowCopy, + new ParseField(REMOTE_STORE_INDEX_SHALLOW_COPY) + ); SNAPSHOT_INFO_PARSER.declareObjectArray( SnapshotInfoBuilder::setShardFailures, SnapshotShardFailure.SNAPSHOT_SHARD_FAILURE_PARSER, @@ -293,6 +306,9 @@ int getSuccessfulShards() { @Nullable private Boolean includeGlobalState; + @Nullable + private Boolean remoteStoreIndexShallowCopy; + @Nullable private final Map userMetadata; @@ -302,11 +318,11 @@ int getSuccessfulShards() { private final List shardFailures; public SnapshotInfo(SnapshotId snapshotId, List indices, List dataStreams, SnapshotState state) { - this(snapshotId, indices, dataStreams, state, null, null, 0L, 0L, 0, 0, Collections.emptyList(), null, null); + this(snapshotId, indices, dataStreams, state, null, null, 0L, 0L, 0, 0, Collections.emptyList(), null, null, null); } public SnapshotInfo(SnapshotId snapshotId, List indices, List dataStreams, SnapshotState state, Version version) { - this(snapshotId, indices, dataStreams, state, null, version, 0L, 0L, 0, 0, Collections.emptyList(), null, null); + this(snapshotId, indices, dataStreams, state, null, version, 0L, 0L, 0, 0, Collections.emptyList(), null, null, null); } public SnapshotInfo(SnapshotsInProgress.Entry entry) { @@ -323,7 +339,8 @@ public SnapshotInfo(SnapshotsInProgress.Entry entry) { 0, Collections.emptyList(), entry.includeGlobalState(), - entry.userMetadata() + entry.userMetadata(), + entry.remoteStoreIndexShallowCopy() ); } @@ -337,7 +354,8 @@ public SnapshotInfo( int totalShards, List shardFailures, Boolean includeGlobalState, - Map userMetadata + Map userMetadata, + Boolean remoteStoreIndexShallowCopy ) { this( snapshotId, @@ -352,7 +370,8 @@ public SnapshotInfo( totalShards - shardFailures.size(), shardFailures, includeGlobalState, - userMetadata + userMetadata, + remoteStoreIndexShallowCopy ); } @@ -369,7 +388,8 @@ public SnapshotInfo( int successfulShards, List shardFailures, Boolean includeGlobalState, - Map userMetadata + Map userMetadata, + Boolean remoteStoreIndexShallowCopy ) { this.snapshotId = Objects.requireNonNull(snapshotId); this.indices = Collections.unmodifiableList(Objects.requireNonNull(indices)); @@ -384,6 +404,7 @@ public SnapshotInfo( this.shardFailures = Objects.requireNonNull(shardFailures); this.includeGlobalState = includeGlobalState; this.userMetadata = userMetadata; + this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy; } /** @@ -411,6 +432,9 @@ public SnapshotInfo(final StreamInput in) throws IOException { } else { dataStreams = Collections.emptyList(); } + if (in.getVersion().onOrAfter(Version.V_2_9_0)) { + remoteStoreIndexShallowCopy = in.readOptionalBoolean(); + } } /** @@ -520,6 +544,10 @@ public Boolean includeGlobalState() { return includeGlobalState; } + public Boolean isRemoteStoreIndexShallowCopyEnabled() { + return remoteStoreIndexShallowCopy; + } + /** * Returns shard failures; an empty list will be returned if there were no shard * failures, or if {@link #state()} returns {@code null}. @@ -585,6 +613,8 @@ public String toString() { + version + ", shardFailures=" + shardFailures + + ", isRemoteStoreInteropEnabled=" + + remoteStoreIndexShallowCopy + '}'; } @@ -621,6 +651,9 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa builder.field(VERSION_ID, version.id); builder.field(VERSION, version.toString()); } + if (remoteStoreIndexShallowCopy != null) { + builder.field(REMOTE_STORE_INDEX_SHALLOW_COPY, remoteStoreIndexShallowCopy); + } builder.startArray(INDICES); for (String index : indices) { builder.value(index); @@ -676,6 +709,9 @@ private XContentBuilder toXContentInternal(final XContentBuilder builder, final builder.field(UUID, snapshotId.getUUID()); assert version != null : "version must always be known when writing a snapshot metadata blob"; builder.field(VERSION_ID, version.id); + if (remoteStoreIndexShallowCopy != null) { + builder.field(REMOTE_STORE_INDEX_SHALLOW_COPY, remoteStoreIndexShallowCopy); + } builder.startArray(INDICES); for (String index : indices) { builder.value(index); @@ -725,6 +761,7 @@ public static SnapshotInfo fromXContentInternal(final XContentParser parser) thr int totalShards = 0; int successfulShards = 0; Boolean includeGlobalState = null; + Boolean remoteStoreIndexShallowCopy = null; Map userMetadata = null; List shardFailures = Collections.emptyList(); if (parser.currentToken() == null) { // fresh parser? move to the first token @@ -762,6 +799,8 @@ public static SnapshotInfo fromXContentInternal(final XContentParser parser) thr version = Version.fromId(parser.intValue()); } else if (INCLUDE_GLOBAL_STATE.equals(currentFieldName)) { includeGlobalState = parser.booleanValue(); + } else if (REMOTE_STORE_INDEX_SHALLOW_COPY.equals(currentFieldName)) { + remoteStoreIndexShallowCopy = parser.booleanValue(); } } else if (token == XContentParser.Token.START_ARRAY) { if (DATA_STREAMS.equals(currentFieldName)) { @@ -813,7 +852,8 @@ public static SnapshotInfo fromXContentInternal(final XContentParser parser) thr successfulShards, shardFailures, includeGlobalState, - userMetadata + userMetadata, + remoteStoreIndexShallowCopy ); } @@ -846,6 +886,9 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeStringCollection(dataStreams); } } + if (out.getVersion().onOrAfter(Version.V_2_9_0)) { + out.writeOptionalBoolean(remoteStoreIndexShallowCopy); + } } private static SnapshotState snapshotState(final String reason, final List shardFailures) { @@ -877,7 +920,8 @@ public boolean equals(Object o) { && Objects.equals(includeGlobalState, that.includeGlobalState) && Objects.equals(version, that.version) && Objects.equals(shardFailures, that.shardFailures) - && Objects.equals(userMetadata, that.userMetadata); + && Objects.equals(userMetadata, that.userMetadata) + && Objects.equals(remoteStoreIndexShallowCopy, that.remoteStoreIndexShallowCopy); } @Override @@ -896,7 +940,8 @@ public int hashCode() { includeGlobalState, version, shardFailures, - userMetadata + userMetadata, + remoteStoreIndexShallowCopy ); } } diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java index 83265dc8fc61b..15908b9ba8213 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java @@ -287,38 +287,47 @@ private void startNewShards(SnapshotsInProgress.Entry entry, Map() { - @Override - public void onResponse(String newGeneration) { - assert newGeneration != null; - assert newGeneration.equals(snapshotStatus.generation()); - if (logger.isDebugEnabled()) { - final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy(); - logger.debug( - "snapshot [{}] completed to [{}] with [{}] at generation [{}]", - snapshot, - snapshot.getRepository(), - lastSnapshotStatus, - snapshotStatus.generation() - ); + snapshot( + shardId, + snapshot, + indexId, + entry.userMetadata(), + snapshotStatus, + entry.version(), + entry.remoteStoreIndexShallowCopy(), + new ActionListener<>() { + @Override + public void onResponse(String newGeneration) { + assert newGeneration != null; + assert newGeneration.equals(snapshotStatus.generation()); + if (logger.isDebugEnabled()) { + final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy(); + logger.debug( + "snapshot [{}] completed to [{}] with [{}] at generation [{}]", + snapshot, + snapshot.getRepository(), + lastSnapshotStatus, + snapshotStatus.generation() + ); + } + notifySuccessfulSnapshotShard(snapshot, shardId, newGeneration); } - notifySuccessfulSnapshotShard(snapshot, shardId, newGeneration); - } - @Override - public void onFailure(Exception e) { - final String failure; - if (e instanceof AbortedSnapshotException) { - failure = "aborted"; - logger.debug(() -> new ParameterizedMessage("[{}][{}] aborted shard snapshot", shardId, snapshot), e); - } else { - failure = summarizeFailure(e); - logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e); + @Override + public void onFailure(Exception e) { + final String failure; + if (e instanceof AbortedSnapshotException) { + failure = "aborted"; + logger.debug(() -> new ParameterizedMessage("[{}][{}] aborted shard snapshot", shardId, snapshot), e); + } else { + failure = summarizeFailure(e); + logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e); + } + snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), failure); + notifyFailedSnapshotShard(snapshot, shardId, failure); } - snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), failure); - notifyFailedSnapshotShard(snapshot, shardId, failure); } - }); + ); } } }); @@ -370,6 +379,7 @@ private void snapshot( final Map userMetadata, final IndexShardSnapshotStatus snapshotStatus, Version version, + final boolean remoteStoreIndexShallowCopy, ActionListener listener ) { try { @@ -391,21 +401,68 @@ private void snapshot( final Repository repository = repositoriesService.repository(snapshot.getRepository()); GatedCloseable wrappedSnapshot = null; try { - // we flush first to make sure we get the latest writes snapshotted - wrappedSnapshot = indexShard.acquireLastIndexCommit(true); - final IndexCommit snapshotIndexCommit = wrappedSnapshot.get(); - repository.snapshotShard( - indexShard.store(), - indexShard.mapperService(), - snapshot.getSnapshotId(), - indexId, - wrappedSnapshot.get(), - getShardStateId(indexShard, snapshotIndexCommit), - snapshotStatus, - version, - userMetadata, - ActionListener.runBefore(listener, wrappedSnapshot::close) - ); + if (remoteStoreIndexShallowCopy && indexShard.indexSettings().isRemoteStoreEnabled()) { + long startTime = threadPool.relativeTimeInMillis(); + // we flush first to make sure we get the latest writes snapshotted + wrappedSnapshot = indexShard.acquireLastIndexCommitAndRefresh(true); + long primaryTerm = indexShard.getOperationPrimaryTerm(); + final IndexCommit snapshotIndexCommit = wrappedSnapshot.get(); + long commitGeneration = snapshotIndexCommit.getGeneration(); + indexShard.acquireLockOnCommitData(snapshot.getSnapshotId().getUUID(), primaryTerm, commitGeneration); + try { + repository.snapshotRemoteStoreIndexShard( + indexShard.store(), + snapshot.getSnapshotId(), + indexId, + wrappedSnapshot.get(), + getShardStateId(indexShard, snapshotIndexCommit), + snapshotStatus, + primaryTerm, + startTime, + ActionListener.runBefore(listener, wrappedSnapshot::close) + ); + } catch (IndexShardSnapshotFailedException e) { + logger.error( + "Shallow Copy Snapshot Failed for Shard [" + + indexId.getName() + + "][" + + shardId.getId() + + "] for snapshot " + + snapshot.getSnapshotId() + + ", releasing acquired lock from remote store" + ); + indexShard.releaseLockOnCommitData(snapshot.getSnapshotId().getUUID(), primaryTerm, commitGeneration); + throw e; + } + long endTime = threadPool.relativeTimeInMillis(); + logger.debug( + "Time taken (in milliseconds) to complete shallow copy snapshot, " + + "for index " + + indexId.getName() + + ", shard " + + shardId.getId() + + " and snapshot " + + snapshot.getSnapshotId() + + " is " + + (endTime - startTime) + ); + } else { + // we flush first to make sure we get the latest writes snapshotted + wrappedSnapshot = indexShard.acquireLastIndexCommit(true); + final IndexCommit snapshotIndexCommit = wrappedSnapshot.get(); + repository.snapshotShard( + indexShard.store(), + indexShard.mapperService(), + snapshot.getSnapshotId(), + indexId, + wrappedSnapshot.get(), + getShardStateId(indexShard, snapshotIndexCommit), + snapshotStatus, + version, + userMetadata, + ActionListener.runBefore(listener, wrappedSnapshot::close) + ); + } } catch (Exception e) { IOUtils.close(wrappedSnapshot); throw e; diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index bd57b44fdbb47..f89260e35b38a 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -134,6 +134,7 @@ import static java.util.Collections.emptySet; import static java.util.Collections.unmodifiableList; import static org.opensearch.cluster.SnapshotsInProgress.completed; +import static org.opensearch.repositories.blobstore.BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY; import static org.opensearch.snapshots.SnapshotUtils.validateSnapshotsBackingAnyIndex; /** @@ -337,6 +338,7 @@ public ClusterState execute(ClusterState currentState) { ); logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices); + boolean remoteStoreIndexShallowCopy = REMOTE_STORE_INDEX_SHALLOW_COPY.get(repository.getMetadata().settings()); newEntry = new SnapshotsInProgress.Entry( new Snapshot(repositoryName, snapshotId), request.includeGlobalState(), @@ -348,7 +350,8 @@ public ClusterState execute(ClusterState currentState) { RepositoryData.UNKNOWN_REPO_GEN, ImmutableOpenMap.of(), userMeta, - Version.CURRENT + Version.CURRENT, + remoteStoreIndexShallowCopy ); initializingSnapshots.add(newEntry.snapshot()); snapshots = SnapshotsInProgress.of(Collections.singletonList(newEntry)); @@ -425,6 +428,7 @@ public void createSnapshot(final CreateSnapshotRequest request, final ActionList // retries final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot Repository repository = repositoriesService.repository(request.repository()); + if (repository.isReadOnly()) { listener.onFailure(new RepositoryException(repository.getMetadata().name(), "cannot create snapshot in a readonly repository")); return; @@ -514,6 +518,8 @@ public ClusterState execute(ClusterState currentState) { ); } } + + boolean remoteStoreIndexShallowCopy = REMOTE_STORE_INDEX_SHALLOW_COPY.get(repository.getMetadata().settings()); newEntry = SnapshotsInProgress.startedEntry( new Snapshot(repositoryName, snapshotId), request.includeGlobalState(), @@ -524,7 +530,8 @@ public ClusterState execute(ClusterState currentState) { repositoryData.getGenId(), shards, userMeta, - version + version, + remoteStoreIndexShallowCopy ); final List newEntries = new ArrayList<>(runningSnapshots); newEntries.add(newEntry); @@ -1034,7 +1041,8 @@ protected void doRun() { repositoryData.getGenId(), ImmutableOpenMap.of(), snapshot.userMetadata(), - version + version, + snapshot.remoteStoreIndexShallowCopy() ), clusterState.metadata(), repositoryData @@ -1840,7 +1848,8 @@ private void finalizeSnapshotEntry(SnapshotsInProgress.Entry entry, Metadata met entry.partial() ? shardGenerations.totalShards() : entry.shards().size(), shardFailures, entry.includeGlobalState(), - entry.userMetadata() + entry.userMetadata(), + entry.remoteStoreIndexShallowCopy() ); final StepListener metadataListener = new StepListener<>(); final Repository repo = repositoriesService.repository(snapshot.getRepository()); diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/create/CreateSnapshotResponseTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/create/CreateSnapshotResponseTests.java index 64d199a51493a..7a294094a21d8 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/create/CreateSnapshotResponseTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/create/CreateSnapshotResponseTests.java @@ -94,7 +94,8 @@ protected CreateSnapshotResponse createTestInstance() { totalShards, shardFailures, globalState, - SnapshotInfoTests.randomUserMetadata() + SnapshotInfoTests.randomUserMetadata(), + false ) ); } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/get/GetSnapshotsResponseTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/get/GetSnapshotsResponseTests.java index 1116e156b28cc..02b8f47f0e1de 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/get/GetSnapshotsResponseTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/get/GetSnapshotsResponseTests.java @@ -76,7 +76,8 @@ protected GetSnapshotsResponse createTestInstance() { randomIntBetween(2, 3), shardFailures, randomBoolean(), - SnapshotInfoTests.randomUserMetadata() + SnapshotInfoTests.randomUserMetadata(), + false ) ); } diff --git a/server/src/test/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java b/server/src/test/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java index e33b873a52f19..aff1af0d0a101 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java @@ -171,7 +171,7 @@ private SnapshotsInProgress.Entry createEntry(String dataStreamName, String repo ImmutableOpenMap.of(), null, null, - null + false ); } diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataDeleteIndexServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataDeleteIndexServiceTests.java index dbff833cfee60..c526dafd294ae 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataDeleteIndexServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataDeleteIndexServiceTests.java @@ -111,7 +111,8 @@ public void testDeleteSnapshotting() { ImmutableOpenMap.of(), null, SnapshotInfoTests.randomUserMetadata(), - VersionUtils.randomVersion(random()) + VersionUtils.randomVersion(random()), + false ) ) ); diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexStateServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexStateServiceTests.java index 5d476b2dbdca5..8ad3a39daee0c 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexStateServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexStateServiceTests.java @@ -530,7 +530,8 @@ private static ClusterState addSnapshotIndex(final String index, final int numSh shardsBuilder.build(), null, SnapshotInfoTests.randomUserMetadata(), - VersionUtils.randomVersion(random()) + VersionUtils.randomVersion(random()), + false ); return ClusterState.builder(newState) .putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(Collections.singletonList(entry))) diff --git a/server/src/test/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshotTests.java b/server/src/test/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshotTests.java new file mode 100644 index 0000000000000..1cb6579c0f1f2 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/snapshots/blobstore/RemoteStoreShardShallowCopySnapshotTests.java @@ -0,0 +1,228 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.snapshots.blobstore; + +import org.opensearch.common.Strings; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +import static org.hamcrest.Matchers.containsString; + +public class RemoteStoreShardShallowCopySnapshotTests extends OpenSearchTestCase { + + public void testToXContent() throws IOException { + String snapshot = "test-snapshot"; + long indexVersion = 1; + long primaryTerm = 3; + long commitGeneration = 5; + long startTime = 123; + long time = 123; + int totalFileCount = 5; + long totalSize = 5; + String indexUUID = "syzhajds-ashdlfj"; + String remoteStoreRepository = "test-rs-repository"; + String repositoryBasePath = "test-repo-basepath"; + List fileNames = new ArrayList<>(5); + fileNames.addAll(Arrays.asList("file1", "file2", "file3", "file4", "file5")); + RemoteStoreShardShallowCopySnapshot shardShallowCopySnapshot = new RemoteStoreShardShallowCopySnapshot( + snapshot, + indexVersion, + primaryTerm, + commitGeneration, + startTime, + time, + totalFileCount, + totalSize, + indexUUID, + remoteStoreRepository, + repositoryBasePath, + fileNames + ); + String actual; + try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) { + builder.startObject(); + shardShallowCopySnapshot.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + actual = Strings.toString(builder); + } + String expectedXContent = "{\"version\":\"1\",\"name\":\"test-snapshot\",\"index_version\":1,\"start_time\":123,\"time\":123," + + "\"number_of_files\":5,\"total_size\":5,\"index_uuid\":\"syzhajds-ashdlfj\",\"remote_store_repository\":" + + "\"test-rs-repository\",\"commit_generation\":5,\"primary_term\":3,\"remote_store_repository_base_path\":" + + "\"test-repo-basepath\",\"file_names\":[\"file1\",\"file2\",\"file3\",\"file4\",\"file5\"]}"; + assert Objects.equals(actual, expectedXContent) : "xContent is " + actual; + } + + public void testFromXContent() throws IOException { + String snapshot = "test-snapshot"; + long indexVersion = 1; + long primaryTerm = 3; + long commitGeneration = 5; + long startTime = 123; + long time = 123; + int totalFileCount = 5; + long totalSize = 5; + String indexUUID = "syzhajds-ashdlfj"; + String remoteStoreRepository = "test-rs-repository"; + String repositoryBasePath = "test-repo-basepath"; + List fileNames = new ArrayList<>(5); + fileNames.addAll(Arrays.asList("file1", "file2", "file3", "file4", "file5")); + RemoteStoreShardShallowCopySnapshot expectedShardShallowCopySnapshot = new RemoteStoreShardShallowCopySnapshot( + snapshot, + indexVersion, + primaryTerm, + commitGeneration, + startTime, + time, + totalFileCount, + totalSize, + indexUUID, + remoteStoreRepository, + repositoryBasePath, + fileNames + ); + String xContent = "{\"version\":\"1\",\"name\":\"test-snapshot\",\"index_version\":1,\"start_time\":123,\"time\":123," + + "\"number_of_files\":5,\"total_size\":5,\"index_uuid\":\"syzhajds-ashdlfj\",\"remote_store_repository\":" + + "\"test-rs-repository\",\"commit_generation\":5,\"primary_term\":3,\"remote_store_repository_base_path\":" + + "\"test-repo-basepath\",\"file_names\":[\"file1\",\"file2\",\"file3\",\"file4\",\"file5\"]}"; + try (XContentParser parser = createParser(JsonXContent.jsonXContent, xContent)) { + RemoteStoreShardShallowCopySnapshot actualShardShallowCopySnapshot = RemoteStoreShardShallowCopySnapshot.fromXContent(parser); + assertEquals(actualShardShallowCopySnapshot.snapshot(), expectedShardShallowCopySnapshot.snapshot()); + assertEquals( + actualShardShallowCopySnapshot.getRemoteStoreRepository(), + expectedShardShallowCopySnapshot.getRemoteStoreRepository() + ); + assertEquals(actualShardShallowCopySnapshot.getCommitGeneration(), expectedShardShallowCopySnapshot.getCommitGeneration()); + assertEquals(actualShardShallowCopySnapshot.getPrimaryTerm(), expectedShardShallowCopySnapshot.getPrimaryTerm()); + assertEquals(actualShardShallowCopySnapshot.startTime(), expectedShardShallowCopySnapshot.startTime()); + assertEquals(actualShardShallowCopySnapshot.time(), expectedShardShallowCopySnapshot.time()); + assertEquals(actualShardShallowCopySnapshot.totalSize(), expectedShardShallowCopySnapshot.totalSize()); + assertEquals(actualShardShallowCopySnapshot.totalFileCount(), expectedShardShallowCopySnapshot.totalFileCount()); + } + } + + public void testFromXContentInvalid() throws IOException { + final int iters = scaledRandomIntBetween(1, 10); + for (int iter = 0; iter < iters; iter++) { + String snapshot = "test-snapshot"; + long indexVersion = 1; + long primaryTerm = 3; + long commitGeneration = 5; + long startTime = 123; + long time = 123; + int totalFileCount = 5; + long totalSize = 5; + String indexUUID = "syzhajds-ashdlfj"; + String remoteStoreRepository = "test-rs-repository"; + String repositoryBasePath = "test-repo-basepath"; + List fileNames = new ArrayList<>(5); + fileNames.addAll(Arrays.asList("file1", "file2", "file3", "file4", "file5")); + String failure = null; + String version = RemoteStoreShardShallowCopySnapshot.DEFAULT_VERSION; + long length = Math.max(0, Math.abs(randomLong())); + // random corruption + switch (randomIntBetween(0, 8)) { + case 0: + snapshot = null; + failure = "Invalid/Missing Snapshot Name"; + break; + case 1: + indexVersion = -Math.abs(randomLong()); + failure = "Invalid Index Version"; + break; + case 2: + commitGeneration = -Math.abs(randomLong()); + failure = "Invalid Commit Generation"; + break; + case 3: + primaryTerm = -Math.abs(randomLong()); + failure = "Invalid Primary Term"; + break; + case 4: + indexUUID = null; + failure = "Invalid/Missing Index UUID"; + break; + case 5: + remoteStoreRepository = null; + failure = "Invalid/Missing Remote Store Repository"; + break; + case 6: + repositoryBasePath = null; + failure = "Invalid/Missing Repository Base Path"; + break; + case 7: + version = null; + failure = "Invalid Version Provided"; + break; + case 8: + break; + default: + fail("shouldn't be here"); + } + + XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); + builder.startObject(); + builder.field(RemoteStoreShardShallowCopySnapshot.VERSION, version); + builder.field(RemoteStoreShardShallowCopySnapshot.NAME, snapshot); + builder.field(RemoteStoreShardShallowCopySnapshot.INDEX_VERSION, indexVersion); + builder.field(RemoteStoreShardShallowCopySnapshot.START_TIME, startTime); + builder.field(RemoteStoreShardShallowCopySnapshot.TIME, time); + builder.field(RemoteStoreShardShallowCopySnapshot.TOTAL_FILE_COUNT, totalFileCount); + builder.field(RemoteStoreShardShallowCopySnapshot.TOTAL_SIZE, totalSize); + builder.field(RemoteStoreShardShallowCopySnapshot.INDEX_UUID, indexUUID); + builder.field(RemoteStoreShardShallowCopySnapshot.REMOTE_STORE_REPOSITORY, remoteStoreRepository); + builder.field(RemoteStoreShardShallowCopySnapshot.COMMIT_GENERATION, commitGeneration); + builder.field(RemoteStoreShardShallowCopySnapshot.PRIMARY_TERM, primaryTerm); + builder.field(RemoteStoreShardShallowCopySnapshot.REPOSITORY_BASE_PATH, repositoryBasePath); + builder.startArray(RemoteStoreShardShallowCopySnapshot.FILE_NAMES); + for (String fileName : fileNames) { + builder.value(fileName); + } + builder.endArray(); + builder.endObject(); + byte[] xContent = BytesReference.toBytes(BytesReference.bytes(builder)); + + if (failure == null) { + // No failures should read as usual + final RemoteStoreShardShallowCopySnapshot remoteStoreShardShallowCopySnapshot; + try (XContentParser parser = createParser(JsonXContent.jsonXContent, xContent)) { + parser.nextToken(); + remoteStoreShardShallowCopySnapshot = RemoteStoreShardShallowCopySnapshot.fromXContent(parser); + } + assertEquals(remoteStoreShardShallowCopySnapshot.snapshot(), snapshot); + assertEquals(remoteStoreShardShallowCopySnapshot.getRemoteStoreRepository(), remoteStoreRepository); + assertEquals(remoteStoreShardShallowCopySnapshot.getCommitGeneration(), commitGeneration); + assertEquals(remoteStoreShardShallowCopySnapshot.getPrimaryTerm(), primaryTerm); + assertEquals(remoteStoreShardShallowCopySnapshot.startTime(), startTime); + assertEquals(remoteStoreShardShallowCopySnapshot.time(), time); + assertEquals(remoteStoreShardShallowCopySnapshot.totalSize(), totalSize); + assertEquals(remoteStoreShardShallowCopySnapshot.totalFileCount(), totalFileCount); + } else { + try (XContentParser parser = createParser(JsonXContent.jsonXContent, xContent)) { + parser.nextToken(); + RemoteStoreShardShallowCopySnapshot.fromXContent(parser); + fail("Should have failed with [" + failure + "]"); + } catch (IllegalArgumentException ex) { + assertThat(ex.getMessage(), containsString(failure)); + } + } + } + } +} diff --git a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java index aa88e3ab1061a..d89c0388209dd 100644 --- a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java @@ -320,6 +320,21 @@ public void snapshotShard( } + @Override + public void snapshotRemoteStoreIndexShard( + Store store, + SnapshotId snapshotId, + IndexId indexId, + IndexCommit snapshotIndexCommit, + String shardStateIdentifier, + IndexShardSnapshotStatus snapshotStatus, + long primaryTerm, + long startTime, + ActionListener listener + ) { + + } + @Override public void restoreShard( Store store, diff --git a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java index 63b144dae9c93..c3bd4dcaf530d 100644 --- a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java +++ b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java @@ -212,7 +212,8 @@ public void testSnapshotWithConflictingName() throws Exception { 6, Collections.emptyList(), true, - Collections.emptyMap() + Collections.emptyMap(), + false ), Version.CURRENT, Function.identity(), diff --git a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java index 45885acb22eac..c15f1069f0023 100644 --- a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -33,18 +33,26 @@ package org.opensearch.repositories.blobstore; import org.opensearch.Version; +import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse; import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.UUIDs; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.ByteSizeUnit; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; +import org.opensearch.index.IndexModule; +import org.opensearch.index.store.RemoteBufferedOutputDirectory; import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.plugins.Plugin; import org.opensearch.plugins.RepositoryPlugin; import org.opensearch.repositories.IndexId; @@ -56,9 +64,11 @@ import org.opensearch.repositories.fs.FsRepository; import org.opensearch.snapshots.SnapshotId; import org.opensearch.snapshots.SnapshotState; +import org.opensearch.test.FeatureFlagSetter; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.OpenSearchSingleNodeTestCase; +import java.io.IOException; import java.nio.file.Path; import java.util.Arrays; import java.util.Collection; @@ -68,9 +78,9 @@ import java.util.function.Function; import java.util.stream.Collectors; -import static org.opensearch.repositories.RepositoryDataTests.generateRandomRepoData; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; +import static org.opensearch.repositories.RepositoryDataTests.generateRandomRepoData; /** * Tests for the {@link BlobStoreRepository} and its subclasses. @@ -105,6 +115,15 @@ protected void assertSnapshotOrGenericThread() { } } + @Override + protected Settings nodeSettings() { + return Settings.builder() + .put(super.nodeSettings()) + .put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true") + .put(FeatureFlags.REMOTE_STORE, "true") + .build(); + } + public void testRetrieveSnapshots() throws Exception { final Client client = client(); final Path location = OpenSearchIntegTestCase.randomRepoPath(node().settings()); @@ -161,6 +180,289 @@ public void testRetrieveSnapshots() throws Exception { assertThat(snapshotIds, equalTo(originalSnapshots)); } + private void createRepository(Client client, String repoName) { + AcknowledgedResponse putRepositoryResponse = client.admin() + .cluster() + .preparePutRepository(repoName) + .setType(REPO_TYPE) + .setSettings( + Settings.builder().put(node().settings()).put("location", OpenSearchIntegTestCase.randomRepoPath(node().settings())) + ) + .get(); + assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); + } + + private void createRepository(Client client, String repoName, Settings repoSettings) { + AcknowledgedResponse putRepositoryResponse = client.admin() + .cluster() + .preparePutRepository(repoName) + .setType(REPO_TYPE) + .setSettings(repoSettings) + .get(); + assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); + } + + private void updateRepository(Client client, String repoName, Settings repoSettings) { + createRepository(client, repoName, repoSettings); + } + + private Settings getRemoteStoreBackedIndexSettings(String remoteStoreRepo) { + return Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "1") + .put("index.refresh_interval", "300s") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1") + .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.FS.getSettingsKey()) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, remoteStoreRepo) + .build(); + } + + private void indexDocuments(Client client, String indexName) { + int numDocs = randomIntBetween(10, 20); + for (int i = 0; i < numDocs; i++) { + String id = Integer.toString(i); + client.prepareIndex(indexName).setId(id).setSource("text", "sometext").get(); + } + client.admin().indices().prepareFlush(indexName).get(); + } + + private String[] getLockFilesInRemoteStore(String remoteStoreIndex, String remoteStoreRepository) throws IOException { + String indexUUID = client().admin() + .indices() + .prepareGetSettings(remoteStoreIndex) + .get() + .getSetting(remoteStoreIndex, IndexMetadata.SETTING_INDEX_UUID); + final RepositoriesService repositoriesService = getInstanceFromNode(RepositoriesService.class); + final BlobStoreRepository remoteStorerepository = (BlobStoreRepository) repositoriesService.repository(remoteStoreRepository); + BlobPath shardLevelBlobPath = remoteStorerepository.basePath().add(indexUUID).add("0").add("segments").add("lock_files"); + BlobContainer blobContainer = remoteStorerepository.blobStore().blobContainer(shardLevelBlobPath); + try (RemoteBufferedOutputDirectory lockDirectory = new RemoteBufferedOutputDirectory(blobContainer)) { + return Arrays.stream(lockDirectory.listAll()).filter(lock -> lock.endsWith(".lock")).toArray(String[]::new); + } + } + + // Validate Scenario Normal Snapshot -> remoteStoreShallowCopy Snapshot -> normal Snapshot + public void testRetrieveShallowCopySnapshotCase1() throws IOException { + FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE); + final Client client = client(); + final String snapshotRepositoryName = "test-repo"; + final String remoteStoreRepositoryName = "test-rs-repo"; + + logger.info("--> creating snapshot repository"); + + Settings snapshotRepoSettings = Settings.builder() + .put(node().settings()) + .put("location", OpenSearchIntegTestCase.randomRepoPath(node().settings())) + .build(); + createRepository(client, snapshotRepositoryName, snapshotRepoSettings); + + logger.info("--> creating remote store repository"); + Settings remoteStoreRepoSettings = Settings.builder() + .put(node().settings()) + .put("location", OpenSearchIntegTestCase.randomRepoPath(node().settings())) + .build(); + createRepository(client, remoteStoreRepositoryName, remoteStoreRepoSettings); + + logger.info("--> creating an index and indexing documents"); + final String indexName = "test-idx"; + createIndex(indexName); + ensureGreen(); + indexDocuments(client, indexName); + + logger.info("--> creating a remote store enabled index and indexing documents"); + final String remoteStoreIndexName = "test-rs-idx"; + Settings indexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepositoryName); + createIndex(remoteStoreIndexName, indexSettings); + indexDocuments(client, remoteStoreIndexName); + + logger.info("--> create first snapshot"); + CreateSnapshotResponse createSnapshotResponse = client.admin() + .cluster() + .prepareCreateSnapshot(snapshotRepositoryName, "test-snap-1") + .setWaitForCompletion(true) + .setIndices(indexName, remoteStoreIndexName) + .get(); + final SnapshotId snapshotId1 = createSnapshotResponse.getSnapshotInfo().snapshotId(); + + String[] lockFiles = getLockFilesInRemoteStore(remoteStoreIndexName, remoteStoreRepositoryName); + assert (lockFiles.length == 0) : "there should be no lock files present in directory, but found " + Arrays.toString(lockFiles); + logger.info("--> create remote index shallow snapshot"); + Settings snapshotRepoSettingsForShallowCopy = Settings.builder() + .put(snapshotRepoSettings) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), Boolean.TRUE) + .build(); + updateRepository(client, snapshotRepositoryName, snapshotRepoSettingsForShallowCopy); + + createSnapshotResponse = client.admin() + .cluster() + .prepareCreateSnapshot(snapshotRepositoryName, "test-snap-2") + .setWaitForCompletion(true) + .setIndices(indexName, remoteStoreIndexName) + .get(); + final SnapshotId snapshotId2 = createSnapshotResponse.getSnapshotInfo().snapshotId(); + + lockFiles = getLockFilesInRemoteStore(remoteStoreIndexName, remoteStoreRepositoryName); + assert (lockFiles.length == 1) : "there should be only one lock file, but found " + Arrays.toString(lockFiles); + assert lockFiles[0].endsWith(snapshotId2.getUUID() + ".lock"); + + logger.info("--> create another normal snapshot"); + updateRepository(client, snapshotRepositoryName, snapshotRepoSettings); + createSnapshotResponse = client.admin() + .cluster() + .prepareCreateSnapshot(snapshotRepositoryName, "test-snap-3") + .setWaitForCompletion(true) + .setIndices(indexName, remoteStoreIndexName) + .get(); + final SnapshotId snapshotId3 = createSnapshotResponse.getSnapshotInfo().snapshotId(); + + lockFiles = getLockFilesInRemoteStore(remoteStoreIndexName, remoteStoreRepositoryName); + assert (lockFiles.length == 1) : "there should be only one lock file, but found " + Arrays.toString(lockFiles); + assert lockFiles[0].endsWith(snapshotId2.getUUID() + ".lock"); + + logger.info("--> make sure the node's repository can resolve the snapshots"); + final List originalSnapshots = Arrays.asList(snapshotId1, snapshotId2, snapshotId3); + + final RepositoriesService repositoriesService = getInstanceFromNode(RepositoriesService.class); + final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(snapshotRepositoryName); + List snapshotIds = OpenSearchBlobStoreRepositoryIntegTestCase.getRepositoryData(repository) + .getSnapshotIds() + .stream() + .sorted((s1, s2) -> s1.getName().compareTo(s2.getName())) + .collect(Collectors.toList()); + assertThat(snapshotIds, equalTo(originalSnapshots)); + } + + // Validate Scenario remoteStoreShallowCopy Snapshot -> remoteStoreShallowCopy Snapshot + // -> remoteStoreShallowCopy Snapshot -> normal snapshot + public void testRetrieveShallowCopySnapshotCase2() throws IOException { + FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE); + final Client client = client(); + final String snapshotRepositoryName = "test-repo"; + final String remoteStoreRepositoryName = "test-rs-repo"; + + logger.info("--> creating snapshot repository"); + Settings snapshotRepoSettings = Settings.builder() + .put(node().settings()) + .put("location", OpenSearchIntegTestCase.randomRepoPath(node().settings())) + .build(); + createRepository(client, snapshotRepositoryName, snapshotRepoSettings); + + GetRepositoriesResponse updatedGetRepositoriesResponse = client.admin() + .cluster() + .prepareGetRepositories(snapshotRepositoryName) + .get(); + + RepositoryMetadata updatedRepositoryMetadata = updatedGetRepositoriesResponse.repositories().get(0); + + assertFalse(updatedRepositoryMetadata.settings().getAsBoolean(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), false)); + + logger.info("--> creating remote store repository"); + createRepository(client, remoteStoreRepositoryName); + + logger.info("--> creating an index and indexing documents"); + final String indexName = "test-idx"; + createIndex(indexName); + ensureGreen(); + indexDocuments(client, indexName); + + logger.info("--> creating a remote store enabled index and indexing documents"); + final String remoteStoreIndexName = "test-rs-idx"; + Settings indexSettings = getRemoteStoreBackedIndexSettings(remoteStoreRepositoryName); + createIndex(remoteStoreIndexName, indexSettings); + indexDocuments(client, remoteStoreIndexName); + + logger.info("--> create first remote index shallow snapshot"); + + Settings snapshotRepoSettingsForShallowCopy = Settings.builder() + .put(snapshotRepoSettings) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .build(); + updateRepository(client, snapshotRepositoryName, snapshotRepoSettingsForShallowCopy); + + updatedGetRepositoriesResponse = client.admin().cluster().prepareGetRepositories(snapshotRepositoryName).get(); + + updatedRepositoryMetadata = updatedGetRepositoriesResponse.repositories().get(0); + + assertTrue(updatedRepositoryMetadata.settings().getAsBoolean(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), false)); + + CreateSnapshotResponse createSnapshotResponse = client.admin() + .cluster() + .prepareCreateSnapshot(snapshotRepositoryName, "test-snap-1") + .setWaitForCompletion(true) + .setIndices(indexName, remoteStoreIndexName) + .get(); + final SnapshotId snapshotId1 = createSnapshotResponse.getSnapshotInfo().snapshotId(); + + String[] lockFiles = getLockFilesInRemoteStore(remoteStoreIndexName, remoteStoreRepositoryName); + assert (lockFiles.length == 1) : "lock files are " + Arrays.toString(lockFiles); + assert lockFiles[0].endsWith(snapshotId1.getUUID() + ".lock"); + + logger.info("--> create second remote index shallow snapshot"); + createSnapshotResponse = client.admin() + .cluster() + .prepareCreateSnapshot(snapshotRepositoryName, "test-snap-2") + .setWaitForCompletion(true) + .setIndices(indexName, remoteStoreIndexName) + .get(); + final SnapshotId snapshotId2 = createSnapshotResponse.getSnapshotInfo().snapshotId(); + + lockFiles = getLockFilesInRemoteStore(remoteStoreIndexName, remoteStoreRepositoryName); + assert (lockFiles.length == 2) : "lock files are " + Arrays.toString(lockFiles); + List shallowCopySnapshotIDs = Arrays.asList(snapshotId1, snapshotId2); + for (SnapshotId snapshotId : shallowCopySnapshotIDs) { + assert lockFiles[0].contains(snapshotId.getUUID()) || lockFiles[1].contains(snapshotId.getUUID()); + } + logger.info("--> create third remote index shallow snapshot"); + createSnapshotResponse = client.admin() + .cluster() + .prepareCreateSnapshot(snapshotRepositoryName, "test-snap-3") + .setWaitForCompletion(true) + .setIndices(indexName, remoteStoreIndexName) + .get(); + final SnapshotId snapshotId3 = createSnapshotResponse.getSnapshotInfo().snapshotId(); + + lockFiles = getLockFilesInRemoteStore(remoteStoreIndexName, remoteStoreRepositoryName); + assert (lockFiles.length == 3); + shallowCopySnapshotIDs = Arrays.asList(snapshotId1, snapshotId2, snapshotId3); + for (SnapshotId snapshotId : shallowCopySnapshotIDs) { + assert lockFiles[0].contains(snapshotId.getUUID()) + || lockFiles[1].contains(snapshotId.getUUID()) + || lockFiles[2].contains(snapshotId.getUUID()); + } + logger.info("--> create normal snapshot"); + createRepository(client, snapshotRepositoryName, snapshotRepoSettings); + createSnapshotResponse = client.admin() + .cluster() + .prepareCreateSnapshot(snapshotRepositoryName, "test-snap-4") + .setWaitForCompletion(true) + .setIndices(indexName, remoteStoreIndexName) + .get(); + final SnapshotId snapshotId4 = createSnapshotResponse.getSnapshotInfo().snapshotId(); + + lockFiles = getLockFilesInRemoteStore(remoteStoreIndexName, remoteStoreRepositoryName); + assert (lockFiles.length == 3) : "lock files are " + Arrays.toString(lockFiles); + shallowCopySnapshotIDs = Arrays.asList(snapshotId1, snapshotId2, snapshotId3); + for (SnapshotId snapshotId : shallowCopySnapshotIDs) { + assert lockFiles[0].contains(snapshotId.getUUID()) + || lockFiles[1].contains(snapshotId.getUUID()) + || lockFiles[2].contains(snapshotId.getUUID()); + } + + logger.info("--> make sure the node's repository can resolve the snapshots"); + final List originalSnapshots = Arrays.asList(snapshotId1, snapshotId2, snapshotId3, snapshotId4); + + final RepositoriesService repositoriesService = getInstanceFromNode(RepositoriesService.class); + final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(snapshotRepositoryName); + List snapshotIds = OpenSearchBlobStoreRepositoryIntegTestCase.getRepositoryData(repository) + .getSnapshotIds() + .stream() + .sorted((s1, s2) -> s1.getName().compareTo(s2.getName())) + .collect(Collectors.toList()); + assertThat(snapshotIds, equalTo(originalSnapshots)); + } + public void testReadAndWriteSnapshotsThroughIndexFile() throws Exception { final BlobStoreRepository repository = setupRepo(); final long pendingGeneration = repository.metadata.pendingGeneration(); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotInfoTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotInfoTests.java index 8320d73d396ea..9a70ec58697d9 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotInfoTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotInfoTests.java @@ -73,6 +73,8 @@ protected SnapshotInfo createTestInstance() { Map userMetadata = randomUserMetadata(); + Boolean remoteStoreIndexShallowCopy = randomBoolean() ? null : randomBoolean(); + return new SnapshotInfo( snapshotId, indices, @@ -83,7 +85,8 @@ protected SnapshotInfo createTestInstance() { totalShards, shardFailures, includeGlobalState, - userMetadata + userMetadata, + remoteStoreIndexShallowCopy ); } @@ -94,7 +97,7 @@ protected Writeable.Reader instanceReader() { @Override protected SnapshotInfo mutateInstance(SnapshotInfo instance) { - switch (randomIntBetween(0, 8)) { + switch (randomIntBetween(0, 9)) { case 0: SnapshotId snapshotId = new SnapshotId( randomValueOtherThan(instance.snapshotId().getName(), () -> randomAlphaOfLength(5)), @@ -110,7 +113,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.totalShards(), instance.shardFailures(), instance.includeGlobalState(), - instance.userMetadata() + instance.userMetadata(), + instance.isRemoteStoreIndexShallowCopyEnabled() ); case 1: int indicesSize = randomValueOtherThan(instance.indices().size(), () -> randomIntBetween(1, 10)); @@ -127,7 +131,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.totalShards(), instance.shardFailures(), instance.includeGlobalState(), - instance.userMetadata() + instance.userMetadata(), + instance.isRemoteStoreIndexShallowCopyEnabled() ); case 2: return new SnapshotInfo( @@ -140,7 +145,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.totalShards(), instance.shardFailures(), instance.includeGlobalState(), - instance.userMetadata() + instance.userMetadata(), + instance.isRemoteStoreIndexShallowCopyEnabled() ); case 3: return new SnapshotInfo( @@ -153,7 +159,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.totalShards(), instance.shardFailures(), instance.includeGlobalState(), - instance.userMetadata() + instance.userMetadata(), + instance.isRemoteStoreIndexShallowCopyEnabled() ); case 4: return new SnapshotInfo( @@ -166,7 +173,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.totalShards(), instance.shardFailures(), instance.includeGlobalState(), - instance.userMetadata() + instance.userMetadata(), + instance.isRemoteStoreIndexShallowCopyEnabled() ); case 5: int totalShards = randomValueOtherThan(instance.totalShards(), () -> randomIntBetween(0, 100)); @@ -191,7 +199,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { totalShards, shardFailures, instance.includeGlobalState(), - instance.userMetadata() + instance.userMetadata(), + instance.isRemoteStoreIndexShallowCopyEnabled() ); case 6: return new SnapshotInfo( @@ -204,7 +213,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.totalShards(), instance.shardFailures(), Boolean.FALSE.equals(instance.includeGlobalState()), - instance.userMetadata() + instance.userMetadata(), + instance.isRemoteStoreIndexShallowCopyEnabled() ); case 7: return new SnapshotInfo( @@ -217,7 +227,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.totalShards(), instance.shardFailures(), instance.includeGlobalState(), - randomValueOtherThan(instance.userMetadata(), SnapshotInfoTests::randomUserMetadata) + randomValueOtherThan(instance.userMetadata(), SnapshotInfoTests::randomUserMetadata), + instance.isRemoteStoreIndexShallowCopyEnabled() ); case 8: List dataStreams = randomValueOtherThan( @@ -234,7 +245,22 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.totalShards(), instance.shardFailures(), instance.includeGlobalState(), - instance.userMetadata() + instance.userMetadata(), + instance.isRemoteStoreIndexShallowCopyEnabled() + ); + case 9: + return new SnapshotInfo( + instance.snapshotId(), + instance.indices(), + instance.dataStreams(), + instance.startTime(), + instance.reason(), + instance.endTime(), + instance.totalShards(), + instance.shardFailures(), + instance.includeGlobalState(), + instance.userMetadata(), + Boolean.FALSE.equals(instance.isRemoteStoreIndexShallowCopyEnabled()) ); default: throw new IllegalArgumentException("invalid randomization case"); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotsInProgressSerializationTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotsInProgressSerializationTests.java index dc5d869c2f6a2..a8dbb71d7afdd 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotsInProgressSerializationTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotsInProgressSerializationTests.java @@ -32,6 +32,7 @@ package org.opensearch.snapshots; +import org.opensearch.Version; import org.opensearch.cluster.ClusterModule; import org.opensearch.cluster.ClusterState.Custom; import org.opensearch.cluster.Diff; @@ -40,7 +41,10 @@ import org.opensearch.cluster.SnapshotsInProgress.ShardState; import org.opensearch.cluster.SnapshotsInProgress.State; import org.opensearch.common.collect.ImmutableOpenMap; +import org.opensearch.common.UUIDs; +import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.io.stream.NamedWriteableRegistry; +import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.Writeable; import org.opensearch.index.Index; import org.opensearch.index.shard.ShardId; @@ -48,11 +52,15 @@ import org.opensearch.test.AbstractDiffableWireSerializationTestCase; import org.opensearch.test.VersionUtils; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Collections; import java.util.stream.Collectors; +import static org.opensearch.test.VersionUtils.randomVersion; + public class SnapshotsInProgressSerializationTests extends AbstractDiffableWireSerializationTestCase { @Override @@ -69,6 +77,7 @@ private Entry randomSnapshot() { Snapshot snapshot = new Snapshot(randomAlphaOfLength(10), new SnapshotId(randomAlphaOfLength(10), randomAlphaOfLength(10))); boolean includeGlobalState = randomBoolean(); boolean partial = randomBoolean(); + boolean remoteStoreIndexShallowCopy = randomBoolean(); int numberOfIndices = randomIntBetween(0, 10); List indices = new ArrayList<>(); for (int i = 0; i < numberOfIndices; i++) { @@ -113,7 +122,8 @@ private Entry randomSnapshot() { shards, null, SnapshotInfoTests.randomUserMetadata(), - VersionUtils.randomVersion(random()) + VersionUtils.randomVersion(random()), + remoteStoreIndexShallowCopy ); } @@ -172,9 +182,67 @@ protected Custom mutateInstance(Custom instance) { return SnapshotsInProgress.of(entries); } + public void testSerDeRemoteStoreIndexShallowCopy() throws IOException { + SnapshotsInProgress.Entry entry = new SnapshotsInProgress.Entry( + new Snapshot(randomName("repo"), new SnapshotId(randomName("snap"), UUIDs.randomBase64UUID())), + randomBoolean(), + randomBoolean(), + SnapshotsInProgressSerializationTests.randomState(ImmutableOpenMap.of()), + Collections.emptyList(), + Collections.emptyList(), + Math.abs(randomLong()), + randomIntBetween(0, 1000), + ImmutableOpenMap.of(), + null, + SnapshotInfoTests.randomUserMetadata(), + randomVersion(random()), + true + ); + final List newEntries = new ArrayList<>(); + newEntries.add(entry); + SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.of(newEntries); + + SnapshotsInProgress actualSnapshotsInProgress; + try (BytesStreamOutput out = new BytesStreamOutput()) { + snapshotsInProgress.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + in.setVersion(Version.V_2_7_0); + actualSnapshotsInProgress = new SnapshotsInProgress(in); + assert in.available() != 0; + for (Entry curr_entry : actualSnapshotsInProgress.entries()) { + assert (curr_entry.remoteStoreIndexShallowCopy() == false); + } + } + try (StreamInput in = out.bytes().streamInput()) { + in.setVersion(Version.V_2_9_0); + actualSnapshotsInProgress = new SnapshotsInProgress(in); + assert in.available() == 0; + for (Entry curr_entry : actualSnapshotsInProgress.entries()) { + assert (curr_entry.remoteStoreIndexShallowCopy() == true); + } + } + } + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.setVersion(Version.V_2_7_0); + snapshotsInProgress.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + in.setVersion(Version.V_2_7_0); + actualSnapshotsInProgress = new SnapshotsInProgress(in); + assert in.available() == 0; + for (Entry curr_entry : actualSnapshotsInProgress.entries()) { + assert (curr_entry.remoteStoreIndexShallowCopy() == false); + } + } + } + } + public static State randomState(ImmutableOpenMap shards) { return SnapshotsInProgress.completed(shards.values()) ? randomFrom(State.SUCCESS, State.FAILED) : randomFrom(State.STARTED, State.INIT, State.ABORTED); } + + private String randomName(String prefix) { + return prefix + UUIDs.randomBase64UUID(random()); + } } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotsServiceTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotsServiceTests.java index 7f96d4842e37d..f2be8933ad856 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotsServiceTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotsServiceTests.java @@ -484,7 +484,8 @@ private static SnapshotsInProgress.Entry snapshotEntry( randomNonNegativeLong(), shards, Collections.emptyMap(), - Version.CURRENT + Version.CURRENT, + false ); } diff --git a/server/src/test/java/org/opensearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java b/server/src/test/java/org/opensearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java index 0dc8d435ec783..43dde7281fb2d 100644 --- a/server/src/test/java/org/opensearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java +++ b/server/src/test/java/org/opensearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java @@ -232,7 +232,8 @@ public void testOverwriteSnapshotInfoBlob() throws Exception { 5, Collections.emptyList(), true, - Collections.emptyMap() + Collections.emptyMap(), + false ), Version.CURRENT, Function.identity(), @@ -257,7 +258,8 @@ public void testOverwriteSnapshotInfoBlob() throws Exception { 6, Collections.emptyList(), true, - Collections.emptyMap() + Collections.emptyMap(), + false ), Version.CURRENT, Function.identity(), @@ -284,7 +286,8 @@ public void testOverwriteSnapshotInfoBlob() throws Exception { 5, Collections.emptyList(), true, - Collections.emptyMap() + Collections.emptyMap(), + false ), Version.CURRENT, Function.identity(), diff --git a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java index f25690e6eb50f..e373fa6010900 100644 --- a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -524,7 +524,8 @@ protected void addBwCFailedSnapshot(String repoName, String snapshotName, Mapget( f -> repo.finalizeSnapshot( From bcd99626287d72348b30773666bc6e22b165e616 Mon Sep 17 00:00:00 2001 From: Ankit Kala Date: Thu, 15 Jun 2023 16:02:44 +0530 Subject: [PATCH 2/2] Establish seed node connections in async during node bootstrap (#8038) (#8077) * Establish seed node connection setup in async Signed-off-by: Ankit Kala --- CHANGELOG.md | 1 + .../transport/RemoteClusterService.java | 32 +++++------------ .../transport/TransportService.java | 7 +++- .../transport/RemoteClusterClientTests.java | 2 +- .../transport/RemoteClusterServiceTests.java | 35 +++++++++++++------ 5 files changed, 42 insertions(+), 35 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bfadf4f34d6ba..60f16de7c3623 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Implement concurrent aggregations support without profile option ([#7514](https://github.com/opensearch-project/OpenSearch/pull/7514)) - Add dynamic index and cluster setting for concurrent segment search ([#7956](https://github.com/opensearch-project/OpenSearch/pull/7956)) - Add descending order search optimization through reverse segment read. ([#7967](https://github.com/opensearch-project/OpenSearch/pull/7967)) +- Make remote cluster connection setup in async ([#8038](https://github.com/opensearch-project/OpenSearch/pull/8038)) ### Dependencies - Bump `com.azure:azure-storage-common` from 12.21.0 to 12.21.1 (#7566, #7814) diff --git a/server/src/main/java/org/opensearch/transport/RemoteClusterService.java b/server/src/main/java/org/opensearch/transport/RemoteClusterService.java index 838367baf8a2e..60e166a4e300c 100644 --- a/server/src/main/java/org/opensearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/opensearch/transport/RemoteClusterService.java @@ -38,7 +38,6 @@ import org.opensearch.action.OriginalIndices; import org.opensearch.action.support.GroupedActionListener; import org.opensearch.action.support.IndicesOptions; -import org.opensearch.action.support.PlainActionFuture; import org.opensearch.client.Client; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; @@ -63,7 +62,6 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Predicate; @@ -328,35 +326,23 @@ synchronized void updateRemoteCluster(String clusterAlias, Settings newSettings, } /** - * Connects to all remote clusters in a blocking fashion. This should be called on node startup to establish an initial connection + * Connects to all remote clusters in a non-blocking fashion. This should be called on node startup to establish an initial connection * to all configured seed nodes. */ - void initializeRemoteClusters() { - final TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings); - final PlainActionFuture> future = new PlainActionFuture<>(); + void initializeRemoteClusters(ActionListener listener) { Set enabledClusters = RemoteClusterAware.getEnabledRemoteClusters(settings); - if (enabledClusters.isEmpty()) { + listener.onResponse(null); return; } - GroupedActionListener listener = new GroupedActionListener<>(future, enabledClusters.size()); - for (String clusterAlias : enabledClusters) { - updateRemoteCluster(clusterAlias, settings, listener); - } + GroupedActionListener groupListener = new GroupedActionListener<>( + ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure), + enabledClusters.size() + ); - if (enabledClusters.isEmpty()) { - future.onResponse(null); - } - - try { - future.get(timeValue.millis(), TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (TimeoutException ex) { - logger.warn("failed to connect to remote clusters within {}", timeValue.toString()); - } catch (Exception e) { - throw new IllegalStateException("failed to connect to remote clusters", e); + for (String clusterAlias : enabledClusters) { + updateRemoteCluster(clusterAlias, settings, groupListener); } } diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index 526728aea725c..13491609945a0 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -291,7 +291,12 @@ protected void doStart() { if (remoteClusterClient) { // here we start to connect to the remote clusters - remoteClusterService.initializeRemoteClusters(); + remoteClusterService.initializeRemoteClusters( + ActionListener.wrap( + r -> logger.info("Remote clusters initialized successfully."), + e -> logger.error("Remote clusters initialization failed partially", e) + ) + ); } } diff --git a/server/src/test/java/org/opensearch/transport/RemoteClusterClientTests.java b/server/src/test/java/org/opensearch/transport/RemoteClusterClientTests.java index 94c535a5e20f5..b89d652510850 100644 --- a/server/src/test/java/org/opensearch/transport/RemoteClusterClientTests.java +++ b/server/src/test/java/org/opensearch/transport/RemoteClusterClientTests.java @@ -86,7 +86,7 @@ public void testConnectAndExecuteRequest() throws Exception { service.acceptIncomingRequests(); logger.info("now accepting incoming requests on local transport"); RemoteClusterService remoteClusterService = service.getRemoteClusterService(); - assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode)); + assertBusy(() -> { assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode)); }, 10, TimeUnit.SECONDS); Client client = remoteClusterService.getRemoteClusterClient(threadPool, "test"); ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get(); assertNotNull(clusterStateResponse); diff --git a/server/src/test/java/org/opensearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/opensearch/transport/RemoteClusterServiceTests.java index dea42684d2e21..8b26fa42e7b76 100644 --- a/server/src/test/java/org/opensearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/opensearch/transport/RemoteClusterServiceTests.java @@ -59,6 +59,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; @@ -92,6 +93,20 @@ private MockTransportService startTransport( return RemoteClusterConnectionTests.startTransport(id, knownNodes, version, threadPool, settings); } + void initializeRemoteClusters(RemoteClusterService service) { + final PlainActionFuture future = new PlainActionFuture<>(); + service.initializeRemoteClusters(future); + try { + future.get(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (TimeoutException ex) { + logger.warn("Timed out connecting to remote clusters"); + } catch (Exception e) { + throw new IllegalStateException("failed to connect to remote clusters", e); + } + } + public void testSettingsAreRegistered() { assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING)); @@ -157,7 +172,7 @@ public void testGroupClusterIndices() throws IOException { builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertTrue(service.isCrossClusterSearchEnabled()); assertTrue(service.isRemoteClusterRegistered("cluster_1")); assertTrue(service.isRemoteClusterRegistered("cluster_2")); @@ -228,7 +243,7 @@ public void testGroupIndices() throws IOException { builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertTrue(service.isCrossClusterSearchEnabled()); assertTrue(service.isRemoteClusterRegistered("cluster_1")); assertTrue(service.isRemoteClusterRegistered("cluster_2")); @@ -321,7 +336,7 @@ public void testIncrementallyAddClusters() throws IOException { builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertFalse(service.isCrossClusterSearchEnabled()); Settings cluster1Settings = createSettings( "cluster_1", @@ -384,7 +399,7 @@ public void testDefaultPingSchedule() throws IOException { transportService.acceptIncomingRequests(); try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertTrue(service.isCrossClusterSearchEnabled()); service.validateAndUpdateRemoteCluster( "cluster_1", @@ -436,7 +451,7 @@ public void testCustomPingSchedule() throws IOException { TimeValue.timeValueSeconds(randomIntBetween(1, 10)); builder.put("cluster.remote.cluster_2.transport.ping_schedule", pingSchedule2); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertTrue(service.isRemoteClusterRegistered("cluster_1")); RemoteClusterConnection remoteClusterConnection1 = service.getRemoteClusterConnection("cluster_1"); assertEquals(pingSchedule1, remoteClusterConnection1.getConnectionManager().getConnectionProfile().getPingInterval()); @@ -467,7 +482,7 @@ public void testChangeSettings() throws Exception { Settings.Builder builder = Settings.builder(); builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { - service.initializeRemoteClusters(); + initializeRemoteClusters(service); RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); Settings.Builder settingsChange = Settings.builder(); TimeValue pingSchedule = TimeValue.timeValueSeconds(randomIntBetween(6, 8)); @@ -517,7 +532,7 @@ public void testRemoteNodeAttribute() throws IOException, InterruptedException { builder.putList("cluster.remote.cluster_2.seed", c2N1Node.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertFalse(service.isCrossClusterSearchEnabled()); final CountDownLatch firstLatch = new CountDownLatch(1); @@ -580,7 +595,7 @@ public void testRemoteNodeRoles() throws IOException, InterruptedException { builder.putList("cluster.remote.cluster_2.seed", c2N1Node.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertFalse(service.isCrossClusterSearchEnabled()); final CountDownLatch firstLatch = new CountDownLatch(1); @@ -648,7 +663,7 @@ public void testCollectNodes() throws InterruptedException, IOException { builder.putList("cluster.remote.cluster_2.seed", c2N1Node.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertFalse(service.isCrossClusterSearchEnabled()); final CountDownLatch firstLatch = new CountDownLatch(1); @@ -896,7 +911,7 @@ public void testReconnectWhenStrategySettingsUpdated() throws Exception { builder.putList("cluster.remote.cluster_test.seeds", Collections.singletonList(node0.getAddress().toString())); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertTrue(service.isCrossClusterSearchEnabled()); final RemoteClusterConnection firstRemoteClusterConnection = service.getRemoteClusterConnection("cluster_test");