From 2b916680b9745d7e75ed00ec5bde38bfb3f6f055 Mon Sep 17 00:00:00 2001 From: Nick Knize Date: Wed, 19 Oct 2022 09:10:24 -0500 Subject: [PATCH] [Remove] Legacy Version support from Snapshot/Restore Service (#4728) Removes all stale snapshot / restore code for legacy versions prior to OpenSearch 2.0. This includes handling null ShardGenerations, partial concurrency, null index generations, etc. which are no longer supported in snapshot versions after legacy 7.5. Signed-off-by: Nicholas Walter Knize --- CHANGELOG.md | 2 + .../s3/S3BlobStoreRepositoryTests.java | 64 ----- .../repositories/s3/S3Repository.java | 85 ------- .../MultiVersionRepositoryAccessIT.java | 22 +- .../opensearch/snapshots/CloneSnapshotIT.java | 20 +- .../snapshots/ConcurrentSnapshotsIT.java | 38 --- .../CorruptedBlobStoreRepositoryIT.java | 191 -------------- .../delete/DeleteSnapshotRequest.java | 18 +- .../cluster/SnapshotDeletionsInProgress.java | 40 +-- .../cluster/SnapshotsInProgress.java | 59 +---- .../IndexMetaDataGenerations.java | 11 +- .../repositories/RepositoryData.java | 53 ++-- .../repositories/ShardGenerations.java | 23 +- .../blobstore/BlobStoreRepository.java | 238 ++++++------------ .../opensearch/snapshots/SnapshotInfo.java | 13 +- .../snapshots/SnapshotShardsService.java | 6 - .../snapshots/SnapshotsService.java | 232 +++-------------- .../repositories/RepositoryDataTests.java | 8 +- .../blobstore/BlobStoreTestUtil.java | 2 +- .../AbstractSnapshotIntegTestCase.java | 7 +- 20 files changed, 160 insertions(+), 972 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4c4c7c80e5410..d294eb68ee7c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -99,6 +99,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Remove LegacyESVersion.V_7_2_ and V_7_3_ Constants ([#4702](https://github.com/opensearch-project/OpenSearch/pull/4702)) - Always auto release the flood stage block ([#4703](https://github.com/opensearch-project/OpenSearch/pull/4703)) - Remove LegacyESVersion.V_7_4_ and V_7_5_ Constants ([#4704](https://github.com/opensearch-project/OpenSearch/pull/4704)) +- Remove Legacy Version support from Snapshot/Restore Service ([#4728](https://github.com/opensearch-project/OpenSearch/pull/4728)) + ### Fixed - `opensearch-service.bat start` and `opensearch-service.bat manager` failing to run ([#4289](https://github.com/opensearch-project/OpenSearch/pull/4289)) - PR reference to checkout code for changelog verifier ([#4296](https://github.com/opensearch-project/OpenSearch/pull/4296)) diff --git a/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java index b35d4080a413a..3c3f2887469b3 100644 --- a/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -36,8 +36,6 @@ import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import fixture.s3.S3HttpHandler; -import org.opensearch.action.ActionRunnable; -import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.service.ClusterService; @@ -45,29 +43,21 @@ import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; -import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.regex.Regex; import org.opensearch.common.settings.MockSecureSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.ByteSizeUnit; -import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.NamedXContentRegistry; -import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.plugins.Plugin; -import org.opensearch.repositories.RepositoriesService; -import org.opensearch.repositories.RepositoryData; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.blobstore.OpenSearchMockAPIBasedRepositoryIntegTestCase; -import org.opensearch.snapshots.SnapshotId; -import org.opensearch.snapshots.SnapshotsService; import org.opensearch.snapshots.mockstore.BlobStoreWrapper; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -75,8 +65,6 @@ import java.util.Map; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.startsWith; @SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint") @@ -84,8 +72,6 @@ @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST) public class S3BlobStoreRepositoryTests extends OpenSearchMockAPIBasedRepositoryIntegTestCase { - private static final TimeValue TEST_COOLDOWN_PERIOD = TimeValue.timeValueSeconds(10L); - private String region; private String signerOverride; @@ -158,56 +144,6 @@ protected Settings nodeSettings(int nodeOrdinal) { return builder.build(); } - public void testEnforcedCooldownPeriod() throws IOException { - final String repoName = createRepository( - randomName(), - Settings.builder().put(repositorySettings()).put(S3Repository.COOLDOWN_PERIOD.getKey(), TEST_COOLDOWN_PERIOD).build() - ); - - final SnapshotId fakeOldSnapshot = client().admin() - .cluster() - .prepareCreateSnapshot(repoName, "snapshot-old") - .setWaitForCompletion(true) - .setIndices() - .get() - .getSnapshotInfo() - .snapshotId(); - final RepositoriesService repositoriesService = internalCluster().getCurrentClusterManagerNodeInstance(RepositoriesService.class); - final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repoName); - final RepositoryData repositoryData = getRepositoryData(repository); - final RepositoryData modifiedRepositoryData = repositoryData.withVersions( - Collections.singletonMap(fakeOldSnapshot, SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.minimumCompatibilityVersion()) - ); - final BytesReference serialized = BytesReference.bytes( - modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), SnapshotsService.OLD_SNAPSHOT_FORMAT) - ); - PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.run(f, () -> { - try (InputStream stream = serialized.streamInput()) { - repository.blobStore() - .blobContainer(repository.basePath()) - .writeBlobAtomic( - BlobStoreRepository.INDEX_FILE_PREFIX + modifiedRepositoryData.getGenId(), - stream, - serialized.length(), - true - ); - } - }))); - - final String newSnapshotName = "snapshot-new"; - final long beforeThrottledSnapshot = repository.threadPool().relativeTimeInNanos(); - client().admin().cluster().prepareCreateSnapshot(repoName, newSnapshotName).setWaitForCompletion(true).setIndices().get(); - assertThat(repository.threadPool().relativeTimeInNanos() - beforeThrottledSnapshot, greaterThan(TEST_COOLDOWN_PERIOD.getNanos())); - - final long beforeThrottledDelete = repository.threadPool().relativeTimeInNanos(); - client().admin().cluster().prepareDeleteSnapshot(repoName, newSnapshotName).get(); - assertThat(repository.threadPool().relativeTimeInNanos() - beforeThrottledDelete, greaterThan(TEST_COOLDOWN_PERIOD.getNanos())); - - final long beforeFastDelete = repository.threadPool().relativeTimeInNanos(); - client().admin().cluster().prepareDeleteSnapshot(repoName, fakeOldSnapshot.getName()).get(); - assertThat(repository.threadPool().relativeTimeInNanos() - beforeFastDelete, lessThan(TEST_COOLDOWN_PERIOD.getNanos())); - } - /** * S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload. */ diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java index c8377949a6842..f80e5743edbc0 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java @@ -35,10 +35,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.LegacyESVersion; import org.opensearch.Version; import org.opensearch.action.ActionListener; -import org.opensearch.action.ActionRunnable; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.RepositoryMetadata; @@ -52,7 +50,6 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.unit.ByteSizeUnit; import org.opensearch.common.unit.ByteSizeValue; -import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.NamedXContentRegistry; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.monitor.jvm.JvmInfo; @@ -62,13 +59,10 @@ import org.opensearch.repositories.blobstore.MeteredBlobStoreRepository; import org.opensearch.snapshots.SnapshotId; import org.opensearch.snapshots.SnapshotInfo; -import org.opensearch.snapshots.SnapshotsService; import org.opensearch.threadpool.Scheduler; -import org.opensearch.threadpool.ThreadPool; import java.util.Collection; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -182,24 +176,6 @@ class S3Repository extends MeteredBlobStoreRepository { static final Setting CLIENT_NAME = new Setting<>("client", "default", Function.identity()); - /** - * Artificial delay to introduce after a snapshot finalization or delete has finished so long as the repository is still using the - * backwards compatible snapshot format from before - * {@link org.opensearch.snapshots.SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION} ({@link LegacyESVersion#V_7_6_0}). - * This delay is necessary so that the eventually consistent nature of AWS S3 does not randomly result in repository corruption when - * doing repository operations in rapid succession on a repository in the old metadata format. - * This setting should not be adjusted in production when working with an AWS S3 backed repository. Doing so risks the repository - * becoming silently corrupted. To get rid of this waiting period, either create a new S3 repository or remove all snapshots older than - * {@link LegacyESVersion#V_7_6_0} from the repository which will trigger an upgrade of the repository metadata to the new - * format and disable the cooldown period. - */ - static final Setting COOLDOWN_PERIOD = Setting.timeSetting( - "cooldown_period", - new TimeValue(3, TimeUnit.MINUTES), - new TimeValue(0, TimeUnit.MILLISECONDS), - Setting.Property.Dynamic - ); - /** * Specifies the path within bucket to repository data. Defaults to root directory. */ @@ -223,12 +199,6 @@ class S3Repository extends MeteredBlobStoreRepository { private final RepositoryMetadata repositoryMetadata; - /** - * Time period to delay repository operations by after finalizing or deleting a snapshot. - * See {@link #COOLDOWN_PERIOD} for details. - */ - private final TimeValue coolDown; - /** * Constructs an s3 backed repository */ @@ -296,8 +266,6 @@ class S3Repository extends MeteredBlobStoreRepository { ); } - coolDown = COOLDOWN_PERIOD.get(metadata.settings()); - logger.debug( "using bucket [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], cannedACL [{}], storageClass [{}]", bucket, @@ -334,9 +302,6 @@ public void finalizeSnapshot( Function stateTransformer, ActionListener listener ) { - if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) { - listener = delayedListener(listener); - } super.finalizeSnapshot( shardGenerations, repositoryStateId, @@ -355,59 +320,9 @@ public void deleteSnapshots( Version repositoryMetaVersion, ActionListener listener ) { - if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) { - listener = delayedListener(listener); - } super.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, listener); } - /** - * Wraps given listener such that it is executed with a delay of {@link #coolDown} on the snapshot thread-pool after being invoked. - * See {@link #COOLDOWN_PERIOD} for details. - */ - private ActionListener delayedListener(ActionListener listener) { - final ActionListener wrappedListener = ActionListener.runBefore(listener, () -> { - final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null); - assert cancellable != null; - }); - return new ActionListener() { - @Override - public void onResponse(T response) { - logCooldownInfo(); - final Scheduler.Cancellable existing = finalizationFuture.getAndSet( - threadPool.schedule( - ActionRunnable.wrap(wrappedListener, l -> l.onResponse(response)), - coolDown, - ThreadPool.Names.SNAPSHOT - ) - ); - assert existing == null : "Already have an ongoing finalization " + finalizationFuture; - } - - @Override - public void onFailure(Exception e) { - logCooldownInfo(); - final Scheduler.Cancellable existing = finalizationFuture.getAndSet( - threadPool.schedule(ActionRunnable.wrap(wrappedListener, l -> l.onFailure(e)), coolDown, ThreadPool.Names.SNAPSHOT) - ); - assert existing == null : "Already have an ongoing finalization " + finalizationFuture; - } - }; - } - - private void logCooldownInfo() { - logger.info( - "Sleeping for [{}] after modifying repository [{}] because it contains snapshots older than version [{}]" - + " and therefore is using a backwards compatible metadata format that requires this cooldown period to avoid " - + "repository corruption. To get rid of this message and move to the new repository metadata format, either remove " - + "all snapshots older than version [{}] from the repository or create a new repository at an empty location.", - coolDown, - metadata.name(), - SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION, - SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION - ); - } - @Override protected S3BlobStore createBlobStore() { return new S3BlobStore(service, bucket, serverSideEncryption, bufferSize, cannedACL, storageClass, repositoryMetadata); diff --git a/qa/repository-multi-version/src/test/java/org/opensearch/upgrades/MultiVersionRepositoryAccessIT.java b/qa/repository-multi-version/src/test/java/org/opensearch/upgrades/MultiVersionRepositoryAccessIT.java index c0b90626d7bad..49efce0516434 100644 --- a/qa/repository-multi-version/src/test/java/org/opensearch/upgrades/MultiVersionRepositoryAccessIT.java +++ b/qa/repository-multi-version/src/test/java/org/opensearch/upgrades/MultiVersionRepositoryAccessIT.java @@ -32,7 +32,6 @@ package org.opensearch.upgrades; -import org.opensearch.OpenSearchStatusException; import org.opensearch.action.admin.cluster.repositories.put.PutRepositoryRequest; import org.opensearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest; import org.opensearch.action.admin.cluster.snapshots.status.SnapshotStatus; @@ -42,20 +41,17 @@ import org.opensearch.client.Request; import org.opensearch.client.RequestOptions; import org.opensearch.client.Response; -import org.opensearch.client.ResponseException; import org.opensearch.client.RestClient; import org.opensearch.client.RestHighLevelClient; import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.DeprecationHandler; import org.opensearch.common.xcontent.XContentParser; import org.opensearch.common.xcontent.json.JsonXContent; -import org.opensearch.snapshots.SnapshotsService; import org.opensearch.test.rest.OpenSearchRestTestCase; import java.io.IOException; import java.io.InputStream; import java.net.HttpURLConnection; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -231,20 +227,10 @@ public void testUpgradeMovesRepoToNewMetaVersion() throws IOException { ensureSnapshotRestoreWorks(repoName, "snapshot-2", shards); } } else { - if (SnapshotsService.useIndexGenerations(minimumNodeVersion()) == false) { - assertThat(TEST_STEP, is(TestStep.STEP3_OLD_CLUSTER)); - final List> expectedExceptions = - Arrays.asList(ResponseException.class, OpenSearchStatusException.class); - expectThrowsAnyOf(expectedExceptions, () -> listSnapshots(repoName)); - expectThrowsAnyOf(expectedExceptions, () -> deleteSnapshot(client, repoName, "snapshot-1")); - expectThrowsAnyOf(expectedExceptions, () -> deleteSnapshot(client, repoName, "snapshot-2")); - expectThrowsAnyOf(expectedExceptions, () -> createSnapshot(client, repoName, "snapshot-impossible", index)); - } else { - assertThat(listSnapshots(repoName), hasSize(2)); - if (TEST_STEP == TestStep.STEP4_NEW_CLUSTER) { - ensureSnapshotRestoreWorks(repoName, "snapshot-1", shards); - ensureSnapshotRestoreWorks(repoName, "snapshot-2", shards); - } + assertThat(listSnapshots(repoName), hasSize(2)); + if (TEST_STEP == TestStep.STEP4_NEW_CLUSTER) { + ensureSnapshotRestoreWorks(repoName, "snapshot-1", shards); + ensureSnapshotRestoreWorks(repoName, "snapshot-2", shards); } } } finally { diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java index 7b95bf93f8bf4..f659d827448a0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/CloneSnapshotIT.java @@ -78,14 +78,6 @@ public void testShardClone() throws Exception { final Path repoPath = randomRepoPath(); createRepository(repoName, "fs", repoPath); - final boolean useBwCFormat = randomBoolean(); - if (useBwCFormat) { - initWithSnapshotVersion(repoName, repoPath, SnapshotsService.OLD_SNAPSHOT_FORMAT); - // Re-create repo to clear repository data cache - assertAcked(clusterAdmin().prepareDeleteRepository(repoName).get()); - createRepository(repoName, "fs", repoPath); - } - final String indexName = "test-index"; createIndexWithRandomDocs(indexName, randomIntBetween(5, 10)); final String sourceSnapshot = "source-snapshot"; @@ -101,21 +93,11 @@ public void testShardClone() throws Exception { final SnapshotId targetSnapshotId = new SnapshotId("target-snapshot", UUIDs.randomBase64UUID(random())); - final String currentShardGen; - if (useBwCFormat) { - currentShardGen = null; - } else { - currentShardGen = repositoryData.shardGenerations().getShardGen(indexId, shardId); - } + final String currentShardGen = repositoryData.shardGenerations().getShardGen(indexId, shardId); final String newShardGeneration = PlainActionFuture.get( f -> repository.cloneShardSnapshot(sourceSnapshotInfo.snapshotId(), targetSnapshotId, repositoryShardId, currentShardGen, f) ); - if (useBwCFormat) { - final long gen = Long.parseLong(newShardGeneration); - assertEquals(gen, 1L); // Initial snapshot brought it to 0, clone increments it to 1 - } - final BlobStoreIndexShardSnapshot targetShardSnapshot = readShardSnapshot(repository, repositoryShardId, targetSnapshotId); final BlobStoreIndexShardSnapshot sourceShardSnapshot = readShardSnapshot( repository, diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/ConcurrentSnapshotsIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/ConcurrentSnapshotsIT.java index b6d482cad8860..f483ed7fe6c5d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/ConcurrentSnapshotsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/ConcurrentSnapshotsIT.java @@ -56,7 +56,6 @@ import org.opensearch.plugins.Plugin; import org.opensearch.repositories.RepositoryData; import org.opensearch.repositories.RepositoryException; -import org.opensearch.repositories.ShardGenerations; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.snapshots.mockstore.MockRepository; import org.opensearch.test.OpenSearchIntegTestCase; @@ -1297,43 +1296,6 @@ public void testConcurrentOperationsLimit() throws Exception { } } - public void testConcurrentSnapshotWorksWithOldVersionRepo() throws Exception { - internalCluster().startClusterManagerOnlyNode(); - final String dataNode = internalCluster().startDataOnlyNode(); - final String repoName = "test-repo"; - final Path repoPath = randomRepoPath(); - createRepository( - repoName, - "mock", - Settings.builder().put(BlobStoreRepository.CACHE_REPOSITORY_DATA.getKey(), false).put("location", repoPath) - ); - initWithSnapshotVersion(repoName, repoPath, SnapshotsService.OLD_SNAPSHOT_FORMAT); - - createIndexWithContent("index-slow"); - - final ActionFuture createSlowFuture = startFullSnapshotBlockedOnDataNode( - "slow-snapshot", - repoName, - dataNode - ); - - final String dataNode2 = internalCluster().startDataOnlyNode(); - ensureStableCluster(3); - final String indexFast = "index-fast"; - createIndexWithContent(indexFast, dataNode2, dataNode); - - final ActionFuture createFastSnapshot = startFullSnapshot(repoName, "fast-snapshot"); - - assertThat(createSlowFuture.isDone(), is(false)); - unblockNode(repoName, dataNode); - - assertSuccessful(createFastSnapshot); - assertSuccessful(createSlowFuture); - - final RepositoryData repositoryData = getRepositoryData(repoName); - assertThat(repositoryData.shardGenerations(), is(ShardGenerations.EMPTY)); - } - public void testQueuedDeleteAfterFinalizationFailure() throws Exception { final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); final String repoName = "test-repo"; diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/CorruptedBlobStoreRepositoryIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/CorruptedBlobStoreRepositoryIT.java index b806ee3e55a94..a07b245db2b21 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/CorruptedBlobStoreRepositoryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/CorruptedBlobStoreRepositoryIT.java @@ -32,48 +32,33 @@ package org.opensearch.snapshots; -import org.opensearch.Version; -import org.opensearch.action.ActionRunnable; import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; import org.opensearch.action.index.IndexRequestBuilder; -import org.opensearch.action.support.PlainActionFuture; import org.opensearch.client.Client; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.RepositoriesMetadata; -import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.ByteSizeUnit; -import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.internal.io.IOUtils; import org.opensearch.repositories.IndexId; -import org.opensearch.repositories.IndexMetaDataGenerations; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.repositories.RepositoryData; import org.opensearch.repositories.RepositoryException; -import org.opensearch.repositories.ShardGenerations; import org.opensearch.repositories.blobstore.BlobStoreRepository; -import org.opensearch.threadpool.ThreadPool; -import java.io.IOException; import java.nio.channels.SeekableByteChannel; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; -import java.util.Collections; import java.util.List; -import java.util.Locale; import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.function.Function; -import java.util.stream.Collectors; import java.util.stream.Stream; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertFileExists; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertRequestBuilderThrows; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -310,101 +295,6 @@ public void testFindDanglingLatestGeneration() throws Exception { ); } - public void testHandlingMissingRootLevelSnapshotMetadata() throws Exception { - Path repo = randomRepoPath(); - final String repoName = "test-repo"; - createRepository( - repoName, - "fs", - Settings.builder() - .put("location", repo) - .put("compress", false) - // Don't cache repository data because the test manually modifies the repository data - .put(BlobStoreRepository.CACHE_REPOSITORY_DATA.getKey(), false) - .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES) - ); - - final String snapshotPrefix = "test-snap-"; - final int snapshots = randomIntBetween(1, 2); - logger.info("--> creating [{}] snapshots", snapshots); - for (int i = 0; i < snapshots; ++i) { - // Workaround to simulate BwC situation: taking a snapshot without indices here so that we don't create any new version shard - // generations (the existence of which would short-circuit checks for the repo containing old version snapshots) - CreateSnapshotResponse createSnapshotResponse = client().admin() - .cluster() - .prepareCreateSnapshot(repoName, snapshotPrefix + i) - .setIndices() - .setWaitForCompletion(true) - .get(); - assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), is(0)); - assertThat( - createSnapshotResponse.getSnapshotInfo().successfulShards(), - equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()) - ); - } - final RepositoryData repositoryData = getRepositoryData(repoName); - - final SnapshotId snapshotToCorrupt = randomFrom(repositoryData.getSnapshotIds()); - logger.info("--> delete root level snapshot metadata blob for snapshot [{}]", snapshotToCorrupt); - Files.delete(repo.resolve(String.format(Locale.ROOT, BlobStoreRepository.SNAPSHOT_NAME_FORMAT, snapshotToCorrupt.getUUID()))); - - logger.info("--> strip version information from index-N blob"); - final RepositoryData withoutVersions = new RepositoryData( - repositoryData.getGenId(), - repositoryData.getSnapshotIds().stream().collect(Collectors.toMap(SnapshotId::getUUID, Function.identity())), - repositoryData.getSnapshotIds().stream().collect(Collectors.toMap(SnapshotId::getUUID, repositoryData::getSnapshotState)), - Collections.emptyMap(), - Collections.emptyMap(), - ShardGenerations.EMPTY, - IndexMetaDataGenerations.EMPTY - ); - - Files.write( - repo.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + withoutVersions.getGenId()), - BytesReference.toBytes( - BytesReference.bytes(withoutVersions.snapshotsToXContent(XContentFactory.jsonBuilder(), Version.CURRENT)) - ), - StandardOpenOption.TRUNCATE_EXISTING - ); - - logger.info("--> verify that repo is assumed in old metadata format"); - final SnapshotsService snapshotsService = internalCluster().getCurrentClusterManagerNodeInstance(SnapshotsService.class); - final ThreadPool threadPool = internalCluster().getCurrentClusterManagerNodeInstance(ThreadPool.class); - assertThat( - PlainActionFuture.get( - f -> threadPool.generic() - .execute( - ActionRunnable.supply( - f, - () -> snapshotsService.minCompatibleVersion(Version.CURRENT, getRepositoryData(repoName), null) - ) - ) - ), - is(SnapshotsService.OLD_SNAPSHOT_FORMAT) - ); - - logger.info("--> verify that snapshot with missing root level metadata can be deleted"); - assertAcked(startDeleteSnapshot(repoName, snapshotToCorrupt.getName()).get()); - - logger.info("--> verify that repository is assumed in new metadata format after removing corrupted snapshot"); - assertThat( - PlainActionFuture.get( - f -> threadPool.generic() - .execute( - ActionRunnable.supply( - f, - () -> snapshotsService.minCompatibleVersion(Version.CURRENT, getRepositoryData(repoName), null) - ) - ) - ), - is(Version.CURRENT) - ); - final RepositoryData finalRepositoryData = getRepositoryData(repoName); - for (SnapshotId snapshotId : finalRepositoryData.getSnapshotIds()) { - assertThat(finalRepositoryData.getVersion(snapshotId), is(Version.CURRENT)); - } - } - public void testMountCorruptedRepositoryData() throws Exception { disableRepoConsistencyCheck("This test intentionally corrupts the repository contents"); Client client = client(); @@ -453,87 +343,6 @@ public void testMountCorruptedRepositoryData() throws Exception { expectThrows(RepositoryException.class, () -> getRepositoryData(otherRepo)); } - public void testHandleSnapshotErrorWithBwCFormat() throws IOException, ExecutionException, InterruptedException { - final String repoName = "test-repo"; - final Path repoPath = randomRepoPath(); - createRepository(repoName, "fs", repoPath); - final String oldVersionSnapshot = initWithSnapshotVersion(repoName, repoPath, SnapshotsService.OLD_SNAPSHOT_FORMAT); - - logger.info("--> recreating repository to clear caches"); - client().admin().cluster().prepareDeleteRepository(repoName).get(); - createRepository(repoName, "fs", repoPath); - - final String indexName = "test-index"; - createIndex(indexName); - - createFullSnapshot(repoName, "snapshot-1"); - - // In the old metadata version the shard level metadata could be moved to the next generation for all sorts of reasons, this should - // not break subsequent repository operations - logger.info("--> move shard level metadata to new generation"); - final IndexId indexId = getRepositoryData(repoName).resolveIndexId(indexName); - final Path shardPath = repoPath.resolve("indices").resolve(indexId.getId()).resolve("0"); - final Path initialShardMetaPath = shardPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + "0"); - assertFileExists(initialShardMetaPath); - Files.move(initialShardMetaPath, shardPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + "1")); - - startDeleteSnapshot(repoName, oldVersionSnapshot).get(); - - createFullSnapshot(repoName, "snapshot-2"); - } - - public void testRepairBrokenShardGenerations() throws Exception { - final String repoName = "test-repo"; - final Path repoPath = randomRepoPath(); - createRepository(repoName, "fs", repoPath); - final String oldVersionSnapshot = initWithSnapshotVersion(repoName, repoPath, SnapshotsService.OLD_SNAPSHOT_FORMAT); - - logger.info("--> recreating repository to clear caches"); - client().admin().cluster().prepareDeleteRepository(repoName).get(); - createRepository(repoName, "fs", repoPath); - - final String indexName = "test-index"; - createIndex(indexName); - - createFullSnapshot(repoName, "snapshot-1"); - - startDeleteSnapshot(repoName, oldVersionSnapshot).get(); - - logger.info("--> move shard level metadata to new generation and make RepositoryData point at an older generation"); - final IndexId indexId = getRepositoryData(repoName).resolveIndexId(indexName); - final Path shardPath = repoPath.resolve("indices").resolve(indexId.getId()).resolve("0"); - final Path initialShardMetaPath = shardPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + "0"); - assertFileExists(initialShardMetaPath); - Files.move(initialShardMetaPath, shardPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + randomIntBetween(1, 1000))); - - final RepositoryData repositoryData1 = getRepositoryData(repoName); - final Map snapshotIds = repositoryData1.getSnapshotIds() - .stream() - .collect(Collectors.toMap(SnapshotId::getUUID, Function.identity())); - final RepositoryData brokenRepoData = new RepositoryData( - repositoryData1.getGenId(), - snapshotIds, - snapshotIds.values().stream().collect(Collectors.toMap(SnapshotId::getUUID, repositoryData1::getSnapshotState)), - snapshotIds.values().stream().collect(Collectors.toMap(SnapshotId::getUUID, repositoryData1::getVersion)), - repositoryData1.getIndices().values().stream().collect(Collectors.toMap(Function.identity(), repositoryData1::getSnapshots)), - ShardGenerations.builder().putAll(repositoryData1.shardGenerations()).put(indexId, 0, "0").build(), - repositoryData1.indexMetaDataGenerations() - ); - Files.write( - repoPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + repositoryData1.getGenId()), - BytesReference.toBytes( - BytesReference.bytes(brokenRepoData.snapshotsToXContent(XContentFactory.jsonBuilder(), Version.CURRENT)) - ), - StandardOpenOption.TRUNCATE_EXISTING - ); - - logger.info("--> recreating repository to clear caches"); - client().admin().cluster().prepareDeleteRepository(repoName).get(); - createRepository(repoName, "fs", repoPath); - - createFullSnapshot(repoName, "snapshot-2"); - } - /** * Tests that a shard snapshot with a corrupted shard index file can still be used for restore and incremental snapshots. */ diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/delete/DeleteSnapshotRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/delete/DeleteSnapshotRequest.java index 61c4fdb9d5c14..832b37050ffe6 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/delete/DeleteSnapshotRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/delete/DeleteSnapshotRequest.java @@ -36,7 +36,6 @@ import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; -import org.opensearch.snapshots.SnapshotsService; import java.io.IOException; @@ -84,27 +83,14 @@ public DeleteSnapshotRequest(String repository) { public DeleteSnapshotRequest(StreamInput in) throws IOException { super(in); repository = in.readString(); - if (in.getVersion().onOrAfter(SnapshotsService.MULTI_DELETE_VERSION)) { - snapshots = in.readStringArray(); - } else { - snapshots = new String[] { in.readString() }; - } + snapshots = in.readStringArray(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(repository); - if (out.getVersion().onOrAfter(SnapshotsService.MULTI_DELETE_VERSION)) { - out.writeStringArray(snapshots); - } else { - if (snapshots.length != 1) { - throw new IllegalArgumentException( - "Can't write snapshot delete with more than one snapshot to version [" + out.getVersion() + "]" - ); - } - out.writeString(snapshots[0]); - } + out.writeStringArray(snapshots); } @Override diff --git a/server/src/main/java/org/opensearch/cluster/SnapshotDeletionsInProgress.java b/server/src/main/java/org/opensearch/cluster/SnapshotDeletionsInProgress.java index 0f95890e56ec2..e4ce0a8f99e02 100644 --- a/server/src/main/java/org/opensearch/cluster/SnapshotDeletionsInProgress.java +++ b/server/src/main/java/org/opensearch/cluster/SnapshotDeletionsInProgress.java @@ -34,7 +34,6 @@ import org.opensearch.Version; import org.opensearch.cluster.ClusterState.Custom; -import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.UUIDs; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; @@ -42,9 +41,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.XContentBuilder; import org.opensearch.repositories.RepositoryOperation; -import org.opensearch.snapshots.Snapshot; import org.opensearch.snapshots.SnapshotId; -import org.opensearch.snapshots.SnapshotsService; import java.io.IOException; import java.util.ArrayList; @@ -245,23 +242,12 @@ private Entry(List snapshots, String repoName, long startTime, long } public Entry(StreamInput in) throws IOException { - if (in.getVersion().onOrAfter(SnapshotsService.MULTI_DELETE_VERSION)) { - this.repoName = in.readString(); - this.snapshots = in.readList(SnapshotId::new); - } else { - final Snapshot snapshot = new Snapshot(in); - this.snapshots = Collections.singletonList(snapshot.getSnapshotId()); - this.repoName = snapshot.getRepository(); - } + this.repoName = in.readString(); + this.snapshots = in.readList(SnapshotId::new); this.startTime = in.readVLong(); this.repositoryStateId = in.readLong(); - if (in.getVersion().onOrAfter(SnapshotsService.FULL_CONCURRENCY_VERSION)) { - this.state = State.readFrom(in); - this.uuid = in.readString(); - } else { - this.state = State.STARTED; - this.uuid = IndexMetadata.INDEX_UUID_NA_VALUE; - } + this.state = State.readFrom(in); + this.uuid = in.readString(); } public Entry started() { @@ -343,22 +329,12 @@ public int hashCode() { @Override public void writeTo(StreamOutput out) throws IOException { - if (out.getVersion().onOrAfter(SnapshotsService.MULTI_DELETE_VERSION)) { - out.writeString(repoName); - out.writeCollection(snapshots); - } else { - assert snapshots.size() == 1 : "Only single deletion allowed in mixed version cluster containing [" - + out.getVersion() - + "] but saw " - + snapshots; - new Snapshot(repoName, snapshots.get(0)).writeTo(out); - } + out.writeString(repoName); + out.writeCollection(snapshots); out.writeVLong(startTime); out.writeLong(repositoryStateId); - if (out.getVersion().onOrAfter(SnapshotsService.FULL_CONCURRENCY_VERSION)) { - state.writeTo(out); - out.writeString(uuid); - } + state.writeTo(out); + out.writeString(uuid); } @Override diff --git a/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java index 4cbf0cfe70adb..f77b3e3b2a904 100644 --- a/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/opensearch/cluster/SnapshotsInProgress.java @@ -35,7 +35,6 @@ import com.carrotsearch.hppc.ObjectContainer; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; -import org.opensearch.LegacyESVersion; import org.opensearch.Version; import org.opensearch.cluster.ClusterState.Custom; import org.opensearch.common.Nullable; @@ -54,7 +53,6 @@ import org.opensearch.snapshots.InFlightShardSnapshotStates; import org.opensearch.snapshots.Snapshot; import org.opensearch.snapshots.SnapshotId; -import org.opensearch.snapshots.SnapshotsService; import java.io.IOException; import java.util.Collections; @@ -66,8 +64,6 @@ import java.util.Set; import java.util.stream.Collectors; -import static org.opensearch.snapshots.SnapshotInfo.DATA_STREAMS_IN_SNAPSHOT; - /** * Meta data about snapshots that are currently executing * @@ -77,8 +73,6 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement public static final SnapshotsInProgress EMPTY = new SnapshotsInProgress(Collections.emptyList()); - private static final Version VERSION_IN_SNAPSHOT_VERSION = LegacyESVersion.V_7_7_0; - public static final String TYPE = "snapshots"; public static final String ABORTED_FAILURE_TEXT = "Snapshot was aborted by deletion"; @@ -296,28 +290,10 @@ private Entry(StreamInput in) throws IOException { repositoryStateId = in.readLong(); failure = in.readOptionalString(); userMetadata = in.readMap(); - if (in.getVersion().onOrAfter(VERSION_IN_SNAPSHOT_VERSION)) { - version = Version.readVersion(in); - } else if (in.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) { - // If an older cluster-manager informs us that shard generations are supported - // we use the minimum shard generation compatible version. - // If shard generations are not supported yet we use a placeholder for a version that does not use shard generations. - version = in.readBoolean() ? SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION : SnapshotsService.OLD_SNAPSHOT_FORMAT; - } else { - version = SnapshotsService.OLD_SNAPSHOT_FORMAT; - } - if (in.getVersion().onOrAfter(DATA_STREAMS_IN_SNAPSHOT)) { - dataStreams = in.readStringList(); - } else { - dataStreams = Collections.emptyList(); - } - if (in.getVersion().onOrAfter(SnapshotsService.CLONE_SNAPSHOT_VERSION)) { - source = in.readOptionalWriteable(SnapshotId::new); - clones = in.readImmutableMap(RepositoryShardId::new, ShardSnapshotStatus::readFrom); - } else { - source = null; - clones = ImmutableOpenMap.of(); - } + version = Version.readVersion(in); + dataStreams = in.readStringList(); + source = in.readOptionalWriteable(SnapshotId::new); + clones = in.readImmutableMap(RepositoryShardId::new, ShardSnapshotStatus::readFrom); } private static boolean assertShardsConsistent( @@ -732,18 +708,10 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(repositoryStateId); out.writeOptionalString(failure); out.writeMap(userMetadata); - if (out.getVersion().onOrAfter(VERSION_IN_SNAPSHOT_VERSION)) { - Version.writeVersion(version, out); - } else if (out.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) { - out.writeBoolean(SnapshotsService.useShardGenerations(version)); - } - if (out.getVersion().onOrAfter(DATA_STREAMS_IN_SNAPSHOT)) { - out.writeStringCollection(dataStreams); - } - if (out.getVersion().onOrAfter(SnapshotsService.CLONE_SNAPSHOT_VERSION)) { - out.writeOptionalWriteable(source); - out.writeMap(clones); - } + Version.writeVersion(version, out); + out.writeStringCollection(dataStreams); + out.writeOptionalWriteable(source); + out.writeMap(clones); } @Override @@ -840,12 +808,7 @@ private boolean assertConsistent() { public static ShardSnapshotStatus readFrom(StreamInput in) throws IOException { String nodeId = in.readOptionalString(); final ShardState state = ShardState.fromValue(in.readByte()); - final String generation; - if (SnapshotsService.useShardGenerations(in.getVersion())) { - generation = in.readOptionalString(); - } else { - generation = null; - } + final String generation = in.readOptionalString(); final String reason = in.readOptionalString(); if (state == ShardState.QUEUED) { return UNASSIGNED_QUEUED; @@ -884,9 +847,7 @@ public boolean isActive() { public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(nodeId); out.writeByte(state.value); - if (SnapshotsService.useShardGenerations(out.getVersion())) { - out.writeOptionalString(generation); - } + out.writeOptionalString(generation); out.writeOptionalString(reason); } diff --git a/server/src/main/java/org/opensearch/repositories/IndexMetaDataGenerations.java b/server/src/main/java/org/opensearch/repositories/IndexMetaDataGenerations.java index afabf4ebfdb58..e8f96bb313dd1 100644 --- a/server/src/main/java/org/opensearch/repositories/IndexMetaDataGenerations.java +++ b/server/src/main/java/org/opensearch/repositories/IndexMetaDataGenerations.java @@ -92,10 +92,7 @@ public String getIndexMetaBlobId(String metaIdentifier) { } /** - * Get the blob id by {@link SnapshotId} and {@link IndexId} and fall back to the value of {@link SnapshotId#getUUID()} if none is - * known to enable backwards compatibility with versions older than - * {@link org.opensearch.snapshots.SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION} which used the snapshot uuid as index metadata - * blob uuid. + * Get the blob id by {@link SnapshotId} and {@link IndexId}. * * @param snapshotId Snapshot Id * @param indexId Index Id @@ -103,11 +100,7 @@ public String getIndexMetaBlobId(String metaIdentifier) { */ public String indexMetaBlobId(SnapshotId snapshotId, IndexId indexId) { final String identifier = lookup.getOrDefault(snapshotId, Collections.emptyMap()).get(indexId); - if (identifier == null) { - return snapshotId.getUUID(); - } else { - return identifiers.get(identifier); - } + return identifiers.get(identifier); } /** diff --git a/server/src/main/java/org/opensearch/repositories/RepositoryData.java b/server/src/main/java/org/opensearch/repositories/RepositoryData.java index e8132801e4238..524a978bcf1d1 100644 --- a/server/src/main/java/org/opensearch/repositories/RepositoryData.java +++ b/server/src/main/java/org/opensearch/repositories/RepositoryData.java @@ -42,7 +42,6 @@ import org.opensearch.common.xcontent.XContentParserUtils; import org.opensearch.snapshots.SnapshotId; import org.opensearch.snapshots.SnapshotState; -import org.opensearch.snapshots.SnapshotsService; import java.io.IOException; import java.util.ArrayList; @@ -238,7 +237,6 @@ public SnapshotState getSnapshotState(final SnapshotId snapshotId) { /** * Returns the {@link Version} for the given snapshot or {@code null} if unknown. */ - @Nullable public Version getVersion(SnapshotId snapshotId) { return snapshotVersions.get(snapshotId.getUUID()); } @@ -551,8 +549,6 @@ public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final builder.startObject(); // write the snapshots list builder.startArray(SNAPSHOTS); - final boolean shouldWriteIndexGens = SnapshotsService.useIndexGenerations(repoMetaVersion); - final boolean shouldWriteShardGens = SnapshotsService.useShardGenerations(repoMetaVersion); for (final SnapshotId snapshot : getSnapshotIds()) { builder.startObject(); builder.field(NAME, snapshot.getName()); @@ -562,14 +558,12 @@ public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final if (state != null) { builder.field(STATE, state.value()); } - if (shouldWriteIndexGens) { - builder.startObject(INDEX_METADATA_LOOKUP); - for (Map.Entry entry : indexMetaDataGenerations.lookup.getOrDefault(snapshot, Collections.emptyMap()) - .entrySet()) { - builder.field(entry.getKey().getId(), entry.getValue()); - } - builder.endObject(); + builder.startObject(INDEX_METADATA_LOOKUP); + for (Map.Entry entry : indexMetaDataGenerations.lookup.getOrDefault(snapshot, Collections.emptyMap()) + .entrySet()) { + builder.field(entry.getKey().getId(), entry.getValue()); } + builder.endObject(); final Version version = snapshotVersions.get(snapshotUUID); if (version != null) { builder.field(VERSION, version.toString()); @@ -589,23 +583,15 @@ public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final builder.value(snapshotId.getUUID()); } builder.endArray(); - if (shouldWriteShardGens) { - builder.startArray(SHARD_GENERATIONS); - for (String gen : shardGenerations.getGens(indexId)) { - builder.value(gen); - } - builder.endArray(); + builder.startArray(SHARD_GENERATIONS); + for (String gen : shardGenerations.getGens(indexId)) { + builder.value(gen); } + builder.endArray(); builder.endObject(); } builder.endObject(); - if (shouldWriteIndexGens) { - builder.field(MIN_VERSION, SnapshotsService.INDEX_GEN_IN_REPO_DATA_VERSION.toString()); - builder.field(INDEX_METADATA_IDENTIFIERS, indexMetaDataGenerations.identifiers); - } else if (shouldWriteShardGens) { - // Add min version field to make it impossible for older OpenSearch versions to deserialize this object - builder.field(MIN_VERSION, SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.toString()); - } + builder.field(INDEX_METADATA_IDENTIFIERS, indexMetaDataGenerations.identifiers); builder.endObject(); return builder; } @@ -616,12 +602,8 @@ public IndexMetaDataGenerations indexMetaDataGenerations() { /** * Reads an instance of {@link RepositoryData} from x-content, loading the snapshots and indices metadata. - * - * @param fixBrokenShardGens set to {@code true} to filter out broken shard generations read from the {@code parser} via - * {@link ShardGenerations#fixShardGeneration}. Used to disable fixing broken generations when reading - * from cached bytes that we trust to not contain broken generations. */ - public static RepositoryData snapshotsFromXContent(XContentParser parser, long genId, boolean fixBrokenShardGens) throws IOException { + public static RepositoryData snapshotsFromXContent(XContentParser parser, long genId) throws IOException { XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); final Map snapshots = new HashMap<>(); @@ -639,16 +621,16 @@ public static RepositoryData snapshotsFromXContent(XContentParser parser, long g parseSnapshots(parser, snapshots, snapshotStates, snapshotVersions, indexMetaLookup); break; case INDICES: - parseIndices(parser, fixBrokenShardGens, snapshots, indexSnapshots, indexLookup, shardGenerations); + parseIndices(parser, snapshots, indexSnapshots, indexLookup, shardGenerations); break; case INDEX_METADATA_IDENTIFIERS: XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); indexMetaIdentifiers = parser.mapStrings(); break; case MIN_VERSION: - XContentParserUtils.ensureExpectedToken(XContentParser.Token.VALUE_STRING, parser.nextToken(), parser); - final Version version = Version.fromString(parser.text()); - assert SnapshotsService.useShardGenerations(version); + // ignore min_version + // todo: remove in next version + parser.nextToken(); break; default: XContentParserUtils.throwUnknownField(field, parser.getTokenLocation()); @@ -763,7 +745,6 @@ private static void parseSnapshots( * {@code shardGenerations}. * * @param parser x-content parser - * @param fixBrokenShardGens whether or not to fix broken shard generation (see {@link #snapshotsFromXContent} for details) * @param snapshots map of snapshot uuid to {@link SnapshotId} that was populated by {@link #parseSnapshots} * @param indexSnapshots map of {@link IndexId} to list of {@link SnapshotId} that contain the given index * @param indexLookup map of index uuid (as returned by {@link IndexId#getId}) to {@link IndexId} @@ -771,7 +752,6 @@ private static void parseSnapshots( */ private static void parseIndices( XContentParser parser, - boolean fixBrokenShardGens, Map snapshots, Map> indexSnapshots, Map indexLookup, @@ -835,9 +815,6 @@ private static void parseIndices( indexLookup.put(indexId.getId(), indexId); for (int i = 0; i < gens.size(); i++) { String parsedGen = gens.get(i); - if (fixBrokenShardGens) { - parsedGen = ShardGenerations.fixShardGeneration(parsedGen); - } if (parsedGen != null) { shardGenerations.put(indexId, i, parsedGen); } diff --git a/server/src/main/java/org/opensearch/repositories/ShardGenerations.java b/server/src/main/java/org/opensearch/repositories/ShardGenerations.java index 27eac4dfdd8c1..d918eb7e1e476 100644 --- a/server/src/main/java/org/opensearch/repositories/ShardGenerations.java +++ b/server/src/main/java/org/opensearch/repositories/ShardGenerations.java @@ -33,7 +33,6 @@ package org.opensearch.repositories; import org.opensearch.common.Nullable; -import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import java.util.Arrays; import java.util.Collection; @@ -44,7 +43,6 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.regex.Pattern; import java.util.stream.Collectors; /** @@ -74,24 +72,6 @@ private ShardGenerations(Map> shardGenerations) { this.shardGenerations = shardGenerations; } - private static final Pattern IS_NUMBER = Pattern.compile("^\\d+$"); - - /** - * Filters out unreliable numeric shard generations read from {@link RepositoryData} or {@link IndexShardSnapshotStatus}, returning - * {@code null} in their place. - * @see Issue #57988 - * - * @param shardGeneration shard generation to fix - * @return given shard generation or {@code null} if it was filtered out or {@code null} was passed - */ - @Nullable - public static String fixShardGeneration(@Nullable String shardGeneration) { - if (shardGeneration == null) { - return null; - } - return IS_NUMBER.matcher(shardGeneration).matches() ? null : shardGeneration; - } - /** * Returns the total number of shards tracked by this instance. */ @@ -145,8 +125,7 @@ public Map> obsoleteShardGenerations(ShardGenerati *
    *
  • {@link #DELETED_SHARD_GEN} a deleted shard that isn't referenced by any snapshot in the repository any longer
  • *
  • {@link #NEW_SHARD_GEN} a new shard that we know doesn't hold any valid data yet in the repository
  • - *
  • {@code null} unknown state. The shard either does not exist at all or it was created by a node older than - * {@link org.opensearch.snapshots.SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION}. If a caller expects a shard to exist in the + *
  • {@code null} unknown state. The shard does not exist at all. If a caller expects a shard to exist in the * repository but sees a {@code null} return, it should try to recover the generation by falling back to listing the contents * of the respective shard directory.
  • *
diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index bf06191bdc8d3..1000fe4078b48 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -818,66 +818,46 @@ private void doDeleteShardSnapshots( Version repoMetaVersion, ActionListener listener ) { - - if (SnapshotsService.useShardGenerations(repoMetaVersion)) { - // First write the new shard state metadata (with the removed snapshot) and compute deletion targets - final StepListener> writeShardMetaDataAndComputeDeletesStep = new StepListener<>(); - writeUpdatedShardMetaDataAndComputeDeletes(snapshotIds, repositoryData, true, writeShardMetaDataAndComputeDeletesStep); - // Once we have put the new shard-level metadata into place, we can update the repository metadata as follows: - // 1. Remove the snapshots from the list of existing snapshots - // 2. Update the index shard generations of all updated shard folders - // - // Note: If we fail updating any of the individual shard paths, none of them are changed since the newly created - // index-${gen_uuid} will not be referenced by the existing RepositoryData and new RepositoryData is only - // written if all shard paths have been successfully updated. - final StepListener writeUpdatedRepoDataStep = new StepListener<>(); - writeShardMetaDataAndComputeDeletesStep.whenComplete(deleteResults -> { - final ShardGenerations.Builder builder = ShardGenerations.builder(); - for (ShardSnapshotMetaDeleteResult newGen : deleteResults) { - builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration); - } - final RepositoryData updatedRepoData = repositoryData.removeSnapshots(snapshotIds, builder.build()); - writeIndexGen( - updatedRepoData, - repositoryStateId, - repoMetaVersion, - Function.identity(), - ActionListener.wrap(writeUpdatedRepoDataStep::onResponse, listener::onFailure) - ); - }, listener::onFailure); - // Once we have updated the repository, run the clean-ups - writeUpdatedRepoDataStep.whenComplete(updatedRepoData -> { - // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion - final ActionListener afterCleanupsListener = new GroupedActionListener<>( - ActionListener.wrap(() -> listener.onResponse(updatedRepoData)), - 2 - ); - cleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener); - asyncCleanupUnlinkedShardLevelBlobs( - repositoryData, - snapshotIds, - writeShardMetaDataAndComputeDeletesStep.result(), - afterCleanupsListener - ); - }, listener::onFailure); - } else { - // Write the new repository data first (with the removed snapshot), using no shard generations - final RepositoryData updatedRepoData = repositoryData.removeSnapshots(snapshotIds, ShardGenerations.EMPTY); - writeIndexGen(updatedRepoData, repositoryStateId, repoMetaVersion, Function.identity(), ActionListener.wrap(newRepoData -> { - // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion - final ActionListener afterCleanupsListener = new GroupedActionListener<>( - ActionListener.wrap(() -> listener.onResponse(newRepoData)), - 2 - ); - cleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, newRepoData, afterCleanupsListener); - final StepListener> writeMetaAndComputeDeletesStep = new StepListener<>(); - writeUpdatedShardMetaDataAndComputeDeletes(snapshotIds, repositoryData, false, writeMetaAndComputeDeletesStep); - writeMetaAndComputeDeletesStep.whenComplete( - deleteResults -> asyncCleanupUnlinkedShardLevelBlobs(repositoryData, snapshotIds, deleteResults, afterCleanupsListener), - afterCleanupsListener::onFailure - ); - }, listener::onFailure)); - } + // First write the new shard state metadata (with the removed snapshot) and compute deletion targets + final StepListener> writeShardMetaDataAndComputeDeletesStep = new StepListener<>(); + writeUpdatedShardMetaDataAndComputeDeletes(snapshotIds, repositoryData, true, writeShardMetaDataAndComputeDeletesStep); + // Once we have put the new shard-level metadata into place, we can update the repository metadata as follows: + // 1. Remove the snapshots from the list of existing snapshots + // 2. Update the index shard generations of all updated shard folders + // + // Note: If we fail updating any of the individual shard paths, none of them are changed since the newly created + // index-${gen_uuid} will not be referenced by the existing RepositoryData and new RepositoryData is only + // written if all shard paths have been successfully updated. + final StepListener writeUpdatedRepoDataStep = new StepListener<>(); + writeShardMetaDataAndComputeDeletesStep.whenComplete(deleteResults -> { + final ShardGenerations.Builder builder = ShardGenerations.builder(); + for (ShardSnapshotMetaDeleteResult newGen : deleteResults) { + builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration); + } + final RepositoryData updatedRepoData = repositoryData.removeSnapshots(snapshotIds, builder.build()); + writeIndexGen( + updatedRepoData, + repositoryStateId, + repoMetaVersion, + Function.identity(), + ActionListener.wrap(writeUpdatedRepoDataStep::onResponse, listener::onFailure) + ); + }, listener::onFailure); + // Once we have updated the repository, run the clean-ups + writeUpdatedRepoDataStep.whenComplete(updatedRepoData -> { + // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion + final ActionListener afterCleanupsListener = new GroupedActionListener<>( + ActionListener.wrap(() -> listener.onResponse(updatedRepoData)), + 2 + ); + cleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener); + asyncCleanupUnlinkedShardLevelBlobs( + repositoryData, + snapshotIds, + writeShardMetaDataAndComputeDeletesStep.result(), + afterCleanupsListener + ); + }, listener::onFailure); } private void cleanupUnlinkedRootAndIndicesBlobs( @@ -1365,31 +1345,19 @@ public void finalizeSnapshot( final Collection indices = shardGenerations.indices(); final SnapshotId snapshotId = snapshotInfo.snapshotId(); // Once we are done writing the updated index-N blob we remove the now unreferenced index-${uuid} blobs in each shard - // directory if all nodes are at least at version SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION - // If there are older version nodes in the cluster, we don't need to run this cleanup as it will have already happened - // when writing the index-${N} to each shard directory. - final boolean writeShardGens = SnapshotsService.useShardGenerations(repositoryMetaVersion); + // directory final Consumer onUpdateFailure = e -> listener.onFailure( new SnapshotException(metadata.name(), snapshotId, "failed to update snapshot in repository", e) ); final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); - final boolean writeIndexGens = SnapshotsService.useIndexGenerations(repositoryMetaVersion); - final StepListener repoDataListener = new StepListener<>(); getRepositoryData(repoDataListener); repoDataListener.whenComplete(existingRepositoryData -> { - final Map indexMetas; - final Map indexMetaIdentifiers; - if (writeIndexGens) { - indexMetaIdentifiers = ConcurrentCollections.newConcurrentMap(); - indexMetas = ConcurrentCollections.newConcurrentMap(); - } else { - indexMetas = null; - indexMetaIdentifiers = null; - } + final Map indexMetas = ConcurrentCollections.newConcurrentMap(); + final Map indexMetaIdentifiers = ConcurrentCollections.newConcurrentMap(); final ActionListener allMetaListener = new GroupedActionListener<>(ActionListener.wrap(v -> { final RepositoryData updatedRepositoryData = existingRepositoryData.addSnapshot( @@ -1406,9 +1374,7 @@ public void finalizeSnapshot( repositoryMetaVersion, stateTransformer, ActionListener.wrap(newRepoData -> { - if (writeShardGens) { - cleanupOldShardGens(existingRepositoryData, updatedRepositoryData); - } + cleanupOldShardGens(existingRepositoryData, updatedRepositoryData); listener.onResponse(newRepoData); }, onUpdateFailure) ); @@ -1432,24 +1398,15 @@ public void finalizeSnapshot( for (IndexId index : indices) { executor.execute(ActionRunnable.run(allMetaListener, () -> { final IndexMetadata indexMetaData = clusterMetadata.index(index.getName()); - if (writeIndexGens) { - final String identifiers = IndexMetaDataGenerations.buildUniqueIdentifier(indexMetaData); - String metaUUID = existingRepositoryData.indexMetaDataGenerations().getIndexMetaBlobId(identifiers); - if (metaUUID == null) { - // We don't yet have this version of the metadata so we write it - metaUUID = UUIDs.base64UUID(); - INDEX_METADATA_FORMAT.write(indexMetaData, indexContainer(index), metaUUID, compress); - indexMetaIdentifiers.put(identifiers, metaUUID); - } - indexMetas.put(index, identifiers); - } else { - INDEX_METADATA_FORMAT.write( - clusterMetadata.index(index.getName()), - indexContainer(index), - snapshotId.getUUID(), - compress - ); + final String identifiers = IndexMetaDataGenerations.buildUniqueIdentifier(indexMetaData); + String metaUUID = existingRepositoryData.indexMetaDataGenerations().getIndexMetaBlobId(identifiers); + if (metaUUID == null) { + // We don't yet have this version of the metadata so we write it + metaUUID = UUIDs.base64UUID(); + INDEX_METADATA_FORMAT.write(indexMetaData, indexContainer(index), metaUUID, compress); + indexMetaIdentifiers.put(identifiers, metaUUID); } + indexMetas.put(index, identifiers); })); } executor.execute( @@ -1774,8 +1731,7 @@ private RepositoryData repositoryDataFromCachedEntry(Tuple try (InputStream input = CompressorFactory.COMPRESSOR.threadLocalInputStream(cacheEntry.v2().streamInput())) { return RepositoryData.snapshotsFromXContent( XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, input), - cacheEntry.v1(), - false + cacheEntry.v1() ); } } @@ -1870,7 +1826,7 @@ private RepositoryData getRepositoryData(long indexGen) { XContentParser parser = XContentType.JSON.xContent() .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, blob) ) { - return RepositoryData.snapshotsFromXContent(parser, indexGen, true); + return RepositoryData.snapshotsFromXContent(parser, indexGen); } } catch (IOException ioe) { if (bestEffortConsistency) { @@ -2454,7 +2410,6 @@ public void snapshotShard( ); final String indexGeneration; - final boolean writeShardGens = SnapshotsService.useShardGenerations(repositoryMetaVersion); // build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones List newSnapshotsList = new ArrayList<>(); newSnapshotsList.add(new SnapshotFiles(snapshotId.getName(), indexCommitPointFiles, shardStateIdentifier)); @@ -2463,71 +2418,24 @@ public void snapshotShard( } final BlobStoreIndexShardSnapshots updatedBlobStoreIndexShardSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList); final Runnable afterWriteSnapBlob; - if (writeShardGens) { - // When using shard generations we can safely write the index-${uuid} blob before writing out any of the actual data - // for this shard since the uuid named blob will simply not be referenced in case of error and thus we will never - // reference a generation that has not had all its files fully upload. - indexGeneration = UUIDs.randomBase64UUID(); - try { - INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedBlobStoreIndexShardSnapshots, shardContainer, indexGeneration, compress); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException( - shardId, - "Failed to write shard level snapshot metadata for [" - + snapshotId - + "] to [" - + INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(indexGeneration) - + "]", - e - ); - } - afterWriteSnapBlob = () -> {}; - } else { - // When not using shard generations we can only write the index-${N} blob after all other work for this shard has - // completed. - // Also, in case of numeric shard generations the data node has to take care of deleting old shard generations. - final long newGen = Long.parseLong(fileListGeneration) + 1; - indexGeneration = Long.toString(newGen); - // Delete all previous index-N blobs - final List blobsToDelete = blobs.stream() - .filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX)) - .collect(Collectors.toList()); - assert blobsToDelete.stream() - .mapToLong(b -> Long.parseLong(b.replaceFirst(SNAPSHOT_INDEX_PREFIX, ""))) - .max() - .orElse(-1L) < Long.parseLong(indexGeneration) : "Tried to delete an index-N blob newer than the current generation [" - + indexGeneration - + "] when deleting index-N blobs " - + blobsToDelete; - afterWriteSnapBlob = () -> { - try { - writeShardIndexBlobAtomic(shardContainer, newGen, updatedBlobStoreIndexShardSnapshots); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException( - shardId, - "Failed to finalize snapshot creation [" - + snapshotId - + "] with shard index [" - + INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(indexGeneration) - + "]", - e - ); - } - try { - deleteFromContainer(shardContainer, blobsToDelete); - } catch (IOException e) { - logger.warn( - () -> new ParameterizedMessage( - "[{}][{}] failed to delete old index-N blobs during finalization", - snapshotId, - shardId - ), - e - ); - } - }; + // When using shard generations we can safely write the index-${uuid} blob before writing out any of the actual data + // for this shard since the uuid named blob will simply not be referenced in case of error and thus we will never + // reference a generation that has not had all its files fully upload. + indexGeneration = UUIDs.randomBase64UUID(); + try { + INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedBlobStoreIndexShardSnapshots, shardContainer, indexGeneration, compress); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException( + shardId, + "Failed to write shard level snapshot metadata for [" + + snapshotId + + "] to [" + + INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(indexGeneration) + + "]", + e + ); } - + afterWriteSnapBlob = () -> {}; final StepListener> allFilesUploadedListener = new StepListener<>(); allFilesUploadedListener.whenComplete(v -> { final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize(snapshotIndexCommit.getGeneration()); @@ -2970,9 +2878,7 @@ public BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContaine * the given list of blobs in the shard container. * * @param blobs list of blobs in repository - * @param generation shard generation or {@code null} in case there was no shard generation tracked in the {@link RepositoryData} for - * this shard because its snapshot was created in a version older than - * {@link SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION}. + * @param generation shard generation * @return tuple of BlobStoreIndexShardSnapshots and the last snapshot index generation */ private Tuple buildBlobStoreIndexShardSnapshots( diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java b/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java index 38d9df0e960e0..94ab875091caf 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java @@ -31,7 +31,6 @@ package org.opensearch.snapshots; -import org.opensearch.LegacyESVersion; import org.opensearch.Version; import org.opensearch.action.ShardOperationFailedException; import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest; @@ -69,8 +68,6 @@ */ public final class SnapshotInfo implements Comparable, ToXContent, Writeable { - public static final Version DATA_STREAMS_IN_SNAPSHOT = LegacyESVersion.V_7_9_0; - public static final String CONTEXT_MODE_PARAM = "context_mode"; public static final String CONTEXT_MODE_SNAPSHOT = "SNAPSHOT"; private static final DateFormatter DATE_TIME_FORMATTER = DateFormatter.forPattern("strict_date_optional_time"); @@ -401,11 +398,7 @@ public SnapshotInfo(final StreamInput in) throws IOException { version = in.readBoolean() ? Version.readVersion(in) : null; includeGlobalState = in.readOptionalBoolean(); userMetadata = in.readMap(); - if (in.getVersion().onOrAfter(DATA_STREAMS_IN_SNAPSHOT)) { - dataStreams = in.readStringList(); - } else { - dataStreams = Collections.emptyList(); - } + dataStreams = in.readStringList(); } /** @@ -836,9 +829,7 @@ public void writeTo(final StreamOutput out) throws IOException { } out.writeOptionalBoolean(includeGlobalState); out.writeMap(userMetadata); - if (out.getVersion().onOrAfter(DATA_STREAMS_IN_SNAPSHOT)) { - out.writeStringCollection(dataStreams); - } + out.writeStringCollection(dataStreams); } private static SnapshotState snapshotState(final String reason, final List shardFailures) { diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java index 22a85558e3b1d..86dbc31245b11 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java @@ -67,7 +67,6 @@ import org.opensearch.repositories.IndexId; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; -import org.opensearch.repositories.ShardGenerations; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportException; import org.opensearch.transport.TransportRequestDeduplicator; @@ -276,11 +275,6 @@ private void startNewShards(SnapshotsInProgress.Entry entry, Map() { @Override public void onResponse(String newGeneration) { diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index e53c2889f88e6..0123c839e4ad9 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -38,7 +38,6 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.ExceptionsHelper; -import org.opensearch.LegacyESVersion; import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; @@ -141,18 +140,6 @@ */ public class SnapshotsService extends AbstractLifecycleComponent implements ClusterStateApplier { - public static final Version FULL_CONCURRENCY_VERSION = LegacyESVersion.V_7_9_0; - - public static final Version CLONE_SNAPSHOT_VERSION = LegacyESVersion.V_7_10_0; - - public static final Version SHARD_GEN_IN_REPO_DATA_VERSION = LegacyESVersion.V_7_6_0; - - public static final Version INDEX_GEN_IN_REPO_DATA_VERSION = LegacyESVersion.V_7_9_0; - - public static final Version OLD_SNAPSHOT_FORMAT = LegacyESVersion.fromId(7050099); - - public static final Version MULTI_DELETE_VERSION = LegacyESVersion.V_7_8_0; - private static final Logger logger = LogManager.getLogger(SnapshotsService.class); public static final String UPDATE_SNAPSHOT_STATUS_ACTION_NAME = "internal:cluster/snapshot/update_snapshot_status"; @@ -168,9 +155,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus private final Map>>> snapshotCompletionListeners = new ConcurrentHashMap<>(); - // Set of snapshots that are currently being initialized by this node - private final Set initializingSnapshots = Collections.synchronizedSet(new HashSet<>()); - /** * Listeners for snapshot deletion keyed by delete uuid as returned from {@link SnapshotDeletionsInProgress.Entry#uuid()} */ @@ -285,18 +269,10 @@ public ClusterState execute(ClusterState currentState) { final List runningSnapshots = snapshots.entries(); ensureSnapshotNameNotRunning(runningSnapshots, repositoryName, snapshotName); validate(repositoryName, snapshotName, currentState); - final boolean concurrentOperationsAllowed = currentState.nodes().getMinNodeVersion().onOrAfter(FULL_CONCURRENCY_VERSION); final SnapshotDeletionsInProgress deletionsInProgress = currentState.custom( SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY ); - if (deletionsInProgress.hasDeletionsInProgress() && concurrentOperationsAllowed == false) { - throw new ConcurrentSnapshotExecutionException( - repositoryName, - snapshotName, - "cannot snapshot while a snapshot deletion is in-progress in [" + deletionsInProgress + "]" - ); - } final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom( RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY @@ -308,13 +284,6 @@ public ClusterState execute(ClusterState currentState) { "cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]" ); } - // Fail if there are any concurrently running snapshots. The only exception to this being a snapshot in INIT state from a - // previous cluster-manager that we can simply ignore and remove from the cluster state because we would clean it up from - // the - // cluster state anyway in #applyClusterState. - if (concurrentOperationsAllowed == false && runningSnapshots.stream().anyMatch(entry -> entry.state() != State.INIT)) { - throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running"); - } ensureNoCleanupInProgress(currentState, repositoryName, snapshotName); ensureBelowConcurrencyLimit(repositoryName, snapshotName, snapshots, deletionsInProgress); // Store newSnapshot here to be processed in clusterStateProcessed @@ -339,7 +308,6 @@ public ClusterState execute(ClusterState currentState) { currentState.metadata(), currentState.routingTable(), indexIds, - useShardGenerations(version), repositoryData, repositoryName ); @@ -1817,16 +1785,6 @@ public void deleteSnapshots(final DeleteSnapshotRequest request, final ActionLis @Override public ClusterState execute(ClusterState currentState) throws Exception { - final Version minNodeVersion = currentState.nodes().getMinNodeVersion(); - if (snapshotNames.length > 1 && minNodeVersion.before(MULTI_DELETE_VERSION)) { - throw new IllegalArgumentException( - "Deleting multiple snapshots in a single request is only supported in version [ " - + MULTI_DELETE_VERSION - + "] but cluster contained node of version [" - + currentState.nodes().getMinNodeVersion() - + "]" - ); - } final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); final List snapshotEntries = findInProgressSnapshots(snapshots, snapshotNames, repoName); final List snapshotIds = matchingSnapshotIds( @@ -1835,76 +1793,8 @@ public ClusterState execute(ClusterState currentState) throws Exception { snapshotNames, repoName ); - if (snapshotEntries.isEmpty() || minNodeVersion.onOrAfter(SnapshotsService.FULL_CONCURRENCY_VERSION)) { - deleteFromRepoTask = createDeleteStateUpdate(snapshotIds, repoName, repositoryData, Priority.NORMAL, listener); - return deleteFromRepoTask.execute(currentState); - } - assert snapshotEntries.size() == 1 : "Expected just a single running snapshot but saw " + snapshotEntries; - final SnapshotsInProgress.Entry snapshotEntry = snapshotEntries.get(0); - runningSnapshot = snapshotEntry.snapshot(); - final ImmutableOpenMap shards; - - final State state = snapshotEntry.state(); - final String failure; - - outstandingDeletes = new ArrayList<>(snapshotIds); - if (state != State.INIT) { - // INIT state snapshots won't ever be physically written to the repository but all other states will end up in the repo - outstandingDeletes.add(runningSnapshot.getSnapshotId()); - } - if (state == State.INIT) { - // snapshot is still initializing, mark it as aborted - shards = snapshotEntry.shards(); - assert shards.isEmpty(); - failure = "Snapshot was aborted during initialization"; - abortedDuringInit = true; - } else if (state == State.STARTED) { - // snapshot is started - mark every non completed shard as aborted - final SnapshotsInProgress.Entry abortedEntry = snapshotEntry.abort(); - shards = abortedEntry.shards(); - failure = abortedEntry.failure(); - } else { - boolean hasUncompletedShards = false; - // Cleanup in case a node gone missing and snapshot wasn't updated for some reason - for (ObjectCursor shardStatus : snapshotEntry.shards().values()) { - // Check if we still have shard running on existing nodes - if (shardStatus.value.state().completed() == false - && shardStatus.value.nodeId() != null - && currentState.nodes().get(shardStatus.value.nodeId()) != null) { - hasUncompletedShards = true; - break; - } - } - if (hasUncompletedShards) { - // snapshot is being finalized - wait for shards to complete finalization process - logger.debug("trying to delete completed snapshot - should wait for shards to finalize on all nodes"); - return currentState; - } else { - // no shards to wait for but a node is gone - this is the only case - // where we force to finish the snapshot - logger.debug("trying to delete completed snapshot with no finalizing shards - can delete immediately"); - shards = snapshotEntry.shards(); - } - failure = snapshotEntry.failure(); - } - return ClusterState.builder(currentState) - .putCustom( - SnapshotsInProgress.TYPE, - SnapshotsInProgress.of( - snapshots.entries() - .stream() - // remove init state snapshot we found from a previous cluster-manager if there was one - .filter(existing -> abortedDuringInit == false || existing.equals(snapshotEntry) == false) - .map(existing -> { - if (existing.equals(snapshotEntry)) { - return snapshotEntry.fail(shards, State.ABORTED, failure); - } - return existing; - }) - .collect(Collectors.toList()) - ) - ) - .build(); + deleteFromRepoTask = createDeleteStateUpdate(snapshotIds, repoName, repositoryData, Priority.NORMAL, listener); + return deleteFromRepoTask.execute(currentState); } @Override @@ -2053,14 +1943,6 @@ public ClusterState execute(ClusterState currentState) { SnapshotDeletionsInProgress.EMPTY ); final Version minNodeVersion = currentState.nodes().getMinNodeVersion(); - if (minNodeVersion.before(FULL_CONCURRENCY_VERSION)) { - if (deletionsInProgress.hasDeletionsInProgress()) { - throw new ConcurrentSnapshotExecutionException( - new Snapshot(repoName, snapshotIds.get(0)), - "cannot delete - another snapshot is currently being deleted in [" + deletionsInProgress + "]" - ); - } - } final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom( RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY @@ -2100,44 +1982,32 @@ public ClusterState execute(ClusterState currentState) { } // Snapshot ids that will have to be physically deleted from the repository final Set snapshotIdsRequiringCleanup = new HashSet<>(snapshotIds); - final SnapshotsInProgress updatedSnapshots; - if (minNodeVersion.onOrAfter(FULL_CONCURRENCY_VERSION)) { - updatedSnapshots = SnapshotsInProgress.of(snapshots.entries().stream().map(existing -> { - if (existing.state() == State.STARTED - && snapshotIdsRequiringCleanup.contains(existing.snapshot().getSnapshotId())) { - // snapshot is started - mark every non completed shard as aborted - final SnapshotsInProgress.Entry abortedEntry = existing.abort(); - if (abortedEntry == null) { - // No work has been done for this snapshot yet so we remove it from the cluster state directly - final Snapshot existingNotYetStartedSnapshot = existing.snapshot(); - // Adding the snapshot to #endingSnapshots since we still have to resolve its listeners to not trip - // any leaked listener assertions - if (endingSnapshots.add(existingNotYetStartedSnapshot)) { - completedNoCleanup.add(existingNotYetStartedSnapshot); - } - snapshotIdsRequiringCleanup.remove(existingNotYetStartedSnapshot.getSnapshotId()); - } else if (abortedEntry.state().completed()) { - completedWithCleanup.add(abortedEntry); + final SnapshotsInProgress updatedSnapshots = SnapshotsInProgress.of(snapshots.entries().stream().map(existing -> { + if (existing.state() == State.STARTED && snapshotIdsRequiringCleanup.contains(existing.snapshot().getSnapshotId())) { + // snapshot is started - mark every non completed shard as aborted + final SnapshotsInProgress.Entry abortedEntry = existing.abort(); + if (abortedEntry == null) { + // No work has been done for this snapshot yet so we remove it from the cluster state directly + final Snapshot existingNotYetStartedSnapshot = existing.snapshot(); + // Adding the snapshot to #endingSnapshots since we still have to resolve its listeners to not trip + // any leaked listener assertions + if (endingSnapshots.add(existingNotYetStartedSnapshot)) { + completedNoCleanup.add(existingNotYetStartedSnapshot); } - return abortedEntry; + snapshotIdsRequiringCleanup.remove(existingNotYetStartedSnapshot.getSnapshotId()); + } else if (abortedEntry.state().completed()) { + completedWithCleanup.add(abortedEntry); } - return existing; - }).filter(Objects::nonNull).collect(Collectors.toList())); - if (snapshotIdsRequiringCleanup.isEmpty()) { - // We only saw snapshots that could be removed from the cluster state right away, no need to update the deletions - return updateWithSnapshots(currentState, updatedSnapshots, null); + return abortedEntry; } - } else { - if (snapshots.entries().isEmpty() == false) { - // However other snapshots are running - cannot continue - throw new ConcurrentSnapshotExecutionException( - repoName, - snapshotIds.toString(), - "another snapshot is currently running cannot delete" - ); - } - updatedSnapshots = snapshots; + return existing; + }).filter(Objects::nonNull).collect(Collectors.toList())); + + if (snapshotIdsRequiringCleanup.isEmpty()) { + // We only saw snapshots that could be removed from the cluster state right away, no need to update the deletions + return updateWithSnapshots(currentState, updatedSnapshots, null); } + // add the snapshot deletion to the cluster state final SnapshotDeletionsInProgress.Entry replacedEntry = deletionsInProgress.getEntries() .stream() @@ -2266,41 +2136,11 @@ public Version minCompatibleVersion(Version minNodeVersion, RepositoryData repos .filter(excluded == null ? sn -> true : sn -> excluded.contains(sn) == false) .collect(Collectors.toList())) { final Version known = repositoryData.getVersion(snapshotId); - // If we don't have the version cached in the repository data yet we load it from the snapshot info blobs - if (known == null) { - assert repositoryData.shardGenerations().totalShards() == 0 : "Saw shard generations [" - + repositoryData.shardGenerations() - + "] but did not have versions tracked for snapshot [" - + snapshotId - + "]"; - return OLD_SNAPSHOT_FORMAT; - } else { - minCompatVersion = minCompatVersion.before(known) ? minCompatVersion : known; - } + minCompatVersion = minCompatVersion.before(known) ? minCompatVersion : known; } return minCompatVersion; } - /** - * Checks whether the metadata version supports writing {@link ShardGenerations} to the repository. - * - * @param repositoryMetaVersion version to check - * @return true if version supports {@link ShardGenerations} - */ - public static boolean useShardGenerations(Version repositoryMetaVersion) { - return repositoryMetaVersion.onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION); - } - - /** - * Checks whether the metadata version supports writing {@link ShardGenerations} to the repository. - * - * @param repositoryMetaVersion version to check - * @return true if version supports {@link ShardGenerations} - */ - public static boolean useIndexGenerations(Version repositoryMetaVersion) { - return repositoryMetaVersion.onOrAfter(INDEX_GEN_IN_REPO_DATA_VERSION); - } - /** Deletes snapshot from repository * * @param deleteEntry delete entry in cluster state @@ -2578,7 +2418,6 @@ private SnapshotsInProgress updatedSnapshotsInProgress(ClusterState currentState currentState.metadata(), currentState.routingTable(), entry.indices(), - entry.version().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION), repositoryData, repoName ); @@ -2677,7 +2516,6 @@ private static void completeListenersIgnoringException(@Nullable List shards( @@ -2686,7 +2524,6 @@ private static ImmutableOpenMap indices, - boolean useShardGenerations, RepositoryData repositoryData, String repoName ) { @@ -2712,18 +2549,15 @@ private static ImmutableOpenMap RepositoryData.snapshotsFromXContent(xParser, corruptedRepositoryData.getGenId(), randomBoolean()) + () -> RepositoryData.snapshotsFromXContent(xParser, corruptedRepositoryData.getGenId()) ); assertThat( e.getMessage(), @@ -327,7 +327,7 @@ public void testIndexThatReferenceANullSnapshot() throws IOException { try (XContentParser xParser = createParser(builder)) { OpenSearchParseException e = expectThrows( OpenSearchParseException.class, - () -> RepositoryData.snapshotsFromXContent(xParser, randomNonNegativeLong(), randomBoolean()) + () -> RepositoryData.snapshotsFromXContent(xParser, randomNonNegativeLong()) ); assertThat( e.getMessage(), diff --git a/test/framework/src/main/java/org/opensearch/repositories/blobstore/BlobStoreTestUtil.java b/test/framework/src/main/java/org/opensearch/repositories/blobstore/BlobStoreTestUtil.java index f9bc0d1066d8e..0702866af35e3 100644 --- a/test/framework/src/main/java/org/opensearch/repositories/blobstore/BlobStoreTestUtil.java +++ b/test/framework/src/main/java/org/opensearch/repositories/blobstore/BlobStoreTestUtil.java @@ -136,7 +136,7 @@ public static void assertConsistency(BlobStoreRepository repository, Executor ex XContentParser parser = XContentType.JSON.xContent() .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, blob) ) { - repositoryData = RepositoryData.snapshotsFromXContent(parser, latestGen, false); + repositoryData = RepositoryData.snapshotsFromXContent(parser, latestGen); } assertIndexUUIDs(repository, repositoryData); assertSnapshotUUIDs(repository, repositoryData); diff --git a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java index bbac1ef591418..e5eac9af13de1 100644 --- a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -419,8 +419,7 @@ protected String initWithSnapshotVersion(String repoName, Path repoPath, Version DeprecationHandler.THROW_UNSUPPORTED_OPERATION, Strings.toString(jsonBuilder).replace(Version.CURRENT.toString(), version.toString()) ), - repositoryData.getGenId(), - randomBoolean() + repositoryData.getGenId() ); Files.write( repoPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + repositoryData.getGenId()), @@ -512,7 +511,7 @@ protected void addBwCFailedSnapshot(String repoName, String snapshotName, Map