Skip to content

Commit

Permalink
Fix deletion permits flow in RemoteFsTimestampAwareTranslog (#16282)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: Sachin Kale <sachinpkale@gmail.com>
(cherry picked from commit 23d1c7a)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Oct 15, 2024
1 parent b68fff8 commit ce20379
Show file tree
Hide file tree
Showing 5 changed files with 317 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,21 +215,42 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
logger.debug(() -> "generationsToBeDeleted = " + generationsToBeDeleted);
if (generationsToBeDeleted.isEmpty() == false) {
// Delete stale generations
translogTransferManager.deleteGenerationAsync(
primaryTermSupplier.getAsLong(),
generationsToBeDeleted,
remoteGenerationDeletionPermits::release
);
try {
translogTransferManager.deleteGenerationAsync(
primaryTermSupplier.getAsLong(),
generationsToBeDeleted,
remoteGenerationDeletionPermits::release
);
} catch (Exception e) {
logger.error("Exception in delete generations flow", e);

Check warning on line 225 in server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java#L224-L225

Added lines #L224 - L225 were not covered by tests
// Release permit that is meant for metadata files and return
remoteGenerationDeletionPermits.release();

Check warning on line 227 in server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java#L227

Added line #L227 was not covered by tests
assert remoteGenerationDeletionPermits.availablePermits() == REMOTE_DELETION_PERMITS : "Available permits "
+ remoteGenerationDeletionPermits.availablePermits()

Check warning on line 229 in server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java#L229

Added line #L229 was not covered by tests
+ " is not equal to "
+ REMOTE_DELETION_PERMITS;
return;

Check warning on line 232 in server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java#L232

Added line #L232 was not covered by tests
}
} else {
remoteGenerationDeletionPermits.release();
}

if (metadataFilesToBeDeleted.isEmpty() == false) {
// Delete stale metadata files
translogTransferManager.deleteMetadataFilesAsync(
metadataFilesToBeDeleted,
remoteGenerationDeletionPermits::release
);
try {
translogTransferManager.deleteMetadataFilesAsync(
metadataFilesToBeDeleted,
remoteGenerationDeletionPermits::release
);
} catch (Exception e) {
logger.error("Exception in delete metadata files flow", e);

Check warning on line 246 in server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java#L245-L246

Added lines #L245 - L246 were not covered by tests
// Permits is already released by deleteMetadataFilesAsync
assert remoteGenerationDeletionPermits.availablePermits() == REMOTE_DELETION_PERMITS : "Available permits "
+ remoteGenerationDeletionPermits.availablePermits()

Check warning on line 249 in server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java#L249

Added line #L249 was not covered by tests
+ " is not equal to "
+ REMOTE_DELETION_PERMITS;
return;

Check warning on line 252 in server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java#L252

Added line #L252 was not covered by tests
}

// Update cache to keep only those metadata files that are not getting deleted
oldFormatMetadataFileGenerationMap.keySet().retainAll(metadataFilesNotToBeDeleted);
Expand All @@ -240,7 +261,12 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
remoteGenerationDeletionPermits.release();
}
} catch (Exception e) {
logger.error("Exception in trimUnreferencedReaders", e);

Check warning on line 264 in server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java#L264

Added line #L264 was not covered by tests
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
assert remoteGenerationDeletionPermits.availablePermits() == REMOTE_DELETION_PERMITS : "Available permits "
+ remoteGenerationDeletionPermits.availablePermits()

Check warning on line 267 in server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java#L267

Added line #L267 was not covered by tests
+ " is not equal to "
+ REMOTE_DELETION_PERMITS;
}
}

Expand Down Expand Up @@ -441,7 +467,8 @@ protected static void deleteStaleRemotePrimaryTerms(
}
Optional<Long> minPrimaryTermFromMetadataFiles = metadataFilesNotToBeDeleted.stream().map(file -> {
try {
return getMinMaxPrimaryTermFromMetadataFile(file, translogTransferManager, oldFormatMetadataFilePrimaryTermMap).v1();
return getMinMaxPrimaryTermFromMetadataFile(file, translogTransferManager, oldFormatMetadataFilePrimaryTermMap, logger)
.v1();
} catch (IOException e) {
return Long.MIN_VALUE;
}
Expand Down Expand Up @@ -482,7 +509,8 @@ protected static Long getMinPrimaryTermInRemote(
protected static Tuple<Long, Long> getMinMaxPrimaryTermFromMetadataFile(
String metadataFile,
TranslogTransferManager translogTransferManager,
Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap
Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap,
Logger logger
) throws IOException {
Tuple<Long, Long> minMaxPrimaryTermFromFileName = TranslogTransferMetadata.getMinMaxPrimaryTermFromFilename(metadataFile);
if (minMaxPrimaryTermFromFileName != null) {
Expand All @@ -504,6 +532,8 @@ protected static Tuple<Long, Long> getMinMaxPrimaryTermFromMetadataFile(
if (primaryTerm.isPresent()) {
minPrimaryTem = primaryTerm.get();
}
} else {
logger.warn("No primary term found from GenerationToPrimaryTermMap for file [{}]", metadataFile);
}
Tuple<Long, Long> minMaxPrimaryTermTuple = new Tuple<>(minPrimaryTem, maxPrimaryTem);
oldFormatMetadataFilePrimaryTermMap.put(metadataFile, minMaxPrimaryTermTuple);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,14 @@ protected void trimUnreferencedReaders(boolean onlyTrimLocal) throws IOException
generationsToDelete.add(generation);
}
if (generationsToDelete.isEmpty() == false) {
deleteRemoteGeneration(generationsToDelete);
try {
deleteRemoteGeneration(generationsToDelete);
} catch (Exception e) {
logger.error("Exception in delete generations flow", e);

Check warning on line 596 in server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java#L595-L596

Added lines #L595 - L596 were not covered by tests
// Release permit that is meant for metadata files and return
remoteGenerationDeletionPermits.release();
return;

Check warning on line 599 in server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java#L598-L599

Added lines #L598 - L599 were not covered by tests
}
translogTransferManager.deleteStaleTranslogMetadataFilesAsync(remoteGenerationDeletionPermits::release);
deleteStaleRemotePrimaryTerms();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,19 +496,24 @@ public byte[] getMetadataBytes(TranslogTransferMetadata metadata) throws IOExcep
* @param onCompletion runnable to run on completion of deletion regardless of success/failure.
*/
public void deleteGenerationAsync(long primaryTerm, Set<Long> generations, Runnable onCompletion) {
List<String> translogFiles = new ArrayList<>();
generations.forEach(generation -> {
// Add .ckp and .tlog file to translog file list which is located in basePath/<primaryTerm>
String ckpFileName = Translog.getCommitCheckpointFileName(generation);
String translogFileName = Translog.getFilename(generation);
if (isTranslogMetadataEnabled == false) {
translogFiles.addAll(List.of(ckpFileName, translogFileName));
} else {
translogFiles.add(translogFileName);
}
});
// Delete the translog and checkpoint files asynchronously
deleteTranslogFilesAsync(primaryTerm, translogFiles, onCompletion);
try {
List<String> translogFiles = new ArrayList<>();
generations.forEach(generation -> {
// Add .ckp and .tlog file to translog file list which is located in basePath/<primaryTerm>
String ckpFileName = Translog.getCommitCheckpointFileName(generation);
String translogFileName = Translog.getFilename(generation);
if (isTranslogMetadataEnabled == false) {
translogFiles.addAll(List.of(ckpFileName, translogFileName));
} else {
translogFiles.add(translogFileName);

Check warning on line 508 in server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java#L508

Added line #L508 was not covered by tests
}
});
// Delete the translog and checkpoint files asynchronously
deleteTranslogFilesAsync(primaryTerm, translogFiles, onCompletion);
} catch (Exception e) {
onCompletion.run();
throw e;

Check warning on line 515 in server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java#L513-L515

Added lines #L513 - L515 were not covered by tests
}
}

/**
Expand Down Expand Up @@ -658,37 +663,32 @@ public void deleteTranslogFiles() throws IOException {
* @param onCompletion runnable to run on completion of deletion regardless of success/failure.
*/
private void deleteTranslogFilesAsync(long primaryTerm, List<String> files, Runnable onCompletion) {
try {
transferService.deleteBlobsAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteDataTransferPath.add(String.valueOf(primaryTerm)),
files,
new ActionListener<>() {
@Override
public void onResponse(Void unused) {
fileTransferTracker.delete(files);
logger.trace("Deleted translogs for primaryTerm={} files={}", primaryTerm, files);
onCompletion.run();
}
transferService.deleteBlobsAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteDataTransferPath.add(String.valueOf(primaryTerm)),
files,
new ActionListener<>() {
@Override
public void onResponse(Void unused) {
fileTransferTracker.delete(files);
logger.trace("Deleted translogs for primaryTerm={} files={}", primaryTerm, files);
onCompletion.run();
}

@Override
public void onFailure(Exception e) {
onCompletion.run();
logger.error(
() -> new ParameterizedMessage(
"Exception occurred while deleting translog for primaryTerm={} files={}",
primaryTerm,
files
),
e
);
}
@Override
public void onFailure(Exception e) {
onCompletion.run();
logger.error(
() -> new ParameterizedMessage(
"Exception occurred while deleting translog for primaryTerm={} files={}",
primaryTerm,
files
),
e
);
}
);
} catch (Exception e) {
onCompletion.run();
throw e;
}
}
);
}

/**
Expand Down
Loading

0 comments on commit ce20379

Please sign in to comment.