diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index a633cb0787f27..23903e5a9d110 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -11,7 +11,6 @@ import org.apache.logging.log4j.Logger; import org.opensearch.common.SetOnce; import org.opensearch.common.lease.Releasable; -import org.opensearch.common.lease.Releasables; import org.opensearch.common.logging.Loggers; import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.common.util.io.IOUtils; @@ -60,7 +59,6 @@ public class RemoteFsTranslog extends Translog { private final BooleanSupplier primaryModeSupplier; private final RemoteTranslogTransferTracker remoteTranslogTransferTracker; private volatile long maxRemoteTranslogGenerationUploaded; - private final Object uploadMutex = new Object(); private volatile long minSeqNoToKeep; @@ -239,19 +237,10 @@ public static TranslogTransferManager buildTranslogTransferManager( @Override public boolean ensureSynced(Location location) throws IOException { try { - boolean shouldUpload = false; - try (ReleasableLock ignored = writeLock.acquire()) { - assert location.generation <= current.getGeneration(); - if (location.generation == current.getGeneration()) { - ensureOpen(); - if (prepareForUpload(location.generation) == false) { - return false; - } - shouldUpload = true; - } - } - if (shouldUpload) { - return performUpload(primaryTermSupplier.getAsLong(), location.generation); + assert location.generation <= current.getGeneration(); + if (location.generation == current.getGeneration()) { + ensureOpen(); + return prepareAndUpload(primaryTermSupplier.getAsLong(), location.generation); } } catch (final Exception ex) { closeOnTragicEvent(ex); @@ -266,12 +255,10 @@ public void rollGeneration() throws IOException { if (current.totalOperations() == 0 && primaryTermSupplier.getAsLong() == current.getPrimaryTerm()) { return; } - if (prepareForUpload(null)) { - performUpload(primaryTermSupplier.getAsLong(), null); - } + prepareAndUpload(primaryTermSupplier.getAsLong(), null); } - private boolean prepareForUpload(Long generation) throws IOException { + private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOException { try (Releasable ignored = writeLock.acquire()) { if (generation == null || generation == current.getGeneration()) { try { @@ -282,46 +269,36 @@ private boolean prepareForUpload(Long generation) throws IOException { logger.trace("Creating new writer for gen: [{}]", current.getGeneration() + 1); current = createWriter(current.getGeneration() + 1); } + assert writeLock.isHeldByCurrentThread() : "Write lock must be held before we acquire the read lock"; + // Here we are downgrading the write lock by acquiring the read lock and releasing the write lock + // This ensures that other threads can still acquire the read locks while also protecting the + // readers and writer to not be mutated any further. + readLock.acquire(); } catch (final Exception e) { tragedy.setTragicException(e); closeOnTragicEvent(e); throw e; } - return true; - } else return generation >= current.getGeneration(); + } else if (generation < current.getGeneration()) { + return false; + } } - } - /** - * This method does the remote store upload by first acquiring the lock on the uploadMutex monitor. The synchronized - * is required to restrict multiple uploads happening concurrently. The read lock is required to ensure that the - * underlying translog readers are not deleted and the current writer is not converted to a reader at the time of - * upload. - * - * @param primaryTerm current primary term - * @param generation current generation - * @return true if upload is successful - * @throws IOException if the upload fails due to any underlying exceptions. - */ - private boolean performUpload(Long primaryTerm, Long generation) throws IOException { - synchronized (uploadMutex) { - try (Releasable ignored = readLock.acquire()) { - // Do we need remote writes in sync fashion ? - // If we don't , we should swallow FileAlreadyExistsException while writing to remote store - // and also verify for same during primary-primary relocation - // Writing remote in sync fashion doesn't hurt as global ckp update - // is not updated in remote translog except in primary to primary recovery. - long generationToUpload; - if (generation == null) { - if (closed.get() == false) { - generationToUpload = current.getGeneration() - 1; - } else { - generationToUpload = current.getGeneration(); - } + assert readLock.isHeldByCurrentThread() == true; + try (Releasable ignored = readLock; Releasable ignoredGenLock = deletionPolicy.acquireTranslogGen(getMinFileGeneration())) { + // Do we need remote writes in sync fashion ? + // If we don't , we should swallow FileAlreadyExistsException while writing to remote store + // and also verify for same during primary-primary relocation + // Writing remote in sync fashion doesn't hurt as global ckp update + // is not updated in remote translog except in primary to primary recovery. + if (generation == null) { + if (closed.get() == false) { + return upload(primaryTerm, current.getGeneration() - 1); } else { - generationToUpload = generation; + return upload(primaryTerm, current.getGeneration()); } - return upload(primaryTerm, generationToUpload); + } else { + return upload(primaryTerm, generation); } } } @@ -347,10 +324,9 @@ private boolean upload(Long primaryTerm, Long generation) throws IOException { Translog::getCommitCheckpointFileName ).build() ) { - Releasable transferReleasable = Releasables.wrap(deletionPolicy.acquireTranslogGen(getMinFileGeneration())); return translogTransferManager.transferSnapshot( transferSnapshotProvider, - new RemoteFsTranslogTransferListener(transferReleasable, generation, primaryTerm) + new RemoteFsTranslogTransferListener(generation, primaryTerm) ); } @@ -373,8 +349,8 @@ private boolean syncToDisk() throws IOException { @Override public void sync() throws IOException { try { - if ((syncToDisk() || syncNeeded()) && prepareForUpload(null)) { - performUpload(primaryTermSupplier.getAsLong(), null); + if (syncToDisk() || syncNeeded()) { + prepareAndUpload(primaryTermSupplier.getAsLong(), null); } } catch (final Exception e) { tragedy.setTragicException(e); @@ -529,16 +505,17 @@ protected void onDelete() { translogTransferManager.delete(); } + // Visible for testing + boolean isRemoteGenerationDeletionPermitsAvailable() { + return remoteGenerationDeletionPermits.availablePermits() == REMOTE_DELETION_PERMITS; + } + /** * TranslogTransferListener implementation for RemoteFsTranslog * * @opensearch.internal */ private class RemoteFsTranslogTransferListener implements TranslogTransferListener { - /** - * Releasable instance for the translog - */ - private final Releasable transferReleasable; /** * Generation for the translog @@ -550,8 +527,7 @@ private class RemoteFsTranslogTransferListener implements TranslogTransferListen */ private final Long primaryTerm; - RemoteFsTranslogTransferListener(Releasable transferReleasable, Long generation, Long primaryTerm) { - this.transferReleasable = transferReleasable; + RemoteFsTranslogTransferListener(Long generation, Long primaryTerm) { this.generation = generation; this.primaryTerm = primaryTerm; } @@ -571,10 +547,5 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) thro throw (RuntimeException) ex; } } - - @Override - public void close() { - transferReleasable.close(); - } } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java index eb0eebb564b63..10dec13c81e1a 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java @@ -164,6 +164,7 @@ public TranslogCheckpointTransferSnapshot build() throws IOException { translogTransferSnapshot.setMinTranslogGeneration(highestGenMinTranslogGeneration); assert this.primaryTerm == highestGenPrimaryTerm : "inconsistent primary term"; + assert this.generation == highestGeneration : " inconsistent generation "; final long finalHighestGeneration = highestGeneration; assert LongStream.iterate(lowestGeneration, i -> i + 1) .limit(highestGeneration) diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index fe6b5dab9937b..fd4936603671c 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -109,7 +109,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans long prevUploadBytesSucceeded = remoteTranslogTransferTracker.getUploadBytesSucceeded(); long prevUploadTimeInMillis = remoteTranslogTransferTracker.getTotalUploadTimeInMillis(); - try (translogTransferListener) { + try { toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileSnapshots())); toUpload.addAll(fileTransferTracker.exclusionFilter((transferSnapshot.getCheckpointFileSnapshots()))); if (toUpload.isEmpty()) { diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java b/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java index 8805c16298d96..132d1adf916da 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java @@ -17,7 +17,7 @@ * * @opensearch.internal */ -public interface TranslogTransferListener extends AutoCloseable { +public interface TranslogTransferListener { /** * Invoked when the transfer of {@link TransferSnapshot} succeeds * @param transferSnapshot the transfer snapshot diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java index 3c654818ffc6c..233d6f319b797 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java @@ -620,6 +620,7 @@ public void testSimpleOperationsUpload() throws Exception { translog.setMinSeqNoToKeep(2); + assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); translog.trimUnreferencedReaders(); assertEquals(1, translog.readers.size()); assertEquals(1, translog.stats().estimatedNumberOfOperations()); diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index 0987201eb8602..6fc4557a75675 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -168,9 +168,6 @@ public void onUploadComplete(TransferSnapshot transferSnapshot) { public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) { translogTransferFailed.incrementAndGet(); } - - @Override - public void close() {} })); assertEquals(4, fileTransferSucceeded.get()); assertEquals(0, fileTransferFailed.get());