Skip to content

Commit

Permalink
Integrate translog cleanup with snapshot deletion and fix primary ter…
Browse files Browse the repository at this point in the history
…m deletion logic (#15657)

---------

Signed-off-by: Sachin Kale <kalsac@amazon.com>
Co-authored-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
sachinpkale and Sachin Kale authored Sep 4, 2024
1 parent 3681b52 commit 7b0846e
Show file tree
Hide file tree
Showing 11 changed files with 422 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import static org.hamcrest.Matchers.lessThan;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class DeleteSnapshotITV2 extends AbstractSnapshotIntegTestCase {
public class DeleteSnapshotV2IT extends AbstractSnapshotIntegTestCase {

private static final String REMOTE_REPO_NAME = "remote-store-repo-name";

Expand Down Expand Up @@ -276,9 +276,11 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio
Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID);
Path shardPath = Path.of(String.valueOf(indexPath), "0");
Path segmentsPath = Path.of(String.valueOf(shardPath), "segments");
Path translogPath = Path.of(String.valueOf(shardPath), "translog");

// Get total segments remote store directory file count for deleted index and shard 0
int segmentFilesCountBeforeDeletingSnapshot1 = RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath);
int translogFilesCountBeforeDeletingSnapshot1 = RemoteStoreBaseIntegTestCase.getFileCount(translogPath);

RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);

Expand Down Expand Up @@ -312,6 +314,13 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio
assertThat(RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath), lessThan(segmentFilesCountAfterDeletingSnapshot1));
} catch (Exception e) {}
}, 60, TimeUnit.SECONDS);

assertBusy(() -> {
try {
assertThat(RemoteStoreBaseIntegTestCase.getFileCount(translogPath), lessThan(translogFilesCountBeforeDeletingSnapshot1));
} catch (Exception e) {}
}, 60, TimeUnit.SECONDS);

}

private Settings snapshotV2Settings(Path remoteStoreRepoPath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ static long getGeneration(String[] filenameTokens) {

public static long getTimestamp(String filename) {
String[] filenameTokens = filename.split(SEPARATOR);
return RemoteStoreUtils.invertLong(filenameTokens[6]);
return RemoteStoreUtils.invertLong(filenameTokens[filenameTokens.length - 2]);
}

public static Tuple<String, String> getNodeIdByPrimaryTermAndGen(String filename) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,8 @@ public Directory newDirectory(String repositoryName, String indexUUID, ShardId s
}
}

public Supplier<RepositoriesService> getRepositoriesService() {
return this.repositoriesService;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.index.translog;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.blobstore.BlobMetadata;
Expand All @@ -33,6 +34,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
Expand All @@ -52,10 +54,13 @@
*/
public class RemoteFsTimestampAwareTranslog extends RemoteFsTranslog {

private static Logger staticLogger = LogManager.getLogger(RemoteFsTimestampAwareTranslog.class);
private final Logger logger;
private final Map<Long, String> metadataFilePinnedTimestampMap;
// For metadata files, with no min generation in the name, we cache generation data to avoid multiple reads.
private final Map<String, Tuple<Long, Long>> oldFormatMetadataFileGenerationMap;
private final Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap;
private final AtomicLong minPrimaryTermInRemote = new AtomicLong(Long.MAX_VALUE);

public RemoteFsTimestampAwareTranslog(
TranslogConfig config,
Expand Down Expand Up @@ -86,6 +91,7 @@ public RemoteFsTimestampAwareTranslog(
logger = Loggers.getLogger(getClass(), shardId);
this.metadataFilePinnedTimestampMap = new HashMap<>();
this.oldFormatMetadataFileGenerationMap = new HashMap<>();
this.oldFormatMetadataFilePrimaryTermMap = new HashMap<>();
}

@Override
Expand Down Expand Up @@ -165,7 +171,11 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
return;
}

List<String> metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles);
List<String> metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(
metadataFiles,
metadataFilePinnedTimestampMap,
logger
);

// If index is not deleted, make sure to keep latest metadata file
if (indexDeleted == false) {
Expand Down Expand Up @@ -209,7 +219,7 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
oldFormatMetadataFileGenerationMap.keySet().retainAll(metadataFilesNotToBeDeleted);

// Delete stale primary terms
deleteStaleRemotePrimaryTerms(metadataFiles);
deleteStaleRemotePrimaryTerms(metadataFilesNotToBeDeleted);
} else {
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
}
Expand Down Expand Up @@ -259,8 +269,16 @@ protected Set<Long> getGenerationsToBeDeleted(
return generationsToBeDeleted;
}

// Visible for testing
protected List<String> getMetadataFilesToBeDeleted(List<String> metadataFiles) {
return getMetadataFilesToBeDeleted(metadataFiles, metadataFilePinnedTimestampMap, logger);
}

// Visible for testing
protected static List<String> getMetadataFilesToBeDeleted(
List<String> metadataFiles,
Map<Long, String> metadataFilePinnedTimestampMap,
Logger logger
) {
Tuple<Long, Set<Long>> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps();

// Keep files since last successful run of scheduler
Expand Down Expand Up @@ -351,27 +369,153 @@ protected Tuple<Long, Long> getMinMaxTranslogGenerationFromMetadataFile(
}
}

private void deleteStaleRemotePrimaryTerms(List<String> metadataFiles) {
deleteStaleRemotePrimaryTerms(
metadataFiles,
translogTransferManager,
oldFormatMetadataFilePrimaryTermMap,
minPrimaryTermInRemote,
logger
);
}

/**
* This method must be called only after there are valid generations to delete in trimUnreferencedReaders as it ensures
* implicitly that minimum primary term in latest translog metadata in remote store is the current primary term.
* <br>
* This will also delete all stale translog metadata files from remote except the latest basis the metadata file comparator.
*/
private void deleteStaleRemotePrimaryTerms(List<String> metadataFiles) {
protected static void deleteStaleRemotePrimaryTerms(
List<String> metadataFiles,
TranslogTransferManager translogTransferManager,
Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap,
AtomicLong minPrimaryTermInRemoteAtomicLong,
Logger logger
) {
// The deletion of older translog files in remote store is on best-effort basis, there is a possibility that there
// are older files that are no longer needed and should be cleaned up. In here, we delete all files that are part
// of older primary term.
if (olderPrimaryCleaned.trySet(Boolean.TRUE)) {
if (metadataFiles.isEmpty()) {
logger.trace("No metadata is uploaded yet, returning from deleteStaleRemotePrimaryTerms");
return;
if (metadataFiles.isEmpty()) {
logger.trace("No metadata is uploaded yet, returning from deleteStaleRemotePrimaryTerms");
return;
}
Optional<Long> minPrimaryTermFromMetadataFiles = metadataFiles.stream().map(file -> {
try {
return getMinMaxPrimaryTermFromMetadataFile(file, translogTransferManager, oldFormatMetadataFilePrimaryTermMap).v1();
} catch (IOException e) {
return Long.MAX_VALUE;
}
}).min(Long::compareTo);
// First we delete all stale primary terms folders from remote store
Long minPrimaryTermInRemote = getMinPrimaryTermInRemote(minPrimaryTermInRemoteAtomicLong, translogTransferManager, logger);
if (minPrimaryTermFromMetadataFiles.get() > minPrimaryTermInRemote) {
translogTransferManager.deletePrimaryTermsAsync(minPrimaryTermFromMetadataFiles.get());
minPrimaryTermInRemoteAtomicLong.set(minPrimaryTermFromMetadataFiles.get());
} else {
logger.debug(
"Skipping primary term cleanup. minimumReferencedPrimaryTerm = {}, minPrimaryTermInRemote = {}",
minPrimaryTermFromMetadataFiles.get(),
minPrimaryTermInRemote
);
}
}

private static Long getMinPrimaryTermInRemote(
AtomicLong minPrimaryTermInRemote,
TranslogTransferManager translogTransferManager,
Logger logger
) {
if (minPrimaryTermInRemote.get() == Long.MAX_VALUE) {
try {
Set<Long> primaryTermsInRemote = translogTransferManager.listPrimaryTermsInRemote();
if (primaryTermsInRemote.isEmpty() == false) {
Optional<Long> minPrimaryTerm = primaryTermsInRemote.stream().min(Long::compareTo);
minPrimaryTerm.ifPresent(minPrimaryTermInRemote::set);
}
} catch (IOException e) {
logger.error("Exception while listing primary terms in remote translog", e);
}
}
return minPrimaryTermInRemote.get();
}

protected static Tuple<Long, Long> getMinMaxPrimaryTermFromMetadataFile(
String metadataFile,
TranslogTransferManager translogTransferManager,
Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap
) throws IOException {
Tuple<Long, Long> minMaxPrimaryTermFromFileName = TranslogTransferMetadata.getMinMaxPrimaryTermFromFilename(metadataFile);
if (minMaxPrimaryTermFromFileName != null) {
return minMaxPrimaryTermFromFileName;
} else {
if (oldFormatMetadataFilePrimaryTermMap.containsKey(metadataFile)) {
return oldFormatMetadataFilePrimaryTermMap.get(metadataFile);
} else {
TranslogTransferMetadata metadata = translogTransferManager.readMetadata(metadataFile);
long maxPrimaryTem = TranslogTransferMetadata.getPrimaryTermFromFileName(metadataFile);
long minPrimaryTem = -1;
if (metadata.getGenerationToPrimaryTermMapper() != null
&& metadata.getGenerationToPrimaryTermMapper().values().isEmpty() == false) {
Optional<Long> primaryTerm = metadata.getGenerationToPrimaryTermMapper()
.values()
.stream()
.map(s -> Long.parseLong(s))
.min(Long::compareTo);
if (primaryTerm.isPresent()) {
minPrimaryTem = primaryTerm.get();
}
}
Tuple<Long, Long> minMaxPrimaryTermTuple = new Tuple<>(minPrimaryTem, maxPrimaryTem);
oldFormatMetadataFilePrimaryTermMap.put(metadataFile, minMaxPrimaryTermTuple);
return minMaxPrimaryTermTuple;
}
Optional<Long> minPrimaryTerm = metadataFiles.stream()
.map(file -> RemoteStoreUtils.invertLong(file.split(METADATA_SEPARATOR)[1]))
.min(Long::compareTo);
// First we delete all stale primary terms folders from remote store
long minimumReferencedPrimaryTerm = minPrimaryTerm.get() - 1;
translogTransferManager.deletePrimaryTermsAsync(minimumReferencedPrimaryTerm);
}
}

public static void cleanup(TranslogTransferManager translogTransferManager) throws IOException {
ActionListener<List<BlobMetadata>> listMetadataFilesListener = new ActionListener<>() {
@Override
public void onResponse(List<BlobMetadata> blobMetadata) {
List<String> metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList());

try {
if (metadataFiles.isEmpty()) {
staticLogger.debug("No stale translog metadata files found");
return;
}
List<String> metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles, new HashMap<>(), staticLogger);
if (metadataFilesToBeDeleted.isEmpty()) {
staticLogger.debug("No metadata files to delete");
return;
}
staticLogger.debug(() -> "metadataFilesToBeDeleted = " + metadataFilesToBeDeleted);

// For all the files that we are keeping, fetch min and max generations
List<String> metadataFilesNotToBeDeleted = new ArrayList<>(metadataFiles);
metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted);
staticLogger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted);

// Delete stale metadata files
translogTransferManager.deleteMetadataFilesAsync(metadataFilesToBeDeleted, () -> {});

// Delete stale primary terms
deleteStaleRemotePrimaryTerms(
metadataFilesNotToBeDeleted,
translogTransferManager,
new HashMap<>(),
new AtomicLong(Long.MAX_VALUE),
staticLogger
);
} catch (Exception e) {
staticLogger.error("Exception while cleaning up metadata and primary terms", e);
}
}

@Override
public void onFailure(Exception e) {
staticLogger.error("Exception while cleaning up metadata and primary terms", e);
}
};
translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,14 @@ public void onFailure(Exception e) {
});
}

public Set<Long> listPrimaryTermsInRemote() throws IOException {
Set<String> primaryTermsStr = transferService.listFolders(remoteDataTransferPath);
if (primaryTermsStr != null) {
return primaryTermsStr.stream().map(Long::parseLong).collect(Collectors.toSet());
}
return new HashSet<>();
}

/**
* Handles deletion of all translog files associated with a primary term.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

/**
* The metadata associated with every transfer {@link TransferSnapshot}. The metadata is uploaded at the end of the
Expand Down Expand Up @@ -108,11 +109,28 @@ public String getFileName() {
RemoteStoreUtils.invertLong(createdAt),
String.valueOf(Objects.hash(nodeId)),
RemoteStoreUtils.invertLong(minTranslogGeneration),
String.valueOf(getMinPrimaryTermReferred()),
String.valueOf(CURRENT_VERSION)
)
);
}

private long getMinPrimaryTermReferred() {
if (generationToPrimaryTermMapper.get() == null || generationToPrimaryTermMapper.get().values().isEmpty()) {
return -1;
}
Optional<Long> minPrimaryTerm = generationToPrimaryTermMapper.get()
.values()
.stream()
.map(s -> Long.parseLong(s))
.min(Long::compareTo);
if (minPrimaryTerm.isPresent()) {
return minPrimaryTerm.get();
} else {
return -1;
}
}

public static Tuple<Tuple<Long, Long>, String> getNodeIdByPrimaryTermAndGeneration(String filename) {
String[] tokens = filename.split(METADATA_SEPARATOR);
if (tokens.length < 6) {
Expand Down Expand Up @@ -143,15 +161,43 @@ public static Tuple<Long, Long> getMinMaxTranslogGenerationFromFilename(String f
assert Version.CURRENT.onOrAfter(Version.V_2_17_0);
try {
// instead of direct index, we go backwards to avoid running into same separator in nodeId
String minGeneration = tokens[tokens.length - 2];
String minGeneration = tokens[tokens.length - 3];
String maxGeneration = tokens[2];
return new Tuple<>(RemoteStoreUtils.invertLong(minGeneration), RemoteStoreUtils.invertLong(maxGeneration));
} catch (NumberFormatException e) {
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("Exception while getting min and max translog generation from: {}", filename), e);
return null;
}
}

public static Tuple<Long, Long> getMinMaxPrimaryTermFromFilename(String filename) {
String[] tokens = filename.split(METADATA_SEPARATOR);
if (tokens.length < 7) {
// For versions < 2.17, we don't have min primary term.
return null;
}
assert Version.CURRENT.onOrAfter(Version.V_2_17_0);
try {
// instead of direct index, we go backwards to avoid running into same separator in nodeId
String minPrimaryTerm = tokens[tokens.length - 2];
String maxPrimaryTerm = tokens[1];
return new Tuple<>(Long.parseLong(minPrimaryTerm), RemoteStoreUtils.invertLong(maxPrimaryTerm));
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("Exception while getting min and max primary term from: {}", filename), e);
return null;
}
}

public static long getPrimaryTermFromFileName(String filename) {
String[] tokens = filename.split(METADATA_SEPARATOR);
try {
return RemoteStoreUtils.invertLong(tokens[1]);
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("Exception while getting max primary term from: {}", filename), e);
return -1;
}
}

@Override
public int hashCode() {
return Objects.hash(primaryTerm, generation);
Expand Down
Loading

0 comments on commit 7b0846e

Please sign in to comment.