Skip to content

Commit

Permalink
Optimize read write lock constructs during translog upload to remote …
Browse files Browse the repository at this point in the history
…store (opensearch-project#9636)

Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Sep 18, 2023
1 parent d7aa6dd commit b5cc002
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class RemoteFsTranslog extends Translog {
private final BooleanSupplier primaryModeSupplier;
private final RemoteTranslogTransferTracker remoteTranslogTransferTracker;
private volatile long maxRemoteTranslogGenerationUploaded;
private final Object uploadMutex = new Object();

private volatile long minSeqNoToKeep;

Expand Down Expand Up @@ -237,11 +238,20 @@ public static TranslogTransferManager buildTranslogTransferManager(

@Override
public boolean ensureSynced(Location location) throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
assert location.generation <= current.getGeneration();
if (location.generation == current.getGeneration()) {
ensureOpen();
return prepareAndUpload(primaryTermSupplier.getAsLong(), location.generation);
try {
boolean shouldUpload = false;
try (ReleasableLock ignored = writeLock.acquire()) {
assert location.generation <= current.getGeneration();
if (location.generation == current.getGeneration()) {
ensureOpen();
if (prepareForUpload(location.generation) == false) {
return false;
}
shouldUpload = true;
}
}
if (shouldUpload) {
return performUpload(primaryTermSupplier.getAsLong(), location.generation);
}
} catch (final Exception ex) {
closeOnTragicEvent(ex);
Expand All @@ -256,10 +266,12 @@ public void rollGeneration() throws IOException {
if (current.totalOperations() == 0 && primaryTermSupplier.getAsLong() == current.getPrimaryTerm()) {
return;
}
prepareAndUpload(primaryTermSupplier.getAsLong(), null);
if (prepareForUpload(null)) {
performUpload(primaryTermSupplier.getAsLong(), null);
}
}

private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOException {
private boolean prepareForUpload(Long generation) throws IOException {
try (Releasable ignored = writeLock.acquire()) {
if (generation == null || generation == current.getGeneration()) {
try {
Expand All @@ -275,23 +287,41 @@ private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOExc
closeOnTragicEvent(e);
throw e;
}
} else if (generation < current.getGeneration()) {
return false;
}
return true;
} else return generation >= current.getGeneration();
}
}

// Do we need remote writes in sync fashion ?
// If we don't , we should swallow FileAlreadyExistsException while writing to remote store
// and also verify for same during primary-primary relocation
// Writing remote in sync fashion doesn't hurt as global ckp update
// is not updated in remote translog except in primary to primary recovery.
if (generation == null) {
if (closed.get() == false) {
return upload(primaryTerm, current.getGeneration() - 1);
/**
* This method does the remote store upload by first acquiring the lock on the uploadMutex monitor. The synchronized
* is required to restrict multiple uploads happening concurrently. The read lock is required to ensure that the
* underlying translog readers are not deleted and the current writer is not converted to a reader at the time of
* upload.
*
* @param primaryTerm current primary term
* @param generation current generation
* @return true if upload is successful
* @throws IOException if the upload fails due to any underlying exceptions.
*/
private boolean performUpload(Long primaryTerm, Long generation) throws IOException {
synchronized (uploadMutex) {
try (Releasable ignored = readLock.acquire()) {
// Do we need remote writes in sync fashion ?
// If we don't , we should swallow FileAlreadyExistsException while writing to remote store
// and also verify for same during primary-primary relocation
// Writing remote in sync fashion doesn't hurt as global ckp update
// is not updated in remote translog except in primary to primary recovery.
long generationToUpload;
if (generation == null) {
if (closed.get() == false) {
generationToUpload = current.getGeneration() - 1;
} else {
generationToUpload = current.getGeneration();
}
} else {
return upload(primaryTerm, current.getGeneration());
generationToUpload = generation;
}
} else {
return upload(primaryTerm, generation);
return upload(primaryTerm, generationToUpload);
}
}
}
Expand Down Expand Up @@ -343,8 +373,8 @@ private boolean syncToDisk() throws IOException {
@Override
public void sync() throws IOException {
try {
if (syncToDisk() || syncNeeded()) {
prepareAndUpload(primaryTermSupplier.getAsLong(), null);
if ((syncToDisk() || syncNeeded()) && prepareForUpload(null)) {
performUpload(primaryTermSupplier.getAsLong(), null);
}
} catch (final Exception e) {
tragedy.setTragicException(e);
Expand Down Expand Up @@ -528,22 +558,23 @@ private class RemoteFsTranslogTransferListener implements TranslogTransferListen

@Override
public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOException {
transferReleasable.close();
closeFilesIfNoPendingRetentionLocks();
maxRemoteTranslogGenerationUploaded = generation;
minRemoteGenReferenced = getMinFileGeneration();
logger.trace("uploaded translog for {} {} ", primaryTerm, generation);
}

@Override
public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) throws IOException {
transferReleasable.close();
closeFilesIfNoPendingRetentionLocks();
if (ex instanceof IOException) {
throw (IOException) ex;
} else {
throw (RuntimeException) ex;
}
}

@Override
public void close() {
transferReleasable.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ public TranslogCheckpointTransferSnapshot build() throws IOException {
translogTransferSnapshot.setMinTranslogGeneration(highestGenMinTranslogGeneration);

assert this.primaryTerm == highestGenPrimaryTerm : "inconsistent primary term";
assert this.generation == highestGeneration : " inconsistent generation ";
final long finalHighestGeneration = highestGeneration;
assert LongStream.iterate(lowestGeneration, i -> i + 1)
.limit(highestGeneration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans
long prevUploadBytesSucceeded = remoteTranslogTransferTracker.getUploadBytesSucceeded();
long prevUploadTimeInMillis = remoteTranslogTransferTracker.getTotalUploadTimeInMillis();

try {
try (translogTransferListener) {
toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileSnapshots()));
toUpload.addAll(fileTransferTracker.exclusionFilter((transferSnapshot.getCheckpointFileSnapshots())));
if (toUpload.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*
* @opensearch.internal
*/
public interface TranslogTransferListener {
public interface TranslogTransferListener extends AutoCloseable {
/**
* Invoked when the transfer of {@link TransferSnapshot} succeeds
* @param transferSnapshot the transfer snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@
import static org.mockito.Mockito.when;

@LuceneTestCase.SuppressFileSystems("ExtrasFS")

public class RemoteFsTranslogTests extends OpenSearchTestCase {

protected final ShardId shardId = new ShardId("index", "_na_", 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ public void onUploadComplete(TransferSnapshot transferSnapshot) {
public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) {
translogTransferFailed.incrementAndGet();
}

@Override
public void close() {}
}));
assertEquals(4, fileTransferSucceeded.get());
assertEquals(0, fileTransferFailed.get());
Expand Down

0 comments on commit b5cc002

Please sign in to comment.