-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Correctly clone IndexInputs to avoid race (#6367)
In PR #6345 I did remove a duplicate clone, however this resulted in cloning the IndexInput in the wrong place. When requesting a file that needs to be downloaded, we have a mechanism to ensure that concurrent calls do not end up duplicating the download, which results in multiple threads being given the same instance. The clone must happen _after_ this point to ensure that each thread gets its own clone. Signed-off-by: Andrew Ross <andrross@amazon.com> (cherry picked from commit 5e5c83b) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
- Loading branch information
1 parent
81f14a4
commit 8744ff4
Showing
2 changed files
with
109 additions
and
11 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
101 changes: 101 additions & 0 deletions
101
server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTests.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.index.store.remote.utils; | ||
|
||
import java.io.ByteArrayInputStream; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.Future; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import org.apache.lucene.store.IndexInput; | ||
import org.apache.lucene.store.MMapDirectory; | ||
import org.apache.lucene.store.SimpleFSLockFactory; | ||
import org.hamcrest.MatcherAssert; | ||
import org.junit.After; | ||
import org.junit.Before; | ||
import org.opensearch.common.blobstore.BlobContainer; | ||
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; | ||
import org.opensearch.index.store.remote.filecache.FileCache; | ||
import org.opensearch.index.store.remote.filecache.FileCacheFactory; | ||
import org.opensearch.test.OpenSearchTestCase; | ||
|
||
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; | ||
|
||
import static org.hamcrest.Matchers.equalTo; | ||
import static org.mockito.Mockito.doAnswer; | ||
import static org.mockito.Mockito.mock; | ||
|
||
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) | ||
public class TransferManagerTests extends OpenSearchTestCase { | ||
private final FileCache fileCache = FileCacheFactory.createConcurrentLRUFileCache(1024 * 1024, 8); | ||
private final ExecutorService executor = Executors.newSingleThreadExecutor(); | ||
private MMapDirectory directory; | ||
private BlobContainer blobContainer; | ||
private TransferManager transferManager; | ||
|
||
@Before | ||
public void setUp() throws Exception { | ||
super.setUp(); | ||
directory = new MMapDirectory(createTempDir(), SimpleFSLockFactory.INSTANCE); | ||
blobContainer = mock(BlobContainer.class); | ||
doAnswer(i -> new ByteArrayInputStream(new byte[] { 0, 1, 2, 3, 4, 5, 6, 7 })).when(blobContainer).readBlob("blob", 0, 8); | ||
transferManager = new TransferManager(blobContainer, executor, fileCache); | ||
} | ||
|
||
@After | ||
public void tearDown() throws Exception { | ||
super.tearDown(); | ||
executor.shutdown(); | ||
assertTrue(executor.awaitTermination(1, TimeUnit.SECONDS)); | ||
} | ||
|
||
public void testSingleAccess() throws Exception { | ||
try (IndexInput i = fetchBlob()) { | ||
i.seek(7); | ||
MatcherAssert.assertThat(i.readByte(), equalTo((byte) 7)); | ||
} | ||
} | ||
|
||
public void testConcurrentAccess() throws Exception { | ||
// Kick off multiple threads that all concurrently request the same resource | ||
final ExecutorService testRunner = Executors.newFixedThreadPool(8); | ||
try { | ||
final List<Future<IndexInput>> futures = new ArrayList<>(); | ||
for (int i = 0; i < 8; i++) { | ||
futures.add(testRunner.submit(this::fetchBlob)); | ||
} | ||
// Wait for all threads to complete | ||
for (Future<IndexInput> future : futures) { | ||
future.get(1, TimeUnit.SECONDS); | ||
} | ||
// Assert that all IndexInputs are independently positioned by seeking | ||
// to the end and closing each one. If not independent, then this would | ||
// result in EOFExceptions and/or NPEs. | ||
for (Future<IndexInput> future : futures) { | ||
try (IndexInput i = future.get()) { | ||
i.seek(7); | ||
MatcherAssert.assertThat(i.readByte(), equalTo((byte) 7)); | ||
} | ||
} | ||
} finally { | ||
testRunner.shutdown(); | ||
assertTrue(testRunner.awaitTermination(1, TimeUnit.SECONDS)); | ||
} | ||
} | ||
|
||
private IndexInput fetchBlob() throws ExecutionException, InterruptedException { | ||
return transferManager.asyncFetchBlob( | ||
BlobFetchRequest.builder().blobName("blob").position(0).fileName("file").directory(directory).length(8).build() | ||
).get(); | ||
} | ||
} |