Skip to content

Commit

Permalink
Add support to skip pinned timestamp in remote segment garbage collec…
Browse files Browse the repository at this point in the history
…tor (opensearch-project#15017)

---------
Signed-off-by: Sachin Kale <kalsac@amazon.com>
Co-authored-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
sachinpkale authored and dk2k committed Oct 17, 2024
1 parent 8228f60 commit c8b1dcc
Show file tree
Hide file tree
Showing 8 changed files with 430 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,15 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
} else {
// As delete is async its possible that the file gets created before the deletion or after
// deletion.
MatcherAssert.assertThat(
actualFileCount,
is(oneOf(lastNMetadataFilesToKeep - 1, lastNMetadataFilesToKeep, lastNMetadataFilesToKeep + 1))
);
if (RemoteStoreSettings.isPinnedTimestampsEnabled()) {
// With pinned timestamp, we also keep md files since last successful fetch
assertTrue(actualFileCount >= lastNMetadataFilesToKeep);
} else {
MatcherAssert.assertThat(
actualFileCount,
is(oneOf(lastNMetadataFilesToKeep - 1, lastNMetadataFilesToKeep, lastNMetadataFilesToKeep + 1))
);
}
}
}, 30, TimeUnit.SECONDS);
}
Expand Down Expand Up @@ -249,7 +254,12 @@ public void testStaleCommitDeletionWithMinSegmentFiles_3() throws Exception {
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
int actualFileCount = getFileCount(indexPath);
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
MatcherAssert.assertThat(actualFileCount, is(oneOf(4)));
if (RemoteStoreSettings.isPinnedTimestampsEnabled()) {
// With pinned timestamp, we also keep md files since last successful fetch
assertTrue(actualFileCount >= 4);
} else {
assertEquals(4, actualFileCount);
}
}

public void testStaleCommitDeletionWithMinSegmentFiles_Disabled() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.store.RemoteBufferedOutputDirectory;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
Expand Down Expand Up @@ -287,8 +288,14 @@ public void testDeleteMultipleShallowCopySnapshotsCase3() throws Exception {
public void testRemoteStoreCleanupForDeletedIndex() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
final Path remoteStoreRepoPath = randomRepoPath();
internalCluster().startClusterManagerOnlyNode(remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath));
internalCluster().startDataOnlyNode(remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath));
Settings settings = remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath);
// Disabling pinned timestamp as this test is specifically for shallow snapshot.
settings = Settings.builder()
.put(settings)
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), false)
.build();
internalCluster().startClusterManagerOnlyNode(settings);
internalCluster().startDataOnlyNode(settings);
final Client clusterManagerClient = internalCluster().clusterManagerClient();
ensureStableCluster(2);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.threadpool.ThreadPool;

import java.io.FileNotFoundException;
Expand Down Expand Up @@ -91,6 +92,8 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement

private final RemoteStoreLockManager mdLockManager;

private final Map<Long, String> metadataFilePinnedTimestampMap;

private final ThreadPool threadPool;

/**
Expand Down Expand Up @@ -132,6 +135,7 @@ public RemoteSegmentStoreDirectory(
this.remoteMetadataDirectory = remoteMetadataDirectory;
this.mdLockManager = mdLockManager;
this.threadPool = threadPool;
this.metadataFilePinnedTimestampMap = new HashMap<>();
this.logger = Loggers.getLogger(getClass(), shardId);
init();
}
Expand Down Expand Up @@ -176,6 +180,42 @@ public RemoteSegmentMetadata initializeToSpecificCommit(long primaryTerm, long c
return remoteSegmentMetadata;
}

/**
* Initializes the remote segment metadata to a specific timestamp.
*
* @param timestamp The timestamp to initialize the remote segment metadata to.
* @return The RemoteSegmentMetadata object corresponding to the specified timestamp, or null if no metadata file is found for that timestamp.
* @throws IOException If an I/O error occurs while reading the metadata file.
*/
public RemoteSegmentMetadata initializeToSpecificTimestamp(long timestamp) throws IOException {
List<String> metadataFiles = remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
MetadataFilenameUtils.METADATA_PREFIX,
Integer.MAX_VALUE
);
Set<String> lockedMetadataFiles = RemoteStoreUtils.getPinnedTimestampLockedFiles(
metadataFiles,
Set.of(timestamp),
MetadataFilenameUtils::getTimestamp,
MetadataFilenameUtils::getNodeIdByPrimaryTermAndGen
);
if (lockedMetadataFiles.isEmpty()) {
return null;
}
if (lockedMetadataFiles.size() > 1) {
throw new IOException(
"Expected exactly one metadata file matching timestamp: " + timestamp + " but got " + lockedMetadataFiles
);
}
String metadataFile = lockedMetadataFiles.iterator().next();
RemoteSegmentMetadata remoteSegmentMetadata = readMetadataFile(metadataFile);
if (remoteSegmentMetadata != null) {
this.segmentsUploadedToRemoteStore = new ConcurrentHashMap<>(remoteSegmentMetadata.getMetadata());
} else {
this.segmentsUploadedToRemoteStore = new ConcurrentHashMap<>();
}
return remoteSegmentMetadata;
}

/**
* Read the latest metadata file to get the list of segments uploaded to the remote segment store.
* We upload a metadata file per refresh, but it is not unique per refresh. Refresh metadata file is unique for a given commit.
Expand Down Expand Up @@ -324,7 +364,8 @@ public static String getMetadataFilename(
long translogGeneration,
long uploadCounter,
int metadataVersion,
String nodeId
String nodeId,
long creationTimestamp
) {
return String.join(
SEPARATOR,
Expand All @@ -334,11 +375,30 @@ public static String getMetadataFilename(
RemoteStoreUtils.invertLong(translogGeneration),
RemoteStoreUtils.invertLong(uploadCounter),
String.valueOf(Objects.hash(nodeId)),
RemoteStoreUtils.invertLong(System.currentTimeMillis()),
RemoteStoreUtils.invertLong(creationTimestamp),
String.valueOf(metadataVersion)
);
}

public static String getMetadataFilename(
long primaryTerm,
long generation,
long translogGeneration,
long uploadCounter,
int metadataVersion,
String nodeId
) {
return getMetadataFilename(
primaryTerm,
generation,
translogGeneration,
uploadCounter,
metadataVersion,
nodeId,
System.currentTimeMillis()
);
}

// Visible for testing
static long getPrimaryTerm(String[] filenameTokens) {
return RemoteStoreUtils.invertLong(filenameTokens[1]);
Expand Down Expand Up @@ -778,6 +838,7 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
);
return;
}

List<String> sortedMetadataFileList = remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
MetadataFilenameUtils.METADATA_PREFIX,
Integer.MAX_VALUE
Expand All @@ -791,16 +852,44 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
return;
}

List<String> metadataFilesEligibleToDelete = new ArrayList<>(
sortedMetadataFileList.subList(lastNMetadataFilesToKeep, sortedMetadataFileList.size())
// Check last fetch status of pinned timestamps. If stale, return.
if (RemoteStoreUtils.isPinnedTimestampStateStale()) {
logger.warn("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale");
return;
}

Tuple<Long, Set<Long>> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps();

Set<String> implicitLockedFiles = RemoteStoreUtils.getPinnedTimestampLockedFiles(
sortedMetadataFileList,
pinnedTimestampsState.v2(),
metadataFilePinnedTimestampMap,
MetadataFilenameUtils::getTimestamp,
MetadataFilenameUtils::getNodeIdByPrimaryTermAndGen
);
Set<String> allLockFiles;
final Set<String> allLockFiles = new HashSet<>(implicitLockedFiles);

try {
allLockFiles = ((RemoteStoreMetadataLockManager) mdLockManager).fetchLockedMetadataFiles(MetadataFilenameUtils.METADATA_PREFIX);
allLockFiles.addAll(
((RemoteStoreMetadataLockManager) mdLockManager).fetchLockedMetadataFiles(MetadataFilenameUtils.METADATA_PREFIX)
);
} catch (Exception e) {
logger.error("Exception while fetching segment metadata lock files, skipping deleteStaleSegments", e);
return;
}

List<String> metadataFilesEligibleToDelete = new ArrayList<>(
sortedMetadataFileList.subList(lastNMetadataFilesToKeep, sortedMetadataFileList.size())
);

// Along with last N files, we need to keep files since last successful run of scheduler
long lastSuccessfulFetchOfPinnedTimestamps = pinnedTimestampsState.v1();
metadataFilesEligibleToDelete = RemoteStoreUtils.filterOutMetadataFilesBasedOnAge(
metadataFilesEligibleToDelete,
MetadataFilenameUtils::getTimestamp,
lastSuccessfulFetchOfPinnedTimestamps
);

List<String> metadataFilesToBeDeleted = metadataFilesEligibleToDelete.stream()
.filter(metadataFile -> allLockFiles.contains(metadataFile) == false)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,9 @@ private ActionListener<Void> getListenerForWriteCallResponse(

private PinnedTimestamps readExistingPinnedTimestamps(String blobFilename, RemotePinnedTimestamps remotePinnedTimestamps) {
remotePinnedTimestamps.setBlobFileName(blobFilename);
remotePinnedTimestamps.setFullBlobName(pinnedTimestampsBlobStore.getBlobPathForUpload(remotePinnedTimestamps));
remotePinnedTimestamps.setFullBlobName(pinnedTimestampsBlobStore().getBlobPathForUpload(remotePinnedTimestamps));
try {
return pinnedTimestampsBlobStore.read(remotePinnedTimestamps);
return pinnedTimestampsBlobStore().read(remotePinnedTimestamps);
} catch (IOException e) {
throw new RuntimeException("Failed to read existing pinned timestamps", e);
}
Expand All @@ -245,6 +245,14 @@ public static Tuple<Long, Set<Long>> getPinnedTimestamps() {
return pinnedTimestampsSet;
}

public RemoteStorePinnedTimestampsBlobStore pinnedTimestampsBlobStore() {
return pinnedTimestampsBlobStore;
}

public BlobStoreTransferService blobStoreTransferService() {
return blobStoreTransferService;
}

/**
* Inner class for asynchronously updating the pinned timestamp set.
*/
Expand All @@ -266,11 +274,12 @@ protected void runInternal() {
clusterService.state().metadata().clusterUUID(),
blobStoreRepository.getCompressor()
);
BlobPath path = pinnedTimestampsBlobStore.getBlobPathForUpload(remotePinnedTimestamps);
blobStoreTransferService.listAllInSortedOrder(path, remotePinnedTimestamps.getType(), 1, new ActionListener<>() {
BlobPath path = pinnedTimestampsBlobStore().getBlobPathForUpload(remotePinnedTimestamps);
blobStoreTransferService().listAllInSortedOrder(path, remotePinnedTimestamps.getType(), 1, new ActionListener<>() {
@Override
public void onResponse(List<BlobMetadata> blobMetadata) {
if (blobMetadata.isEmpty()) {
pinnedTimestampsSet = new Tuple<>(triggerTimestamp, Set.of());
return;
}
PinnedTimestamps pinnedTimestamps = readExistingPinnedTimestamps(blobMetadata.get(0).name(), remotePinnedTimestamps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1081,5 +1081,4 @@ public void testIsPinnedTimestampStateStaleFeatureEnabled() {
setupRemotePinnedTimestampFeature(true);
assertTrue(RemoteStoreUtils.isPinnedTimestampStateStale());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,47 +43,19 @@ public class BaseRemoteSegmentStoreDirectoryTests extends IndexShardTestCase {
protected SegmentInfos segmentInfos;
protected ThreadPool threadPool;

protected final String metadataFilename = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(
12,
23,
34,
1,
1,
"node-1"
);
protected String metadataFilename = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(12, 23, 34, 1, 1, "node-1");

protected final String metadataFilenameDup = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(
protected String metadataFilenameDup = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(
12,
23,
34,
2,
1,
"node-2"
);
protected final String metadataFilename2 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(
12,
13,
34,
1,
1,
"node-1"
);
protected final String metadataFilename3 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(
10,
38,
34,
1,
1,
"node-1"
);
protected final String metadataFilename4 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(
10,
36,
34,
1,
1,
"node-1"
);
protected String metadataFilename2 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(12, 13, 34, 1, 1, "node-1");
protected String metadataFilename3 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(10, 38, 34, 1, 1, "node-1");
protected String metadataFilename4 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(10, 36, 34, 1, 1, "node-1");

public void setupRemoteSegmentStoreDirectory() throws IOException {
remoteDataDirectory = mock(RemoteDirectory.class);
Expand Down
Loading

0 comments on commit c8b1dcc

Please sign in to comment.