Skip to content

Commit

Permalink
[Remote Translog] Trimming based on remote segment upload and cleanin…
Browse files Browse the repository at this point in the history
…g older tlog files (opensearch-project#5662) (opensearch-project#5792)

* RemoteFSTranslog Trimming and GC Logic

Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
  • Loading branch information
sachinpkale authored Jan 10, 2023
1 parent 3210fa8 commit 36a4727
Show file tree
Hide file tree
Showing 15 changed files with 140 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ public void trimUnreferencedTranslogFiles() throws TranslogException {
store.decRef();
}
}

@Override
public void setMinSeqNoToKeep(long seqNo) {}
};
} catch (IOException ex) {
throw new RuntimeException(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ public void afterRefresh(boolean didRefresh) {
.filter(file -> !localSegmentsPostRefresh.contains(file))
.collect(Collectors.toSet())
.forEach(localSegmentChecksumMap::remove);
final long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine())
.lastRefreshedCheckpoint();
((InternalEngine) indexShard.getEngine()).translogManager()
.setMinSeqNoToKeep(lastRefreshedCheckpoint + 1);
}
}
} catch (EngineException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,11 @@ public void ensureCanFlush() {
}
}

@Override
public void setMinSeqNoToKeep(long seqNo) {
translog.setMinSeqNoToKeep(seqNo);
}

/**
* Reads operations from the translog
* @param location
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws T
@Override
public void ensureCanFlush() {}

@Override
public void setMinSeqNoToKeep(long seqNo) {}

@Override
public int restoreLocalHistoryFromTranslog(long processedCheckpoint, TranslogRecoveryRunner translogRecoveryRunner) throws IOException {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class RemoteFsTranslog extends Translog {
private final FileTransferTracker fileTransferTracker;
private volatile long maxRemoteTranslogGenerationUploaded;

private volatile long minSeqNoToKeep;

public RemoteFsTranslog(
TranslogConfig config,
String translogUUID,
Expand Down Expand Up @@ -282,4 +284,42 @@ public void close() throws IOException {
}
}
}

protected long getMinReferencedGen() throws IOException {
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
long minReferencedGen = Math.min(
deletionPolicy.minTranslogGenRequired(readers, current),
minGenerationForSeqNo(Math.min(deletionPolicy.getLocalCheckpointOfSafeCommit() + 1, minSeqNoToKeep), current, readers)
);
assert minReferencedGen >= getMinFileGeneration() : "deletion policy requires a minReferenceGen of ["
+ minReferencedGen
+ "] but the lowest gen available is ["
+ getMinFileGeneration()
+ "]";
assert minReferencedGen <= currentFileGeneration() : "deletion policy requires a minReferenceGen of ["
+ minReferencedGen
+ "] which is higher than the current generation ["
+ currentFileGeneration()
+ "]";
return minReferencedGen;
}

protected void setMinSeqNoToKeep(long seqNo) {
if (seqNo < this.minSeqNoToKeep) {
throw new IllegalArgumentException(
"min seq number required can't go backwards: " + "current [" + this.minSeqNoToKeep + "] new [" + seqNo + "]"
);
}
this.minSeqNoToKeep = seqNo;
}

@Override
void deleteReaderFiles(TranslogReader reader) {
try {
translogTransferManager.deleteTranslog(primaryTermSupplier.getAsLong(), reader.generation);
} catch (IOException ignored) {
logger.error("Exception {} while deleting generation {}", ignored, reader.generation);
}
super.deleteReaderFiles(reader);
}
}
10 changes: 8 additions & 2 deletions server/src/main/java/org/opensearch/index/translog/Translog.java
Original file line number Diff line number Diff line change
Expand Up @@ -1684,7 +1684,7 @@ public TranslogGeneration getMinGenerationForSeqNo(final long seqNo) {
}
}

private static long minGenerationForSeqNo(long seqNo, TranslogWriter writer, List<TranslogReader> readers) {
static long minGenerationForSeqNo(long seqNo, TranslogWriter writer, List<TranslogReader> readers) {
long minGen = writer.generation;
for (final TranslogReader reader : readers) {
if (seqNo <= reader.getCheckpoint().maxEffectiveSeqNo()) {
Expand Down Expand Up @@ -1781,7 +1781,7 @@ public void trimUnreferencedReaders() throws IOException {
}
}

private long getMinReferencedGen() throws IOException {
protected long getMinReferencedGen() throws IOException {
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
long minReferencedGen = Math.min(
deletionPolicy.minTranslogGenRequired(readers, current),
Expand All @@ -1800,6 +1800,12 @@ private long getMinReferencedGen() throws IOException {
return minReferencedGen;
}

/*
Min Seq number required in translog to restore the complete data .
This might be required when segments are persisted via other mechanism than flush.
*/
protected void setMinSeqNoToKeep(long seqNo) {}

/**
* deletes all files associated with a reader. package-private to be able to simulate node failures at this point
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,11 @@ public interface TranslogManager {
* Checks if the translog has a pending recovery
*/
void ensureCanFlush();

/**
*
* @param seqNo : operations greater or equal to seqNo should be persisted
* This might be required when segments are persisted via other mechanism than flush.
*/
void setMinSeqNoToKeep(long seqNo);
}
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ synchronized boolean assertNoSeqAbove(long belowTerm, long aboveSeqNo) {
*
* Note: any exception during the sync process will be interpreted as a tragic exception and the writer will be closed before
* raising the exception.
* @return
* @return <code>true</code> if this call caused an actual sync operation
*/
public boolean sync() throws IOException {
return syncUpTo(Long.MAX_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;

Expand Down Expand Up @@ -75,6 +76,11 @@ public InputStream downloadBlob(Iterable<String> path, String fileName) throws I
return blobStore.blobContainer((BlobPath) path).readBlob(fileName);
}

@Override
public void deleteBlobs(Iterable<String> path, List<String> fileNames) throws IOException {
blobStore.blobContainer((BlobPath) path).deleteBlobsIgnoringIfNotExists(fileNames);
}

@Override
public Set<String> listAll(Iterable<String> path) throws IOException {
return blobStore.blobContainer((BlobPath) path).listBlobs().keySet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) {
add(fileSnapshot.getName(), TransferState.FAILED);
}

@Override
public void onDelete(String name) {
fileTransferTracker.remove(name);
}

public Set<TransferFileSnapshot> exclusionFilter(Set<TransferFileSnapshot> original) {
return original.stream()
.filter(fileSnapshot -> fileTransferTracker.get(fileSnapshot.getName()) != TransferState.SUCCESS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Set;

/**
Expand Down Expand Up @@ -42,6 +43,8 @@ void uploadBlobAsync(
*/
void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable<String> remotePath) throws IOException;

void deleteBlobs(Iterable<String> path, List<String> fileNames) throws IOException;

/**
* Lists the files
* @param path : the path to list
Expand All @@ -52,8 +55,8 @@ void uploadBlobAsync(

/**
*
* @param path
* @param fileName
* @param path the remote path from where download should be made
* @param fileName the name of the file
* @return inputstream of the remote file
* @throws IOException the exception while reading the data
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,4 +194,15 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot)
translogTransferMetadata.getPrimaryTerm()
);
}

public void deleteTranslog(long primaryTerm, long generation) throws IOException {
String ckpFileName = Translog.getCommitCheckpointFileName(generation);
String translogFilename = Translog.getFilename(generation);
// ToDo - Take care of metadata file cleanup
// https://github.com/opensearch-project/OpenSearch/issues/5677
fileTransferTracker.onDelete(ckpFileName);
fileTransferTracker.onDelete(translogFilename);
List<String> files = List.of(ckpFileName, translogFilename);
transferService.deleteBlobs(remoteBaseTransferPath.add(String.valueOf(primaryTerm)), files);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;

/**
* The listener to be invoked on the completion or failure of a {@link TransferFileSnapshot}
* The listener to be invoked on the completion or failure of a {@link TransferFileSnapshot} or deletion of file
*
* @opensearch.internal
*/
Expand All @@ -29,4 +29,6 @@ public interface FileTransferListener {
* @param e the exception while processing the {@link TransferFileSnapshot}
*/
void onFailure(TransferFileSnapshot fileSnapshot, Exception e);

void onDelete(String name);
}
Original file line number Diff line number Diff line change
Expand Up @@ -448,13 +448,13 @@ public void testSimpleOperationsUpload() throws IOException {
assertThat(snapshot.totalOperations(), equalTo(ops.size()));
}

assertEquals(translog.allUploaded().size(), 4);
assertEquals(translog.allUploaded().size(), 2);

addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 1, primaryTerm.get(), new byte[] { 1 }));
assertEquals(translog.allUploaded().size(), 6);
assertEquals(translog.allUploaded().size(), 4);

translog.rollGeneration();
assertEquals(translog.allUploaded().size(), 6);
assertEquals(translog.allUploaded().size(), 4);

Set<String> mdFiles = blobStoreTransferService.listAll(
repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add("metadata")
Expand Down Expand Up @@ -495,6 +495,38 @@ public void testSimpleOperationsUpload() throws IOException {
assertArrayEquals(ckp, content);
}
}

// expose the new checkpoint (simulating a commit), before we trim the translog
translog.deletionPolicy.setLocalCheckpointOfSafeCommit(0);
// simulating the remote segment upload .
translog.setMinSeqNoToKeep(0);
// This should not trim anything
translog.trimUnreferencedReaders();
assertEquals(translog.allUploaded().size(), 4);
assertEquals(
blobStoreTransferService.listAll(
repository.basePath()
.add(shardId.getIndex().getUUID())
.add(String.valueOf(shardId.id()))
.add(String.valueOf(primaryTerm.get()))
).size(),
4
);

// This should trim tlog-2.* files as it contains seq no 0
translog.setMinSeqNoToKeep(1);
translog.trimUnreferencedReaders();
assertEquals(translog.allUploaded().size(), 2);
assertEquals(
blobStoreTransferService.listAll(
repository.basePath()
.add(shardId.getIndex().getUUID())
.add(String.valueOf(shardId.id()))
.add(String.valueOf(primaryTerm.get()))
).size(),
2
);

}

private Long populateTranslogOps(boolean withMissingOps) throws IOException {
Expand Down Expand Up @@ -684,6 +716,7 @@ public void doRun() throws BrokenBarrierException, InterruptedException, IOExcep
// expose the new checkpoint (simulating a commit), before we trim the translog
lastCommittedLocalCheckpoint.set(localCheckpoint);
deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
translog.setMinSeqNoToKeep(localCheckpoint + 1);
translog.trimUnreferencedReaders();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ public void onSuccess(TransferFileSnapshot fileSnapshot) {
public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) {
fileTransferFailed.incrementAndGet();
}

@Override
public void onDelete(String name) {}
}
);

Expand Down

0 comments on commit 36a4727

Please sign in to comment.