diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java index e4fcc7328f2a7..670251aadfdd3 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java @@ -25,7 +25,6 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.function.Supplier; /** * This acts as entry point to fetch {@link BlobFetchRequest} and return actual {@link IndexInput}. Utilizes the BlobContainer interface to @@ -53,21 +52,20 @@ public TransferManager(final BlobContainer blobContainer, final ExecutorService * @return future of IndexInput augmented with internal caching maintenance tasks */ public CompletableFuture asyncFetchBlob(BlobFetchRequest blobFetchRequest) { - return asyncFetchBlob(blobFetchRequest.getFilePath(), () -> { + return invocationLinearizer.linearize(blobFetchRequest.getFilePath(), p -> { try { return fetchBlob(blobFetchRequest); } catch (IOException e) { throw new IllegalStateException(e); } - }); - } - - private CompletableFuture asyncFetchBlob(Path path, Supplier indexInputSupplier) { - return invocationLinearizer.linearize(path, p -> indexInputSupplier.get()); + }).thenApply(IndexInput::clone); } - /* - This method accessed through the ConcurrentInvocationLinearizer so read-check-write is acceptable here + /** + * Fetches the "origin" IndexInput from the cache, downloading it first if it is + * not already cached. This instance must be cloned before using. This method is + * accessed through the ConcurrentInvocationLinearizer so read-check-write is + * acceptable here */ private IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOException { // check if the origin is already in block cache @@ -101,8 +99,7 @@ private IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExcepti fileCache.put(blobFetchRequest.getFilePath(), newOrigin); origin = newOrigin; } - // always, need to clone to do refcount += 1, and rely on GC to clean these IndexInput which will refcount -= 1 - return origin.clone(); + return origin; } private IndexInput downloadBlockLocally(BlobFetchRequest blobFetchRequest) throws IOException { diff --git a/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java new file mode 100644 index 0000000000000..b7aa38ec6573e --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java @@ -0,0 +1,102 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.remote.utils; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.MMapDirectory; +import org.apache.lucene.store.SimpleFSLockFactory; +import org.hamcrest.MatcherAssert; +import org.junit.After; +import org.junit.Before; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; +import org.opensearch.index.store.remote.filecache.FileCache; +import org.opensearch.index.store.remote.filecache.FileCacheFactory; +import org.opensearch.test.OpenSearchTestCase; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; + +@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) +public class TransferManagerTests extends OpenSearchTestCase { + private final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(1024 * 1024, 8); + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + private MMapDirectory directory; + private BlobContainer blobContainer; + private TransferManager transferManager; + + @Before + public void setUp() throws Exception { + super.setUp(); + directory = new MMapDirectory(createTempDir(), SimpleFSLockFactory.INSTANCE); + blobContainer = mock(BlobContainer.class); + doAnswer(i -> new ByteArrayInputStream(new byte[] { 0, 1, 2, 3, 4, 5, 6, 7 })).when(blobContainer).readBlob("blob", 0, 8); + transferManager = new TransferManager(blobContainer, executor, fileCache); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + executor.shutdown(); + assertTrue(executor.awaitTermination(1, TimeUnit.SECONDS)); + } + + public void testSingleAccess() throws Exception { + try (IndexInput i = fetchBlob()) { + i.seek(7); + MatcherAssert.assertThat(i.readByte(), equalTo((byte) 7)); + } + } + + public void testConcurrentAccess() throws Exception { + // Kick off multiple threads that all concurrently request the same resource + final ExecutorService testRunner = Executors.newFixedThreadPool(8); + try { + final List> futures = new ArrayList<>(); + for (int i = 0; i < 8; i++) { + futures.add(testRunner.submit(this::fetchBlob)); + } + // Wait for all threads to complete + for (Future future : futures) { + future.get(1, TimeUnit.SECONDS); + } + // Assert that all IndexInputs are independently positioned by seeking + // to the end and closing each one. If not independent, then this would + // result in EOFExceptions and/or NPEs. + for (Future future : futures) { + try (IndexInput i = future.get()) { + i.seek(7); + MatcherAssert.assertThat(i.readByte(), equalTo((byte) 7)); + } + } + } finally { + testRunner.shutdown(); + assertTrue(testRunner.awaitTermination(1, TimeUnit.SECONDS)); + } + } + + private IndexInput fetchBlob() throws ExecutionException, InterruptedException { + return transferManager.asyncFetchBlob( + BlobFetchRequest.builder().blobName("blob").position(0).fileName("file").directory(directory).length(8).build() + ).get(); + } +}