diff --git a/server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSnapshotDirectory.java b/server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSnapshotDirectory.java index e723c831b213d..2fcad7cee3449 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSnapshotDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSnapshotDirectory.java @@ -24,9 +24,9 @@ import org.apache.lucene.store.NoLockFactory; import org.opensearch.LegacyESVersion; import org.opensearch.Version; +import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; import org.opensearch.index.store.remote.file.OnDemandBlockSnapshotIndexInput; -import org.opensearch.index.store.remote.file.OnDemandVirtualFileSnapshotIndexInput; import org.opensearch.index.store.remote.utils.TransferManager; import org.opensearch.repositories.blobstore.BlobStoreRepository; @@ -72,7 +72,7 @@ public IndexInput openInput(String name, IOContext context) throws IOException { final BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfoMap.get(name); if (fileInfo.name().startsWith(VIRTUAL_FILE_PREFIX)) { - return new OnDemandVirtualFileSnapshotIndexInput(fileInfo, localStoreDir, transferManager); + return new ByteArrayIndexInput(fileInfo.physicalName(), fileInfo.metadata().hash().bytes); } return new OnDemandBlockSnapshotIndexInput(fileInfo, localStoreDir, transferManager); } diff --git a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockIndexInput.java b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockIndexInput.java index c8e56844b0819..c80c29dbc845a 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockIndexInput.java +++ b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockIndexInput.java @@ -338,7 +338,7 @@ private void demandBlock(int blockId) throws IOException { currentBlock.close(); } - currentBlock = fetchBlock(blockId).clone(); + currentBlock = fetchBlock(blockId); currentBlockId = blockId; } diff --git a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandVirtualFileSnapshotIndexInput.java b/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandVirtualFileSnapshotIndexInput.java deleted file mode 100644 index da3478561a2c7..0000000000000 --- a/server/src/main/java/org/opensearch/index/store/remote/file/OnDemandVirtualFileSnapshotIndexInput.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.store.remote.file; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.store.FSDirectory; -import org.apache.lucene.store.IndexInput; -import org.opensearch.common.lucene.store.ByteArrayIndexInput; -import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; -import org.opensearch.index.store.remote.utils.TransferManager; - -import java.io.IOException; -import java.nio.file.Path; -import java.util.concurrent.ExecutionException; - -/** - * This is an implementation of {@link OnDemandBlockIndexInput} where this class provides the main IndexInput using shard snapshot virtual - * files and will basically read virtual file from memory and write it to disk . - * - * @opensearch.internal - */ -public class OnDemandVirtualFileSnapshotIndexInput extends OnDemandBlockIndexInput { - private static final Logger logger = LogManager.getLogger(OnDemandVirtualFileSnapshotIndexInput.class); - - // 2^30 should keep the virtual file in memory un-partitioned when written to disk - private static final int BLOCK_SIZE_SHIFT = 30; - - /** - * Where this class fetches IndexInput parts from - */ - private final TransferManager transferManager; - - /** - * FileInfo contains snapshot metadata references for this IndexInput - */ - private final FileInfo fileInfo; - - /** - * underlying lucene directory to open file - */ - protected final FSDirectory directory; - - /** - * file name - */ - protected final String fileName; - - public OnDemandVirtualFileSnapshotIndexInput(FileInfo fileInfo, FSDirectory directory, TransferManager transferManager) { - this( - "VirtualFileSnapshotIndexInput(path=\"" - + directory.getDirectory().toString() - + "/" - + fileInfo.physicalName() - + "\", " - + "offset=" - + 0 - + ", length= " - + fileInfo.length() - + ")", - fileInfo, - directory, - transferManager, - fileInfo.physicalName(), - 0L, - fileInfo.length(), - false - ); - } - - public OnDemandVirtualFileSnapshotIndexInput( - String resourceDescription, - FileInfo fileInfo, - FSDirectory directory, - TransferManager transferManager, - String fileName, - long offset, - long length, - boolean isClone - ) { - super( - OnDemandBlockIndexInput.builder() - .resourceDescription(resourceDescription) - .isClone(isClone) - .offset(offset) - .length(length) - .blockSizeShift(BLOCK_SIZE_SHIFT) - ); - this.fileInfo = fileInfo; - this.directory = directory; - this.fileName = fileName; - this.transferManager = transferManager; - } - - @Override - protected OnDemandVirtualFileSnapshotIndexInput buildSlice(String sliceDescription, long offset, long length) { - return new OnDemandVirtualFileSnapshotIndexInput( - sliceDescription, - this.fileInfo, - this.directory, - this.transferManager, - this.fileName, - offset, - length, - true - ); - } - - @Override - public OnDemandVirtualFileSnapshotIndexInput clone() { - OnDemandVirtualFileSnapshotIndexInput clone = buildSlice("clone", 0L, this.length); - // ensures that clones may be positioned at the same point as the blocked file they were cloned from - if (currentBlock != null) { - clone.currentBlock = currentBlock.clone(); - clone.currentBlockId = currentBlockId; - } - return clone; - } - - @Override - protected IndexInput fetchBlock(int blockId) throws IOException { - // will always have one block. - final Path filePath = directory.getDirectory().resolve(fileName); - try { - return transferManager.asyncFetchBlob(filePath, () -> new ByteArrayIndexInput(fileName, fileInfo.metadata().hash().bytes)) - .get(); - } catch (InterruptedException | ExecutionException e) { - logger.error(() -> new ParameterizedMessage("unexpected failure while fetching [{}]", filePath), e); - throw new IllegalStateException(e); - } - } -} diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java index ad46f7ef4ae42..e4fcc7328f2a7 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java @@ -62,7 +62,7 @@ public CompletableFuture asyncFetchBlob(BlobFetchRequest blobFetchRe }); } - public CompletableFuture asyncFetchBlob(Path path, Supplier indexInputSupplier) { + private CompletableFuture asyncFetchBlob(Path path, Supplier indexInputSupplier) { return invocationLinearizer.linearize(path, p -> indexInputSupplier.get()); }