diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index c488127857ed5..a633cb0787f27 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -60,6 +60,7 @@ public class RemoteFsTranslog extends Translog { private final BooleanSupplier primaryModeSupplier; private final RemoteTranslogTransferTracker remoteTranslogTransferTracker; private volatile long maxRemoteTranslogGenerationUploaded; + private final Object uploadMutex = new Object(); private volatile long minSeqNoToKeep; @@ -237,11 +238,20 @@ public static TranslogTransferManager buildTranslogTransferManager( @Override public boolean ensureSynced(Location location) throws IOException { - try (ReleasableLock ignored = writeLock.acquire()) { - assert location.generation <= current.getGeneration(); - if (location.generation == current.getGeneration()) { - ensureOpen(); - return prepareAndUpload(primaryTermSupplier.getAsLong(), location.generation); + try { + boolean shouldUpload = false; + try (ReleasableLock ignored = writeLock.acquire()) { + assert location.generation <= current.getGeneration(); + if (location.generation == current.getGeneration()) { + ensureOpen(); + if (prepareForUpload(location.generation) == false) { + return false; + } + shouldUpload = true; + } + } + if (shouldUpload) { + return performUpload(primaryTermSupplier.getAsLong(), location.generation); } } catch (final Exception ex) { closeOnTragicEvent(ex); @@ -256,10 +266,12 @@ public void rollGeneration() throws IOException { if (current.totalOperations() == 0 && primaryTermSupplier.getAsLong() == current.getPrimaryTerm()) { return; } - prepareAndUpload(primaryTermSupplier.getAsLong(), null); + if (prepareForUpload(null)) { + performUpload(primaryTermSupplier.getAsLong(), null); + } } - private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOException { + private boolean prepareForUpload(Long generation) throws IOException { try (Releasable ignored = writeLock.acquire()) { if (generation == null || generation == current.getGeneration()) { try { @@ -275,23 +287,41 @@ private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOExc closeOnTragicEvent(e); throw e; } - } else if (generation < current.getGeneration()) { - return false; - } + return true; + } else return generation >= current.getGeneration(); + } + } - // Do we need remote writes in sync fashion ? - // If we don't , we should swallow FileAlreadyExistsException while writing to remote store - // and also verify for same during primary-primary relocation - // Writing remote in sync fashion doesn't hurt as global ckp update - // is not updated in remote translog except in primary to primary recovery. - if (generation == null) { - if (closed.get() == false) { - return upload(primaryTerm, current.getGeneration() - 1); + /** + * This method does the remote store upload by first acquiring the lock on the uploadMutex monitor. The synchronized + * is required to restrict multiple uploads happening concurrently. The read lock is required to ensure that the + * underlying translog readers are not deleted and the current writer is not converted to a reader at the time of + * upload. + * + * @param primaryTerm current primary term + * @param generation current generation + * @return true if upload is successful + * @throws IOException if the upload fails due to any underlying exceptions. + */ + private boolean performUpload(Long primaryTerm, Long generation) throws IOException { + synchronized (uploadMutex) { + try (Releasable ignored = readLock.acquire()) { + // Do we need remote writes in sync fashion ? + // If we don't , we should swallow FileAlreadyExistsException while writing to remote store + // and also verify for same during primary-primary relocation + // Writing remote in sync fashion doesn't hurt as global ckp update + // is not updated in remote translog except in primary to primary recovery. + long generationToUpload; + if (generation == null) { + if (closed.get() == false) { + generationToUpload = current.getGeneration() - 1; + } else { + generationToUpload = current.getGeneration(); + } } else { - return upload(primaryTerm, current.getGeneration()); + generationToUpload = generation; } - } else { - return upload(primaryTerm, generation); + return upload(primaryTerm, generationToUpload); } } } @@ -343,8 +373,8 @@ private boolean syncToDisk() throws IOException { @Override public void sync() throws IOException { try { - if (syncToDisk() || syncNeeded()) { - prepareAndUpload(primaryTermSupplier.getAsLong(), null); + if ((syncToDisk() || syncNeeded()) && prepareForUpload(null)) { + performUpload(primaryTermSupplier.getAsLong(), null); } } catch (final Exception e) { tragedy.setTragicException(e); @@ -528,8 +558,6 @@ private class RemoteFsTranslogTransferListener implements TranslogTransferListen @Override public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOException { - transferReleasable.close(); - closeFilesIfNoPendingRetentionLocks(); maxRemoteTranslogGenerationUploaded = generation; minRemoteGenReferenced = getMinFileGeneration(); logger.trace("uploaded translog for {} {} ", primaryTerm, generation); @@ -537,13 +565,16 @@ public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOExcepti @Override public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) throws IOException { - transferReleasable.close(); - closeFilesIfNoPendingRetentionLocks(); if (ex instanceof IOException) { throw (IOException) ex; } else { throw (RuntimeException) ex; } } + + @Override + public void close() { + transferReleasable.close(); + } } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java index 10dec13c81e1a..eb0eebb564b63 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java @@ -164,7 +164,6 @@ public TranslogCheckpointTransferSnapshot build() throws IOException { translogTransferSnapshot.setMinTranslogGeneration(highestGenMinTranslogGeneration); assert this.primaryTerm == highestGenPrimaryTerm : "inconsistent primary term"; - assert this.generation == highestGeneration : " inconsistent generation "; final long finalHighestGeneration = highestGeneration; assert LongStream.iterate(lowestGeneration, i -> i + 1) .limit(highestGeneration) diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index fd4936603671c..fe6b5dab9937b 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -109,7 +109,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans long prevUploadBytesSucceeded = remoteTranslogTransferTracker.getUploadBytesSucceeded(); long prevUploadTimeInMillis = remoteTranslogTransferTracker.getTotalUploadTimeInMillis(); - try { + try (translogTransferListener) { toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileSnapshots())); toUpload.addAll(fileTransferTracker.exclusionFilter((transferSnapshot.getCheckpointFileSnapshots()))); if (toUpload.isEmpty()) { diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java b/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java index 132d1adf916da..8805c16298d96 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java @@ -17,7 +17,7 @@ * * @opensearch.internal */ -public interface TranslogTransferListener { +public interface TranslogTransferListener extends AutoCloseable { /** * Invoked when the transfer of {@link TransferSnapshot} succeeds * @param transferSnapshot the transfer snapshot diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java index de1b2990f0a50..3c654818ffc6c 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java @@ -108,7 +108,6 @@ import static org.mockito.Mockito.when; @LuceneTestCase.SuppressFileSystems("ExtrasFS") - public class RemoteFsTranslogTests extends OpenSearchTestCase { protected final ShardId shardId = new ShardId("index", "_na_", 1); diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index 6fc4557a75675..0987201eb8602 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -168,6 +168,9 @@ public void onUploadComplete(TransferSnapshot transferSnapshot) { public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) { translogTransferFailed.incrementAndGet(); } + + @Override + public void close() {} })); assertEquals(4, fileTransferSucceeded.get()); assertEquals(0, fileTransferFailed.get());