From 33afac1898114a4f3b61092ce4d61d7aa1b0795c Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Mon, 16 Sep 2024 15:50:11 +0530 Subject: [PATCH 1/9] Optimize GC flow with pinned timestamps Signed-off-by: Sachin Kale --- ...rePinnedTimestampsGarbageCollectionIT.java | 433 ++++++++++++++++++ .../snapshots/DeleteSnapshotV2IT.java | 231 +++++----- .../store/RemoteSegmentStoreDirectory.java | 25 +- .../RemoteFsTimestampAwareTranslog.java | 171 ++++--- .../index/translog/RemoteFsTranslog.java | 5 +- .../transfer/TranslogTransferMetadata.java | 10 + .../RemoteStorePinnedTimestampService.java | 56 ++- .../blobstore/BlobStoreRepository.java | 68 ++- .../snapshots/SnapshotsService.java | 10 +- .../RemoteSegmentStoreDirectoryTests.java | 4 +- .../RemoteFsTimestampAwareTranslogTests.java | 57 ++- 11 files changed, 831 insertions(+), 239 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java new file mode 100644 index 0000000000000..dedc6352f8140 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java @@ -0,0 +1,433 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; +import org.opensearch.index.translog.transfer.TranslogTransferMetadata; +import org.opensearch.indices.RemoteStoreSettings; +import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +import static org.opensearch.index.IndexSettings.INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING; +import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG; +import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA; +import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class RemoteStorePinnedTimestampsGarbageCollectionIT extends RemoteStoreBaseIntegTestCase { + static final String INDEX_NAME = "remote-store-test-idx-1"; + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true) + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA.getKey(), false) + .build(); + } + + private void keepPinnedTimestampSchedulerUpdated() throws InterruptedException { + long currentTime = System.currentTimeMillis(); + int maxRetry = 10; + while (maxRetry > 0 && RemoteStorePinnedTimestampService.getPinnedTimestamps().v1() <= currentTime) { + Thread.sleep(1000); + maxRetry--; + } + } + + ActionListener noOpActionListener = new ActionListener<>() { + @Override + public void onResponse(Void unused) {} + + @Override + public void onFailure(Exception e) {} + }; + + public void testLiveIndexNoPinnedTimestamps() throws Exception { + prepareCluster(1, 1, Settings.EMPTY); + Settings indexSettings = Settings.builder() + .put(remoteStoreIndexSettings(0, 1)) + .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 0) + .build(); + createIndex(INDEX_NAME, indexSettings); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + primaryNodeName(INDEX_NAME) + ); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + + int numDocs = randomIntBetween(5, 10); + for (int i = 0; i < numDocs; i++) { + keepPinnedTimestampSchedulerUpdated(); + indexSingleDoc(INDEX_NAME, true); + } + + String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); + String shardDataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + DATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1"); + String shardMetadataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + METADATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath); + + assertBusy(() -> { + List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); + assertEquals(1, metadataFiles.size()); + + verifyTranslogDataFileCount(metadataFiles, translogDataPath); + }); + } + + public void testLiveIndexNoPinnedTimestampsWithExtraGenSettingWithinLimit() throws Exception { + prepareCluster(1, 1, Settings.EMPTY); + Settings indexSettings = Settings.builder() + .put(remoteStoreIndexSettings(0, 1)) + .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 10) + .build(); + createIndex(INDEX_NAME, indexSettings); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + primaryNodeName(INDEX_NAME) + ); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + + int numDocs = randomIntBetween(5, 9); + for (int i = 0; i < numDocs; i++) { + keepPinnedTimestampSchedulerUpdated(); + indexSingleDoc(INDEX_NAME, true); + } + + String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); + String shardDataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + DATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1"); + String shardMetadataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + METADATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath); + + assertBusy(() -> { + List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); + assertEquals(numDocs + 1, metadataFiles.size()); + + verifyTranslogDataFileCount(metadataFiles, translogDataPath); + }); + } + + public void testLiveIndexNoPinnedTimestampsWithExtraGenSetting() throws Exception { + prepareCluster(1, 1, Settings.EMPTY); + Settings indexSettings = Settings.builder() + .put(remoteStoreIndexSettings(0, 1)) + .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 3) + .build(); + createIndex(INDEX_NAME, indexSettings); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + primaryNodeName(INDEX_NAME) + ); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + + int numDocs = 5; + for (int i = 0; i < numDocs; i++) { + keepPinnedTimestampSchedulerUpdated(); + indexSingleDoc(INDEX_NAME, true); + } + + String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); + String shardDataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + DATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1"); + String shardMetadataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + METADATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath); + + assertBusy(() -> { + List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); + assertEquals(3, metadataFiles.size()); + + verifyTranslogDataFileCount(metadataFiles, translogDataPath); + }); + } + + public void testLiveIndexWithPinnedTimestamps() throws Exception { + prepareCluster(1, 1, Settings.EMPTY); + Settings indexSettings = Settings.builder() + .put(remoteStoreIndexSettings(0, 1)) + .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 0) + .build(); + createIndex(INDEX_NAME, indexSettings); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + primaryNodeName(INDEX_NAME) + ); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + + int numDocs = randomIntBetween(5, 10); + for (int i = 0; i < numDocs; i++) { + keepPinnedTimestampSchedulerUpdated(); + indexSingleDoc(INDEX_NAME, true); + if (i == 2) { + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.timeValueMinutes(1)); + remoteStorePinnedTimestampService.pinTimestamp(System.currentTimeMillis(), "xyz", noOpActionListener); + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + } + } + + String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); + String shardDataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + DATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1"); + String shardMetadataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + METADATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath); + + assertBusy(() -> { + List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); + assertEquals(2, metadataFiles.size()); + + verifyTranslogDataFileCount(metadataFiles, translogDataPath); + }); + } + + public void testIndexDeletionNoPinnedTimestamps() throws Exception { + prepareCluster(1, 1, Settings.EMPTY); + Settings indexSettings = Settings.builder() + .put(remoteStoreIndexSettings(0, 1)) + .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 0) + .build(); + createIndex(INDEX_NAME, indexSettings); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + primaryNodeName(INDEX_NAME) + ); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + + int numDocs = randomIntBetween(5, 10); + for (int i = 0; i < numDocs; i++) { + keepPinnedTimestampSchedulerUpdated(); + indexSingleDoc(INDEX_NAME, true); + } + + String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); + String shardDataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + DATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1"); + String shardMetadataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + METADATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath); + + assertBusy(() -> { + List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); + assertEquals(1, metadataFiles.size()); + + verifyTranslogDataFileCount(metadataFiles, translogDataPath); + }); + + keepPinnedTimestampSchedulerUpdated(); + client().admin().indices().prepareDelete(INDEX_NAME).get(); + + assertBusy(() -> { + assertEquals(0, Files.list(translogMetadataPath).collect(Collectors.toList()).size()); + assertEquals(0, Files.list(translogDataPath).collect(Collectors.toList()).size()); + }); + } + + public void testIndexDeletionWithPinnedTimestamps() throws Exception { + prepareCluster(1, 1, Settings.EMPTY); + Settings indexSettings = Settings.builder() + .put(remoteStoreIndexSettings(0, 1)) + .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 0) + .build(); + createIndex(INDEX_NAME, indexSettings); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + primaryNodeName(INDEX_NAME) + ); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + + int numDocs = randomIntBetween(5, 10); + for (int i = 0; i < numDocs; i++) { + keepPinnedTimestampSchedulerUpdated(); + indexSingleDoc(INDEX_NAME, true); + if (i == 2) { + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.timeValueMinutes(1)); + remoteStorePinnedTimestampService.pinTimestamp(System.currentTimeMillis(), "xyz", noOpActionListener); + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + } + } + + String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); + String shardDataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + DATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1"); + String shardMetadataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + METADATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath); + + assertBusy(() -> { + List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); + assertEquals(2, metadataFiles.size()); + + verifyTranslogDataFileCount(metadataFiles, translogDataPath); + }, 30, TimeUnit.SECONDS); + + keepPinnedTimestampSchedulerUpdated(); + client().admin().indices().prepareDelete(INDEX_NAME).get(); + + assertBusy(() -> { + List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); + assertEquals(1, metadataFiles.size()); + + verifyTranslogDataFileCount(metadataFiles, translogDataPath); + }); + } + + private void verifyTranslogDataFileCount(List metadataFiles, Path translogDataPath) throws IOException { + List mdFiles = metadataFiles.stream().map(p -> p.getFileName().toString()).collect(Collectors.toList()); + Set generations = new HashSet<>(); + for (String mdFile : mdFiles) { + Tuple minMaxGen = TranslogTransferMetadata.getMinMaxTranslogGenerationFromFilename(mdFile); + generations.addAll(LongStream.rangeClosed(minMaxGen.v1(), minMaxGen.v2()).boxed().collect(Collectors.toList())); + } + assertEquals(generations.size() * 2, Files.list(translogDataPath).collect(Collectors.toList()).size()); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java index 1d7a58384c0be..7279e4de427e2 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java @@ -23,19 +23,31 @@ import org.opensearch.repositories.fs.FsRepository; import org.opensearch.test.OpenSearchIntegTestCase; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; import java.nio.file.Path; +import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.lessThan; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class DeleteSnapshotV2IT extends AbstractSnapshotIntegTestCase { private static final String REMOTE_REPO_NAME = "remote-store-repo-name"; + private void keepPinnedTimestampSchedulerUpdated() throws InterruptedException { + long currentTime = System.currentTimeMillis(); + int maxRetry = 10; + while (maxRetry > 0 && RemoteStorePinnedTimestampService.getPinnedTimestamps().v1() <= currentTime) { + Thread.sleep(1000); + maxRetry--; + } + } + public void testDeleteShallowCopyV2() throws Exception { disableRepoConsistencyCheck("Remote store repository is being used in the test"); @@ -74,8 +86,8 @@ public void testDeleteShallowCopyV2() throws Exception { createIndex(indexName1, getRemoteStoreBackedIndexSettings()); createIndex(indexName2, getRemoteStoreBackedIndexSettings()); - final int numDocsInIndex1 = 10; - final int numDocsInIndex2 = 20; + final int numDocsInIndex1 = 1; + final int numDocsInIndex2 = 2; indexRandomDocs(indexName1, numDocsInIndex1); indexRandomDocs(indexName2, numDocsInIndex2); ensureGreen(indexName1, indexName2); @@ -92,7 +104,7 @@ public void testDeleteShallowCopyV2() throws Exception { assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName1)); createIndex(indexName3, getRemoteStoreBackedIndexSettings()); - indexRandomDocs(indexName3, 10); + indexRandomDocs(indexName3, 1); CreateSnapshotResponse createSnapshotResponse2 = client().admin() .cluster() .prepareCreateSnapshot(snapshotRepoName, snapshotName2) @@ -105,109 +117,101 @@ public void testDeleteShallowCopyV2() throws Exception { assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName2)); assertAcked(client().admin().indices().prepareDelete(indexName1)); - Thread.sleep(100); - AcknowledgedResponse deleteResponse = client().admin() - .cluster() - .prepareDeleteSnapshot(snapshotRepoName, snapshotName2) - .setSnapshots(snapshotName2) - .get(); + AcknowledgedResponse deleteResponse = client().admin().cluster().prepareDeleteSnapshot(snapshotRepoName, snapshotName2).get(); assertTrue(deleteResponse.isAcknowledged()); // test delete non-existent snapshot assertThrows( SnapshotMissingException.class, - () -> client().admin().cluster().prepareDeleteSnapshot(snapshotRepoName, "random-snapshot").setSnapshots(snapshotName2).get() + () -> client().admin().cluster().prepareDeleteSnapshot(snapshotRepoName, "random-snapshot").get() ); - } - public void testDeleteShallowCopyV2MultipleSnapshots() throws Exception { + public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exception { disableRepoConsistencyCheck("Remote store repository is being used in the test"); final Path remoteStoreRepoPath = randomRepoPath(); + Settings settings = remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath); + settings = Settings.builder() + .put(settings) + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true) + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.FIXED.toString()) + .build(); + String clusterManagerName = internalCluster().startClusterManagerOnlyNode(settings); + internalCluster().startDataOnlyNode(settings); + final Client clusterManagerClient = internalCluster().clusterManagerClient(); + ensureStableCluster(2); - internalCluster().startClusterManagerOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); - internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); - internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath)); - - String indexName1 = "testindex1"; - String indexName2 = "testindex2"; - String indexName3 = "testindex3"; - String snapshotRepoName = "test-create-snapshot-repo"; - String snapshotName1 = "test-create-snapshot1"; - String snapshotName2 = "test-create-snapshot2"; - Path absolutePath1 = randomRepoPath().toAbsolutePath(); - logger.info("Snapshot Path [{}]", absolutePath1); - - Client client = client(); - - assertAcked( - client.admin() - .cluster() - .preparePutRepository(snapshotRepoName) - .setType(FsRepository.TYPE) - .setSettings( - Settings.builder() - .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) - .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) - .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) - .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) - .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true) - ) + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + clusterManagerName ); - createIndex(indexName1, getRemoteStoreBackedIndexSettings()); + final String snapshotRepoName = "snapshot-repo-name"; + final Path snapshotRepoPath = randomRepoPath(); + createRepository(snapshotRepoName, "mock", snapshotRepoSettingsForShallowV2(snapshotRepoPath)); - createIndex(indexName2, getRemoteStoreBackedIndexSettings()); + final String remoteStoreEnabledIndexName = "remote-index-1"; + final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(); + createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings); + indexRandomDocs(remoteStoreEnabledIndexName, 25); - final int numDocsInIndex1 = 10; - final int numDocsInIndex2 = 20; - indexRandomDocs(indexName1, numDocsInIndex1); - indexRandomDocs(indexName2, numDocsInIndex2); - ensureGreen(indexName1, indexName2); + String indexUUID = client().admin() + .indices() + .prepareGetSettings(remoteStoreEnabledIndexName) + .get() + .getSetting(remoteStoreEnabledIndexName, IndexMetadata.SETTING_INDEX_UUID); + logger.info("--> create two remote index shallow snapshots"); CreateSnapshotResponse createSnapshotResponse = client().admin() .cluster() - .prepareCreateSnapshot(snapshotRepoName, snapshotName1) + .prepareCreateSnapshot(snapshotRepoName, "snap1") .setWaitForCompletion(true) .get(); - SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo(); - assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); - assertThat(snapshotInfo.successfulShards(), greaterThan(0)); - assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards())); - assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName1)); + SnapshotInfo snapshotInfo1 = createSnapshotResponse.getSnapshotInfo(); - createIndex(indexName3, getRemoteStoreBackedIndexSettings()); - indexRandomDocs(indexName3, 10); + Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID); + Path shardPath = Path.of(String.valueOf(indexPath), "0"); - CreateSnapshotResponse createSnapshotResponse2 = client().admin() - .cluster() - .prepareCreateSnapshot(snapshotRepoName, snapshotName2) - .setWaitForCompletion(true) - .get(); - snapshotInfo = createSnapshotResponse2.getSnapshotInfo(); - assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); - assertThat(snapshotInfo.successfulShards(), greaterThan(0)); - assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards())); - assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName2)); + // delete remote store index + assertAcked(client().admin().indices().prepareDelete(remoteStoreEnabledIndexName)); + + logger.info("--> delete snapshot 1"); + + Path segmentsPath = Path.of(String.valueOf(shardPath), "segments"); + Path translogPath = Path.of(String.valueOf(shardPath), "translog"); + + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); - AcknowledgedResponse deleteResponse = client().admin() + keepPinnedTimestampSchedulerUpdated(); + + AcknowledgedResponse deleteSnapshotResponse = clusterManagerClient.admin() .cluster() - .prepareDeleteSnapshot(snapshotRepoName, snapshotName1, snapshotName2) - .setSnapshots(snapshotName2) + .prepareDeleteSnapshot(snapshotRepoName, snapshotInfo1.snapshotId().getName()) .get(); - assertTrue(deleteResponse.isAcknowledged()); + assertAcked(deleteSnapshotResponse); - // test delete non-existent snapshot - assertThrows( - SnapshotMissingException.class, - () -> client().admin().cluster().prepareDeleteSnapshot(snapshotRepoName, "random-snapshot").setSnapshots(snapshotName2).get() - ); + // Delete is async. Give time for it + assertBusy(() -> { + try { + assertEquals(0, RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath)); + } catch (NoSuchFileException e) { + fail(); + } + }, 60, TimeUnit.SECONDS); + assertBusy(() -> { + try { + assertEquals(0, RemoteStoreBaseIntegTestCase.getFileCount(translogPath)); + } catch (NoSuchFileException e) { + fail(); + } + }, 60, TimeUnit.SECONDS); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/15692") - public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exception { + public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2MultipleSnapshots() throws Exception { disableRepoConsistencyCheck("Remote store repository is being used in the test"); final Path remoteStoreRepoPath = randomRepoPath(); Settings settings = remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath); @@ -242,11 +246,11 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio .get() .getSetting(remoteStoreEnabledIndexName, IndexMetadata.SETTING_INDEX_UUID); - String numShards = client().admin() - .indices() - .prepareGetSettings(remoteStoreEnabledIndexName) - .get() - .getSetting(remoteStoreEnabledIndexName, IndexMetadata.SETTING_NUMBER_OF_SHARDS); + Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID); + Path shardPath = Path.of(String.valueOf(indexPath), "0"); + + Path segmentsPath = Path.of(String.valueOf(shardPath), "segments", "data"); + Path translogPath = Path.of(String.valueOf(shardPath), "translog", "data", "1"); logger.info("--> create two remote index shallow snapshots"); CreateSnapshotResponse createSnapshotResponse = client().admin() @@ -256,6 +260,10 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio .get(); SnapshotInfo snapshotInfo1 = createSnapshotResponse.getSnapshotInfo(); + List segmentsPostSnapshot1 = Files.list(segmentsPath).collect(Collectors.toList()); + List translogPostSnapshot1 = Files.list(translogPath).collect(Collectors.toList()); + + forceMerge(1); indexRandomDocs(remoteStoreEnabledIndexName, 25); CreateSnapshotResponse createSnapshotResponse2 = client().admin() @@ -264,70 +272,47 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio .setWaitForCompletion(true) .get(); SnapshotInfo snapshotInfo2 = createSnapshotResponse2.getSnapshotInfo(); + + List segmentsPostSnapshot2 = Files.list(segmentsPath).collect(Collectors.toList()); + List translogPostSnapshot2 = Files.list(translogPath).collect(Collectors.toList()); + assertThat(snapshotInfo2.state(), equalTo(SnapshotState.SUCCESS)); assertThat(snapshotInfo2.successfulShards(), greaterThan(0)); assertThat(snapshotInfo2.successfulShards(), equalTo(snapshotInfo2.totalShards())); assertThat(snapshotInfo2.snapshotId().getName(), equalTo("snap2")); - // delete remote store index - assertAcked(client().admin().indices().prepareDelete(remoteStoreEnabledIndexName)); - - logger.info("--> delete snapshot 2"); - - Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID); - Path shardPath = Path.of(String.valueOf(indexPath), "0"); - Path segmentsPath = Path.of(String.valueOf(shardPath), "segments"); - Path translogPath = Path.of(String.valueOf(shardPath), "translog"); - - // Get total segments remote store directory file count for deleted index and shard 0 - int segmentFilesCountBeforeDeletingSnapshot1 = RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath); - int translogFilesCountBeforeDeletingSnapshot1 = RemoteStoreBaseIntegTestCase.getFileCount(translogPath); - - RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); - - AcknowledgedResponse deleteSnapshotResponse = clusterManagerClient.admin() - .cluster() - .prepareDeleteSnapshot(snapshotRepoName, snapshotInfo2.snapshotId().getName()) - .get(); - assertAcked(deleteSnapshotResponse); + assertBusy(() -> assertTrue(translogPostSnapshot2.size() > translogPostSnapshot1.size()), 60, TimeUnit.SECONDS); + assertBusy(() -> assertTrue(segmentsPostSnapshot2.size() > segmentsPostSnapshot1.size()), 60, TimeUnit.SECONDS); - Thread.sleep(5000); + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + keepPinnedTimestampSchedulerUpdated(); - assertBusy(() -> { - try { - assertThat(RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath), lessThan(segmentFilesCountBeforeDeletingSnapshot1)); - } catch (Exception e) {} - }, 30, TimeUnit.SECONDS); - int segmentFilesCountAfterDeletingSnapshot1 = RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath); + // delete remote store index + assertAcked(client().admin().indices().prepareDelete(remoteStoreEnabledIndexName)); logger.info("--> delete snapshot 1"); RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + keepPinnedTimestampSchedulerUpdated(); // on snapshot deletion, remote store segment files should get cleaned up for deleted index - `remote-index-1` - deleteSnapshotResponse = clusterManagerClient.admin() + AcknowledgedResponse deleteSnapshotResponse = clusterManagerClient.admin() .cluster() .prepareDeleteSnapshot(snapshotRepoName, snapshotInfo1.snapshotId().getName()) .get(); assertAcked(deleteSnapshotResponse); - // Delete is async. Give time for it - assertBusy(() -> { - try { - assertThat(RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath), lessThan(segmentFilesCountAfterDeletingSnapshot1)); - } catch (Exception e) {} - }, 60, TimeUnit.SECONDS); - - assertBusy(() -> { - try { - assertThat(RemoteStoreBaseIntegTestCase.getFileCount(translogPath), lessThan(translogFilesCountBeforeDeletingSnapshot1)); - } catch (Exception e) {} - }, 60, TimeUnit.SECONDS); + List segmentsPostDeletionOfSnapshot1 = Files.list(segmentsPath).collect(Collectors.toList()); + List translogPostDeletionOfSnapshot1 = Files.list(translogPath).collect(Collectors.toList()); + // Delete is async. Give time for it + assertBusy(() -> assertEquals(translogPostSnapshot2.size(), translogPostDeletionOfSnapshot1.size()), 60, TimeUnit.SECONDS); + assertBusy(() -> assertEquals(segmentsPostSnapshot2.size(), segmentsPostDeletionOfSnapshot1.size()), 60, TimeUnit.SECONDS); } private Settings snapshotV2Settings(Path remoteStoreRepoPath) { Settings settings = Settings.builder() .put(remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath)) .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true) + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA.getKey(), false) .build(); return settings; } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 72bf07d4b03b2..cde63b37c1289 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -817,6 +817,10 @@ Set getMetadataFilesToFilterActiveSegments( return metadataFilesToFilterActiveSegments; } + public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException { + deleteStaleSegments(lastNMetadataFilesToKeep, Map.of()); + } + /** * Delete stale segment and metadata files * One metadata file is kept per commit (refresh updates the same file). To read segments uploaded to remote store, @@ -832,7 +836,7 @@ Set getMetadataFilesToFilterActiveSegments( * @param lastNMetadataFilesToKeep number of metadata files to keep * @throws IOException in case of I/O error while reading from / writing to remote segment store */ - public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException { + private void deleteStaleSegments(int lastNMetadataFilesToKeep, Map pinnedTimestampsToSkip) throws IOException { if (lastNMetadataFilesToKeep == -1) { logger.info( "Stale segment deletion is disabled if cluster.remote_store.index.segment_metadata.retention.max_count is set to -1" @@ -854,12 +858,12 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException } // Check last fetch status of pinned timestamps. If stale, return. - if (RemoteStoreUtils.isPinnedTimestampStateStale()) { + if (lastNMetadataFilesToKeep != 0 && RemoteStoreUtils.isPinnedTimestampStateStale()) { logger.warn("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale"); return; } - Tuple> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps(); + Tuple> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps(pinnedTimestampsToSkip); Set implicitLockedFiles = RemoteStoreUtils.getPinnedTimestampLockedFiles( sortedMetadataFileList, @@ -994,7 +998,9 @@ public static void remoteDirectoryCleanup( String remoteStoreRepoForIndex, String indexUUID, ShardId shardId, - RemoteStorePathStrategy pathStrategy + RemoteStorePathStrategy pathStrategy, + boolean forceClean, + Map pinnedTimestampsToSkip ) { try { RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = (RemoteSegmentStoreDirectory) remoteDirectoryFactory.newDirectory( @@ -1003,8 +1009,12 @@ public static void remoteDirectoryCleanup( shardId, pathStrategy ); - remoteSegmentStoreDirectory.deleteStaleSegments(0); - remoteSegmentStoreDirectory.deleteIfEmpty(); + if (forceClean) { + remoteSegmentStoreDirectory.delete(); + } else { + remoteSegmentStoreDirectory.deleteStaleSegments(0, pinnedTimestampsToSkip); + remoteSegmentStoreDirectory.deleteIfEmpty(); + } } catch (Exception e) { staticLogger.error("Exception occurred while deleting directory", e); } @@ -1023,7 +1033,10 @@ private boolean deleteIfEmpty() throws IOException { logger.info("Remote directory still has files, not deleting the path"); return false; } + return delete(); + } + private boolean delete() { try { remoteDataDirectory.delete(); remoteMetadataDirectory.delete(); diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java index 27d34ec0d05af..9391bc81c437c 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java @@ -61,6 +61,7 @@ public class RemoteFsTimestampAwareTranslog extends RemoteFsTranslog { private final Map> oldFormatMetadataFileGenerationMap; private final Map> oldFormatMetadataFilePrimaryTermMap; private final AtomicLong minPrimaryTermInRemote = new AtomicLong(Long.MAX_VALUE); + private long maxDeletedGenerationOnRemote = 0; public RemoteFsTimestampAwareTranslog( TranslogConfig config, @@ -135,13 +136,20 @@ protected void trimUnreferencedReaders(boolean indexDeleted, boolean trimLocal) // This is to ensure that after the permits are acquired during primary relocation, there are no further modification on remote // store. - if (startedPrimarySupplier.getAsBoolean() == false || pauseSync.get()) { + if ((indexDeleted == false && startedPrimarySupplier.getAsBoolean() == false) || pauseSync.get()) { return; } // This is to fail fast and avoid listing md files un-necessarily. if (indexDeleted == false && RemoteStoreUtils.isPinnedTimestampStateStale()) { - logger.warn("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale"); + logger.warn("Skipping remote translog garbage collection as last fetch of pinned timestamp is stale"); + return; + } + + // This code block ensures parity with RemoteFsTranslog. Without this, we will end up making list translog metadata + // call in each invocation of trimUnreferencedReaders + long minGenerationToKeep = minRemoteGenReferenced - indexSettings().getRemoteTranslogExtraKeep(); + if (indexDeleted == false && (minGenerationToKeep <= maxDeletedGenerationOnRemote)) { return; } @@ -158,7 +166,7 @@ public void onResponse(List blobMetadata) { List metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList()); try { - if (metadataFiles.size() <= 1) { + if (indexDeleted == false && metadataFiles.size() <= 1) { logger.debug("No stale translog metadata files found"); remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); return; @@ -166,16 +174,12 @@ public void onResponse(List blobMetadata) { // Check last fetch status of pinned timestamps. If stale, return. if (indexDeleted == false && RemoteStoreUtils.isPinnedTimestampStateStale()) { - logger.warn("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale"); + logger.warn("Skipping remote translog garbage collection as last fetch of pinned timestamp is stale"); remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); return; } - List metadataFilesToBeDeleted = getMetadataFilesToBeDeleted( - metadataFiles, - metadataFilePinnedTimestampMap, - logger - ); + List metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles, indexDeleted); // If index is not deleted, make sure to keep latest metadata file if (indexDeleted == false) { @@ -194,21 +198,28 @@ public void onResponse(List blobMetadata) { metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted); logger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted); + Set generationsToBeDeleted = getGenerationsToBeDeleted( metadataFilesNotToBeDeleted, metadataFilesToBeDeleted, - indexDeleted + indexDeleted ? Long.MAX_VALUE : minRemoteGenReferenced ); logger.debug(() -> "generationsToBeDeleted = " + generationsToBeDeleted); if (generationsToBeDeleted.isEmpty() == false) { + maxDeletedGenerationOnRemote = generationsToBeDeleted.stream().max(Long::compareTo).get(); + // Delete stale generations translogTransferManager.deleteGenerationAsync( primaryTermSupplier.getAsLong(), generationsToBeDeleted, remoteGenerationDeletionPermits::release ); + } else { + remoteGenerationDeletionPermits.release(); + } + if (metadataFilesToBeDeleted.isEmpty() == false) { // Delete stale metadata files translogTransferManager.deleteMetadataFilesAsync( metadataFilesToBeDeleted, @@ -217,11 +228,10 @@ public void onResponse(List blobMetadata) { // Update cache to keep only those metadata files that are not getting deleted oldFormatMetadataFileGenerationMap.keySet().retainAll(metadataFilesNotToBeDeleted); - // Delete stale primary terms deleteStaleRemotePrimaryTerms(metadataFilesNotToBeDeleted); } else { - remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); + remoteGenerationDeletionPermits.release(); } } catch (Exception e) { remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); @@ -241,14 +251,8 @@ public void onFailure(Exception e) { protected Set getGenerationsToBeDeleted( List metadataFilesNotToBeDeleted, List metadataFilesToBeDeleted, - boolean indexDeleted + long minRemoteGenReferenced ) throws IOException { - long maxGenerationToBeDeleted = Long.MAX_VALUE; - - if (indexDeleted == false) { - maxGenerationToBeDeleted = minRemoteGenReferenced - 1 - indexSettings().getRemoteTranslogExtraKeep(); - } - Set generationsFromMetadataFilesToBeDeleted = new HashSet<>(); for (String mdFile : metadataFilesToBeDeleted) { Tuple minMaxGen = getMinMaxTranslogGenerationFromMetadataFile(mdFile, translogTransferManager); @@ -262,24 +266,36 @@ protected Set getGenerationsToBeDeleted( Set generationsToBeDeleted = new HashSet<>(); for (long generation : generationsFromMetadataFilesToBeDeleted) { // Check if the generation is not referred by metadata file matching pinned timestamps - if (generation <= maxGenerationToBeDeleted && isGenerationPinned(generation, pinnedGenerations) == false) { + // The check with minRemoteGenReferenced is redundant but kept as to make sure we don't delete generations + // that are not persisted in remote segment store yet. + if (generation < minRemoteGenReferenced && isGenerationPinned(generation, pinnedGenerations) == false) { generationsToBeDeleted.add(generation); } } return generationsToBeDeleted; } - protected List getMetadataFilesToBeDeleted(List metadataFiles) { - return getMetadataFilesToBeDeleted(metadataFiles, metadataFilePinnedTimestampMap, logger); + protected List getMetadataFilesToBeDeleted(List metadataFiles, boolean indexDeleted) { + return getMetadataFilesToBeDeleted( + metadataFiles, + metadataFilePinnedTimestampMap, + minRemoteGenReferenced, + Map.of(), + indexDeleted, + logger + ); } // Visible for testing protected static List getMetadataFilesToBeDeleted( List metadataFiles, Map metadataFilePinnedTimestampMap, + long minRemoteGenReferenced, + Map pinnedTimestampsToSkip, + boolean indexDeleted, Logger logger ) { - Tuple> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps(); + Tuple> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps(pinnedTimestampsToSkip); // Keep files since last successful run of scheduler List metadataFilesToBeDeleted = RemoteStoreUtils.filterOutMetadataFilesBasedOnAge( @@ -312,6 +328,22 @@ protected static List getMetadataFilesToBeDeleted( metadataFilesToBeDeleted.size() ); + if (indexDeleted == false) { + // Filter out metadata files based on minRemoteGenReferenced + List metadataFilesContainingMinRemoteGenReferenced = metadataFilesToBeDeleted.stream().filter(md -> { + long maxGeneration = TranslogTransferMetadata.getMaxGenerationFromFileName(md); + return maxGeneration == -1 || maxGeneration > minRemoteGenReferenced; + }).collect(Collectors.toList()); + metadataFilesToBeDeleted.removeAll(metadataFilesContainingMinRemoteGenReferenced); + + logger.trace( + "metadataFilesContainingMinRemoteGenReferenced.size = {}, metadataFilesToBeDeleted based on minRemoteGenReferenced filtering = {}, minRemoteGenReferenced = {}", + metadataFilesContainingMinRemoteGenReferenced.size(), + metadataFilesToBeDeleted.size(), + minRemoteGenReferenced + ); + } + return metadataFilesToBeDeleted; } @@ -472,50 +504,65 @@ protected static Tuple getMinMaxPrimaryTermFromMetadataFile( } } - public static void cleanup(TranslogTransferManager translogTransferManager) throws IOException { - ActionListener> listMetadataFilesListener = new ActionListener<>() { - @Override - public void onResponse(List blobMetadata) { - List metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList()); + public static void cleanup( + TranslogTransferManager translogTransferManager, + boolean forceClean, + Map pinnedTimestampsToSkip + ) throws IOException { + if (forceClean) { + translogTransferManager.delete(); + } else { + ActionListener> listMetadataFilesListener = new ActionListener<>() { + @Override + public void onResponse(List blobMetadata) { + List metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList()); + + try { + if (metadataFiles.isEmpty()) { + staticLogger.debug("No stale translog metadata files found"); + return; + } + List metadataFilesToBeDeleted = getMetadataFilesToBeDeleted( + metadataFiles, + new HashMap<>(), + Long.MAX_VALUE, + pinnedTimestampsToSkip, + true, + staticLogger + ); + if (metadataFilesToBeDeleted.isEmpty()) { + staticLogger.debug("No metadata files to delete"); + return; + } + staticLogger.debug(() -> "metadataFilesToBeDeleted = " + metadataFilesToBeDeleted); - try { - if (metadataFiles.isEmpty()) { - staticLogger.debug("No stale translog metadata files found"); - return; - } - List metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles, new HashMap<>(), staticLogger); - if (metadataFilesToBeDeleted.isEmpty()) { - staticLogger.debug("No metadata files to delete"); - return; - } - staticLogger.debug(() -> "metadataFilesToBeDeleted = " + metadataFilesToBeDeleted); + // For all the files that we are keeping, fetch min and max generations + List metadataFilesNotToBeDeleted = new ArrayList<>(metadataFiles); + metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted); + staticLogger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted); - // For all the files that we are keeping, fetch min and max generations - List metadataFilesNotToBeDeleted = new ArrayList<>(metadataFiles); - metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted); - staticLogger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted); + // Delete stale metadata files + translogTransferManager.deleteMetadataFilesAsync(metadataFilesToBeDeleted, () -> {}); - // Delete stale metadata files - translogTransferManager.deleteMetadataFilesAsync(metadataFilesToBeDeleted, () -> {}); + // Delete stale primary terms + deleteStaleRemotePrimaryTerms( + metadataFilesNotToBeDeleted, + translogTransferManager, + new HashMap<>(), + new AtomicLong(Long.MAX_VALUE), + staticLogger + ); + } catch (Exception e) { + staticLogger.error("Exception while cleaning up metadata and primary terms", e); + } + } - // Delete stale primary terms - deleteStaleRemotePrimaryTerms( - metadataFilesNotToBeDeleted, - translogTransferManager, - new HashMap<>(), - new AtomicLong(Long.MAX_VALUE), - staticLogger - ); - } catch (Exception e) { + @Override + public void onFailure(Exception e) { staticLogger.error("Exception while cleaning up metadata and primary terms", e); } - } - - @Override - public void onFailure(Exception e) { - staticLogger.error("Exception while cleaning up metadata and primary terms", e); - } - }; - translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener); + }; + translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener); + } } } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 812852d107682..a54a31cea41ef 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -685,10 +685,11 @@ public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOExcepti maxRemoteTranslogGenerationUploaded = generation; minRemoteGenReferenced = getMinFileGeneration(); logger.debug( - "Successfully uploaded translog for primary term = {}, generation = {}, maxSeqNo = {}", + "Successfully uploaded translog for primary term = {}, generation = {}, maxSeqNo = {}, minRemoteGenReferenced = {}", primaryTerm, generation, - maxSeqNo + maxSeqNo, + minRemoteGenReferenced ); } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java index 3b8885055e8f7..7fe3305545085 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java @@ -170,6 +170,16 @@ public static Tuple getMinMaxTranslogGenerationFromFilename(String f } } + public static long getMaxGenerationFromFileName(String filename) { + String[] tokens = filename.split(METADATA_SEPARATOR); + try { + return RemoteStoreUtils.invertLong(tokens[2]); + } catch (Exception e) { + logger.error(() -> new ParameterizedMessage("Exception while getting max generation from: {}", filename), e); + return -1; + } + } + public static Tuple getMinMaxPrimaryTermFromFilename(String filename) { String[] tokens = filename.split(METADATA_SEPARATOR); if (tokens.length < 7) { diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java index 71133615ed056..6d801f9f5e50e 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java @@ -37,7 +37,6 @@ import java.util.Map; import java.util.Set; import java.util.function.Supplier; -import java.util.stream.Collectors; /** * Service for managing pinned timestamps in a remote store. @@ -48,7 +47,8 @@ @ExperimentalApi public class RemoteStorePinnedTimestampService implements Closeable { private static final Logger logger = LogManager.getLogger(RemoteStorePinnedTimestampService.class); - private static Tuple> pinnedTimestampsSet = new Tuple<>(-1L, Set.of()); + private static Tuple>> pinningEntityTimestampMap = new Tuple<>(-1L, Map.of()); + public static final String PINNED_TIMESTAMPS_PATH_TOKEN = "pinned_timestamps"; public static final String PINNED_TIMESTAMPS_FILENAME_SEPARATOR = "__"; @@ -199,21 +199,23 @@ public void cloneTimestamp(long timestamp, String existingPinningEntity, String } } - private String getBlobName(long timestamp, String pinningEntity) { + public static String getBlobName(long timestamp, String pinningEntity) { return String.join(PINNED_TIMESTAMPS_FILENAME_SEPARATOR, pinningEntity, String.valueOf(timestamp)); } - private long getTimestampFromBlobName(String blobName) { + public static Tuple getPinningEntityTimestampFromBlobName(String blobName) { String[] blobNameTokens = blobName.split(PINNED_TIMESTAMPS_FILENAME_SEPARATOR); if (blobNameTokens.length < 2) { logger.error("Pinned timestamps blob name contains invalid format: {}", blobName); } try { - return Long.parseLong(blobNameTokens[blobNameTokens.length - 1]); + String pinningEntity = blobName.substring(blobName.lastIndexOf(PINNED_TIMESTAMPS_FILENAME_SEPARATOR)); + Long timestamp = Long.parseLong(blobNameTokens[blobNameTokens.length - 1]); + return new Tuple<>(pinningEntity, timestamp); } catch (NumberFormatException e) { logger.error(() -> new ParameterizedMessage("Pinned timestamps blob name contains invalid format: {}", blobName), e); } - return -1; + return null; } /** @@ -248,14 +250,32 @@ public void close() throws IOException { // Used in integ tests public void rescheduleAsyncUpdatePinnedTimestampTask(TimeValue pinnedTimestampsSchedulerInterval) { if (pinnedTimestampsSchedulerInterval != null) { - pinnedTimestampsSet = new Tuple<>(-1L, Set.of()); + pinningEntityTimestampMap = new Tuple<>(-1L, Map.of()); asyncUpdatePinnedTimestampTask.close(); startAsyncUpdateTask(pinnedTimestampsSchedulerInterval); } } public static Tuple> getPinnedTimestamps() { - return pinnedTimestampsSet; + return getPinnedTimestamps(null); + } + + public static Tuple> getPinnedTimestamps(Map pinnedTimestampsToSkip) { + Set allPinnedTimestamps = new HashSet<>(); + if (pinnedTimestampsToSkip == null || pinnedTimestampsToSkip.isEmpty()) { + pinningEntityTimestampMap.v2().values().forEach(allPinnedTimestamps::addAll); + } else { + for (String pinningEntity : pinningEntityTimestampMap.v2().keySet()) { + if (pinnedTimestampsToSkip.containsKey(pinningEntity)) { + Set timestamps = new HashSet<>(pinningEntityTimestampMap.v2().get(pinningEntity)); + timestamps.remove(pinnedTimestampsToSkip.get(pinningEntity)); + allPinnedTimestamps.addAll(timestamps); + } else { + allPinnedTimestamps.addAll(pinningEntityTimestampMap.v2().get(pinningEntity)); + } + } + } + return new Tuple<>(pinningEntityTimestampMap.v1(), allPinnedTimestamps); } /** @@ -278,16 +298,22 @@ protected void runInternal() { try { Map pinnedTimestampList = blobContainer.listBlobs(); if (pinnedTimestampList.isEmpty()) { - pinnedTimestampsSet = new Tuple<>(triggerTimestamp, Set.of()); + logger.debug("Fetched empty pinned timestamps from remote store: {}", triggerTimestamp); + pinningEntityTimestampMap = new Tuple<>(triggerTimestamp, Map.of()); return; } - Set pinnedTimestamps = pinnedTimestampList.keySet() - .stream() - .map(RemoteStorePinnedTimestampService.this::getTimestampFromBlobName) - .filter(timestamp -> timestamp != -1) - .collect(Collectors.toSet()); + Map> pinnedTimestamps = new HashMap<>(); + for (String blobName : pinnedTimestampList.keySet()) { + Tuple pinningEntityTimestamp = getPinningEntityTimestampFromBlobName(blobName); + if (pinningEntityTimestamp != null) { + if (pinnedTimestamps.containsKey(pinningEntityTimestamp.v1()) == false) { + pinnedTimestamps.put(pinningEntityTimestamp.v1(), new HashSet<>()); + } + pinnedTimestamps.get(pinningEntityTimestamp.v1()).add(pinningEntityTimestamp.v2()); + } + } logger.debug("Fetched pinned timestamps from remote store: {} - {}", triggerTimestamp, pinnedTimestamps); - pinnedTimestampsSet = new Tuple<>(triggerTimestamp, pinnedTimestamps); + pinningEntityTimestampMap = new Tuple<>(triggerTimestamp, pinnedTimestamps); } catch (Throwable t) { logger.error("Exception while fetching pinned timestamp details", t); } diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 0292cecc36a81..1a093ab120582 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -199,7 +199,6 @@ import static org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm.FNV_1A_COMPOSITE_1; import static org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName; import static org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS; -import static org.opensearch.snapshots.SnapshotsService.SNAPSHOT_PINNED_TIMESTAMP_DELIMITER; /** * BlobStore - based implementation of Snapshot Repository @@ -1281,7 +1280,8 @@ private void doDeleteShardSnapshots( snapshotIds, writeShardMetaDataAndComputeDeletesStep.result(), remoteSegmentStoreDirectoryFactory, - afterCleanupsListener + afterCleanupsListener, + snapshotIdPinnedTimestampMap ); } else { asyncCleanupUnlinkedShardLevelBlobs( @@ -1300,7 +1300,8 @@ private void cleanUpRemoteStoreFilesForDeletedIndicesV2( Collection snapshotIds, Collection result, RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory, - ActionListener afterCleanupsListener + ActionListener afterCleanupsListener, + Map snapshotIdPinnedTimestampMap ) { try { Set uniqueIndexIds = new HashSet<>(); @@ -1309,7 +1310,14 @@ private void cleanUpRemoteStoreFilesForDeletedIndicesV2( } // iterate through all the indices and trigger remote store directory cleanup for deleted index segments for (String indexId : uniqueIndexIds) { - cleanRemoteStoreDirectoryIfNeeded(snapshotIds, indexId, repositoryData, remoteSegmentStoreDirectoryFactory); + cleanRemoteStoreDirectoryIfNeeded( + snapshotIds, + indexId, + repositoryData, + remoteSegmentStoreDirectoryFactory, + snapshotIdPinnedTimestampMap, + false + ); } afterCleanupsListener.onResponse(null); } catch (Exception e) { @@ -1357,7 +1365,7 @@ private void removeSnapshotPinnedTimestamp( ) { remoteStorePinnedTimestampService.unpinTimestamp( timestampToUnpin, - repository + SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + snapshotId.getUUID(), + SnapshotsService.getPinningEntity(repository, snapshotId.getUUID()), new ActionListener() { @Override public void onResponse(Void unused) { @@ -1466,7 +1474,9 @@ public static void remoteDirectoryCleanupAsync( String indexUUID, ShardId shardId, String threadPoolName, - RemoteStorePathStrategy pathStrategy + RemoteStorePathStrategy pathStrategy, + boolean forceClean, + Map pinnedTimestampsToSkip ) { threadpool.executor(threadPoolName) .execute( @@ -1476,7 +1486,9 @@ public static void remoteDirectoryCleanupAsync( remoteStoreRepoForIndex, indexUUID, shardId, - pathStrategy + pathStrategy, + forceClean, + pinnedTimestampsToSkip ), indexUUID, shardId @@ -1532,7 +1544,9 @@ protected void releaseRemoteStoreLockAndCleanup( indexUUID, new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.parseInt(shardId)), ThreadPool.Names.REMOTE_PURGE, - remoteStoreShardShallowCopySnapshot.getRemoteStorePathStrategy() + remoteStoreShardShallowCopySnapshot.getRemoteStorePathStrategy(), + false, + null ); } } @@ -2095,7 +2109,14 @@ private void executeOneStaleIndexDelete( deleteResult = deleteResult.add(cleanUpStaleSnapshotShardPathsFile(matchingShardPaths, snapshotShardPaths)); if (remoteSegmentStoreDirectoryFactory != null) { - cleanRemoteStoreDirectoryIfNeeded(deletedSnapshots, indexSnId, oldRepoData, remoteSegmentStoreDirectoryFactory); + cleanRemoteStoreDirectoryIfNeeded( + deletedSnapshots, + indexSnId, + oldRepoData, + remoteSegmentStoreDirectoryFactory, + new HashMap<>(), + true + ); } // Finally, we delete the [base_path]/indexId folder @@ -2167,7 +2188,9 @@ private void cleanRemoteStoreDirectoryIfNeeded( Collection deletedSnapshots, String indexSnId, RepositoryData oldRepoData, - RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory + RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory, + Map snapshotIdPinnedTimestampMap, + boolean forceClean ) { assert (indexSnId != null); @@ -2210,6 +2233,12 @@ private void cleanRemoteStoreDirectoryIfNeeded( prevIndexMetadata ); + String pinningEntity = SnapshotsService.getPinningEntity(getMetadata().name(), snapshotId.getUUID()); + Map pinnedTimestampsToSkip = new HashMap<>(); + if (snapshotIdPinnedTimestampMap.get(snapshotId) != null) { + pinnedTimestampsToSkip.put(pinningEntity, snapshotIdPinnedTimestampMap.get(snapshotId)); + } + for (int shardId = 0; shardId < prevIndexMetadata.getNumberOfShards(); shardId++) { ShardId shard = new ShardId(Index.UNKNOWN_INDEX_NAME, prevIndexMetadata.getIndexUUID(), shardId); remoteDirectoryCleanupAsync( @@ -2219,9 +2248,18 @@ private void cleanRemoteStoreDirectoryIfNeeded( prevIndexMetadata.getIndexUUID(), shard, ThreadPool.Names.REMOTE_PURGE, - remoteStorePathStrategy + remoteStorePathStrategy, + forceClean, + pinnedTimestampsToSkip + ); + remoteTranslogCleanupAsync( + remoteTranslogRepository, + shard, + remoteStorePathStrategy, + prevIndexMetadata, + forceClean, + pinnedTimestampsToSkip ); - remoteTranslogCleanupAsync(remoteTranslogRepository, shard, remoteStorePathStrategy, prevIndexMetadata); } } } catch (Exception e) { @@ -2245,7 +2283,9 @@ private void remoteTranslogCleanupAsync( Repository remoteTranslogRepository, ShardId shardId, RemoteStorePathStrategy remoteStorePathStrategy, - IndexMetadata prevIndexMetadata + IndexMetadata prevIndexMetadata, + boolean forceClean, + Map pinnedTimestampsToSkip ) { assert remoteTranslogRepository instanceof BlobStoreRepository; boolean indexMetadataEnabled = RemoteStoreUtils.determineTranslogMetadataEnabled(prevIndexMetadata); @@ -2262,7 +2302,7 @@ private void remoteTranslogCleanupAsync( indexMetadataEnabled ); try { - RemoteFsTimestampAwareTranslog.cleanup(translogTransferManager); + RemoteFsTimestampAwareTranslog.cleanup(translogTransferManager, forceClean, pinnedTimestampsToSkip); } catch (IOException e) { logger.error("Exception while cleaning up remote translog for shard: " + shardId, e); } diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index f6e550525a3e5..23f6deff3715d 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -613,7 +613,7 @@ private void updateSnapshotPinnedTimestamp( ) { remoteStorePinnedTimestampService.pinTimestamp( timestampToPin, - snapshot.getRepository() + SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + snapshot.getSnapshotId().getUUID(), + getPinningEntity(snapshot.getRepository(), snapshot.getSnapshotId().getUUID()), new ActionListener() { @Override public void onResponse(Void unused) { @@ -631,6 +631,10 @@ public void onFailure(Exception e) { ); } + public static String getPinningEntity(String repositoryName, String snapshotUUID) { + return repositoryName + SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + snapshotUUID; + } + private void cloneSnapshotPinnedTimestamp( RepositoryData repositoryData, SnapshotId sourceSnapshot, @@ -640,8 +644,8 @@ private void cloneSnapshotPinnedTimestamp( ) { remoteStorePinnedTimestampService.cloneTimestamp( timestampToPin, - snapshot.getRepository() + SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + sourceSnapshot.getUUID(), - snapshot.getRepository() + SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + snapshot.getSnapshotId().getUUID(), + getPinningEntity(snapshot.getRepository(), sourceSnapshot.getUUID()), + getPinningEntity(snapshot.getRepository(), snapshot.getSnapshotId().getUUID()), new ActionListener() { @Override public void onResponse(Void unused) { diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index ecd6620dbea15..fe0b67c9816dc 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -565,7 +565,9 @@ public void testCleanupAsync() throws Exception { repositoryName, indexUUID, shardId, - pathStrategy + pathStrategy, + false, + Map.of() ); verify(remoteSegmentStoreDirectoryFactory).newDirectory(repositoryName, indexUUID, shardId, pathStrategy); verify(threadPool, times(0)).executor(ThreadPool.Names.REMOTE_PURGE); diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java index 4ec68f7fb75b4..bc561a6b2214c 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java @@ -347,7 +347,7 @@ public void testSimpleOperationsUpload() throws Exception { // Fetch pinned timestamps so that it won't be stale updatePinnedTimstampTask.run(); - translog.setMinSeqNoToKeep(4); + translog.setMinSeqNoToKeep(6); translog.trimUnreferencedReaders(); addToTranslogAndListAndUpload(translog, ops, new Translog.Index("7", 7, primaryTerm.get(), new byte[] { 1 })); addToTranslogAndListAndUpload(translog, ops, new Translog.Index("8", 8, primaryTerm.get(), new byte[] { 1 })); @@ -356,14 +356,14 @@ public void testSimpleOperationsUpload() throws Exception { // Fetch pinned timestamps so that it won't be stale updatePinnedTimstampTask.run(); translog.trimUnreferencedReaders(); - assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); - assertEquals(5, translog.readers.size()); + + assertEquals(3, translog.readers.size()); assertBusy(() -> { - assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); - assertEquals(10, translog.allUploaded().size()); + assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); + assertEquals(6, translog.allUploaded().size()); assertEquals( - 10, + 6, blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() ); }, 60, TimeUnit.SECONDS); @@ -397,7 +397,7 @@ public void testMetadataFileDeletion() throws Exception { ); updatePinnedTimstampTask.run(); translog.trimUnreferencedReaders(); - assertBusy(() -> { assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); }); + assertBusy(() -> { assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); }); } public void testMetadataFileDeletionWithPinnedTimestamps() throws Exception { @@ -647,7 +647,7 @@ public void testGetGenerationsToBeDeletedEmptyMetadataFilesNotToBeDeleted() thro Set generations = ((RemoteFsTimestampAwareTranslog) translog).getGenerationsToBeDeleted( metadataFilesNotToBeDeleted, metadataFilesToBeDeleted, - true + Long.MAX_VALUE ); Set md1Generations = LongStream.rangeClosed(4, 7).boxed().collect(Collectors.toSet()); Set md2Generations = LongStream.rangeClosed(17, 37).boxed().collect(Collectors.toSet()); @@ -683,7 +683,7 @@ public void testGetGenerationsToBeDeleted() throws IOException { Set generations = ((RemoteFsTimestampAwareTranslog) translog).getGenerationsToBeDeleted( metadataFilesNotToBeDeleted, metadataFilesToBeDeleted, - true + Long.MAX_VALUE ); Set md1Generations = LongStream.rangeClosed(5, 7).boxed().collect(Collectors.toSet()); Set md2Generations = LongStream.rangeClosed(17, 25).boxed().collect(Collectors.toSet()); @@ -708,7 +708,17 @@ public void testGetMetadataFilesToBeDeletedNoExclusion() { "metadata__9223372036438563903__9223372036854775701__9223370311919910403__31__9223372036854775701__1" ); - assertEquals(metadataFiles, ((RemoteFsTimestampAwareTranslog) translog).getMetadataFilesToBeDeleted(metadataFiles)); + assertEquals( + metadataFiles, + RemoteFsTimestampAwareTranslog.getMetadataFilesToBeDeleted( + metadataFiles, + new HashMap<>(), + Long.MAX_VALUE, + Map.of(), + false, + logger + ) + ); } public void testGetMetadataFilesToBeDeletedExclusionBasedOnAgeOnly() { @@ -724,7 +734,14 @@ public void testGetMetadataFilesToBeDeletedExclusionBasedOnAgeOnly() { "metadata__9223372036438563903__9223372036854775701__" + md3Timestamp + "__31__9223372036854775701__1" ); - List metadataFilesToBeDeleted = ((RemoteFsTimestampAwareTranslog) translog).getMetadataFilesToBeDeleted(metadataFiles); + List metadataFilesToBeDeleted = RemoteFsTimestampAwareTranslog.getMetadataFilesToBeDeleted( + metadataFiles, + new HashMap<>(), + Long.MAX_VALUE, + Map.of(), + false, + logger + ); assertEquals(1, metadataFilesToBeDeleted.size()); assertEquals(metadataFiles.get(0), metadataFilesToBeDeleted.get(0)); } @@ -746,7 +763,14 @@ public void testGetMetadataFilesToBeDeletedExclusionBasedOnPinningOnly() throws "metadata__9223372036438563903__9223372036854775701__" + md3Timestamp + "__31__9223372036854775701__1" ); - List metadataFilesToBeDeleted = ((RemoteFsTimestampAwareTranslog) translog).getMetadataFilesToBeDeleted(metadataFiles); + List metadataFilesToBeDeleted = RemoteFsTimestampAwareTranslog.getMetadataFilesToBeDeleted( + metadataFiles, + new HashMap<>(), + Long.MAX_VALUE, + Map.of(), + false, + logger + ); assertEquals(2, metadataFilesToBeDeleted.size()); assertEquals(metadataFiles.get(0), metadataFilesToBeDeleted.get(0)); assertEquals(metadataFiles.get(2), metadataFilesToBeDeleted.get(1)); @@ -769,7 +793,14 @@ public void testGetMetadataFilesToBeDeletedExclusionBasedOnAgeAndPinning() throw "metadata__9223372036438563903__9223372036854775701__" + md3Timestamp + "__31__9223372036854775701__1" ); - List metadataFilesToBeDeleted = ((RemoteFsTimestampAwareTranslog) translog).getMetadataFilesToBeDeleted(metadataFiles); + List metadataFilesToBeDeleted = RemoteFsTimestampAwareTranslog.getMetadataFilesToBeDeleted( + metadataFiles, + new HashMap<>(), + Long.MAX_VALUE, + Map.of(), + false, + logger + ); assertEquals(1, metadataFilesToBeDeleted.size()); assertEquals(metadataFiles.get(2), metadataFilesToBeDeleted.get(0)); } From 33327f383387744f1e785454a82f87874db4c8d1 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Wed, 18 Sep 2024 19:31:57 +0530 Subject: [PATCH 2/9] Fix flakiness of the test Signed-off-by: Sachin Kale --- .../opensearch/snapshots/DeleteSnapshotV2IT.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java index 7279e4de427e2..c4e3a478c8540 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java @@ -264,6 +264,7 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2MultipleSnapshots( List translogPostSnapshot1 = Files.list(translogPath).collect(Collectors.toList()); forceMerge(1); + refresh(remoteStoreEnabledIndexName); indexRandomDocs(remoteStoreEnabledIndexName, 25); CreateSnapshotResponse createSnapshotResponse2 = client().admin() @@ -284,7 +285,6 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2MultipleSnapshots( assertBusy(() -> assertTrue(translogPostSnapshot2.size() > translogPostSnapshot1.size()), 60, TimeUnit.SECONDS); assertBusy(() -> assertTrue(segmentsPostSnapshot2.size() > segmentsPostSnapshot1.size()), 60, TimeUnit.SECONDS); - remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); keepPinnedTimestampSchedulerUpdated(); // delete remote store index @@ -300,12 +300,16 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2MultipleSnapshots( .get(); assertAcked(deleteSnapshotResponse); - List segmentsPostDeletionOfSnapshot1 = Files.list(segmentsPath).collect(Collectors.toList()); - List translogPostDeletionOfSnapshot1 = Files.list(translogPath).collect(Collectors.toList()); - // Delete is async. Give time for it - assertBusy(() -> assertEquals(translogPostSnapshot2.size(), translogPostDeletionOfSnapshot1.size()), 60, TimeUnit.SECONDS); - assertBusy(() -> assertEquals(segmentsPostSnapshot2.size(), segmentsPostDeletionOfSnapshot1.size()), 60, TimeUnit.SECONDS); + assertBusy(() -> { + List segmentsPostDeletionOfSnapshot1 = Files.list(segmentsPath).collect(Collectors.toList()); + assertTrue(segmentsPostDeletionOfSnapshot1.size() < segmentsPostSnapshot2.size()); + }, 60, TimeUnit.SECONDS); + // To uncomment following, we need to handle deletion of generations in translog cleanup flow + // List translogPostDeletionOfSnapshot1 = Files.list(translogPath).collect(Collectors.toList()); + // Delete is async. Give time for it + // assertBusy(() -> assertEquals(translogPostSnapshot2.size() - translogPostSnapshot1.size(), + // translogPostDeletionOfSnapshot1.size()), 60, TimeUnit.SECONDS); } private Settings snapshotV2Settings(Path remoteStoreRepoPath) { From 51f64f98851295a3e0b98f31d46a958a86344928 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Thu, 19 Sep 2024 10:56:36 +0530 Subject: [PATCH 3/9] Add more UTs Signed-off-by: Sachin Kale --- .../RemoteFsTimestampAwareTranslogTests.java | 69 ++++++++++++++++++- 1 file changed, 68 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java index bc561a6b2214c..9608246a81d1f 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java @@ -214,6 +214,7 @@ public void onFailure(Exception e) { // Old format metadata file String oldFormatMdFilename = "metadata__9223372036438563903__9223372036854774799__9223370311919910393__31__1"; assertNull(TranslogTransferMetadata.getMinMaxTranslogGenerationFromFilename(oldFormatMdFilename)); + assertEquals(Long.MAX_VALUE - 9223372036854774799L, TranslogTransferMetadata.getMaxGenerationFromFileName(oldFormatMdFilename)); // Node id containing separator String nodeIdWithSeparator = @@ -221,10 +222,14 @@ public void onFailure(Exception e) { Tuple minMaxGen = TranslogTransferMetadata.getMinMaxTranslogGenerationFromFilename(nodeIdWithSeparator); Long minGen = Long.MAX_VALUE - 9223372036438563958L; assertEquals(minGen, minMaxGen.v1()); + Long maxGen = Long.MAX_VALUE - 9223372036854774799L; + assertEquals(maxGen, minMaxGen.v2()); + assertEquals(Long.MAX_VALUE - 9223372036854774799L, TranslogTransferMetadata.getMaxGenerationFromFileName(nodeIdWithSeparator)); // Malformed md filename String malformedMdFileName = "metadata__9223372036438563903__9223372036854774799__9223370311919910393__node1__xyz__3__1"; assertNull(TranslogTransferMetadata.getMinMaxTranslogGenerationFromFilename(malformedMdFileName)); + assertEquals(Long.MAX_VALUE - 9223372036854774799L, TranslogTransferMetadata.getMaxGenerationFromFileName(malformedMdFileName)); } public void testGetMinMaxPrimaryTermFromFilename() throws Exception { @@ -778,7 +783,7 @@ public void testGetMetadataFilesToBeDeletedExclusionBasedOnPinningOnly() throws public void testGetMetadataFilesToBeDeletedExclusionBasedOnAgeAndPinning() throws IOException { long currentTimeInMillis = System.currentTimeMillis(); - String md1Timestamp = RemoteStoreUtils.invertLong(currentTimeInMillis + 100000); + String md1Timestamp = RemoteStoreUtils.invertLong(currentTimeInMillis - 200000); String md2Timestamp = RemoteStoreUtils.invertLong(currentTimeInMillis - 300000); String md3Timestamp = RemoteStoreUtils.invertLong(currentTimeInMillis - 600000); @@ -805,6 +810,68 @@ public void testGetMetadataFilesToBeDeletedExclusionBasedOnAgeAndPinning() throw assertEquals(metadataFiles.get(2), metadataFilesToBeDeleted.get(0)); } + public void testGetMetadataFilesToBeDeletedExclusionBasedOnGenerationOnly() throws IOException { + long currentTimeInMillis = System.currentTimeMillis(); + String md1Timestamp = RemoteStoreUtils.invertLong(currentTimeInMillis - 200000); + String md2Timestamp = RemoteStoreUtils.invertLong(currentTimeInMillis - 300000); + String md3Timestamp = RemoteStoreUtils.invertLong(currentTimeInMillis - 600000); + + when(blobContainer.listBlobs()).thenReturn(Map.of()); + + updatePinnedTimstampTask.run(); + + List metadataFiles = List.of( + // MaxGen 7 + "metadata__9223372036438563903__9223372036854775800__" + md1Timestamp + "__31__9223372036854775106__1", + // MaxGen 12 + "metadata__9223372036438563903__9223372036854775795__" + md2Timestamp + "__31__9223372036854775803__1", + // MaxGen 10 + "metadata__9223372036438563903__9223372036854775797__" + md3Timestamp + "__31__9223372036854775701__1" + ); + + List metadataFilesToBeDeleted = RemoteFsTimestampAwareTranslog.getMetadataFilesToBeDeleted( + metadataFiles, + new HashMap<>(), + 10L, + Map.of(), + false, + logger + ); + assertEquals(2, metadataFilesToBeDeleted.size()); + assertEquals(metadataFiles.get(0), metadataFilesToBeDeleted.get(0)); + assertEquals(metadataFiles.get(2), metadataFilesToBeDeleted.get(1)); + } + + public void testGetMetadataFilesToBeDeletedExclusionBasedOnGenerationDeleteIndex() throws IOException { + long currentTimeInMillis = System.currentTimeMillis(); + String md1Timestamp = RemoteStoreUtils.invertLong(currentTimeInMillis - 200000); + String md2Timestamp = RemoteStoreUtils.invertLong(currentTimeInMillis - 300000); + String md3Timestamp = RemoteStoreUtils.invertLong(currentTimeInMillis - 600000); + + when(blobContainer.listBlobs()).thenReturn(Map.of()); + + updatePinnedTimstampTask.run(); + + List metadataFiles = List.of( + // MaxGen 7 + "metadata__9223372036438563903__9223372036854775800__" + md1Timestamp + "__31__9223372036854775106__1", + // MaxGen 12 + "metadata__9223372036438563903__9223372036854775795__" + md2Timestamp + "__31__9223372036854775803__1", + // MaxGen 17 + "metadata__9223372036438563903__9223372036854775790__" + md3Timestamp + "__31__9223372036854775701__1" + ); + + List metadataFilesToBeDeleted = RemoteFsTimestampAwareTranslog.getMetadataFilesToBeDeleted( + metadataFiles, + new HashMap<>(), + 10L, + Map.of(), + true, + logger + ); + assertEquals(metadataFiles, metadataFilesToBeDeleted); + } + public void testIsGenerationPinned() { TreeSet> pinnedGenerations = new TreeSet<>(new TreeSet<>((o1, o2) -> { if (Objects.equals(o1.v1(), o2.v1()) == false) { From 39fcf8b4f4f5ec229b564aed843853006cf69a69 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Thu, 19 Sep 2024 15:27:28 +0530 Subject: [PATCH 4/9] Change the way we skip calling list metadata repeatedly Signed-off-by: Sachin Kale --- ...rePinnedTimestampsGarbageCollectionIT.java | 65 ++----------------- .../RemoteFsTimestampAwareTranslog.java | 11 ++-- .../RemoteFsTimestampAwareTranslogTests.java | 2 +- 3 files changed, 11 insertions(+), 67 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java index dedc6352f8140..a0e4281a11569 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java @@ -118,17 +118,15 @@ public void testLiveIndexNoPinnedTimestamps() throws Exception { }); } - public void testLiveIndexNoPinnedTimestampsWithExtraGenSettingWithinLimit() throws Exception { + public void testLiveIndexNoPinnedTimestampsWithMetadataSkippedOnLastDeletionCheck() throws Exception { prepareCluster(1, 1, Settings.EMPTY); - Settings indexSettings = Settings.builder() - .put(remoteStoreIndexSettings(0, 1)) - .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 10) - .build(); + Settings indexSettings = Settings.builder().put(remoteStoreIndexSettings(0, 1)).build(); createIndex(INDEX_NAME, indexSettings); ensureYellowAndNoInitializingShards(INDEX_NAME); ensureGreen(INDEX_NAME); - RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + // We don't set look-back interval to 0 as we want GC to skip based on last deletion check + // RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( RemoteStorePinnedTimestampService.class, @@ -173,61 +171,6 @@ public void testLiveIndexNoPinnedTimestampsWithExtraGenSettingWithinLimit() thro }); } - public void testLiveIndexNoPinnedTimestampsWithExtraGenSetting() throws Exception { - prepareCluster(1, 1, Settings.EMPTY); - Settings indexSettings = Settings.builder() - .put(remoteStoreIndexSettings(0, 1)) - .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 3) - .build(); - createIndex(INDEX_NAME, indexSettings); - ensureYellowAndNoInitializingShards(INDEX_NAME); - ensureGreen(INDEX_NAME); - - RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); - - RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( - RemoteStorePinnedTimestampService.class, - primaryNodeName(INDEX_NAME) - ); - - remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); - - int numDocs = 5; - for (int i = 0; i < numDocs; i++) { - keepPinnedTimestampSchedulerUpdated(); - indexSingleDoc(INDEX_NAME, true); - } - - String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); - String shardDataPath = getShardLevelBlobPath( - client(), - INDEX_NAME, - BlobPath.cleanPath(), - "0", - TRANSLOG, - DATA, - translogPathFixedPrefix - ).buildAsString(); - Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1"); - String shardMetadataPath = getShardLevelBlobPath( - client(), - INDEX_NAME, - BlobPath.cleanPath(), - "0", - TRANSLOG, - METADATA, - translogPathFixedPrefix - ).buildAsString(); - Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath); - - assertBusy(() -> { - List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); - assertEquals(3, metadataFiles.size()); - - verifyTranslogDataFileCount(metadataFiles, translogDataPath); - }); - } - public void testLiveIndexWithPinnedTimestamps() throws Exception { prepareCluster(1, 1, Settings.EMPTY); Settings indexSettings = Settings.builder() diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java index 9391bc81c437c..cb8e9fbeba4ab 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java @@ -61,7 +61,7 @@ public class RemoteFsTimestampAwareTranslog extends RemoteFsTranslog { private final Map> oldFormatMetadataFileGenerationMap; private final Map> oldFormatMetadataFilePrimaryTermMap; private final AtomicLong minPrimaryTermInRemote = new AtomicLong(Long.MAX_VALUE); - private long maxDeletedGenerationOnRemote = 0; + private long lastTimestampOfMetadataDeletionOnRemote = System.currentTimeMillis(); public RemoteFsTimestampAwareTranslog( TranslogConfig config, @@ -148,8 +148,10 @@ protected void trimUnreferencedReaders(boolean indexDeleted, boolean trimLocal) // This code block ensures parity with RemoteFsTranslog. Without this, we will end up making list translog metadata // call in each invocation of trimUnreferencedReaders - long minGenerationToKeep = minRemoteGenReferenced - indexSettings().getRemoteTranslogExtraKeep(); - if (indexDeleted == false && (minGenerationToKeep <= maxDeletedGenerationOnRemote)) { + if (indexDeleted == false + && (System.currentTimeMillis() - lastTimestampOfMetadataDeletionOnRemote <= RemoteStoreSettings + .getPinnedTimestampsLookbackInterval() + .millis() * 2)) { return; } @@ -207,8 +209,6 @@ public void onResponse(List blobMetadata) { logger.debug(() -> "generationsToBeDeleted = " + generationsToBeDeleted); if (generationsToBeDeleted.isEmpty() == false) { - maxDeletedGenerationOnRemote = generationsToBeDeleted.stream().max(Long::compareTo).get(); - // Delete stale generations translogTransferManager.deleteGenerationAsync( primaryTermSupplier.getAsLong(), @@ -220,6 +220,7 @@ public void onResponse(List blobMetadata) { } if (metadataFilesToBeDeleted.isEmpty() == false) { + lastTimestampOfMetadataDeletionOnRemote = System.currentTimeMillis(); // Delete stale metadata files translogTransferManager.deleteMetadataFilesAsync( metadataFilesToBeDeleted, diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java index 9608246a81d1f..799d858b7dd12 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java @@ -783,7 +783,7 @@ public void testGetMetadataFilesToBeDeletedExclusionBasedOnPinningOnly() throws public void testGetMetadataFilesToBeDeletedExclusionBasedOnAgeAndPinning() throws IOException { long currentTimeInMillis = System.currentTimeMillis(); - String md1Timestamp = RemoteStoreUtils.invertLong(currentTimeInMillis - 200000); + String md1Timestamp = RemoteStoreUtils.invertLong(currentTimeInMillis + 100000); String md2Timestamp = RemoteStoreUtils.invertLong(currentTimeInMillis - 300000); String md3Timestamp = RemoteStoreUtils.invertLong(currentTimeInMillis - 600000); From 6f77e5109167ca0f75d05e38d6273947c6726d51 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Thu, 19 Sep 2024 20:19:16 +0530 Subject: [PATCH 5/9] Address PR comments Signed-off-by: Sachin Kale --- ...rePinnedTimestampsGarbageCollectionIT.java | 65 +++++++++++++++++-- .../store/RemoteSegmentStoreDirectory.java | 13 ++-- .../RemoteFsTimestampAwareTranslog.java | 57 +++++++--------- .../RemoteStorePinnedTimestampService.java | 60 ++++++----------- .../blobstore/BlobStoreRepository.java | 59 +++++------------ .../RemoteSegmentStoreDirectoryTests.java | 3 +- .../RemoteFsTimestampAwareTranslogTests.java | 16 +---- 7 files changed, 128 insertions(+), 145 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java index a0e4281a11569..15c52fdc03b9a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java @@ -118,15 +118,17 @@ public void testLiveIndexNoPinnedTimestamps() throws Exception { }); } - public void testLiveIndexNoPinnedTimestampsWithMetadataSkippedOnLastDeletionCheck() throws Exception { + public void testLiveIndexNoPinnedTimestampsWithExtraGenSettingWithinLimit() throws Exception { prepareCluster(1, 1, Settings.EMPTY); - Settings indexSettings = Settings.builder().put(remoteStoreIndexSettings(0, 1)).build(); + Settings indexSettings = Settings.builder() + .put(remoteStoreIndexSettings(0, 1)) + .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 10) + .build(); createIndex(INDEX_NAME, indexSettings); ensureYellowAndNoInitializingShards(INDEX_NAME); ensureGreen(INDEX_NAME); - // We don't set look-back interval to 0 as we want GC to skip based on last deletion check - // RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( RemoteStorePinnedTimestampService.class, @@ -171,6 +173,61 @@ public void testLiveIndexNoPinnedTimestampsWithMetadataSkippedOnLastDeletionChec }); } + public void testLiveIndexNoPinnedTimestampsWithExtraGenSetting() throws Exception { + prepareCluster(1, 1, Settings.EMPTY); + Settings indexSettings = Settings.builder() + .put(remoteStoreIndexSettings(0, 1)) + .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 3) + .build(); + createIndex(INDEX_NAME, indexSettings); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + primaryNodeName(INDEX_NAME) + ); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + + int numDocs = 5; + for (int i = 0; i < numDocs; i++) { + keepPinnedTimestampSchedulerUpdated(); + indexSingleDoc(INDEX_NAME, true); + } + + String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); + String shardDataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + DATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1"); + String shardMetadataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + METADATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath); + + assertBusy(() -> { + List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); + assertEquals(4, metadataFiles.size()); + + verifyTranslogDataFileCount(metadataFiles, translogDataPath); + }); + } + public void testLiveIndexWithPinnedTimestamps() throws Exception { prepareCluster(1, 1, Settings.EMPTY); Settings indexSettings = Settings.builder() diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index cde63b37c1289..5be516166803e 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -817,10 +817,6 @@ Set getMetadataFilesToFilterActiveSegments( return metadataFilesToFilterActiveSegments; } - public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException { - deleteStaleSegments(lastNMetadataFilesToKeep, Map.of()); - } - /** * Delete stale segment and metadata files * One metadata file is kept per commit (refresh updates the same file). To read segments uploaded to remote store, @@ -836,7 +832,7 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException * @param lastNMetadataFilesToKeep number of metadata files to keep * @throws IOException in case of I/O error while reading from / writing to remote segment store */ - private void deleteStaleSegments(int lastNMetadataFilesToKeep, Map pinnedTimestampsToSkip) throws IOException { + public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException { if (lastNMetadataFilesToKeep == -1) { logger.info( "Stale segment deletion is disabled if cluster.remote_store.index.segment_metadata.retention.max_count is set to -1" @@ -863,7 +859,7 @@ private void deleteStaleSegments(int lastNMetadataFilesToKeep, Map return; } - Tuple> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps(pinnedTimestampsToSkip); + Tuple> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps(); Set implicitLockedFiles = RemoteStoreUtils.getPinnedTimestampLockedFiles( sortedMetadataFileList, @@ -999,8 +995,7 @@ public static void remoteDirectoryCleanup( String indexUUID, ShardId shardId, RemoteStorePathStrategy pathStrategy, - boolean forceClean, - Map pinnedTimestampsToSkip + boolean forceClean ) { try { RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = (RemoteSegmentStoreDirectory) remoteDirectoryFactory.newDirectory( @@ -1012,7 +1007,7 @@ public static void remoteDirectoryCleanup( if (forceClean) { remoteSegmentStoreDirectory.delete(); } else { - remoteSegmentStoreDirectory.deleteStaleSegments(0, pinnedTimestampsToSkip); + remoteSegmentStoreDirectory.deleteStaleSegments(0); remoteSegmentStoreDirectory.deleteIfEmpty(); } } catch (Exception e) { diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java index cb8e9fbeba4ab..92b09188eb1ce 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java @@ -61,7 +61,7 @@ public class RemoteFsTimestampAwareTranslog extends RemoteFsTranslog { private final Map> oldFormatMetadataFileGenerationMap; private final Map> oldFormatMetadataFilePrimaryTermMap; private final AtomicLong minPrimaryTermInRemote = new AtomicLong(Long.MAX_VALUE); - private long lastTimestampOfMetadataDeletionOnRemote = System.currentTimeMillis(); + private long previousMinRemoteGenReferenced = -1; public RemoteFsTimestampAwareTranslog( TranslogConfig config, @@ -148,11 +148,10 @@ protected void trimUnreferencedReaders(boolean indexDeleted, boolean trimLocal) // This code block ensures parity with RemoteFsTranslog. Without this, we will end up making list translog metadata // call in each invocation of trimUnreferencedReaders - if (indexDeleted == false - && (System.currentTimeMillis() - lastTimestampOfMetadataDeletionOnRemote <= RemoteStoreSettings - .getPinnedTimestampsLookbackInterval() - .millis() * 2)) { + if (indexDeleted == false && previousMinRemoteGenReferenced == minRemoteGenReferenced) { return; + } else if (previousMinRemoteGenReferenced != minRemoteGenReferenced) { + previousMinRemoteGenReferenced = minRemoteGenReferenced; } // Since remote generation deletion is async, this ensures that only one generation deletion happens at a time. @@ -204,7 +203,7 @@ public void onResponse(List blobMetadata) { Set generationsToBeDeleted = getGenerationsToBeDeleted( metadataFilesNotToBeDeleted, metadataFilesToBeDeleted, - indexDeleted ? Long.MAX_VALUE : minRemoteGenReferenced + indexDeleted ? Long.MAX_VALUE : getMinGenerationToKeep() ); logger.debug(() -> "generationsToBeDeleted = " + generationsToBeDeleted); @@ -220,7 +219,6 @@ public void onResponse(List blobMetadata) { } if (metadataFilesToBeDeleted.isEmpty() == false) { - lastTimestampOfMetadataDeletionOnRemote = System.currentTimeMillis(); // Delete stale metadata files translogTransferManager.deleteMetadataFilesAsync( metadataFilesToBeDeleted, @@ -248,11 +246,15 @@ public void onFailure(Exception e) { translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener); } + private long getMinGenerationToKeep() { + return minRemoteGenReferenced - indexSettings().getRemoteTranslogExtraKeep(); + } + // Visible for testing protected Set getGenerationsToBeDeleted( List metadataFilesNotToBeDeleted, List metadataFilesToBeDeleted, - long minRemoteGenReferenced + long minGenerationToKeep ) throws IOException { Set generationsFromMetadataFilesToBeDeleted = new HashSet<>(); for (String mdFile : metadataFilesToBeDeleted) { @@ -267,9 +269,9 @@ protected Set getGenerationsToBeDeleted( Set generationsToBeDeleted = new HashSet<>(); for (long generation : generationsFromMetadataFilesToBeDeleted) { // Check if the generation is not referred by metadata file matching pinned timestamps - // The check with minRemoteGenReferenced is redundant but kept as to make sure we don't delete generations + // The check with minGenerationToKeep is redundant but kept as to make sure we don't delete generations // that are not persisted in remote segment store yet. - if (generation < minRemoteGenReferenced && isGenerationPinned(generation, pinnedGenerations) == false) { + if (generation < minGenerationToKeep && isGenerationPinned(generation, pinnedGenerations) == false) { generationsToBeDeleted.add(generation); } } @@ -277,26 +279,18 @@ protected Set getGenerationsToBeDeleted( } protected List getMetadataFilesToBeDeleted(List metadataFiles, boolean indexDeleted) { - return getMetadataFilesToBeDeleted( - metadataFiles, - metadataFilePinnedTimestampMap, - minRemoteGenReferenced, - Map.of(), - indexDeleted, - logger - ); + return getMetadataFilesToBeDeleted(metadataFiles, metadataFilePinnedTimestampMap, getMinGenerationToKeep(), indexDeleted, logger); } // Visible for testing protected static List getMetadataFilesToBeDeleted( List metadataFiles, Map metadataFilePinnedTimestampMap, - long minRemoteGenReferenced, - Map pinnedTimestampsToSkip, + long minGenerationToKeep, boolean indexDeleted, Logger logger ) { - Tuple> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps(pinnedTimestampsToSkip); + Tuple> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps(); // Keep files since last successful run of scheduler List metadataFilesToBeDeleted = RemoteStoreUtils.filterOutMetadataFilesBasedOnAge( @@ -330,18 +324,18 @@ protected static List getMetadataFilesToBeDeleted( ); if (indexDeleted == false) { - // Filter out metadata files based on minRemoteGenReferenced - List metadataFilesContainingMinRemoteGenReferenced = metadataFilesToBeDeleted.stream().filter(md -> { + // Filter out metadata files based on minGenerationToKeep + List metadataFilesContainingMinGenerationToKeep = metadataFilesToBeDeleted.stream().filter(md -> { long maxGeneration = TranslogTransferMetadata.getMaxGenerationFromFileName(md); - return maxGeneration == -1 || maxGeneration > minRemoteGenReferenced; + return maxGeneration == -1 || maxGeneration > minGenerationToKeep; }).collect(Collectors.toList()); - metadataFilesToBeDeleted.removeAll(metadataFilesContainingMinRemoteGenReferenced); + metadataFilesToBeDeleted.removeAll(metadataFilesContainingMinGenerationToKeep); logger.trace( - "metadataFilesContainingMinRemoteGenReferenced.size = {}, metadataFilesToBeDeleted based on minRemoteGenReferenced filtering = {}, minRemoteGenReferenced = {}", - metadataFilesContainingMinRemoteGenReferenced.size(), + "metadataFilesContainingMinGenerationToKeep.size = {}, metadataFilesToBeDeleted based on minGenerationToKeep filtering = {}, minGenerationToKeep = {}", + metadataFilesContainingMinGenerationToKeep.size(), metadataFilesToBeDeleted.size(), - minRemoteGenReferenced + minGenerationToKeep ); } @@ -505,11 +499,7 @@ protected static Tuple getMinMaxPrimaryTermFromMetadataFile( } } - public static void cleanup( - TranslogTransferManager translogTransferManager, - boolean forceClean, - Map pinnedTimestampsToSkip - ) throws IOException { + public static void cleanup(TranslogTransferManager translogTransferManager, boolean forceClean) throws IOException { if (forceClean) { translogTransferManager.delete(); } else { @@ -527,7 +517,6 @@ public void onResponse(List blobMetadata) { metadataFiles, new HashMap<>(), Long.MAX_VALUE, - pinnedTimestampsToSkip, true, staticLogger ); diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java index 6d801f9f5e50e..1448c46583f6a 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java @@ -37,6 +37,7 @@ import java.util.Map; import java.util.Set; import java.util.function.Supplier; +import java.util.stream.Collectors; /** * Service for managing pinned timestamps in a remote store. @@ -47,8 +48,7 @@ @ExperimentalApi public class RemoteStorePinnedTimestampService implements Closeable { private static final Logger logger = LogManager.getLogger(RemoteStorePinnedTimestampService.class); - private static Tuple>> pinningEntityTimestampMap = new Tuple<>(-1L, Map.of()); - + private static Tuple> pinnedTimestampsSet = new Tuple<>(-1L, Set.of()); public static final String PINNED_TIMESTAMPS_PATH_TOKEN = "pinned_timestamps"; public static final String PINNED_TIMESTAMPS_FILENAME_SEPARATOR = "__"; @@ -199,23 +199,21 @@ public void cloneTimestamp(long timestamp, String existingPinningEntity, String } } - public static String getBlobName(long timestamp, String pinningEntity) { + private String getBlobName(long timestamp, String pinningEntity) { return String.join(PINNED_TIMESTAMPS_FILENAME_SEPARATOR, pinningEntity, String.valueOf(timestamp)); } - public static Tuple getPinningEntityTimestampFromBlobName(String blobName) { + private long getTimestampFromBlobName(String blobName) { String[] blobNameTokens = blobName.split(PINNED_TIMESTAMPS_FILENAME_SEPARATOR); if (blobNameTokens.length < 2) { logger.error("Pinned timestamps blob name contains invalid format: {}", blobName); } try { - String pinningEntity = blobName.substring(blobName.lastIndexOf(PINNED_TIMESTAMPS_FILENAME_SEPARATOR)); - Long timestamp = Long.parseLong(blobNameTokens[blobNameTokens.length - 1]); - return new Tuple<>(pinningEntity, timestamp); + return Long.parseLong(blobNameTokens[blobNameTokens.length - 1]); } catch (NumberFormatException e) { logger.error(() -> new ParameterizedMessage("Pinned timestamps blob name contains invalid format: {}", blobName), e); } - return null; + return -1; } /** @@ -242,6 +240,10 @@ public void unpinTimestamp(long timestamp, String pinningEntity, ActionListener< } } + public void forceSyncPinnedTimestamps() { + asyncUpdatePinnedTimestampTask.run(); + } + @Override public void close() throws IOException { asyncUpdatePinnedTimestampTask.close(); @@ -250,32 +252,14 @@ public void close() throws IOException { // Used in integ tests public void rescheduleAsyncUpdatePinnedTimestampTask(TimeValue pinnedTimestampsSchedulerInterval) { if (pinnedTimestampsSchedulerInterval != null) { - pinningEntityTimestampMap = new Tuple<>(-1L, Map.of()); + pinnedTimestampsSet = new Tuple<>(-1L, Set.of()); asyncUpdatePinnedTimestampTask.close(); startAsyncUpdateTask(pinnedTimestampsSchedulerInterval); } } public static Tuple> getPinnedTimestamps() { - return getPinnedTimestamps(null); - } - - public static Tuple> getPinnedTimestamps(Map pinnedTimestampsToSkip) { - Set allPinnedTimestamps = new HashSet<>(); - if (pinnedTimestampsToSkip == null || pinnedTimestampsToSkip.isEmpty()) { - pinningEntityTimestampMap.v2().values().forEach(allPinnedTimestamps::addAll); - } else { - for (String pinningEntity : pinningEntityTimestampMap.v2().keySet()) { - if (pinnedTimestampsToSkip.containsKey(pinningEntity)) { - Set timestamps = new HashSet<>(pinningEntityTimestampMap.v2().get(pinningEntity)); - timestamps.remove(pinnedTimestampsToSkip.get(pinningEntity)); - allPinnedTimestamps.addAll(timestamps); - } else { - allPinnedTimestamps.addAll(pinningEntityTimestampMap.v2().get(pinningEntity)); - } - } - } - return new Tuple<>(pinningEntityTimestampMap.v1(), allPinnedTimestamps); + return pinnedTimestampsSet; } /** @@ -298,22 +282,16 @@ protected void runInternal() { try { Map pinnedTimestampList = blobContainer.listBlobs(); if (pinnedTimestampList.isEmpty()) { - logger.debug("Fetched empty pinned timestamps from remote store: {}", triggerTimestamp); - pinningEntityTimestampMap = new Tuple<>(triggerTimestamp, Map.of()); + pinnedTimestampsSet = new Tuple<>(triggerTimestamp, Set.of()); return; } - Map> pinnedTimestamps = new HashMap<>(); - for (String blobName : pinnedTimestampList.keySet()) { - Tuple pinningEntityTimestamp = getPinningEntityTimestampFromBlobName(blobName); - if (pinningEntityTimestamp != null) { - if (pinnedTimestamps.containsKey(pinningEntityTimestamp.v1()) == false) { - pinnedTimestamps.put(pinningEntityTimestamp.v1(), new HashSet<>()); - } - pinnedTimestamps.get(pinningEntityTimestamp.v1()).add(pinningEntityTimestamp.v2()); - } - } + Set pinnedTimestamps = pinnedTimestampList.keySet() + .stream() + .map(RemoteStorePinnedTimestampService.this::getTimestampFromBlobName) + .filter(timestamp -> timestamp != -1) + .collect(Collectors.toSet()); logger.debug("Fetched pinned timestamps from remote store: {} - {}", triggerTimestamp, pinnedTimestamps); - pinningEntityTimestampMap = new Tuple<>(triggerTimestamp, pinnedTimestamps); + pinnedTimestampsSet = new Tuple<>(triggerTimestamp, pinnedTimestamps); } catch (Throwable t) { logger.error("Exception while fetching pinned timestamp details", t); } diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 1a093ab120582..86f8f1c2a821a 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -1280,8 +1280,7 @@ private void doDeleteShardSnapshots( snapshotIds, writeShardMetaDataAndComputeDeletesStep.result(), remoteSegmentStoreDirectoryFactory, - afterCleanupsListener, - snapshotIdPinnedTimestampMap + afterCleanupsListener ); } else { asyncCleanupUnlinkedShardLevelBlobs( @@ -1300,8 +1299,7 @@ private void cleanUpRemoteStoreFilesForDeletedIndicesV2( Collection snapshotIds, Collection result, RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory, - ActionListener afterCleanupsListener, - Map snapshotIdPinnedTimestampMap + ActionListener afterCleanupsListener ) { try { Set uniqueIndexIds = new HashSet<>(); @@ -1310,14 +1308,7 @@ private void cleanUpRemoteStoreFilesForDeletedIndicesV2( } // iterate through all the indices and trigger remote store directory cleanup for deleted index segments for (String indexId : uniqueIndexIds) { - cleanRemoteStoreDirectoryIfNeeded( - snapshotIds, - indexId, - repositoryData, - remoteSegmentStoreDirectoryFactory, - snapshotIdPinnedTimestampMap, - false - ); + cleanRemoteStoreDirectoryIfNeeded(snapshotIds, indexId, repositoryData, remoteSegmentStoreDirectoryFactory, false); } afterCleanupsListener.onResponse(null); } catch (Exception e) { @@ -1369,7 +1360,13 @@ private void removeSnapshotPinnedTimestamp( new ActionListener() { @Override public void onResponse(Void unused) { - logger.debug("Timestamp {} unpinned successfully for snapshot {}", timestampToUnpin, snapshotId.getName()); + logger.info("Timestamp {} unpinned successfully for snapshot {}", timestampToUnpin, snapshotId.getName()); + try { + remoteStorePinnedTimestampService.forceSyncPinnedTimestamps(); + logger.debug("Successfully synced pinned timestamp state"); + } catch (Exception e) { + logger.warn("Exception while updating pinning timestamp state, snapshot deletion will continue", e); + } listener.onResponse(null); } @@ -1475,8 +1472,7 @@ public static void remoteDirectoryCleanupAsync( ShardId shardId, String threadPoolName, RemoteStorePathStrategy pathStrategy, - boolean forceClean, - Map pinnedTimestampsToSkip + boolean forceClean ) { threadpool.executor(threadPoolName) .execute( @@ -1487,8 +1483,7 @@ public static void remoteDirectoryCleanupAsync( indexUUID, shardId, pathStrategy, - forceClean, - pinnedTimestampsToSkip + forceClean ), indexUUID, shardId @@ -1545,8 +1540,7 @@ protected void releaseRemoteStoreLockAndCleanup( new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.parseInt(shardId)), ThreadPool.Names.REMOTE_PURGE, remoteStoreShardShallowCopySnapshot.getRemoteStorePathStrategy(), - false, - null + false ); } } @@ -2109,14 +2103,7 @@ private void executeOneStaleIndexDelete( deleteResult = deleteResult.add(cleanUpStaleSnapshotShardPathsFile(matchingShardPaths, snapshotShardPaths)); if (remoteSegmentStoreDirectoryFactory != null) { - cleanRemoteStoreDirectoryIfNeeded( - deletedSnapshots, - indexSnId, - oldRepoData, - remoteSegmentStoreDirectoryFactory, - new HashMap<>(), - true - ); + cleanRemoteStoreDirectoryIfNeeded(deletedSnapshots, indexSnId, oldRepoData, remoteSegmentStoreDirectoryFactory, true); } // Finally, we delete the [base_path]/indexId folder @@ -2189,7 +2176,6 @@ private void cleanRemoteStoreDirectoryIfNeeded( String indexSnId, RepositoryData oldRepoData, RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory, - Map snapshotIdPinnedTimestampMap, boolean forceClean ) { assert (indexSnId != null); @@ -2233,12 +2219,6 @@ private void cleanRemoteStoreDirectoryIfNeeded( prevIndexMetadata ); - String pinningEntity = SnapshotsService.getPinningEntity(getMetadata().name(), snapshotId.getUUID()); - Map pinnedTimestampsToSkip = new HashMap<>(); - if (snapshotIdPinnedTimestampMap.get(snapshotId) != null) { - pinnedTimestampsToSkip.put(pinningEntity, snapshotIdPinnedTimestampMap.get(snapshotId)); - } - for (int shardId = 0; shardId < prevIndexMetadata.getNumberOfShards(); shardId++) { ShardId shard = new ShardId(Index.UNKNOWN_INDEX_NAME, prevIndexMetadata.getIndexUUID(), shardId); remoteDirectoryCleanupAsync( @@ -2249,16 +2229,14 @@ private void cleanRemoteStoreDirectoryIfNeeded( shard, ThreadPool.Names.REMOTE_PURGE, remoteStorePathStrategy, - forceClean, - pinnedTimestampsToSkip + forceClean ); remoteTranslogCleanupAsync( remoteTranslogRepository, shard, remoteStorePathStrategy, prevIndexMetadata, - forceClean, - pinnedTimestampsToSkip + forceClean ); } } @@ -2284,8 +2262,7 @@ private void remoteTranslogCleanupAsync( ShardId shardId, RemoteStorePathStrategy remoteStorePathStrategy, IndexMetadata prevIndexMetadata, - boolean forceClean, - Map pinnedTimestampsToSkip + boolean forceClean ) { assert remoteTranslogRepository instanceof BlobStoreRepository; boolean indexMetadataEnabled = RemoteStoreUtils.determineTranslogMetadataEnabled(prevIndexMetadata); @@ -2302,7 +2279,7 @@ private void remoteTranslogCleanupAsync( indexMetadataEnabled ); try { - RemoteFsTimestampAwareTranslog.cleanup(translogTransferManager, forceClean, pinnedTimestampsToSkip); + RemoteFsTimestampAwareTranslog.cleanup(translogTransferManager, forceClean); } catch (IOException e) { logger.error("Exception while cleaning up remote translog for shard: " + shardId, e); } diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index fe0b67c9816dc..df3df81361a12 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -566,8 +566,7 @@ public void testCleanupAsync() throws Exception { indexUUID, shardId, pathStrategy, - false, - Map.of() + false ); verify(remoteSegmentStoreDirectoryFactory).newDirectory(repositoryName, indexUUID, shardId, pathStrategy); verify(threadPool, times(0)).executor(ThreadPool.Names.REMOTE_PURGE); diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java index 799d858b7dd12..2a97c42346d31 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java @@ -402,7 +402,7 @@ public void testMetadataFileDeletion() throws Exception { ); updatePinnedTimstampTask.run(); translog.trimUnreferencedReaders(); - assertBusy(() -> { assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); }); + assertBusy(() -> { assertEquals(3, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); }); } public void testMetadataFileDeletionWithPinnedTimestamps() throws Exception { @@ -715,14 +715,7 @@ public void testGetMetadataFilesToBeDeletedNoExclusion() { assertEquals( metadataFiles, - RemoteFsTimestampAwareTranslog.getMetadataFilesToBeDeleted( - metadataFiles, - new HashMap<>(), - Long.MAX_VALUE, - Map.of(), - false, - logger - ) + RemoteFsTimestampAwareTranslog.getMetadataFilesToBeDeleted(metadataFiles, new HashMap<>(), Long.MAX_VALUE, false, logger) ); } @@ -743,7 +736,6 @@ public void testGetMetadataFilesToBeDeletedExclusionBasedOnAgeOnly() { metadataFiles, new HashMap<>(), Long.MAX_VALUE, - Map.of(), false, logger ); @@ -772,7 +764,6 @@ public void testGetMetadataFilesToBeDeletedExclusionBasedOnPinningOnly() throws metadataFiles, new HashMap<>(), Long.MAX_VALUE, - Map.of(), false, logger ); @@ -802,7 +793,6 @@ public void testGetMetadataFilesToBeDeletedExclusionBasedOnAgeAndPinning() throw metadataFiles, new HashMap<>(), Long.MAX_VALUE, - Map.of(), false, logger ); @@ -833,7 +823,6 @@ public void testGetMetadataFilesToBeDeletedExclusionBasedOnGenerationOnly() thro metadataFiles, new HashMap<>(), 10L, - Map.of(), false, logger ); @@ -865,7 +854,6 @@ public void testGetMetadataFilesToBeDeletedExclusionBasedOnGenerationDeleteIndex metadataFiles, new HashMap<>(), 10L, - Map.of(), true, logger ); From ca6af65c4e65760486371493731efb4b13b9f7e2 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Fri, 20 Sep 2024 12:23:51 +0530 Subject: [PATCH 6/9] Fix tests Signed-off-by: Sachin Kale --- ...rePinnedTimestampsGarbageCollectionIT.java | 10 ++--- .../RemoteFsTimestampAwareTranslog.java | 22 ++++++---- .../RemoteFsTimestampAwareTranslogTests.java | 41 +++++++++++++------ 3 files changed, 48 insertions(+), 25 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java index 15c52fdc03b9a..46d43926e7e34 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java @@ -112,7 +112,7 @@ public void testLiveIndexNoPinnedTimestamps() throws Exception { assertBusy(() -> { List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); - assertEquals(1, metadataFiles.size()); + assertEquals(2, metadataFiles.size()); verifyTranslogDataFileCount(metadataFiles, translogDataPath); }); @@ -222,7 +222,7 @@ public void testLiveIndexNoPinnedTimestampsWithExtraGenSetting() throws Exceptio assertBusy(() -> { List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); - assertEquals(4, metadataFiles.size()); + assertEquals(5, metadataFiles.size()); verifyTranslogDataFileCount(metadataFiles, translogDataPath); }); @@ -282,7 +282,7 @@ public void testLiveIndexWithPinnedTimestamps() throws Exception { assertBusy(() -> { List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); - assertEquals(2, metadataFiles.size()); + assertEquals(3, metadataFiles.size()); verifyTranslogDataFileCount(metadataFiles, translogDataPath); }); @@ -337,7 +337,7 @@ public void testIndexDeletionNoPinnedTimestamps() throws Exception { assertBusy(() -> { List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); - assertEquals(1, metadataFiles.size()); + assertEquals(2, metadataFiles.size()); verifyTranslogDataFileCount(metadataFiles, translogDataPath); }); @@ -405,7 +405,7 @@ public void testIndexDeletionWithPinnedTimestamps() throws Exception { assertBusy(() -> { List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); - assertEquals(2, metadataFiles.size()); + assertEquals(3, metadataFiles.size()); verifyTranslogDataFileCount(metadataFiles, translogDataPath); }, 30, TimeUnit.SECONDS); diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java index 92b09188eb1ce..7f52110843645 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java @@ -203,7 +203,7 @@ public void onResponse(List blobMetadata) { Set generationsToBeDeleted = getGenerationsToBeDeleted( metadataFilesNotToBeDeleted, metadataFilesToBeDeleted, - indexDeleted ? Long.MAX_VALUE : getMinGenerationToKeep() + indexDeleted ? Long.MAX_VALUE : getMinGenerationToKeepInRemote() ); logger.debug(() -> "generationsToBeDeleted = " + generationsToBeDeleted); @@ -246,7 +246,7 @@ public void onFailure(Exception e) { translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener); } - private long getMinGenerationToKeep() { + private long getMinGenerationToKeepInRemote() { return minRemoteGenReferenced - indexSettings().getRemoteTranslogExtraKeep(); } @@ -254,7 +254,7 @@ private long getMinGenerationToKeep() { protected Set getGenerationsToBeDeleted( List metadataFilesNotToBeDeleted, List metadataFilesToBeDeleted, - long minGenerationToKeep + long minGenerationToKeepInRemote ) throws IOException { Set generationsFromMetadataFilesToBeDeleted = new HashSet<>(); for (String mdFile : metadataFilesToBeDeleted) { @@ -271,7 +271,7 @@ protected Set getGenerationsToBeDeleted( // Check if the generation is not referred by metadata file matching pinned timestamps // The check with minGenerationToKeep is redundant but kept as to make sure we don't delete generations // that are not persisted in remote segment store yet. - if (generation < minGenerationToKeep && isGenerationPinned(generation, pinnedGenerations) == false) { + if (generation < minGenerationToKeepInRemote && isGenerationPinned(generation, pinnedGenerations) == false) { generationsToBeDeleted.add(generation); } } @@ -279,14 +279,20 @@ protected Set getGenerationsToBeDeleted( } protected List getMetadataFilesToBeDeleted(List metadataFiles, boolean indexDeleted) { - return getMetadataFilesToBeDeleted(metadataFiles, metadataFilePinnedTimestampMap, getMinGenerationToKeep(), indexDeleted, logger); + return getMetadataFilesToBeDeleted( + metadataFiles, + metadataFilePinnedTimestampMap, + getMinGenerationToKeepInRemote(), + indexDeleted, + logger + ); } // Visible for testing protected static List getMetadataFilesToBeDeleted( List metadataFiles, Map metadataFilePinnedTimestampMap, - long minGenerationToKeep, + long minGenerationToKeepInRemote, boolean indexDeleted, Logger logger ) { @@ -327,7 +333,7 @@ protected static List getMetadataFilesToBeDeleted( // Filter out metadata files based on minGenerationToKeep List metadataFilesContainingMinGenerationToKeep = metadataFilesToBeDeleted.stream().filter(md -> { long maxGeneration = TranslogTransferMetadata.getMaxGenerationFromFileName(md); - return maxGeneration == -1 || maxGeneration > minGenerationToKeep; + return maxGeneration == -1 || maxGeneration >= minGenerationToKeepInRemote; }).collect(Collectors.toList()); metadataFilesToBeDeleted.removeAll(metadataFilesContainingMinGenerationToKeep); @@ -335,7 +341,7 @@ protected static List getMetadataFilesToBeDeleted( "metadataFilesContainingMinGenerationToKeep.size = {}, metadataFilesToBeDeleted based on minGenerationToKeep filtering = {}, minGenerationToKeep = {}", metadataFilesContainingMinGenerationToKeep.size(), metadataFilesToBeDeleted.size(), - minGenerationToKeep + minGenerationToKeepInRemote ); } diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java index 2a97c42346d31..e6871414cf5e0 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java @@ -335,29 +335,46 @@ public void testSimpleOperationsUpload() throws Exception { addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 2, primaryTerm.get(), new byte[] { 1 })); addToTranslogAndListAndUpload(translog, ops, new Translog.Index("3", 3, primaryTerm.get(), new byte[] { 1 })); - addToTranslogAndListAndUpload(translog, ops, new Translog.Index("4", 4, primaryTerm.get(), new byte[] { 1 })); - addToTranslogAndListAndUpload(translog, ops, new Translog.Index("5", 5, primaryTerm.get(), new byte[] { 1 })); - addToTranslogAndListAndUpload(translog, ops, new Translog.Index("6", 6, primaryTerm.get(), new byte[] { 1 })); assertBusy(() -> { assertEquals( - 16, + 10, blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() ); }); - assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); - RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); // Fetch pinned timestamps so that it won't be stale updatePinnedTimstampTask.run(); + translog.setMinSeqNoToKeep(3); + translog.trimUnreferencedReaders(); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("4", 4, primaryTerm.get(), new byte[] { 1 })); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("5", 5, primaryTerm.get(), new byte[] { 1 })); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("6", 6, primaryTerm.get(), new byte[] { 1 })); + + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); + // Fetch pinned timestamps so that it won't be stale + updatePinnedTimstampTask.run(); translog.setMinSeqNoToKeep(6); translog.trimUnreferencedReaders(); + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); + + assertEquals(1, translog.readers.size()); + assertBusy(() -> { + assertEquals(2, translog.allUploaded().size()); + assertEquals(4, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); + assertEquals( + 16, + blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() + ); + }, 30, TimeUnit.SECONDS); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("7", 7, primaryTerm.get(), new byte[] { 1 })); addToTranslogAndListAndUpload(translog, ops, new Translog.Index("8", 8, primaryTerm.get(), new byte[] { 1 })); - assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); // Fetch pinned timestamps so that it won't be stale updatePinnedTimstampTask.run(); translog.trimUnreferencedReaders(); @@ -365,13 +382,13 @@ public void testSimpleOperationsUpload() throws Exception { assertEquals(3, translog.readers.size()); assertBusy(() -> { - assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); assertEquals(6, translog.allUploaded().size()); + assertEquals(3, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); assertEquals( - 6, + 12, blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() ); - }, 60, TimeUnit.SECONDS); + }, 30, TimeUnit.SECONDS); } @Override @@ -573,7 +590,7 @@ public void testDrainSync() throws Exception { assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); assertEquals(1, translog.readers.size()); assertBusy(() -> assertEquals(2, translog.allUploaded().size())); - assertBusy(() -> assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); + assertBusy(() -> assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); } @Override @@ -816,7 +833,7 @@ public void testGetMetadataFilesToBeDeletedExclusionBasedOnGenerationOnly() thro // MaxGen 12 "metadata__9223372036438563903__9223372036854775795__" + md2Timestamp + "__31__9223372036854775803__1", // MaxGen 10 - "metadata__9223372036438563903__9223372036854775797__" + md3Timestamp + "__31__9223372036854775701__1" + "metadata__9223372036438563903__9223372036854775798__" + md3Timestamp + "__31__9223372036854775701__1" ); List metadataFilesToBeDeleted = RemoteFsTimestampAwareTranslog.getMetadataFilesToBeDeleted( From 4f83dac81bb1803b49874042021e378eff5e38f5 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Tue, 24 Sep 2024 12:01:26 +0530 Subject: [PATCH 7/9] Address PR comments Signed-off-by: Sachin Kale --- .../index/translog/RemoteFsTimestampAwareTranslog.java | 6 +++--- .../repositories/blobstore/BlobStoreRepository.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java index 7f52110843645..46ea1d44f5d30 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java @@ -136,7 +136,7 @@ protected void trimUnreferencedReaders(boolean indexDeleted, boolean trimLocal) // This is to ensure that after the permits are acquired during primary relocation, there are no further modification on remote // store. - if ((indexDeleted == false && startedPrimarySupplier.getAsBoolean() == false) || pauseSync.get()) { + if (indexDeleted == false && (startedPrimarySupplier.getAsBoolean() == false || pauseSync.get())) { return; } @@ -505,7 +505,7 @@ protected static Tuple getMinMaxPrimaryTermFromMetadataFile( } } - public static void cleanup(TranslogTransferManager translogTransferManager, boolean forceClean) throws IOException { + public static void cleanupOfDeletedIndex(TranslogTransferManager translogTransferManager, boolean forceClean) throws IOException { if (forceClean) { translogTransferManager.delete(); } else { @@ -523,7 +523,7 @@ public void onResponse(List blobMetadata) { metadataFiles, new HashMap<>(), Long.MAX_VALUE, - true, + true, // This method gets called when the index is no longer present staticLogger ); if (metadataFilesToBeDeleted.isEmpty()) { diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 86f8f1c2a821a..14c201e819994 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -2279,7 +2279,7 @@ private void remoteTranslogCleanupAsync( indexMetadataEnabled ); try { - RemoteFsTimestampAwareTranslog.cleanup(translogTransferManager, forceClean); + RemoteFsTimestampAwareTranslog.cleanupOfDeletedIndex(translogTransferManager, forceClean); } catch (IOException e) { logger.error("Exception while cleaning up remote translog for shard: " + shardId, e); } From 8d3e2558936a048518659402c9460a0b7a4c74dc Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Tue, 24 Sep 2024 18:36:47 +0530 Subject: [PATCH 8/9] Fix flaky test Signed-off-by: Sachin Kale --- .../RemoteStorePinnedTimestampsGarbageCollectionIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java index 46d43926e7e34..d21b447c0b67f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java @@ -170,7 +170,7 @@ public void testLiveIndexNoPinnedTimestampsWithExtraGenSettingWithinLimit() thro assertEquals(numDocs + 1, metadataFiles.size()); verifyTranslogDataFileCount(metadataFiles, translogDataPath); - }); + }, 30, TimeUnit.SECONDS); } public void testLiveIndexNoPinnedTimestampsWithExtraGenSetting() throws Exception { From af7fcbf8b1cf4cf757f1bd26dade326f03ad0db3 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Wed, 25 Sep 2024 10:17:57 +0530 Subject: [PATCH 9/9] Address PR comments and fix flaky tests Signed-off-by: Sachin Kale --- ...teStorePinnedTimestampsGarbageCollectionIT.java | 2 +- .../translog/RemoteFsTimestampAwareTranslog.java | 14 ++++++++++---- .../index/translog/RemoteFsTranslog.java | 8 ++++++++ 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java index d21b447c0b67f..0a2668c60d3bd 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java @@ -167,7 +167,7 @@ public void testLiveIndexNoPinnedTimestampsWithExtraGenSettingWithinLimit() thro assertBusy(() -> { List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); - assertEquals(numDocs + 1, metadataFiles.size()); + assertTrue(metadataFiles.size() >= numDocs + 1); verifyTranslogDataFileCount(metadataFiles, translogDataPath); }, 30, TimeUnit.SECONDS); diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java index 46ea1d44f5d30..e61a9606175ee 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java @@ -34,6 +34,7 @@ import java.util.Optional; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BooleanSupplier; import java.util.function.LongConsumer; @@ -61,7 +62,7 @@ public class RemoteFsTimestampAwareTranslog extends RemoteFsTranslog { private final Map> oldFormatMetadataFileGenerationMap; private final Map> oldFormatMetadataFilePrimaryTermMap; private final AtomicLong minPrimaryTermInRemote = new AtomicLong(Long.MAX_VALUE); - private long previousMinRemoteGenReferenced = -1; + private final AtomicBoolean triggerTrimOnMinRemoteGenReferencedChange = new AtomicBoolean(false); public RemoteFsTimestampAwareTranslog( TranslogConfig config, @@ -106,6 +107,11 @@ protected void onDelete() { } } + @Override + protected void onMinRemoteGenReferencedChange() { + triggerTrimOnMinRemoteGenReferencedChange.set(true); + } + @Override public void trimUnreferencedReaders() throws IOException { trimUnreferencedReaders(false, true); @@ -148,10 +154,10 @@ protected void trimUnreferencedReaders(boolean indexDeleted, boolean trimLocal) // This code block ensures parity with RemoteFsTranslog. Without this, we will end up making list translog metadata // call in each invocation of trimUnreferencedReaders - if (indexDeleted == false && previousMinRemoteGenReferenced == minRemoteGenReferenced) { + if (indexDeleted == false && triggerTrimOnMinRemoteGenReferencedChange.get() == false) { return; - } else if (previousMinRemoteGenReferenced != minRemoteGenReferenced) { - previousMinRemoteGenReferenced = minRemoteGenReferenced; + } else if (triggerTrimOnMinRemoteGenReferencedChange.get()) { + triggerTrimOnMinRemoteGenReferencedChange.set(false); } // Since remote generation deletion is async, this ensures that only one generation deletion happens at a time. diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index a54a31cea41ef..85f58f898826f 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -683,7 +683,11 @@ private class RemoteFsTranslogTransferListener implements TranslogTransferListen @Override public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOException { maxRemoteTranslogGenerationUploaded = generation; + long previousMinRemoteGenReferenced = minRemoteGenReferenced; minRemoteGenReferenced = getMinFileGeneration(); + if (previousMinRemoteGenReferenced != minRemoteGenReferenced) { + onMinRemoteGenReferencedChange(); + } logger.debug( "Successfully uploaded translog for primary term = {}, generation = {}, maxSeqNo = {}, minRemoteGenReferenced = {}", primaryTerm, @@ -703,6 +707,10 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) thro } } + protected void onMinRemoteGenReferencedChange() { + + } + @Override public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommit) { return minSeqNoToKeep;