diff --git a/CHANGELOG.md b/CHANGELOG.md index 4c4c7c80e5410..d294eb68ee7c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -99,6 +99,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Remove LegacyESVersion.V_7_2_ and V_7_3_ Constants ([#4702](https://github.com/opensearch-project/OpenSearch/pull/4702)) - Always auto release the flood stage block ([#4703](https://github.com/opensearch-project/OpenSearch/pull/4703)) - Remove LegacyESVersion.V_7_4_ and V_7_5_ Constants ([#4704](https://github.com/opensearch-project/OpenSearch/pull/4704)) +- Remove Legacy Version support from Snapshot/Restore Service ([#4728](https://github.com/opensearch-project/OpenSearch/pull/4728)) + ### Fixed - `opensearch-service.bat start` and `opensearch-service.bat manager` failing to run ([#4289](https://github.com/opensearch-project/OpenSearch/pull/4289)) - PR reference to checkout code for changelog verifier ([#4296](https://github.com/opensearch-project/OpenSearch/pull/4296)) diff --git a/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java index b35d4080a413a..3c3f2887469b3 100644 --- a/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -36,8 +36,6 @@ import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import fixture.s3.S3HttpHandler; -import org.opensearch.action.ActionRunnable; -import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.service.ClusterService; @@ -45,29 +43,21 @@ import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; -import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.regex.Regex; import org.opensearch.common.settings.MockSecureSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.ByteSizeUnit; -import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.NamedXContentRegistry; -import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.plugins.Plugin; -import org.opensearch.repositories.RepositoriesService; -import org.opensearch.repositories.RepositoryData; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.blobstore.OpenSearchMockAPIBasedRepositoryIntegTestCase; -import org.opensearch.snapshots.SnapshotId; -import org.opensearch.snapshots.SnapshotsService; import org.opensearch.snapshots.mockstore.BlobStoreWrapper; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -75,8 +65,6 @@ import java.util.Map; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.startsWith; @SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint") @@ -84,8 +72,6 @@ @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST) public class S3BlobStoreRepositoryTests extends OpenSearchMockAPIBasedRepositoryIntegTestCase { - private static final TimeValue TEST_COOLDOWN_PERIOD = TimeValue.timeValueSeconds(10L); - private String region; private String signerOverride; @@ -158,56 +144,6 @@ protected Settings nodeSettings(int nodeOrdinal) { return builder.build(); } - public void testEnforcedCooldownPeriod() throws IOException { - final String repoName = createRepository( - randomName(), - Settings.builder().put(repositorySettings()).put(S3Repository.COOLDOWN_PERIOD.getKey(), TEST_COOLDOWN_PERIOD).build() - ); - - final SnapshotId fakeOldSnapshot = client().admin() - .cluster() - .prepareCreateSnapshot(repoName, "snapshot-old") - .setWaitForCompletion(true) - .setIndices() - .get() - .getSnapshotInfo() - .snapshotId(); - final RepositoriesService repositoriesService = internalCluster().getCurrentClusterManagerNodeInstance(RepositoriesService.class); - final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repoName); - final RepositoryData repositoryData = getRepositoryData(repository); - final RepositoryData modifiedRepositoryData = repositoryData.withVersions( - Collections.singletonMap(fakeOldSnapshot, SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.minimumCompatibilityVersion()) - ); - final BytesReference serialized = BytesReference.bytes( - modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), SnapshotsService.OLD_SNAPSHOT_FORMAT) - ); - PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.run(f, () -> { - try (InputStream stream = serialized.streamInput()) { - repository.blobStore() - .blobContainer(repository.basePath()) - .writeBlobAtomic( - BlobStoreRepository.INDEX_FILE_PREFIX + modifiedRepositoryData.getGenId(), - stream, - serialized.length(), - true - ); - } - }))); - - final String newSnapshotName = "snapshot-new"; - final long beforeThrottledSnapshot = repository.threadPool().relativeTimeInNanos(); - client().admin().cluster().prepareCreateSnapshot(repoName, newSnapshotName).setWaitForCompletion(true).setIndices().get(); - assertThat(repository.threadPool().relativeTimeInNanos() - beforeThrottledSnapshot, greaterThan(TEST_COOLDOWN_PERIOD.getNanos())); - - final long beforeThrottledDelete = repository.threadPool().relativeTimeInNanos(); - client().admin().cluster().prepareDeleteSnapshot(repoName, newSnapshotName).get(); - assertThat(repository.threadPool().relativeTimeInNanos() - beforeThrottledDelete, greaterThan(TEST_COOLDOWN_PERIOD.getNanos())); - - final long beforeFastDelete = repository.threadPool().relativeTimeInNanos(); - client().admin().cluster().prepareDeleteSnapshot(repoName, fakeOldSnapshot.getName()).get(); - assertThat(repository.threadPool().relativeTimeInNanos() - beforeFastDelete, lessThan(TEST_COOLDOWN_PERIOD.getNanos())); - } - /** * S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload. */ diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java index c8377949a6842..f80e5743edbc0 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java @@ -35,10 +35,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.LegacyESVersion; import org.opensearch.Version; import org.opensearch.action.ActionListener; -import org.opensearch.action.ActionRunnable; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.RepositoryMetadata; @@ -52,7 +50,6 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.unit.ByteSizeUnit; import org.opensearch.common.unit.ByteSizeValue; -import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.NamedXContentRegistry; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.monitor.jvm.JvmInfo; @@ -62,13 +59,10 @@ import org.opensearch.repositories.blobstore.MeteredBlobStoreRepository; import org.opensearch.snapshots.SnapshotId; import org.opensearch.snapshots.SnapshotInfo; -import org.opensearch.snapshots.SnapshotsService; import org.opensearch.threadpool.Scheduler; -import org.opensearch.threadpool.ThreadPool; import java.util.Collection; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -182,24 +176,6 @@ class S3Repository extends MeteredBlobStoreRepository { static final Setting CLIENT_NAME = new Setting<>("client", "default", Function.identity()); - /** - * Artificial delay to introduce after a snapshot finalization or delete has finished so long as the repository is still using the - * backwards compatible snapshot format from before - * {@link org.opensearch.snapshots.SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION} ({@link LegacyESVersion#V_7_6_0}). - * This delay is necessary so that the eventually consistent nature of AWS S3 does not randomly result in repository corruption when - * doing repository operations in rapid succession on a repository in the old metadata format. - * This setting should not be adjusted in production when working with an AWS S3 backed repository. Doing so risks the repository - * becoming silently corrupted. To get rid of this waiting period, either create a new S3 repository or remove all snapshots older than - * {@link LegacyESVersion#V_7_6_0} from the repository which will trigger an upgrade of the repository metadata to the new - * format and disable the cooldown period. - */ - static final Setting COOLDOWN_PERIOD = Setting.timeSetting( - "cooldown_period", - new TimeValue(3, TimeUnit.MINUTES), - new TimeValue(0, TimeUnit.MILLISECONDS), - Setting.Property.Dynamic - ); - /** * Specifies the path within bucket to repository data. Defaults to root directory. */ @@ -223,12 +199,6 @@ class S3Repository extends MeteredBlobStoreRepository { private final RepositoryMetadata repositoryMetadata; - /** - * Time period to delay repository operations by after finalizing or deleting a snapshot. - * See {@link #COOLDOWN_PERIOD} for details. - */ - private final TimeValue coolDown; - /** * Constructs an s3 backed repository */ @@ -296,8 +266,6 @@ class S3Repository extends MeteredBlobStoreRepository { ); } - coolDown = COOLDOWN_PERIOD.get(metadata.settings()); - logger.debug( "using bucket [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], cannedACL [{}], storageClass [{}]", bucket, @@ -334,9 +302,6 @@ public void finalizeSnapshot( Function stateTransformer, ActionListener listener ) { - if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) { - listener = delayedListener(listener); - } super.finalizeSnapshot( shardGenerations, repositoryStateId, @@ -355,59 +320,9 @@ public void deleteSnapshots( Version repositoryMetaVersion, ActionListener listener ) { - if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) { - listener = delayedListener(listener); - } super.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, listener); } - /** - * Wraps given listener such that it is executed with a delay of {@link #coolDown} on the snapshot thread-pool after being invoked. - * See {@link #COOLDOWN_PERIOD} for details. - */ - private ActionListener delayedListener(ActionListener listener) { - final ActionListener wrappedListener = ActionListener.runBefore(listener, () -> { - final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null); - assert cancellable != null; - }); - return new ActionListener() { - @Override - public void onResponse(T response) { - logCooldownInfo(); - final Scheduler.Cancellable existing = finalizationFuture.getAndSet( - threadPool.schedule( - ActionRunnable.wrap(wrappedListener, l -> l.onResponse(response)), - coolDown, - ThreadPool.Names.SNAPSHOT - ) - ); - assert existing == null : "Already have an ongoing finalization " + finalizationFuture; - } - - @Override - public void onFailure(Exception e) { - logCooldownInfo(); - final Scheduler.Cancellable existing = finalizationFuture.getAndSet( - threadPool.schedule(ActionRunnable.wrap(wrappedListener, l -> l.onFailure(e)), coolDown, ThreadPool.Names.SNAPSHOT) - ); - assert existing == null : "Already have an ongoing finalization " + finalizationFuture; - } - }; - } - - private void logCooldownInfo() { - logger.info( - "Sleeping for [{}] after modifying repository [{}] because it contains snapshots older than version [{}]" - + " and therefore is using a backwards compatible metadata format that requires this cooldown period to avoid " - + "repository corruption. To get rid of this message and move to the new repository metadata format, either remove " - + "all snapshots older than version [{}] from the repository or create a new repository at an empty location.", - coolDown, - metadata.name(), - SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION, - SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION - ); - } - @Override protected S3BlobStore createBlobStore() { return new S3BlobStore(service, bucket, serverSideEncryption, bufferSize, cannedACL, storageClass, repositoryMetadata); diff --git a/qa/repository-multi-version/src/test/java/org/opensearch/upgrades/MultiVersionRepositoryAccessIT.java b/qa/repository-multi-version/src/test/java/org/opensearch/upgrades/MultiVersionRepositoryAccessIT.java index c0b90626d7bad..49efce0516434 100644 --- a/qa/repository-multi-version/src/test/java/org/opensearch/upgrades/MultiVersionRepositoryAccessIT.java +++ b/qa/repository-multi-version/src/test/java/org/opensearch/upgrades/MultiVersionRepositoryAccessIT.java @@ -32,7 +32,6 @@ package org.opensearch.upgrades; -import org.opensearch.OpenSearchStatusException; import org.opensearch.action.admin.cluster.repositories.put.PutRepositoryRequest; import org.opensearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest; import org.opensearch.action.admin.cluster.snapshots.status.SnapshotStatus; @@ -42,20 +41,17 @@ import org.opensearch.client.Request; import org.opensearch.client.RequestOptions; import org.opensearch.client.Response; -import org.opensearch.client.ResponseException; import org.opensearch.client.RestClient; import org.opensearch.client.RestHighLevelClient; import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.DeprecationHandler; import org.opensearch.common.xcontent.XContentParser; import org.opensearch.common.xcontent.json.JsonXContent; -import org.opensearch.snapshots.SnapshotsService; import org.opensearch.test.rest.OpenSearchRestTestCase; import java.io.IOException; import java.io.InputStream; import java.net.HttpURLConnection; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -231,20 +227,10 @@ public void testUpgradeMovesRepoToNewMetaVersion() throws IOException { ensureSnapshotRestoreWorks(repoName, "snapshot-2", shards); } } else { - if (SnapshotsService.useIndexGenerations(minimumNodeVersion()) == false) { - assertThat(TEST_STEP, is(TestStep.STEP3_OLD_CLUSTER)); - final List> expectedExceptions = - Arrays.asList(ResponseException.class, OpenSearchStatusException.class); - expectThrowsAnyOf(expectedExceptions, () -> listSnapshots(repoName)); - expectThrowsAnyOf(expectedExceptions, () -> deleteSnapshot(client, repoName, "snapshot-1")); - expectThrowsAnyOf(expectedExceptions, () -> deleteSnapshot(client, repoName, "snapshot-2")); - expectThrowsAnyOf(expectedExceptions, () -> createSnapshot(client, repoName, "snapshot-impossible", index)); - } else { - assertThat(listSnapshots(repoName), hasSize(2)); - if (TEST_STEP == TestStep.STEP4_NEW_CLUSTER) { - ensureSnapshotRestoreWorks(repoName, "snapshot-1", shards); - ensureSnapshotRestoreWorks(repoName, "snapshot-2", shards); - } + assertThat(listSnapshots(repoName), hasSize(2)); + if (TEST_STEP == TestStep.STEP4_NEW_CLUSTER) { + ensureSnapshotRestoreWorks(repoName, "snapshot-1", shards); + ensureSnapshotRestoreWorks(repoName, "snapshot-2", shards); } } } finally { diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java index 7b95bf93f8bf4..f659d827448a0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java @@ -78,14 +78,6 @@ public void testShardClone() throws Exception { final Path repoPath = randomRepoPath(); createRepository(repoName, "fs", repoPath); - final boolean useBwCFormat = randomBoolean(); - if (useBwCFormat) { - initWithSnapshotVersion(repoName, repoPath, SnapshotsService.OLD_SNAPSHOT_FORMAT); - // Re-create repo to clear repository data cache - assertAcked(clusterAdmin().prepareDeleteRepository(repoName).get()); - createRepository(repoName, "fs", repoPath); - } - final String indexName = "test-index"; createIndexWithRandomDocs(indexName, randomIntBetween(5, 10)); final String sourceSnapshot = "source-snapshot"; @@ -101,21 +93,11 @@ public void testShardClone() throws Exception { final SnapshotId targetSnapshotId = new SnapshotId("target-snapshot", UUIDs.randomBase64UUID(random())); - final String currentShardGen; - if (useBwCFormat) { - currentShardGen = null; - } else { - currentShardGen = repositoryData.shardGenerations().getShardGen(indexId, shardId); - } + final String currentShardGen = repositoryData.shardGenerations().getShardGen(indexId, shardId); final String newShardGeneration = PlainActionFuture.get( f -> repository.cloneShardSnapshot(sourceSnapshotInfo.snapshotId(), targetSnapshotId, repositoryShardId, currentShardGen, f) ); - if (useBwCFormat) { - final long gen = Long.parseLong(newShardGeneration); - assertEquals(gen, 1L); // Initial snapshot brought it to 0, clone increments it to 1 - } - final BlobStoreIndexShardSnapshot targetShardSnapshot = readShardSnapshot(repository, repositoryShardId, targetSnapshotId); final BlobStoreIndexShardSnapshot sourceShardSnapshot = readShardSnapshot( repository, diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/ConcurrentSnapshotsIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/ConcurrentSnapshotsIT.java index b6d482cad8860..f483ed7fe6c5d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/ConcurrentSnapshotsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/ConcurrentSnapshotsIT.java @@ -56,7 +56,6 @@ import org.opensearch.plugins.Plugin; import org.opensearch.repositories.RepositoryData; import org.opensearch.repositories.RepositoryException; -import org.opensearch.repositories.ShardGenerations; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.snapshots.mockstore.MockRepository; import org.opensearch.test.OpenSearchIntegTestCase; @@ -1297,43 +1296,6 @@ public void testConcurrentOperationsLimit() throws Exception { } } - public void testConcurrentSnapshotWorksWithOldVersionRepo() throws Exception { - internalCluster().startClusterManagerOnlyNode(); - final String dataNode = internalCluster().startDataOnlyNode(); - final String repoName = "test-repo"; - final Path repoPath = randomRepoPath(); - createRepository( - repoName, - "mock", - Settings.builder().put(BlobStoreRepository.CACHE_REPOSITORY_DATA.getKey(), false).put("location", repoPath) - ); - initWithSnapshotVersion(repoName, repoPath, SnapshotsService.OLD_SNAPSHOT_FORMAT); - - createIndexWithContent("index-slow"); - - final ActionFuture createSlowFuture = startFullSnapshotBlockedOnDataNode( - "slow-snapshot", - repoName, - dataNode - ); - - final String dataNode2 = internalCluster().startDataOnlyNode(); - ensureStableCluster(3); - final String indexFast = "index-fast"; - createIndexWithContent(indexFast, dataNode2, dataNode); - - final ActionFuture createFastSnapshot = startFullSnapshot(repoName, "fast-snapshot"); - - assertThat(createSlowFuture.isDone(), is(false)); - unblockNode(repoName, dataNode); - - assertSuccessful(createFastSnapshot); - assertSuccessful(createSlowFuture); - - final RepositoryData repositoryData = getRepositoryData(repoName); - assertThat(repositoryData.shardGenerations(), is(ShardGenerations.EMPTY)); - } - public void testQueuedDeleteAfterFinalizationFailure() throws Exception { final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); final String repoName = "test-repo"; diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/CorruptedBlobStoreRepositoryIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/CorruptedBlobStoreRepositoryIT.java index b806ee3e55a94..a07b245db2b21 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/CorruptedBlobStoreRepositoryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/CorruptedBlobStoreRepositoryIT.java @@ -32,48 +32,33 @@ package org.opensearch.snapshots; -import org.opensearch.Version; -import org.opensearch.action.ActionRunnable; import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; import org.opensearch.action.index.IndexRequestBuilder; -import org.opensearch.action.support.PlainActionFuture; import org.opensearch.client.Client; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.RepositoriesMetadata; -import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.ByteSizeUnit; -import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.internal.io.IOUtils; import org.opensearch.repositories.IndexId; -import org.opensearch.repositories.IndexMetaDataGenerations; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.repositories.RepositoryData; import org.opensearch.repositories.RepositoryException; -import org.opensearch.repositories.ShardGenerations; import org.opensearch.repositories.blobstore.BlobStoreRepository; -import org.opensearch.threadpool.ThreadPool; -import java.io.IOException; import java.nio.channels.SeekableByteChannel; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; -import java.util.Collections; import java.util.List; -import java.util.Locale; import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.function.Function; -import java.util.stream.Collectors; import java.util.stream.Stream; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertFileExists; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertRequestBuilderThrows; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -310,101 +295,6 @@ public void testFindDanglingLatestGeneration() throws Exception { ); } - public void testHandlingMissingRootLevelSnapshotMetadata() throws Exception { - Path repo = randomRepoPath(); - final String repoName = "test-repo"; - createRepository( - repoName, - "fs", - Settings.builder() - .put("location", repo) - .put("compress", false) - // Don't cache repository data because the test manually modifies the repository data - .put(BlobStoreRepository.CACHE_REPOSITORY_DATA.getKey(), false) - .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES) - ); - - final String snapshotPrefix = "test-snap-"; - final int snapshots = randomIntBetween(1, 2); - logger.info("--> creating [{}] snapshots", snapshots); - for (int i = 0; i < snapshots; ++i) { - // Workaround to simulate BwC situation: taking a snapshot without indices here so that we don't create any new version shard - // generations (the existence of which would short-circuit checks for the repo containing old version snapshots) - CreateSnapshotResponse createSnapshotResponse = client().admin() - .cluster() - .prepareCreateSnapshot(repoName, snapshotPrefix + i) - .setIndices() - .setWaitForCompletion(true) - .get(); - assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), is(0)); - assertThat( - createSnapshotResponse.getSnapshotInfo().successfulShards(), - equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()) - ); - } - final RepositoryData repositoryData = getRepositoryData(repoName); - - final SnapshotId snapshotToCorrupt = randomFrom(repositoryData.getSnapshotIds()); - logger.info("--> delete root level snapshot metadata blob for snapshot [{}]", snapshotToCorrupt); - Files.delete(repo.resolve(String.format(Locale.ROOT, BlobStoreRepository.SNAPSHOT_NAME_FORMAT, snapshotToCorrupt.getUUID()))); - - logger.info("--> strip version information from index-N blob"); - final RepositoryData withoutVersions = new RepositoryData( - repositoryData.getGenId(), - repositoryData.getSnapshotIds().stream().collect(Collectors.toMap(SnapshotId::getUUID, Function.identity())), - repositoryData.getSnapshotIds().stream().collect(Collectors.toMap(SnapshotId::getUUID, repositoryData::getSnapshotState)), - Collections.emptyMap(), - Collections.emptyMap(), - ShardGenerations.EMPTY, - IndexMetaDataGenerations.EMPTY - ); - - Files.write( - repo.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + withoutVersions.getGenId()), - BytesReference.toBytes( - BytesReference.bytes(withoutVersions.snapshotsToXContent(XContentFactory.jsonBuilder(), Version.CURRENT)) - ), - StandardOpenOption.TRUNCATE_EXISTING - ); - - logger.info("--> verify that repo is assumed in old metadata format"); - final SnapshotsService snapshotsService = internalCluster().getCurrentClusterManagerNodeInstance(SnapshotsService.class); - final ThreadPool threadPool = internalCluster().getCurrentClusterManagerNodeInstance(ThreadPool.class); - assertThat( - PlainActionFuture.get( - f -> threadPool.generic() - .execute( - ActionRunnable.supply( - f, - () -> snapshotsService.minCompatibleVersion(Version.CURRENT, getRepositoryData(repoName), null) - ) - ) - ), - is(SnapshotsService.OLD_SNAPSHOT_FORMAT) - ); - - logger.info("--> verify that snapshot with missing root level metadata can be deleted"); - assertAcked(startDeleteSnapshot(repoName, snapshotToCorrupt.getName()).get()); - - logger.info("--> verify that repository is assumed in new metadata format after removing corrupted snapshot"); - assertThat( - PlainActionFuture.get( - f -> threadPool.generic() - .execute( - ActionRunnable.supply( - f, - () -> snapshotsService.minCompatibleVersion(Version.CURRENT, getRepositoryData(repoName), null) - ) - ) - ), - is(Version.CURRENT) - ); - final RepositoryData finalRepositoryData = getRepositoryData(repoName); - for (SnapshotId snapshotId : finalRepositoryData.getSnapshotIds()) { - assertThat(finalRepositoryData.getVersion(snapshotId), is(Version.CURRENT)); - } - } - public void testMountCorruptedRepositoryData() throws Exception { disableRepoConsistencyCheck("This test intentionally corrupts the repository contents"); Client client = client(); @@ -453,87 +343,6 @@ public void testMountCorruptedRepositoryData() throws Exception { expectThrows(RepositoryException.class, () -> getRepositoryData(otherRepo)); } - public void testHandleSnapshotErrorWithBwCFormat() throws IOException, ExecutionException, InterruptedException { - final String repoName = "test-repo"; - final Path repoPath = randomRepoPath(); - createRepository(repoName, "fs", repoPath); - final String oldVersionSnapshot = initWithSnapshotVersion(repoName, repoPath, SnapshotsService.OLD_SNAPSHOT_FORMAT); - - logger.info("--> recreating repository to clear caches"); - client().admin().cluster().prepareDeleteRepository(repoName).get(); - createRepository(repoName, "fs", repoPath); - - final String indexName = "test-index"; - createIndex(indexName); - - createFullSnapshot(repoName, "snapshot-1"); - - // In the old metadata version the shard level metadata could be moved to the next generation for all sorts of reasons, this should - // not break subsequent repository operations - logger.info("--> move shard level metadata to new generation"); - final IndexId indexId = getRepositoryData(repoName).resolveIndexId(indexName); - final Path shardPath = repoPath.resolve("indices").resolve(indexId.getId()).resolve("0"); - final Path initialShardMetaPath = shardPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + "0"); - assertFileExists(initialShardMetaPath); - Files.move(initialShardMetaPath, shardPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + "1")); - - startDeleteSnapshot(repoName, oldVersionSnapshot).get(); - - createFullSnapshot(repoName, "snapshot-2"); - } - - public void testRepairBrokenShardGenerations() throws Exception { - final String repoName = "test-repo"; - final Path repoPath = randomRepoPath(); - createRepository(repoName, "fs", repoPath); - final String oldVersionSnapshot = initWithSnapshotVersion(repoName, repoPath, SnapshotsService.OLD_SNAPSHOT_FORMAT); - - logger.info("--> recreating repository to clear caches"); - client().admin().cluster().prepareDeleteRepository(repoName).get(); - createRepository(repoName, "fs", repoPath); - - final String indexName = "test-index"; - createIndex(indexName); - - createFullSnapshot(repoName, "snapshot-1"); - - startDeleteSnapshot(repoName, oldVersionSnapshot).get(); - - logger.info("--> move shard level metadata to new generation and make RepositoryData point at an older generation"); - final IndexId indexId = getRepositoryData(repoName).resolveIndexId(indexName); - final Path shardPath = repoPath.resolve("indices").resolve(indexId.getId()).resolve("0"); - final Path initialShardMetaPath = shardPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + "0"); - assertFileExists(initialShardMetaPath); - Files.move(initialShardMetaPath, shardPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + randomIntBetween(1, 1000))); - - final RepositoryData repositoryData1 = getRepositoryData(repoName); - final Map snapshotIds = repositoryData1.getSnapshotIds() - .stream() - .collect(Collectors.toMap(SnapshotId::getUUID, Function.identity())); - final RepositoryData brokenRepoData = new RepositoryData( - repositoryData1.getGenId(), - snapshotIds, - snapshotIds.values().stream().collect(Collectors.toMap(SnapshotId::getUUID, repositoryData1::getSnapshotState)), - snapshotIds.values().stream().collect(Collectors.toMap(SnapshotId::getUUID, repositoryData1::getVersion)), - repositoryData1.getIndices().values().stream().collect(Collectors.toMap(Function.identity(), repositoryData1::getSnapshots)), - ShardGenerations.builder().putAll(repositoryData1.shardGenerations()).put(indexId, 0, "0").build(), - repositoryData1.indexMetaDataGenerations() - ); - Files.write( - repoPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + repositoryData1.getGenId()), - BytesReference.toBytes( - BytesReference.bytes(brokenRepoData.snapshotsToXContent(XContentFactory.jsonBuilder(), Version.CURRENT)) - ), - StandardOpenOption.TRUNCATE_EXISTING - ); - - logger.info("--> recreating repository to clear caches"); - client().admin().cluster().prepareDeleteRepository(repoName).get(); - createRepository(repoName, "fs", repoPath); - - createFullSnapshot(repoName, "snapshot-2"); - } - /** * Tests that a shard snapshot with a corrupted shard index file can still be used for restore and incremental snapshots. */ diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/delete/DeleteSnapshotRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/delete/DeleteSnapshotRequest.java index 61c4fdb9d5c14..832b37050ffe6 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/delete/DeleteSnapshotRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/delete/DeleteSnapshotRequest.java @@ -36,7 +36,6 @@ import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; -import org.opensearch.snapshots.SnapshotsService; import java.io.IOException; @@ -84,27 +83,14 @@ public DeleteSnapshotRequest(String repository) { public DeleteSnapshotRequest(StreamInput in) throws IOException { super(in); repository = in.readString(); - if (in.getVersion().onOrAfter(SnapshotsService.MULTI_DELETE_VERSION)) { - snapshots = in.readStringArray(); - } else { - snapshots = new String[] { in.readString() }; - } + snapshots = in.readStringArray(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(repository); - if (out.getVersion().onOrAfter(SnapshotsService.MULTI_DELETE_VERSION)) { - out.writeStringArray(snapshots); - } else { - if (snapshots.length != 1) { - throw new IllegalArgumentException( - "Can't write snapshot delete with more than one snapshot to version [" + out.getVersion() + "]" - ); - } - out.writeString(snapshots[0]); - } + out.writeStringArray(snapshots); } @Override diff --git a/server/src/main/java/org/opensearch/cluster/SnapshotDeletionsInProgress.java b/server/src/main/java/org/opensearch/cluster/SnapshotDeletionsInProgress.java index 0f95890e56ec2..e4ce0a8f99e02 100644 --- a/server/src/main/java/org/opensearch/cluster/SnapshotDeletionsInProgress.java +++ b/server/src/main/java/org/opensearch/cluster/SnapshotDeletionsInProgress.java @@ -34,7 +34,6 @@ import org.opensearch.Version; import org.opensearch.cluster.ClusterState.Custom; -import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.UUIDs; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; @@ -42,9 +41,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.XContentBuilder; import org.opensearch.repositories.RepositoryOperation; -import org.opensearch.snapshots.Snapshot; import org.opensearch.snapshots.SnapshotId; -import org.opensearch.snapshots.SnapshotsService; import java.io.IOException; import java.util.ArrayList; @@ -245,23 +242,12 @@ private Entry(List snapshots, String repoName, long startTime, long } public Entry(StreamInput in) throws IOException { - if (in.getVersion().onOrAfter(SnapshotsService.MULTI_DELETE_VERSION)) { - this.repoName = in.readString(); - this.snapshots = in.readList(SnapshotId::new); - } else { - final Snapshot snapshot = new Snapshot(in); - this.snapshots = Collections.singletonList(snapshot.getSnapshotId()); - this.repoName = snapshot.getRepository(); - } + this.repoName = in.readString(); + this.snapshots = in.readList(SnapshotId::new); this.startTime = in.readVLong(); this.repositoryStateId = in.readLong(); - if (in.getVersion().onOrAfter(SnapshotsService.FULL_CONCURRENCY_VERSION)) { - this.state = State.readFrom(in); - this.uuid = in.readString(); - } else { - this.state = State.STARTED; - this.uuid = IndexMetadata.INDEX_UUID_NA_VALUE; - } + this.state = State.readFrom(in); + this.uuid = in.readString(); } public Entry started() { @@ -343,22 +329,12 @@ public int hashCode() { @Override public void writeTo(StreamOutput out) throws IOException { - if (out.getVersion().onOrAfter(SnapshotsService.MULTI_DELETE_VERSION)) { - out.writeString(repoName); - out.writeCollection(snapshots); - } else { - assert snapshots.size() == 1 : "Only single deletion allowed in mixed version cluster containing [" - + out.getVersion() - + "] but saw " - + snapshots; - new Snapshot(repoName, snapshots.get(0)).writeTo(out); - } + out.writeString(repoName); + out.writeCollection(snapshots); out.writeVLong(startTime); out.writeLong(repositoryStateId); - if (out.getVersion().onOrAfter(SnapshotsService.FULL_CONCURRENCY_VERSION)) { - state.writeTo(out); - out.writeString(uuid); - } + state.writeTo(out); + out.writeString(uuid); } @Override diff --git a/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java index 4cbf0cfe70adb..f77b3e3b2a904 100644 --- a/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java @@ -35,7 +35,6 @@ import com.carrotsearch.hppc.ObjectContainer; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; -import org.opensearch.LegacyESVersion; import org.opensearch.Version; import org.opensearch.cluster.ClusterState.Custom; import org.opensearch.common.Nullable; @@ -54,7 +53,6 @@ import org.opensearch.snapshots.InFlightShardSnapshotStates; import org.opensearch.snapshots.Snapshot; import org.opensearch.snapshots.SnapshotId; -import org.opensearch.snapshots.SnapshotsService; import java.io.IOException; import java.util.Collections; @@ -66,8 +64,6 @@ import java.util.Set; import java.util.stream.Collectors; -import static org.opensearch.snapshots.SnapshotInfo.DATA_STREAMS_IN_SNAPSHOT; - /** * Meta data about snapshots that are currently executing * @@ -77,8 +73,6 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement public static final SnapshotsInProgress EMPTY = new SnapshotsInProgress(Collections.emptyList()); - private static final Version VERSION_IN_SNAPSHOT_VERSION = LegacyESVersion.V_7_7_0; - public static final String TYPE = "snapshots"; public static final String ABORTED_FAILURE_TEXT = "Snapshot was aborted by deletion"; @@ -296,28 +290,10 @@ private Entry(StreamInput in) throws IOException { repositoryStateId = in.readLong(); failure = in.readOptionalString(); userMetadata = in.readMap(); - if (in.getVersion().onOrAfter(VERSION_IN_SNAPSHOT_VERSION)) { - version = Version.readVersion(in); - } else if (in.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) { - // If an older cluster-manager informs us that shard generations are supported - // we use the minimum shard generation compatible version. - // If shard generations are not supported yet we use a placeholder for a version that does not use shard generations. - version = in.readBoolean() ? SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION : SnapshotsService.OLD_SNAPSHOT_FORMAT; - } else { - version = SnapshotsService.OLD_SNAPSHOT_FORMAT; - } - if (in.getVersion().onOrAfter(DATA_STREAMS_IN_SNAPSHOT)) { - dataStreams = in.readStringList(); - } else { - dataStreams = Collections.emptyList(); - } - if (in.getVersion().onOrAfter(SnapshotsService.CLONE_SNAPSHOT_VERSION)) { - source = in.readOptionalWriteable(SnapshotId::new); - clones = in.readImmutableMap(RepositoryShardId::new, ShardSnapshotStatus::readFrom); - } else { - source = null; - clones = ImmutableOpenMap.of(); - } + version = Version.readVersion(in); + dataStreams = in.readStringList(); + source = in.readOptionalWriteable(SnapshotId::new); + clones = in.readImmutableMap(RepositoryShardId::new, ShardSnapshotStatus::readFrom); } private static boolean assertShardsConsistent( @@ -732,18 +708,10 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(repositoryStateId); out.writeOptionalString(failure); out.writeMap(userMetadata); - if (out.getVersion().onOrAfter(VERSION_IN_SNAPSHOT_VERSION)) { - Version.writeVersion(version, out); - } else if (out.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) { - out.writeBoolean(SnapshotsService.useShardGenerations(version)); - } - if (out.getVersion().onOrAfter(DATA_STREAMS_IN_SNAPSHOT)) { - out.writeStringCollection(dataStreams); - } - if (out.getVersion().onOrAfter(SnapshotsService.CLONE_SNAPSHOT_VERSION)) { - out.writeOptionalWriteable(source); - out.writeMap(clones); - } + Version.writeVersion(version, out); + out.writeStringCollection(dataStreams); + out.writeOptionalWriteable(source); + out.writeMap(clones); } @Override @@ -840,12 +808,7 @@ private boolean assertConsistent() { public static ShardSnapshotStatus readFrom(StreamInput in) throws IOException { String nodeId = in.readOptionalString(); final ShardState state = ShardState.fromValue(in.readByte()); - final String generation; - if (SnapshotsService.useShardGenerations(in.getVersion())) { - generation = in.readOptionalString(); - } else { - generation = null; - } + final String generation = in.readOptionalString(); final String reason = in.readOptionalString(); if (state == ShardState.QUEUED) { return UNASSIGNED_QUEUED; @@ -884,9 +847,7 @@ public boolean isActive() { public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(nodeId); out.writeByte(state.value); - if (SnapshotsService.useShardGenerations(out.getVersion())) { - out.writeOptionalString(generation); - } + out.writeOptionalString(generation); out.writeOptionalString(reason); } diff --git a/server/src/main/java/org/opensearch/repositories/IndexMetaDataGenerations.java b/server/src/main/java/org/opensearch/repositories/IndexMetaDataGenerations.java index afabf4ebfdb58..e8f96bb313dd1 100644 --- a/server/src/main/java/org/opensearch/repositories/IndexMetaDataGenerations.java +++ b/server/src/main/java/org/opensearch/repositories/IndexMetaDataGenerations.java @@ -92,10 +92,7 @@ public String getIndexMetaBlobId(String metaIdentifier) { } /** - * Get the blob id by {@link SnapshotId} and {@link IndexId} and fall back to the value of {@link SnapshotId#getUUID()} if none is - * known to enable backwards compatibility with versions older than - * {@link org.opensearch.snapshots.SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION} which used the snapshot uuid as index metadata - * blob uuid. + * Get the blob id by {@link SnapshotId} and {@link IndexId}. * * @param snapshotId Snapshot Id * @param indexId Index Id @@ -103,11 +100,7 @@ public String getIndexMetaBlobId(String metaIdentifier) { */ public String indexMetaBlobId(SnapshotId snapshotId, IndexId indexId) { final String identifier = lookup.getOrDefault(snapshotId, Collections.emptyMap()).get(indexId); - if (identifier == null) { - return snapshotId.getUUID(); - } else { - return identifiers.get(identifier); - } + return identifiers.get(identifier); } /** diff --git a/server/src/main/java/org/opensearch/repositories/RepositoryData.java b/server/src/main/java/org/opensearch/repositories/RepositoryData.java index e8132801e4238..524a978bcf1d1 100644 --- a/server/src/main/java/org/opensearch/repositories/RepositoryData.java +++ b/server/src/main/java/org/opensearch/repositories/RepositoryData.java @@ -42,7 +42,6 @@ import org.opensearch.common.xcontent.XContentParserUtils; import org.opensearch.snapshots.SnapshotId; import org.opensearch.snapshots.SnapshotState; -import org.opensearch.snapshots.SnapshotsService; import java.io.IOException; import java.util.ArrayList; @@ -238,7 +237,6 @@ public SnapshotState getSnapshotState(final SnapshotId snapshotId) { /** * Returns the {@link Version} for the given snapshot or {@code null} if unknown. */ - @Nullable public Version getVersion(SnapshotId snapshotId) { return snapshotVersions.get(snapshotId.getUUID()); } @@ -551,8 +549,6 @@ public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final builder.startObject(); // write the snapshots list builder.startArray(SNAPSHOTS); - final boolean shouldWriteIndexGens = SnapshotsService.useIndexGenerations(repoMetaVersion); - final boolean shouldWriteShardGens = SnapshotsService.useShardGenerations(repoMetaVersion); for (final SnapshotId snapshot : getSnapshotIds()) { builder.startObject(); builder.field(NAME, snapshot.getName()); @@ -562,14 +558,12 @@ public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final if (state != null) { builder.field(STATE, state.value()); } - if (shouldWriteIndexGens) { - builder.startObject(INDEX_METADATA_LOOKUP); - for (Map.Entry entry : indexMetaDataGenerations.lookup.getOrDefault(snapshot, Collections.emptyMap()) - .entrySet()) { - builder.field(entry.getKey().getId(), entry.getValue()); - } - builder.endObject(); + builder.startObject(INDEX_METADATA_LOOKUP); + for (Map.Entry entry : indexMetaDataGenerations.lookup.getOrDefault(snapshot, Collections.emptyMap()) + .entrySet()) { + builder.field(entry.getKey().getId(), entry.getValue()); } + builder.endObject(); final Version version = snapshotVersions.get(snapshotUUID); if (version != null) { builder.field(VERSION, version.toString()); @@ -589,23 +583,15 @@ public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final builder.value(snapshotId.getUUID()); } builder.endArray(); - if (shouldWriteShardGens) { - builder.startArray(SHARD_GENERATIONS); - for (String gen : shardGenerations.getGens(indexId)) { - builder.value(gen); - } - builder.endArray(); + builder.startArray(SHARD_GENERATIONS); + for (String gen : shardGenerations.getGens(indexId)) { + builder.value(gen); } + builder.endArray(); builder.endObject(); } builder.endObject(); - if (shouldWriteIndexGens) { - builder.field(MIN_VERSION, SnapshotsService.INDEX_GEN_IN_REPO_DATA_VERSION.toString()); - builder.field(INDEX_METADATA_IDENTIFIERS, indexMetaDataGenerations.identifiers); - } else if (shouldWriteShardGens) { - // Add min version field to make it impossible for older OpenSearch versions to deserialize this object - builder.field(MIN_VERSION, SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.toString()); - } + builder.field(INDEX_METADATA_IDENTIFIERS, indexMetaDataGenerations.identifiers); builder.endObject(); return builder; } @@ -616,12 +602,8 @@ public IndexMetaDataGenerations indexMetaDataGenerations() { /** * Reads an instance of {@link RepositoryData} from x-content, loading the snapshots and indices metadata. - * - * @param fixBrokenShardGens set to {@code true} to filter out broken shard generations read from the {@code parser} via - * {@link ShardGenerations#fixShardGeneration}. Used to disable fixing broken generations when reading - * from cached bytes that we trust to not contain broken generations. */ - public static RepositoryData snapshotsFromXContent(XContentParser parser, long genId, boolean fixBrokenShardGens) throws IOException { + public static RepositoryData snapshotsFromXContent(XContentParser parser, long genId) throws IOException { XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); final Map snapshots = new HashMap<>(); @@ -639,16 +621,16 @@ public static RepositoryData snapshotsFromXContent(XContentParser parser, long g parseSnapshots(parser, snapshots, snapshotStates, snapshotVersions, indexMetaLookup); break; case INDICES: - parseIndices(parser, fixBrokenShardGens, snapshots, indexSnapshots, indexLookup, shardGenerations); + parseIndices(parser, snapshots, indexSnapshots, indexLookup, shardGenerations); break; case INDEX_METADATA_IDENTIFIERS: XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); indexMetaIdentifiers = parser.mapStrings(); break; case MIN_VERSION: - XContentParserUtils.ensureExpectedToken(XContentParser.Token.VALUE_STRING, parser.nextToken(), parser); - final Version version = Version.fromString(parser.text()); - assert SnapshotsService.useShardGenerations(version); + // ignore min_version + // todo: remove in next version + parser.nextToken(); break; default: XContentParserUtils.throwUnknownField(field, parser.getTokenLocation()); @@ -763,7 +745,6 @@ private static void parseSnapshots( * {@code shardGenerations}. * * @param parser x-content parser - * @param fixBrokenShardGens whether or not to fix broken shard generation (see {@link #snapshotsFromXContent} for details) * @param snapshots map of snapshot uuid to {@link SnapshotId} that was populated by {@link #parseSnapshots} * @param indexSnapshots map of {@link IndexId} to list of {@link SnapshotId} that contain the given index * @param indexLookup map of index uuid (as returned by {@link IndexId#getId}) to {@link IndexId} @@ -771,7 +752,6 @@ private static void parseSnapshots( */ private static void parseIndices( XContentParser parser, - boolean fixBrokenShardGens, Map snapshots, Map> indexSnapshots, Map indexLookup, @@ -835,9 +815,6 @@ private static void parseIndices( indexLookup.put(indexId.getId(), indexId); for (int i = 0; i < gens.size(); i++) { String parsedGen = gens.get(i); - if (fixBrokenShardGens) { - parsedGen = ShardGenerations.fixShardGeneration(parsedGen); - } if (parsedGen != null) { shardGenerations.put(indexId, i, parsedGen); } diff --git a/server/src/main/java/org/opensearch/repositories/ShardGenerations.java b/server/src/main/java/org/opensearch/repositories/ShardGenerations.java index 27eac4dfdd8c1..d918eb7e1e476 100644 --- a/server/src/main/java/org/opensearch/repositories/ShardGenerations.java +++ b/server/src/main/java/org/opensearch/repositories/ShardGenerations.java @@ -33,7 +33,6 @@ package org.opensearch.repositories; import org.opensearch.common.Nullable; -import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import java.util.Arrays; import java.util.Collection; @@ -44,7 +43,6 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.regex.Pattern; import java.util.stream.Collectors; /** @@ -74,24 +72,6 @@ private ShardGenerations(Map> shardGenerations) { this.shardGenerations = shardGenerations; } - private static final Pattern IS_NUMBER = Pattern.compile("^\\d+$"); - - /** - * Filters out unreliable numeric shard generations read from {@link RepositoryData} or {@link IndexShardSnapshotStatus}, returning - * {@code null} in their place. - * @see Issue #57988 - * - * @param shardGeneration shard generation to fix - * @return given shard generation or {@code null} if it was filtered out or {@code null} was passed - */ - @Nullable - public static String fixShardGeneration(@Nullable String shardGeneration) { - if (shardGeneration == null) { - return null; - } - return IS_NUMBER.matcher(shardGeneration).matches() ? null : shardGeneration; - } - /** * Returns the total number of shards tracked by this instance. */ @@ -145,8 +125,7 @@ public Map> obsoleteShardGenerations(ShardGenerati *
    *
  • {@link #DELETED_SHARD_GEN} a deleted shard that isn't referenced by any snapshot in the repository any longer
  • *
  • {@link #NEW_SHARD_GEN} a new shard that we know doesn't hold any valid data yet in the repository
  • - *
  • {@code null} unknown state. The shard either does not exist at all or it was created by a node older than - * {@link org.opensearch.snapshots.SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION}. If a caller expects a shard to exist in the + *
  • {@code null} unknown state. The shard does not exist at all. If a caller expects a shard to exist in the * repository but sees a {@code null} return, it should try to recover the generation by falling back to listing the contents * of the respective shard directory.
  • *
diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index bf06191bdc8d3..1000fe4078b48 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -818,66 +818,46 @@ private void doDeleteShardSnapshots( Version repoMetaVersion, ActionListener listener ) { - - if (SnapshotsService.useShardGenerations(repoMetaVersion)) { - // First write the new shard state metadata (with the removed snapshot) and compute deletion targets - final StepListener> writeShardMetaDataAndComputeDeletesStep = new StepListener<>(); - writeUpdatedShardMetaDataAndComputeDeletes(snapshotIds, repositoryData, true, writeShardMetaDataAndComputeDeletesStep); - // Once we have put the new shard-level metadata into place, we can update the repository metadata as follows: - // 1. Remove the snapshots from the list of existing snapshots - // 2. Update the index shard generations of all updated shard folders - // - // Note: If we fail updating any of the individual shard paths, none of them are changed since the newly created - // index-${gen_uuid} will not be referenced by the existing RepositoryData and new RepositoryData is only - // written if all shard paths have been successfully updated. - final StepListener writeUpdatedRepoDataStep = new StepListener<>(); - writeShardMetaDataAndComputeDeletesStep.whenComplete(deleteResults -> { - final ShardGenerations.Builder builder = ShardGenerations.builder(); - for (ShardSnapshotMetaDeleteResult newGen : deleteResults) { - builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration); - } - final RepositoryData updatedRepoData = repositoryData.removeSnapshots(snapshotIds, builder.build()); - writeIndexGen( - updatedRepoData, - repositoryStateId, - repoMetaVersion, - Function.identity(), - ActionListener.wrap(writeUpdatedRepoDataStep::onResponse, listener::onFailure) - ); - }, listener::onFailure); - // Once we have updated the repository, run the clean-ups - writeUpdatedRepoDataStep.whenComplete(updatedRepoData -> { - // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion - final ActionListener afterCleanupsListener = new GroupedActionListener<>( - ActionListener.wrap(() -> listener.onResponse(updatedRepoData)), - 2 - ); - cleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener); - asyncCleanupUnlinkedShardLevelBlobs( - repositoryData, - snapshotIds, - writeShardMetaDataAndComputeDeletesStep.result(), - afterCleanupsListener - ); - }, listener::onFailure); - } else { - // Write the new repository data first (with the removed snapshot), using no shard generations - final RepositoryData updatedRepoData = repositoryData.removeSnapshots(snapshotIds, ShardGenerations.EMPTY); - writeIndexGen(updatedRepoData, repositoryStateId, repoMetaVersion, Function.identity(), ActionListener.wrap(newRepoData -> { - // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion - final ActionListener afterCleanupsListener = new GroupedActionListener<>( - ActionListener.wrap(() -> listener.onResponse(newRepoData)), - 2 - ); - cleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, newRepoData, afterCleanupsListener); - final StepListener> writeMetaAndComputeDeletesStep = new StepListener<>(); - writeUpdatedShardMetaDataAndComputeDeletes(snapshotIds, repositoryData, false, writeMetaAndComputeDeletesStep); - writeMetaAndComputeDeletesStep.whenComplete( - deleteResults -> asyncCleanupUnlinkedShardLevelBlobs(repositoryData, snapshotIds, deleteResults, afterCleanupsListener), - afterCleanupsListener::onFailure - ); - }, listener::onFailure)); - } + // First write the new shard state metadata (with the removed snapshot) and compute deletion targets + final StepListener> writeShardMetaDataAndComputeDeletesStep = new StepListener<>(); + writeUpdatedShardMetaDataAndComputeDeletes(snapshotIds, repositoryData, true, writeShardMetaDataAndComputeDeletesStep); + // Once we have put the new shard-level metadata into place, we can update the repository metadata as follows: + // 1. Remove the snapshots from the list of existing snapshots + // 2. Update the index shard generations of all updated shard folders + // + // Note: If we fail updating any of the individual shard paths, none of them are changed since the newly created + // index-${gen_uuid} will not be referenced by the existing RepositoryData and new RepositoryData is only + // written if all shard paths have been successfully updated. + final StepListener writeUpdatedRepoDataStep = new StepListener<>(); + writeShardMetaDataAndComputeDeletesStep.whenComplete(deleteResults -> { + final ShardGenerations.Builder builder = ShardGenerations.builder(); + for (ShardSnapshotMetaDeleteResult newGen : deleteResults) { + builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration); + } + final RepositoryData updatedRepoData = repositoryData.removeSnapshots(snapshotIds, builder.build()); + writeIndexGen( + updatedRepoData, + repositoryStateId, + repoMetaVersion, + Function.identity(), + ActionListener.wrap(writeUpdatedRepoDataStep::onResponse, listener::onFailure) + ); + }, listener::onFailure); + // Once we have updated the repository, run the clean-ups + writeUpdatedRepoDataStep.whenComplete(updatedRepoData -> { + // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion + final ActionListener afterCleanupsListener = new GroupedActionListener<>( + ActionListener.wrap(() -> listener.onResponse(updatedRepoData)), + 2 + ); + cleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener); + asyncCleanupUnlinkedShardLevelBlobs( + repositoryData, + snapshotIds, + writeShardMetaDataAndComputeDeletesStep.result(), + afterCleanupsListener + ); + }, listener::onFailure); } private void cleanupUnlinkedRootAndIndicesBlobs( @@ -1365,31 +1345,19 @@ public void finalizeSnapshot( final Collection indices = shardGenerations.indices(); final SnapshotId snapshotId = snapshotInfo.snapshotId(); // Once we are done writing the updated index-N blob we remove the now unreferenced index-${uuid} blobs in each shard - // directory if all nodes are at least at version SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION - // If there are older version nodes in the cluster, we don't need to run this cleanup as it will have already happened - // when writing the index-${N} to each shard directory. - final boolean writeShardGens = SnapshotsService.useShardGenerations(repositoryMetaVersion); + // directory final Consumer onUpdateFailure = e -> listener.onFailure( new SnapshotException(metadata.name(), snapshotId, "failed to update snapshot in repository", e) ); final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); - final boolean writeIndexGens = SnapshotsService.useIndexGenerations(repositoryMetaVersion); - final StepListener repoDataListener = new StepListener<>(); getRepositoryData(repoDataListener); repoDataListener.whenComplete(existingRepositoryData -> { - final Map indexMetas; - final Map indexMetaIdentifiers; - if (writeIndexGens) { - indexMetaIdentifiers = ConcurrentCollections.newConcurrentMap(); - indexMetas = ConcurrentCollections.newConcurrentMap(); - } else { - indexMetas = null; - indexMetaIdentifiers = null; - } + final Map indexMetas = ConcurrentCollections.newConcurrentMap(); + final Map indexMetaIdentifiers = ConcurrentCollections.newConcurrentMap(); final ActionListener allMetaListener = new GroupedActionListener<>(ActionListener.wrap(v -> { final RepositoryData updatedRepositoryData = existingRepositoryData.addSnapshot( @@ -1406,9 +1374,7 @@ public void finalizeSnapshot( repositoryMetaVersion, stateTransformer, ActionListener.wrap(newRepoData -> { - if (writeShardGens) { - cleanupOldShardGens(existingRepositoryData, updatedRepositoryData); - } + cleanupOldShardGens(existingRepositoryData, updatedRepositoryData); listener.onResponse(newRepoData); }, onUpdateFailure) ); @@ -1432,24 +1398,15 @@ public void finalizeSnapshot( for (IndexId index : indices) { executor.execute(ActionRunnable.run(allMetaListener, () -> { final IndexMetadata indexMetaData = clusterMetadata.index(index.getName()); - if (writeIndexGens) { - final String identifiers = IndexMetaDataGenerations.buildUniqueIdentifier(indexMetaData); - String metaUUID = existingRepositoryData.indexMetaDataGenerations().getIndexMetaBlobId(identifiers); - if (metaUUID == null) { - // We don't yet have this version of the metadata so we write it - metaUUID = UUIDs.base64UUID(); - INDEX_METADATA_FORMAT.write(indexMetaData, indexContainer(index), metaUUID, compress); - indexMetaIdentifiers.put(identifiers, metaUUID); - } - indexMetas.put(index, identifiers); - } else { - INDEX_METADATA_FORMAT.write( - clusterMetadata.index(index.getName()), - indexContainer(index), - snapshotId.getUUID(), - compress - ); + final String identifiers = IndexMetaDataGenerations.buildUniqueIdentifier(indexMetaData); + String metaUUID = existingRepositoryData.indexMetaDataGenerations().getIndexMetaBlobId(identifiers); + if (metaUUID == null) { + // We don't yet have this version of the metadata so we write it + metaUUID = UUIDs.base64UUID(); + INDEX_METADATA_FORMAT.write(indexMetaData, indexContainer(index), metaUUID, compress); + indexMetaIdentifiers.put(identifiers, metaUUID); } + indexMetas.put(index, identifiers); })); } executor.execute( @@ -1774,8 +1731,7 @@ private RepositoryData repositoryDataFromCachedEntry(Tuple try (InputStream input = CompressorFactory.COMPRESSOR.threadLocalInputStream(cacheEntry.v2().streamInput())) { return RepositoryData.snapshotsFromXContent( XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, input), - cacheEntry.v1(), - false + cacheEntry.v1() ); } } @@ -1870,7 +1826,7 @@ private RepositoryData getRepositoryData(long indexGen) { XContentParser parser = XContentType.JSON.xContent() .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, blob) ) { - return RepositoryData.snapshotsFromXContent(parser, indexGen, true); + return RepositoryData.snapshotsFromXContent(parser, indexGen); } } catch (IOException ioe) { if (bestEffortConsistency) { @@ -2454,7 +2410,6 @@ public void snapshotShard( ); final String indexGeneration; - final boolean writeShardGens = SnapshotsService.useShardGenerations(repositoryMetaVersion); // build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones List newSnapshotsList = new ArrayList<>(); newSnapshotsList.add(new SnapshotFiles(snapshotId.getName(), indexCommitPointFiles, shardStateIdentifier)); @@ -2463,71 +2418,24 @@ public void snapshotShard( } final BlobStoreIndexShardSnapshots updatedBlobStoreIndexShardSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList); final Runnable afterWriteSnapBlob; - if (writeShardGens) { - // When using shard generations we can safely write the index-${uuid} blob before writing out any of the actual data - // for this shard since the uuid named blob will simply not be referenced in case of error and thus we will never - // reference a generation that has not had all its files fully upload. - indexGeneration = UUIDs.randomBase64UUID(); - try { - INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedBlobStoreIndexShardSnapshots, shardContainer, indexGeneration, compress); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException( - shardId, - "Failed to write shard level snapshot metadata for [" - + snapshotId - + "] to [" - + INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(indexGeneration) - + "]", - e - ); - } - afterWriteSnapBlob = () -> {}; - } else { - // When not using shard generations we can only write the index-${N} blob after all other work for this shard has - // completed. - // Also, in case of numeric shard generations the data node has to take care of deleting old shard generations. - final long newGen = Long.parseLong(fileListGeneration) + 1; - indexGeneration = Long.toString(newGen); - // Delete all previous index-N blobs - final List blobsToDelete = blobs.stream() - .filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX)) - .collect(Collectors.toList()); - assert blobsToDelete.stream() - .mapToLong(b -> Long.parseLong(b.replaceFirst(SNAPSHOT_INDEX_PREFIX, ""))) - .max() - .orElse(-1L) < Long.parseLong(indexGeneration) : "Tried to delete an index-N blob newer than the current generation [" - + indexGeneration - + "] when deleting index-N blobs " - + blobsToDelete; - afterWriteSnapBlob = () -> { - try { - writeShardIndexBlobAtomic(shardContainer, newGen, updatedBlobStoreIndexShardSnapshots); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException( - shardId, - "Failed to finalize snapshot creation [" - + snapshotId - + "] with shard index [" - + INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(indexGeneration) - + "]", - e - ); - } - try { - deleteFromContainer(shardContainer, blobsToDelete); - } catch (IOException e) { - logger.warn( - () -> new ParameterizedMessage( - "[{}][{}] failed to delete old index-N blobs during finalization", - snapshotId, - shardId - ), - e - ); - } - }; + // When using shard generations we can safely write the index-${uuid} blob before writing out any of the actual data + // for this shard since the uuid named blob will simply not be referenced in case of error and thus we will never + // reference a generation that has not had all its files fully upload. + indexGeneration = UUIDs.randomBase64UUID(); + try { + INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedBlobStoreIndexShardSnapshots, shardContainer, indexGeneration, compress); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException( + shardId, + "Failed to write shard level snapshot metadata for [" + + snapshotId + + "] to [" + + INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(indexGeneration) + + "]", + e + ); } - + afterWriteSnapBlob = () -> {}; final StepListener> allFilesUploadedListener = new StepListener<>(); allFilesUploadedListener.whenComplete(v -> { final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize(snapshotIndexCommit.getGeneration()); @@ -2970,9 +2878,7 @@ public BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContaine * the given list of blobs in the shard container. * * @param blobs list of blobs in repository - * @param generation shard generation or {@code null} in case there was no shard generation tracked in the {@link RepositoryData} for - * this shard because its snapshot was created in a version older than - * {@link SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION}. + * @param generation shard generation * @return tuple of BlobStoreIndexShardSnapshots and the last snapshot index generation */ private Tuple buildBlobStoreIndexShardSnapshots( diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java b/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java index 38d9df0e960e0..94ab875091caf 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java @@ -31,7 +31,6 @@ package org.opensearch.snapshots; -import org.opensearch.LegacyESVersion; import org.opensearch.Version; import org.opensearch.action.ShardOperationFailedException; import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest; @@ -69,8 +68,6 @@ */ public final class SnapshotInfo implements Comparable, ToXContent, Writeable { - public static final Version DATA_STREAMS_IN_SNAPSHOT = LegacyESVersion.V_7_9_0; - public static final String CONTEXT_MODE_PARAM = "context_mode"; public static final String CONTEXT_MODE_SNAPSHOT = "SNAPSHOT"; private static final DateFormatter DATE_TIME_FORMATTER = DateFormatter.forPattern("strict_date_optional_time"); @@ -401,11 +398,7 @@ public SnapshotInfo(final StreamInput in) throws IOException { version = in.readBoolean() ? Version.readVersion(in) : null; includeGlobalState = in.readOptionalBoolean(); userMetadata = in.readMap(); - if (in.getVersion().onOrAfter(DATA_STREAMS_IN_SNAPSHOT)) { - dataStreams = in.readStringList(); - } else { - dataStreams = Collections.emptyList(); - } + dataStreams = in.readStringList(); } /** @@ -836,9 +829,7 @@ public void writeTo(final StreamOutput out) throws IOException { } out.writeOptionalBoolean(includeGlobalState); out.writeMap(userMetadata); - if (out.getVersion().onOrAfter(DATA_STREAMS_IN_SNAPSHOT)) { - out.writeStringCollection(dataStreams); - } + out.writeStringCollection(dataStreams); } private static SnapshotState snapshotState(final String reason, final List shardFailures) { diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java index 22a85558e3b1d..86dbc31245b11 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java @@ -67,7 +67,6 @@ import org.opensearch.repositories.IndexId; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; -import org.opensearch.repositories.ShardGenerations; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportException; import org.opensearch.transport.TransportRequestDeduplicator; @@ -276,11 +275,6 @@ private void startNewShards(SnapshotsInProgress.Entry entry, Map() { @Override public void onResponse(String newGeneration) { diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index e53c2889f88e6..0123c839e4ad9 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -38,7 +38,6 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.ExceptionsHelper; -import org.opensearch.LegacyESVersion; import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; @@ -141,18 +140,6 @@ */ public class SnapshotsService extends AbstractLifecycleComponent implements ClusterStateApplier { - public static final Version FULL_CONCURRENCY_VERSION = LegacyESVersion.V_7_9_0; - - public static final Version CLONE_SNAPSHOT_VERSION = LegacyESVersion.V_7_10_0; - - public static final Version SHARD_GEN_IN_REPO_DATA_VERSION = LegacyESVersion.V_7_6_0; - - public static final Version INDEX_GEN_IN_REPO_DATA_VERSION = LegacyESVersion.V_7_9_0; - - public static final Version OLD_SNAPSHOT_FORMAT = LegacyESVersion.fromId(7050099); - - public static final Version MULTI_DELETE_VERSION = LegacyESVersion.V_7_8_0; - private static final Logger logger = LogManager.getLogger(SnapshotsService.class); public static final String UPDATE_SNAPSHOT_STATUS_ACTION_NAME = "internal:cluster/snapshot/update_snapshot_status"; @@ -168,9 +155,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus private final Map>>> snapshotCompletionListeners = new ConcurrentHashMap<>(); - // Set of snapshots that are currently being initialized by this node - private final Set initializingSnapshots = Collections.synchronizedSet(new HashSet<>()); - /** * Listeners for snapshot deletion keyed by delete uuid as returned from {@link SnapshotDeletionsInProgress.Entry#uuid()} */ @@ -285,18 +269,10 @@ public ClusterState execute(ClusterState currentState) { final List runningSnapshots = snapshots.entries(); ensureSnapshotNameNotRunning(runningSnapshots, repositoryName, snapshotName); validate(repositoryName, snapshotName, currentState); - final boolean concurrentOperationsAllowed = currentState.nodes().getMinNodeVersion().onOrAfter(FULL_CONCURRENCY_VERSION); final SnapshotDeletionsInProgress deletionsInProgress = currentState.custom( SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY ); - if (deletionsInProgress.hasDeletionsInProgress() && concurrentOperationsAllowed == false) { - throw new ConcurrentSnapshotExecutionException( - repositoryName, - snapshotName, - "cannot snapshot while a snapshot deletion is in-progress in [" + deletionsInProgress + "]" - ); - } final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom( RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY @@ -308,13 +284,6 @@ public ClusterState execute(ClusterState currentState) { "cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]" ); } - // Fail if there are any concurrently running snapshots. The only exception to this being a snapshot in INIT state from a - // previous cluster-manager that we can simply ignore and remove from the cluster state because we would clean it up from - // the - // cluster state anyway in #applyClusterState. - if (concurrentOperationsAllowed == false && runningSnapshots.stream().anyMatch(entry -> entry.state() != State.INIT)) { - throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running"); - } ensureNoCleanupInProgress(currentState, repositoryName, snapshotName); ensureBelowConcurrencyLimit(repositoryName, snapshotName, snapshots, deletionsInProgress); // Store newSnapshot here to be processed in clusterStateProcessed @@ -339,7 +308,6 @@ public ClusterState execute(ClusterState currentState) { currentState.metadata(), currentState.routingTable(), indexIds, - useShardGenerations(version), repositoryData, repositoryName ); @@ -1817,16 +1785,6 @@ public void deleteSnapshots(final DeleteSnapshotRequest request, final ActionLis @Override public ClusterState execute(ClusterState currentState) throws Exception { - final Version minNodeVersion = currentState.nodes().getMinNodeVersion(); - if (snapshotNames.length > 1 && minNodeVersion.before(MULTI_DELETE_VERSION)) { - throw new IllegalArgumentException( - "Deleting multiple snapshots in a single request is only supported in version [ " - + MULTI_DELETE_VERSION - + "] but cluster contained node of version [" - + currentState.nodes().getMinNodeVersion() - + "]" - ); - } final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); final List snapshotEntries = findInProgressSnapshots(snapshots, snapshotNames, repoName); final List snapshotIds = matchingSnapshotIds( @@ -1835,76 +1793,8 @@ public ClusterState execute(ClusterState currentState) throws Exception { snapshotNames, repoName ); - if (snapshotEntries.isEmpty() || minNodeVersion.onOrAfter(SnapshotsService.FULL_CONCURRENCY_VERSION)) { - deleteFromRepoTask = createDeleteStateUpdate(snapshotIds, repoName, repositoryData, Priority.NORMAL, listener); - return deleteFromRepoTask.execute(currentState); - } - assert snapshotEntries.size() == 1 : "Expected just a single running snapshot but saw " + snapshotEntries; - final SnapshotsInProgress.Entry snapshotEntry = snapshotEntries.get(0); - runningSnapshot = snapshotEntry.snapshot(); - final ImmutableOpenMap shards; - - final State state = snapshotEntry.state(); - final String failure; - - outstandingDeletes = new ArrayList<>(snapshotIds); - if (state != State.INIT) { - // INIT state snapshots won't ever be physically written to the repository but all other states will end up in the repo - outstandingDeletes.add(runningSnapshot.getSnapshotId()); - } - if (state == State.INIT) { - // snapshot is still initializing, mark it as aborted - shards = snapshotEntry.shards(); - assert shards.isEmpty(); - failure = "Snapshot was aborted during initialization"; - abortedDuringInit = true; - } else if (state == State.STARTED) { - // snapshot is started - mark every non completed shard as aborted - final SnapshotsInProgress.Entry abortedEntry = snapshotEntry.abort(); - shards = abortedEntry.shards(); - failure = abortedEntry.failure(); - } else { - boolean hasUncompletedShards = false; - // Cleanup in case a node gone missing and snapshot wasn't updated for some reason - for (ObjectCursor shardStatus : snapshotEntry.shards().values()) { - // Check if we still have shard running on existing nodes - if (shardStatus.value.state().completed() == false - && shardStatus.value.nodeId() != null - && currentState.nodes().get(shardStatus.value.nodeId()) != null) { - hasUncompletedShards = true; - break; - } - } - if (hasUncompletedShards) { - // snapshot is being finalized - wait for shards to complete finalization process - logger.debug("trying to delete completed snapshot - should wait for shards to finalize on all nodes"); - return currentState; - } else { - // no shards to wait for but a node is gone - this is the only case - // where we force to finish the snapshot - logger.debug("trying to delete completed snapshot with no finalizing shards - can delete immediately"); - shards = snapshotEntry.shards(); - } - failure = snapshotEntry.failure(); - } - return ClusterState.builder(currentState) - .putCustom( - SnapshotsInProgress.TYPE, - SnapshotsInProgress.of( - snapshots.entries() - .stream() - // remove init state snapshot we found from a previous cluster-manager if there was one - .filter(existing -> abortedDuringInit == false || existing.equals(snapshotEntry) == false) - .map(existing -> { - if (existing.equals(snapshotEntry)) { - return snapshotEntry.fail(shards, State.ABORTED, failure); - } - return existing; - }) - .collect(Collectors.toList()) - ) - ) - .build(); + deleteFromRepoTask = createDeleteStateUpdate(snapshotIds, repoName, repositoryData, Priority.NORMAL, listener); + return deleteFromRepoTask.execute(currentState); } @Override @@ -2053,14 +1943,6 @@ public ClusterState execute(ClusterState currentState) { SnapshotDeletionsInProgress.EMPTY ); final Version minNodeVersion = currentState.nodes().getMinNodeVersion(); - if (minNodeVersion.before(FULL_CONCURRENCY_VERSION)) { - if (deletionsInProgress.hasDeletionsInProgress()) { - throw new ConcurrentSnapshotExecutionException( - new Snapshot(repoName, snapshotIds.get(0)), - "cannot delete - another snapshot is currently being deleted in [" + deletionsInProgress + "]" - ); - } - } final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom( RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY @@ -2100,44 +1982,32 @@ public ClusterState execute(ClusterState currentState) { } // Snapshot ids that will have to be physically deleted from the repository final Set snapshotIdsRequiringCleanup = new HashSet<>(snapshotIds); - final SnapshotsInProgress updatedSnapshots; - if (minNodeVersion.onOrAfter(FULL_CONCURRENCY_VERSION)) { - updatedSnapshots = SnapshotsInProgress.of(snapshots.entries().stream().map(existing -> { - if (existing.state() == State.STARTED - && snapshotIdsRequiringCleanup.contains(existing.snapshot().getSnapshotId())) { - // snapshot is started - mark every non completed shard as aborted - final SnapshotsInProgress.Entry abortedEntry = existing.abort(); - if (abortedEntry == null) { - // No work has been done for this snapshot yet so we remove it from the cluster state directly - final Snapshot existingNotYetStartedSnapshot = existing.snapshot(); - // Adding the snapshot to #endingSnapshots since we still have to resolve its listeners to not trip - // any leaked listener assertions - if (endingSnapshots.add(existingNotYetStartedSnapshot)) { - completedNoCleanup.add(existingNotYetStartedSnapshot); - } - snapshotIdsRequiringCleanup.remove(existingNotYetStartedSnapshot.getSnapshotId()); - } else if (abortedEntry.state().completed()) { - completedWithCleanup.add(abortedEntry); + final SnapshotsInProgress updatedSnapshots = SnapshotsInProgress.of(snapshots.entries().stream().map(existing -> { + if (existing.state() == State.STARTED && snapshotIdsRequiringCleanup.contains(existing.snapshot().getSnapshotId())) { + // snapshot is started - mark every non completed shard as aborted + final SnapshotsInProgress.Entry abortedEntry = existing.abort(); + if (abortedEntry == null) { + // No work has been done for this snapshot yet so we remove it from the cluster state directly + final Snapshot existingNotYetStartedSnapshot = existing.snapshot(); + // Adding the snapshot to #endingSnapshots since we still have to resolve its listeners to not trip + // any leaked listener assertions + if (endingSnapshots.add(existingNotYetStartedSnapshot)) { + completedNoCleanup.add(existingNotYetStartedSnapshot); } - return abortedEntry; + snapshotIdsRequiringCleanup.remove(existingNotYetStartedSnapshot.getSnapshotId()); + } else if (abortedEntry.state().completed()) { + completedWithCleanup.add(abortedEntry); } - return existing; - }).filter(Objects::nonNull).collect(Collectors.toList())); - if (snapshotIdsRequiringCleanup.isEmpty()) { - // We only saw snapshots that could be removed from the cluster state right away, no need to update the deletions - return updateWithSnapshots(currentState, updatedSnapshots, null); + return abortedEntry; } - } else { - if (snapshots.entries().isEmpty() == false) { - // However other snapshots are running - cannot continue - throw new ConcurrentSnapshotExecutionException( - repoName, - snapshotIds.toString(), - "another snapshot is currently running cannot delete" - ); - } - updatedSnapshots = snapshots; + return existing; + }).filter(Objects::nonNull).collect(Collectors.toList())); + + if (snapshotIdsRequiringCleanup.isEmpty()) { + // We only saw snapshots that could be removed from the cluster state right away, no need to update the deletions + return updateWithSnapshots(currentState, updatedSnapshots, null); } + // add the snapshot deletion to the cluster state final SnapshotDeletionsInProgress.Entry replacedEntry = deletionsInProgress.getEntries() .stream() @@ -2266,41 +2136,11 @@ public Version minCompatibleVersion(Version minNodeVersion, RepositoryData repos .filter(excluded == null ? sn -> true : sn -> excluded.contains(sn) == false) .collect(Collectors.toList())) { final Version known = repositoryData.getVersion(snapshotId); - // If we don't have the version cached in the repository data yet we load it from the snapshot info blobs - if (known == null) { - assert repositoryData.shardGenerations().totalShards() == 0 : "Saw shard generations [" - + repositoryData.shardGenerations() - + "] but did not have versions tracked for snapshot [" - + snapshotId - + "]"; - return OLD_SNAPSHOT_FORMAT; - } else { - minCompatVersion = minCompatVersion.before(known) ? minCompatVersion : known; - } + minCompatVersion = minCompatVersion.before(known) ? minCompatVersion : known; } return minCompatVersion; } - /** - * Checks whether the metadata version supports writing {@link ShardGenerations} to the repository. - * - * @param repositoryMetaVersion version to check - * @return true if version supports {@link ShardGenerations} - */ - public static boolean useShardGenerations(Version repositoryMetaVersion) { - return repositoryMetaVersion.onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION); - } - - /** - * Checks whether the metadata version supports writing {@link ShardGenerations} to the repository. - * - * @param repositoryMetaVersion version to check - * @return true if version supports {@link ShardGenerations} - */ - public static boolean useIndexGenerations(Version repositoryMetaVersion) { - return repositoryMetaVersion.onOrAfter(INDEX_GEN_IN_REPO_DATA_VERSION); - } - /** Deletes snapshot from repository * * @param deleteEntry delete entry in cluster state @@ -2578,7 +2418,6 @@ private SnapshotsInProgress updatedSnapshotsInProgress(ClusterState currentState currentState.metadata(), currentState.routingTable(), entry.indices(), - entry.version().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION), repositoryData, repoName ); @@ -2677,7 +2516,6 @@ private static void completeListenersIgnoringException(@Nullable List shards( @@ -2686,7 +2524,6 @@ private static ImmutableOpenMap indices, - boolean useShardGenerations, RepositoryData repositoryData, String repoName ) { @@ -2712,18 +2549,15 @@ private static ImmutableOpenMap RepositoryData.snapshotsFromXContent(xParser, corruptedRepositoryData.getGenId(), randomBoolean()) + () -> RepositoryData.snapshotsFromXContent(xParser, corruptedRepositoryData.getGenId()) ); assertThat( e.getMessage(), @@ -327,7 +327,7 @@ public void testIndexThatReferenceANullSnapshot() throws IOException { try (XContentParser xParser = createParser(builder)) { OpenSearchParseException e = expectThrows( OpenSearchParseException.class, - () -> RepositoryData.snapshotsFromXContent(xParser, randomNonNegativeLong(), randomBoolean()) + () -> RepositoryData.snapshotsFromXContent(xParser, randomNonNegativeLong()) ); assertThat( e.getMessage(), diff --git a/test/framework/src/main/java/org/opensearch/repositories/blobstore/BlobStoreTestUtil.java b/test/framework/src/main/java/org/opensearch/repositories/blobstore/BlobStoreTestUtil.java index f9bc0d1066d8e..0702866af35e3 100644 --- a/test/framework/src/main/java/org/opensearch/repositories/blobstore/BlobStoreTestUtil.java +++ b/test/framework/src/main/java/org/opensearch/repositories/blobstore/BlobStoreTestUtil.java @@ -136,7 +136,7 @@ public static void assertConsistency(BlobStoreRepository repository, Executor ex XContentParser parser = XContentType.JSON.xContent() .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, blob) ) { - repositoryData = RepositoryData.snapshotsFromXContent(parser, latestGen, false); + repositoryData = RepositoryData.snapshotsFromXContent(parser, latestGen); } assertIndexUUIDs(repository, repositoryData); assertSnapshotUUIDs(repository, repositoryData); diff --git a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java index bbac1ef591418..e5eac9af13de1 100644 --- a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -419,8 +419,7 @@ protected String initWithSnapshotVersion(String repoName, Path repoPath, Version DeprecationHandler.THROW_UNSUPPORTED_OPERATION, Strings.toString(jsonBuilder).replace(Version.CURRENT.toString(), version.toString()) ), - repositoryData.getGenId(), - randomBoolean() + repositoryData.getGenId() ); Files.write( repoPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + repositoryData.getGenId()), @@ -512,7 +511,7 @@ protected void addBwCFailedSnapshot(String repoName, String snapshotName, Map