Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Segment gc timestamp pinned enabled #15435

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,15 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
} else {
// As delete is async its possible that the file gets created before the deletion or after
// deletion.
MatcherAssert.assertThat(
actualFileCount,
is(oneOf(lastNMetadataFilesToKeep - 1, lastNMetadataFilesToKeep, lastNMetadataFilesToKeep + 1))
);
if (RemoteStoreSettings.isPinnedTimestampsEnabled()) {
// With pinned timestamp, we also keep md files since last successful fetch
assertTrue(actualFileCount >= lastNMetadataFilesToKeep);
} else {
MatcherAssert.assertThat(
actualFileCount,
is(oneOf(lastNMetadataFilesToKeep - 1, lastNMetadataFilesToKeep, lastNMetadataFilesToKeep + 1))
);
}
}
}, 30, TimeUnit.SECONDS);
}
Expand Down Expand Up @@ -249,7 +254,12 @@ public void testStaleCommitDeletionWithMinSegmentFiles_3() throws Exception {
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
int actualFileCount = getFileCount(indexPath);
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
MatcherAssert.assertThat(actualFileCount, is(oneOf(4)));
if (RemoteStoreSettings.isPinnedTimestampsEnabled()) {
// With pinned timestamp, we also keep md files since last successful fetch
assertTrue(actualFileCount >= 4);
} else {
assertEquals(4, actualFileCount);
}
}

public void testStaleCommitDeletionWithMinSegmentFiles_Disabled() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.threadpool.ThreadPool;

import java.io.FileNotFoundException;
Expand Down Expand Up @@ -91,6 +92,8 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement

private final RemoteStoreLockManager mdLockManager;

private final Map<Long, String> metadataFilePinnedTimestampMap;

private final ThreadPool threadPool;

/**
Expand Down Expand Up @@ -132,6 +135,7 @@ public RemoteSegmentStoreDirectory(
this.remoteMetadataDirectory = remoteMetadataDirectory;
this.mdLockManager = mdLockManager;
this.threadPool = threadPool;
this.metadataFilePinnedTimestampMap = new HashMap<>();
this.logger = Loggers.getLogger(getClass(), shardId);
init();
}
Expand Down Expand Up @@ -176,6 +180,38 @@ public RemoteSegmentMetadata initializeToSpecificCommit(long primaryTerm, long c
return remoteSegmentMetadata;
}

/**
* Initializes the remote segment metadata to a specific timestamp.
*
* @param timestamp The timestamp to initialize the remote segment metadata to.
* @return The RemoteSegmentMetadata object corresponding to the specified timestamp, or null if no metadata file is found for that timestamp.
* @throws IOException If an I/O error occurs while reading the metadata file.
*/
public RemoteSegmentMetadata initializeToSpecificTimestamp(long timestamp) throws IOException {
List<String> metadataFiles = remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
MetadataFilenameUtils.METADATA_PREFIX,
Integer.MAX_VALUE
);
Set<String> lockedMetadataFiles = RemoteStoreUtils.getPinnedTimestampLockedFiles(
metadataFiles,
Set.of(timestamp),
MetadataFilenameUtils::getTimestamp,
MetadataFilenameUtils::getNodeIdByPrimaryTermAndGen
);
if (lockedMetadataFiles.isEmpty()) {
return null;
}
assert lockedMetadataFiles.size() == 1 : "Expected exactly one metadata file but got " + lockedMetadataFiles;
String metadataFile = lockedMetadataFiles.iterator().next();
RemoteSegmentMetadata remoteSegmentMetadata = readMetadataFile(metadataFile);
if (remoteSegmentMetadata != null) {
this.segmentsUploadedToRemoteStore = new ConcurrentHashMap<>(remoteSegmentMetadata.getMetadata());
} else {
this.segmentsUploadedToRemoteStore = new ConcurrentHashMap<>();
}
return remoteSegmentMetadata;
}

/**
* Read the latest metadata file to get the list of segments uploaded to the remote segment store.
* We upload a metadata file per refresh, but it is not unique per refresh. Refresh metadata file is unique for a given commit.
Expand Down Expand Up @@ -324,7 +360,8 @@ public static String getMetadataFilename(
long translogGeneration,
long uploadCounter,
int metadataVersion,
String nodeId
String nodeId,
long creationTimestamp
) {
return String.join(
SEPARATOR,
Expand All @@ -334,11 +371,30 @@ public static String getMetadataFilename(
RemoteStoreUtils.invertLong(translogGeneration),
RemoteStoreUtils.invertLong(uploadCounter),
String.valueOf(Objects.hash(nodeId)),
RemoteStoreUtils.invertLong(System.currentTimeMillis()),
RemoteStoreUtils.invertLong(creationTimestamp),
String.valueOf(metadataVersion)
);
}

public static String getMetadataFilename(
long primaryTerm,
long generation,
long translogGeneration,
long uploadCounter,
int metadataVersion,
String nodeId
) {
return getMetadataFilename(
primaryTerm,
generation,
translogGeneration,
uploadCounter,
metadataVersion,
nodeId,
System.currentTimeMillis()
);
}

// Visible for testing
static long getPrimaryTerm(String[] filenameTokens) {
return RemoteStoreUtils.invertLong(filenameTokens[1]);
Expand Down Expand Up @@ -778,6 +834,7 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
);
return;
}

List<String> sortedMetadataFileList = remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
MetadataFilenameUtils.METADATA_PREFIX,
Integer.MAX_VALUE
Expand All @@ -791,16 +848,44 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
return;
}

List<String> metadataFilesEligibleToDelete = new ArrayList<>(
sortedMetadataFileList.subList(lastNMetadataFilesToKeep, sortedMetadataFileList.size())
// Check last fetch status of pinned timestamps. If stale, return.
if (RemoteStoreUtils.isPinnedTimestampStateStale()) {
logger.warn("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale");
return;
}

Tuple<Long, Set<Long>> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps();

Set<String> implicitLockedFiles = RemoteStoreUtils.getPinnedTimestampLockedFiles(
sortedMetadataFileList,
pinnedTimestampsState.v2(),
metadataFilePinnedTimestampMap,
MetadataFilenameUtils::getTimestamp,
MetadataFilenameUtils::getNodeIdByPrimaryTermAndGen
);
Set<String> allLockFiles;
final Set<String> allLockFiles = new HashSet<>(implicitLockedFiles);

try {
allLockFiles = ((RemoteStoreMetadataLockManager) mdLockManager).fetchLockedMetadataFiles(MetadataFilenameUtils.METADATA_PREFIX);
allLockFiles.addAll(
((RemoteStoreMetadataLockManager) mdLockManager).fetchLockedMetadataFiles(MetadataFilenameUtils.METADATA_PREFIX)
);
} catch (Exception e) {
logger.error("Exception while fetching segment metadata lock files, skipping deleteStaleSegments", e);
return;
}

List<String> metadataFilesEligibleToDelete = new ArrayList<>(
sortedMetadataFileList.subList(lastNMetadataFilesToKeep, sortedMetadataFileList.size())
);

// Along with last N files, we need to keep files since last successful run of scheduler
long lastSuccessfulFetchOfPinnedTimestamps = pinnedTimestampsState.v1();
metadataFilesEligibleToDelete = RemoteStoreUtils.filterOutMetadataFilesBasedOnAge(
metadataFilesEligibleToDelete,
MetadataFilenameUtils::getTimestamp,
lastSuccessfulFetchOfPinnedTimestamps
);

List<String> metadataFilesToBeDeleted = metadataFilesEligibleToDelete.stream()
.filter(metadataFile -> allLockFiles.contains(metadataFile) == false)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,9 @@ private ActionListener<Void> getListenerForWriteCallResponse(

private PinnedTimestamps readExistingPinnedTimestamps(String blobFilename, RemotePinnedTimestamps remotePinnedTimestamps) {
remotePinnedTimestamps.setBlobFileName(blobFilename);
remotePinnedTimestamps.setFullBlobName(pinnedTimestampsBlobStore.getBlobPathForUpload(remotePinnedTimestamps));
remotePinnedTimestamps.setFullBlobName(pinnedTimestampsBlobStore().getBlobPathForUpload(remotePinnedTimestamps));
try {
return pinnedTimestampsBlobStore.read(remotePinnedTimestamps);
return pinnedTimestampsBlobStore().read(remotePinnedTimestamps);
} catch (IOException e) {
throw new RuntimeException("Failed to read existing pinned timestamps", e);
}
Expand All @@ -245,6 +245,14 @@ public static Tuple<Long, Set<Long>> getPinnedTimestamps() {
return pinnedTimestampsSet;
}

public RemoteStorePinnedTimestampsBlobStore pinnedTimestampsBlobStore() {
return pinnedTimestampsBlobStore;
}

public BlobStoreTransferService blobStoreTransferService() {
return blobStoreTransferService;
}

/**
* Inner class for asynchronously updating the pinned timestamp set.
*/
Expand All @@ -266,11 +274,12 @@ protected void runInternal() {
clusterService.state().metadata().clusterUUID(),
blobStoreRepository.getCompressor()
);
BlobPath path = pinnedTimestampsBlobStore.getBlobPathForUpload(remotePinnedTimestamps);
blobStoreTransferService.listAllInSortedOrder(path, remotePinnedTimestamps.getType(), 1, new ActionListener<>() {
BlobPath path = pinnedTimestampsBlobStore().getBlobPathForUpload(remotePinnedTimestamps);
blobStoreTransferService().listAllInSortedOrder(path, remotePinnedTimestamps.getType(), 1, new ActionListener<>() {
@Override
public void onResponse(List<BlobMetadata> blobMetadata) {
if (blobMetadata.isEmpty()) {
pinnedTimestampsSet = new Tuple<>(triggerTimestamp, Set.of());
return;
}
PinnedTimestamps pinnedTimestamps = readExistingPinnedTimestamps(blobMetadata.get(0).name(), remotePinnedTimestamps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1081,5 +1081,4 @@ public void testIsPinnedTimestampStateStaleFeatureEnabled() {
setupRemotePinnedTimestampFeature(true);
assertTrue(RemoteStoreUtils.isPinnedTimestampStateStale());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,47 +43,19 @@ public class BaseRemoteSegmentStoreDirectoryTests extends IndexShardTestCase {
protected SegmentInfos segmentInfos;
protected ThreadPool threadPool;

protected final String metadataFilename = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(
12,
23,
34,
1,
1,
"node-1"
);
protected String metadataFilename = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(12, 23, 34, 1, 1, "node-1");

protected final String metadataFilenameDup = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(
protected String metadataFilenameDup = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(
12,
23,
34,
2,
1,
"node-2"
);
protected final String metadataFilename2 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(
12,
13,
34,
1,
1,
"node-1"
);
protected final String metadataFilename3 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(
10,
38,
34,
1,
1,
"node-1"
);
protected final String metadataFilename4 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(
10,
36,
34,
1,
1,
"node-1"
);
protected String metadataFilename2 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(12, 13, 34, 1, 1, "node-1");
protected String metadataFilename3 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(10, 38, 34, 1, 1, "node-1");
protected String metadataFilename4 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(10, 36, 34, 1, 1, "node-1");

public void setupRemoteSegmentStoreDirectory() throws IOException {
remoteDataDirectory = mock(RemoteDirectory.class);
Expand Down
Loading
Loading