diff --git a/buildSrc/version.properties b/buildSrc/version.properties index f62f2217971f7..7a4c7321df268 100644 --- a/buildSrc/version.properties +++ b/buildSrc/version.properties @@ -62,3 +62,5 @@ jmh = 1.35 # compression zstd = 1.5.5-3 + +jzlib = 1.1.3 diff --git a/modules/transport-netty4/build.gradle b/modules/transport-netty4/build.gradle index a4cfef14a2300..e0a491ab63d7b 100644 --- a/modules/transport-netty4/build.gradle +++ b/modules/transport-netty4/build.gradle @@ -187,10 +187,6 @@ thirdPartyAudit { 'com.google.protobuf.nano.CodedOutputByteBufferNano', 'com.google.protobuf.nano.MessageNano', - 'com.jcraft.jzlib.Deflater', - 'com.jcraft.jzlib.Inflater', - 'com.jcraft.jzlib.JZlib$WrapperType', - 'com.jcraft.jzlib.JZlib', 'com.ning.compress.BufferRecycler', 'com.ning.compress.lzf.ChunkDecoder', 'com.ning.compress.lzf.ChunkEncoder', diff --git a/plugins/transport-nio/build.gradle b/plugins/transport-nio/build.gradle index 87e54bfed3b3d..42d0d40a1d449 100644 --- a/plugins/transport-nio/build.gradle +++ b/plugins/transport-nio/build.gradle @@ -113,10 +113,6 @@ thirdPartyAudit { 'com.google.protobuf.nano.CodedOutputByteBufferNano', 'com.google.protobuf.nano.MessageNano', - 'com.jcraft.jzlib.Deflater', - 'com.jcraft.jzlib.Inflater', - 'com.jcraft.jzlib.JZlib$WrapperType', - 'com.jcraft.jzlib.JZlib', 'com.ning.compress.BufferRecycler', 'com.ning.compress.lzf.ChunkDecoder', 'com.ning.compress.lzf.ChunkEncoder', diff --git a/server/build.gradle b/server/build.gradle index a77fdc9062c5a..4662766f77487 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -146,6 +146,9 @@ dependencies { // jna api "net.java.dev.jna:jna:${versions.jna}" + // jcraft + api "com.jcraft:jzlib:${versions.jzlib}" + // protobuf api "com.google.protobuf:protobuf-java:${versions.protobuf}" api "jakarta.annotation:jakarta.annotation-api:${versions.jakarta_annotation}" diff --git a/server/licenses/jzlib-1.1.3.jar.sha1 b/server/licenses/jzlib-1.1.3.jar.sha1 new file mode 100644 index 0000000000000..2affa9b8cd51b --- /dev/null +++ b/server/licenses/jzlib-1.1.3.jar.sha1 @@ -0,0 +1 @@ +c01428efa717624f7aabf4df319939dda9646b2d \ No newline at end of file diff --git a/server/licenses/jzlib-LICENSE.txt b/server/licenses/jzlib-LICENSE.txt new file mode 100644 index 0000000000000..245abb23411e0 --- /dev/null +++ b/server/licenses/jzlib-LICENSE.txt @@ -0,0 +1,29 @@ +JZlib 0.0.* were released under the GNU LGPL license. Later, we have switched +over to a BSD-style license. + +------------------------------------------------------------------------------ +Copyright (c) 2000-2011 ymnk, JCraft,Inc. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + + Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + The name of the authors may not be used to endorse or promote products + derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL JCRAFT, INC. OR ANY CONTRIBUTORS TO THIS SOFTWARE +BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/server/licenses/jzlib-NOTICE.txt b/server/licenses/jzlib-NOTICE.txt new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/server/src/main/java/org/opensearch/common/CheckedTriFunction.java b/server/src/main/java/org/opensearch/common/CheckedTriFunction.java new file mode 100644 index 0000000000000..7898226b751f7 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/CheckedTriFunction.java @@ -0,0 +1,19 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common; + +/** + * A {@link TriFunction}-like interface which allows throwing checked exceptions. + * + * @opensearch.internal + */ +@FunctionalInterface +public interface CheckedTriFunction { + R apply(S s, T t, U u) throws E; +} diff --git a/server/src/main/java/org/opensearch/common/StreamContext.java b/server/src/main/java/org/opensearch/common/StreamContext.java new file mode 100644 index 0000000000000..32f095f8488b7 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/StreamContext.java @@ -0,0 +1,66 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common; + +import org.opensearch.common.io.InputStreamContainer; + +import java.io.IOException; + +/** + * StreamContext is used to supply streams to vendor plugins using {@link StreamContext#provideStream} + * + * @opensearch.internal + */ +public class StreamContext { + + private final CheckedTriFunction streamSupplier; + private final long partSize; + private final long lastPartSize; + private final int numberOfParts; + + /** + * Construct a new StreamProvider object + * + * @param streamSupplier A {@link CheckedTriFunction} that will be called with the partNumber, partSize and position in the stream + * @param partSize Size of all parts apart from the last one + * @param lastPartSize Size of the last part + * @param numberOfParts Total number of parts + */ + public StreamContext( + CheckedTriFunction streamSupplier, + long partSize, + long lastPartSize, + int numberOfParts + ) { + this.streamSupplier = streamSupplier; + this.partSize = partSize; + this.lastPartSize = lastPartSize; + this.numberOfParts = numberOfParts; + } + + /** + * Vendor plugins can use this method to create new streams only when they are required for processing + * New streams won't be created till this method is called with the specific partNumber + * + * @param partNumber The index of the part + * @return A stream reference to the part requested + */ + public InputStreamContainer provideStream(int partNumber) throws IOException { + long position = partSize * partNumber; + long size = (partNumber == numberOfParts - 1) ? lastPartSize : partSize; + return streamSupplier.apply(partNumber, size, position); + } + + /** + * @return The number of parts in which this file is supposed to be uploaded + */ + public int getNumberOfParts() { + return numberOfParts; + } +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/exception/CorruptFileException.java b/server/src/main/java/org/opensearch/common/blobstore/exception/CorruptFileException.java new file mode 100644 index 0000000000000..e30f515b57495 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/exception/CorruptFileException.java @@ -0,0 +1,40 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.exception; + +import java.io.IOException; + +/** + * Exception thrown when remote integrity checks + * + * @opensearch.internal + */ +public class CorruptFileException extends IOException { + + private final String fileName; + + public CorruptFileException(String message, String fileName) { + super(message); + this.fileName = fileName; + } + + public CorruptFileException(String message, Throwable cause, String fileName) { + super(message, cause); + this.fileName = fileName; + } + + public CorruptFileException(Throwable cause, String fileName) { + super(cause); + this.fileName = fileName; + } + + public String getFileName() { + return fileName; + } +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/exception/package-info.java b/server/src/main/java/org/opensearch/common/blobstore/exception/package-info.java new file mode 100644 index 0000000000000..5f042a9c0adc3 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/exception/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Exceptions for blobstore abstractions */ +package org.opensearch.common.blobstore.exception; diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/package-info.java b/server/src/main/java/org/opensearch/common/blobstore/stream/package-info.java new file mode 100644 index 0000000000000..88d6d433d837f --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Abstractions for stream based file transfers */ +package org.opensearch.common.blobstore.stream; diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/write/StreamContextSupplier.java b/server/src/main/java/org/opensearch/common/blobstore/stream/write/StreamContextSupplier.java new file mode 100644 index 0000000000000..24a47a5f25b2d --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/write/StreamContextSupplier.java @@ -0,0 +1,26 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.stream.write; + +import org.opensearch.common.StreamContext; + +/** + * Will return the StreamContext to the caller given the part size + * + * @opensearch.internal + */ +@FunctionalInterface +public interface StreamContextSupplier { + + /** + * @param partSize The size of a single part to be uploaded + * @return The StreamContext based on the part size provided + */ + StreamContext supplyStreamContext(long partSize); +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java new file mode 100644 index 0000000000000..ef5e3d1e8c26c --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WriteContext.java @@ -0,0 +1,120 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.stream.write; + +import org.opensearch.common.CheckedConsumer; +import org.opensearch.common.Nullable; +import org.opensearch.common.StreamContext; + +import java.io.IOException; + +/** + * WriteContext is used to encapsulate all data needed by BlobContainer#writeStreams + * + * @opensearch.internal + */ +public class WriteContext { + + private final String fileName; + private final StreamContextSupplier streamContextSupplier; + private final long fileSize; + private final boolean failIfAlreadyExists; + private final WritePriority writePriority; + private final CheckedConsumer uploadFinalizer; + private final boolean doRemoteDataIntegrityCheck; + private final Long expectedChecksum; + + /** + * Construct a new WriteContext object + * + * @param fileName The name of the file being uploaded + * @param streamContextSupplier A supplier that will provide StreamContext to the plugin + * @param fileSize The total size of the file being uploaded + * @param failIfAlreadyExists A boolean to fail the upload is the file exists + * @param writePriority The WritePriority of this upload + * @param doRemoteDataIntegrityCheck A boolean to inform vendor plugins whether remote data integrity checks need to be done + * @param expectedChecksum This parameter expected only when the vendor plugin is expected to do server side data integrity verification + */ + public WriteContext( + String fileName, + StreamContextSupplier streamContextSupplier, + long fileSize, + boolean failIfAlreadyExists, + WritePriority writePriority, + CheckedConsumer uploadFinalizer, + boolean doRemoteDataIntegrityCheck, + @Nullable Long expectedChecksum + ) { + this.fileName = fileName; + this.streamContextSupplier = streamContextSupplier; + this.fileSize = fileSize; + this.failIfAlreadyExists = failIfAlreadyExists; + this.writePriority = writePriority; + this.uploadFinalizer = uploadFinalizer; + this.doRemoteDataIntegrityCheck = doRemoteDataIntegrityCheck; + this.expectedChecksum = expectedChecksum; + } + + /** + * @return The file name + */ + public String getFileName() { + return fileName; + } + + /** + * @return The boolean representing whether to fail the file upload if it exists + */ + public boolean isFailIfAlreadyExists() { + return failIfAlreadyExists; + } + + /** + * @param partSize The size of a single part to be uploaded + * @return The stream context which will be used by the plugin to initialize streams from the file + */ + public StreamContext getStreamProvider(long partSize) { + return streamContextSupplier.supplyStreamContext(partSize); + } + + /** + * @return The total size of the file + */ + public long getFileSize() { + return fileSize; + } + + /** + * @return The WritePriority of the upload + */ + public WritePriority getWritePriority() { + return writePriority; + } + + /** + * @return The UploadFinalizer for this upload + */ + public CheckedConsumer getUploadFinalizer() { + return uploadFinalizer; + } + + /** + * @return A boolean for whether remote data integrity check has to be done for this upload or not + */ + public boolean doRemoteDataIntegrityCheck() { + return doRemoteDataIntegrityCheck; + } + + /** + * @return The CRC32 checksum associated with this file + */ + public Long getExpectedChecksum() { + return expectedChecksum; + } +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java new file mode 100644 index 0000000000000..b8c0b52f93a3c --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java @@ -0,0 +1,19 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.stream.write; + +/** + * WritePriority for upload + * + * @opensearch.internal + */ +public enum WritePriority { + NORMAL, + HIGH +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/write/package-info.java b/server/src/main/java/org/opensearch/common/blobstore/stream/write/package-info.java new file mode 100644 index 0000000000000..a2f0e634f0a0a --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/write/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Abstractions for stream based file writes */ +package org.opensearch.common.blobstore.stream.write; diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java new file mode 100644 index 0000000000000..7864c3ab5c794 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java @@ -0,0 +1,246 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.transfer; + +import com.jcraft.jzlib.JZlib; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.CorruptIndexException; +import org.opensearch.common.CheckedTriFunction; +import org.opensearch.common.SetOnce; +import org.opensearch.common.StreamContext; +import org.opensearch.common.blobstore.stream.write.WriteContext; +import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream; +import org.opensearch.common.blobstore.transfer.stream.ResettableCheckedInputStream; +import org.opensearch.common.io.InputStreamContainer; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.util.Objects; + +/** + * RemoteTransferContainer is an encapsulation for managing file transfers. + * + * @opensearch.internal + */ +public class RemoteTransferContainer implements Closeable { + + private int numberOfParts; + private long partSize; + private long lastPartSize; + + private final long contentLength; + private final SetOnce inputStreams = new SetOnce<>(); + private final String fileName; + private final String remoteFileName; + private final boolean failTransferIfFileExists; + private final WritePriority writePriority; + private final long expectedChecksum; + private final OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier; + private final boolean isRemoteDataIntegritySupported; + + private static final Logger log = LogManager.getLogger(RemoteTransferContainer.class); + + /** + * Construct a new RemoteTransferContainer object + * + * @param fileName Name of the local file + * @param remoteFileName Name of the remote file + * @param contentLength Total content length of the file to be uploaded + * @param failTransferIfFileExists A boolean to determine if upload has to be failed if file exists + * @param writePriority The {@link WritePriority} of current upload + * @param offsetRangeInputStreamSupplier A supplier to create OffsetRangeInputStreams + * @param expectedChecksum The expected checksum value for the file being uploaded. This checksum will be used for local or remote data integrity checks + * @param isRemoteDataIntegritySupported A boolean to signify whether the remote repository supports server side data integrity verification + */ + public RemoteTransferContainer( + String fileName, + String remoteFileName, + long contentLength, + boolean failTransferIfFileExists, + WritePriority writePriority, + OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier, + long expectedChecksum, + boolean isRemoteDataIntegritySupported + ) { + this.fileName = fileName; + this.remoteFileName = remoteFileName; + this.contentLength = contentLength; + this.failTransferIfFileExists = failTransferIfFileExists; + this.writePriority = writePriority; + this.offsetRangeInputStreamSupplier = offsetRangeInputStreamSupplier; + this.expectedChecksum = expectedChecksum; + this.isRemoteDataIntegritySupported = isRemoteDataIntegritySupported; + } + + /** + * @return The {@link WriteContext} for the current upload + */ + public WriteContext createWriteContext() { + return new WriteContext( + remoteFileName, + this::supplyStreamContext, + contentLength, + failTransferIfFileExists, + writePriority, + this::finalizeUpload, + isRemoteDataIntegrityCheckPossible(), + isRemoteDataIntegrityCheckPossible() ? expectedChecksum : null + ); + } + + // package-private for testing + + /** + * This method is called to create the {@link StreamContext} object that will be used by the vendor plugin to + * open streams during uploads. Calling this method won't actually create the streams, for that the consumer needs + * to call {@link StreamContext#provideStream} + * + * @param partSize Part sizes of all parts apart from the last one, which is determined internally + * @return The {@link StreamContext} object that will be used by the vendor plugin to retrieve streams during upload + */ + StreamContext supplyStreamContext(long partSize) { + try { + return this.openMultipartStreams(partSize); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private StreamContext openMultipartStreams(long partSize) throws IOException { + if (inputStreams.get() != null) { + throw new IOException("Multi-part streams are already created."); + } + + this.partSize = partSize; + this.lastPartSize = (contentLength % partSize) != 0 ? contentLength % partSize : partSize; + this.numberOfParts = (int) ((contentLength % partSize) == 0 ? contentLength / partSize : (contentLength / partSize) + 1); + InputStream[] streams = new InputStream[numberOfParts]; + inputStreams.set(streams); + + return new StreamContext(getTransferPartStreamSupplier(), partSize, lastPartSize, numberOfParts); + } + + private CheckedTriFunction getTransferPartStreamSupplier() { + return ((partNo, size, position) -> { + assert inputStreams.get() != null : "expected inputStreams to be initialised"; + return getMultipartStreamSupplier(partNo, size, position).get(); + }); + } + + /** + * OffsetRangeInputStreamSupplier is used to get the offset based input streams at runtime + */ + public interface OffsetRangeInputStreamSupplier { + OffsetRangeInputStream get(long size, long position) throws IOException; + } + + interface LocalStreamSupplier { + Stream get() throws IOException; + } + + private LocalStreamSupplier getMultipartStreamSupplier( + final int streamIdx, + final long size, + final long position + ) { + return () -> { + try { + OffsetRangeInputStream offsetRangeInputStream = offsetRangeInputStreamSupplier.get(size, position); + InputStream inputStream = !isRemoteDataIntegrityCheckPossible() + ? new ResettableCheckedInputStream(offsetRangeInputStream, fileName) + : offsetRangeInputStream; + Objects.requireNonNull(inputStreams.get())[streamIdx] = inputStream; + + return new InputStreamContainer(inputStream, size); + } catch (IOException e) { + log.error("Failed to create input stream", e); + throw e; + } + }; + } + + private boolean isRemoteDataIntegrityCheckPossible() { + return isRemoteDataIntegritySupported; + } + + private void finalizeUpload(boolean uploadSuccessful) throws IOException { + if (isRemoteDataIntegrityCheckPossible()) { + return; + } + + if (uploadSuccessful) { + long actualChecksum = getActualChecksum(); + if (actualChecksum != expectedChecksum) { + throw new CorruptIndexException( + "Data integrity check done after upload for file " + + fileName + + " failed, actual checksum: " + + actualChecksum + + ", expected checksum: " + + expectedChecksum, + fileName + ); + } + } + } + + /** + * @return The total content length of current upload + */ + public long getContentLength() { + return contentLength; + } + + private long getInputStreamChecksum(InputStream inputStream) { + assert inputStream instanceof ResettableCheckedInputStream + : "expected passed inputStream to be instance of ResettableCheckedInputStream"; + return ((ResettableCheckedInputStream) inputStream).getChecksum(); + } + + private long getActualChecksum() { + InputStream[] currentInputStreams = Objects.requireNonNull(inputStreams.get()); + long checksum = getInputStreamChecksum(currentInputStreams[0]); + for (int checkSumIdx = 1; checkSumIdx < Objects.requireNonNull(inputStreams.get()).length - 1; checkSumIdx++) { + checksum = JZlib.crc32_combine(checksum, getInputStreamChecksum(currentInputStreams[checkSumIdx]), partSize); + } + if (numberOfParts > 1) { + checksum = JZlib.crc32_combine(checksum, getInputStreamChecksum(currentInputStreams[numberOfParts - 1]), lastPartSize); + } + + return checksum; + } + + @Override + public void close() throws IOException { + if (inputStreams.get() == null) { + log.warn("Input streams cannot be closed since they are not yet set for multi stream upload"); + return; + } + + boolean closeStreamException = false; + for (InputStream is : Objects.requireNonNull(inputStreams.get())) { + try { + if (is != null) { + is.close(); + } + } catch (IOException ex) { + closeStreamException = true; + // Attempting to close all streams first before throwing exception. + log.error("Multipart stream failed to close ", ex); + } + } + + if (closeStreamException) { + throw new IOException("Closure of some of the multi-part streams failed."); + } + } +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/package-info.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/package-info.java new file mode 100644 index 0000000000000..779b6538401d0 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Contains transfer related utilities for {@link org.opensearch.common.blobstore.BlobContainer} */ +package org.opensearch.common.blobstore.transfer; diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeFileInputStream.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeFileInputStream.java new file mode 100644 index 0000000000000..7ba673ce34d2a --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeFileInputStream.java @@ -0,0 +1,122 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.transfer.stream; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; + +/** + * OffsetRangeFileInputStream extends InputStream to read from a specified offset using FileChannel + * + * @opensearch.internal + */ +public class OffsetRangeFileInputStream extends OffsetRangeInputStream { + private final InputStream inputStream; + private final FileChannel fileChannel; + + private final long actualSizeToRead; + // This is the maximum position till stream is to be read. If read methods exceed maxPos then bytes are read + // till maxPos. If no byte is left after maxPos, then -1 is returned from read methods. + private final long limit; + // Position in stream from which read will start. + private long counter = 0; + + private long markPointer; + private long markCounter; + + /** + * Construct a new OffsetRangeFileInputStream object + * + * @param path Path to the file + * @param size Number of bytes that need to be read from specified position + * @param position Position from where the read needs to start + * @throws IOException When FileChannel#position operation fails + */ + public OffsetRangeFileInputStream(Path path, long size, long position) throws IOException { + fileChannel = FileChannel.open(path, StandardOpenOption.READ); + fileChannel.position(position); + inputStream = Channels.newInputStream(fileChannel); + long totalLength = fileChannel.size(); + this.counter = 0; + this.limit = size; + if ((totalLength - position) > limit) { + actualSizeToRead = limit; + } else { + actualSizeToRead = totalLength - position; + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (b == null) { + throw new NullPointerException(); + } else if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } + if (fileChannel.position() >= fileChannel.size()) { + return -1; + } + if (fileChannel.position() + len > fileChannel.size()) { + len = (int) (fileChannel.size() - fileChannel.position()); + } + if (counter + len > limit) { + len = (int) (limit - counter); + } + if (len <= 0) { + return -1; + } + + inputStream.read(b, off, len); + counter += len; + return len; + } + + @Override + public int read() throws IOException { + if (counter++ >= limit) { + return -1; + } + return (fileChannel.position() < fileChannel.size()) ? (inputStream.read() & 0xff) : -1; + } + + @Override + public boolean markSupported() { + return true; + } + + @Override + public synchronized void mark(int readlimit) { + try { + markPointer = fileChannel.position(); + } catch (IOException e) { + throw new RuntimeException(e); + } + markCounter = counter; + } + + @Override + public synchronized void reset() throws IOException { + fileChannel.position(markPointer); + counter = markCounter; + } + + @Override + public long getFilePointer() throws IOException { + return fileChannel.position(); + } + + @Override + public void close() throws IOException { + inputStream.close(); + } +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeIndexInputStream.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeIndexInputStream.java new file mode 100644 index 0000000000000..7518f9ac569b9 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeIndexInputStream.java @@ -0,0 +1,75 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.transfer.stream; + +import org.apache.lucene.store.IndexInput; +import org.opensearch.common.lucene.store.InputStreamIndexInput; + +import java.io.IOException; + +/** + * OffsetRangeIndexInputStream extends InputStream to read from a specified offset using IndexInput + * + * @opensearch.internal + */ +public class OffsetRangeIndexInputStream extends OffsetRangeInputStream { + + private final InputStreamIndexInput inputStreamIndexInput; + private final IndexInput indexInput; + + /** + * Construct a new OffsetRangeIndexInputStream object + * + * @param indexInput IndexInput opened on the file to read from + * @param size The maximum length to read from specified position + * @param position Position from where read needs to start + * @throws IOException When IndexInput#seek operation fails + */ + public OffsetRangeIndexInputStream(IndexInput indexInput, long size, long position) throws IOException { + indexInput.seek(position); + this.indexInput = indexInput; + this.inputStreamIndexInput = new InputStreamIndexInput(indexInput, size); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return inputStreamIndexInput.read(b, off, len); + } + + @Override + public int read() throws IOException { + return inputStreamIndexInput.read(); + } + + @Override + public boolean markSupported() { + return inputStreamIndexInput.markSupported(); + } + + @Override + public synchronized void mark(int readlimit) { + inputStreamIndexInput.mark(readlimit); + } + + @Override + public synchronized void reset() throws IOException { + inputStreamIndexInput.reset(); + } + + @Override + public long getFilePointer() throws IOException { + return indexInput.getFilePointer(); + } + + @Override + public void close() throws IOException { + inputStreamIndexInput.close(); + indexInput.close(); + } +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeInputStream.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeInputStream.java new file mode 100644 index 0000000000000..e8b889db1f3b0 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeInputStream.java @@ -0,0 +1,22 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.transfer.stream; + +import java.io.IOException; +import java.io.InputStream; + +/** + * OffsetRangeInputStream is an abstract class that extends from {@link InputStream} + * and adds a method to get the file pointer to the specific part being read + * + * @opensearch.internal + */ +public abstract class OffsetRangeInputStream extends InputStream { + public abstract long getFilePointer() throws IOException; +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/ResettableCheckedInputStream.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/ResettableCheckedInputStream.java new file mode 100644 index 0000000000000..c3e1e815e9eab --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/ResettableCheckedInputStream.java @@ -0,0 +1,128 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.transfer.stream; + +import com.jcraft.jzlib.CRC32; + +import java.io.FilterInputStream; +import java.io.IOException; + +/** + * ResettableCheckedInputStream is a modified implementation of {@link java.util.zip.CheckedInputStream} that supports + * mark and reset and modifies the file checksum during mark and reset calls. + * + * @opensearch.internal + */ +public class ResettableCheckedInputStream extends FilterInputStream { + private final CRC32 checksum; + private final CRC32 markedChecksum; + private final long startPos; + private final String file; + private final OffsetRangeInputStream offsetRangeInputStream; + + /** + * Creates a new ResettableCheckedInputStream object + * + * @param offsetRangeInputStream The underlying input stream + * @param file Name of the file + */ + public ResettableCheckedInputStream(OffsetRangeInputStream offsetRangeInputStream, String file) throws IOException { + super(offsetRangeInputStream); + this.checksum = new CRC32(); + this.markedChecksum = new CRC32(); + this.offsetRangeInputStream = offsetRangeInputStream; + this.startPos = offsetRangeInputStream.getFilePointer(); + this.file = file; + } + + /** + * Reads a byte. Will block if no input is available. + * @return the byte read, or -1 if the end of the stream is reached. + * @exception IOException if an I/O error has occurred + */ + public int read() throws IOException { + byte[] buffer = new byte[1]; + int len = read(buffer, 0, 1); + if (len == -1) { + return -1; + } + return buffer[0]; + } + + /** + * Reads into an array of bytes. If len is not zero, the method + * blocks until some input is available; otherwise, no + * bytes are read and 0 is returned. + * @param buf the buffer into which the data is read + * @param off the start offset in the destination array buf + * @param len the maximum number of bytes read + * @return the actual number of bytes read, or -1 if the end + * of the stream is reached. + * @exception NullPointerException If buf is null. + * @exception IndexOutOfBoundsException If off is negative, + * len is negative, or len is greater than + * buf.length - off + * @exception IOException if an I/O error has occurred + */ + public int read(byte[] buf, int off, int len) throws IOException { + len = in.read(buf, off, len); + if (len != -1) { + checksum.update(buf, off, len); + } + return len; + } + + /** + * Skips specified number of bytes of input. + * @param n the number of bytes to skip + * @return the actual number of bytes skipped + * @exception IOException if an I/O error has occurred + */ + public long skip(long n) throws IOException { + byte[] buf = new byte[512]; + long total = 0; + while (total < n) { + long len = n - total; + len = read(buf, 0, len < buf.length ? (int) len : buf.length); + if (len == -1) { + return total; + } + total += len; + } + return total; + } + + /** + * Returns the Checksum for this input stream. + * @return the Checksum value + */ + public long getChecksum() { + return checksum.getValue(); + } + + @Override + public boolean markSupported() { + return true; + } + + @Override + public synchronized void mark(int readlimit) { + markedChecksum.reset(checksum.getValue()); + super.mark(readlimit); + } + + @Override + public synchronized void reset() throws IOException { + if (startPos == offsetRangeInputStream.getFilePointer()) { + return; + } + checksum.reset(markedChecksum.getValue()); + super.reset(); + } +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/package-info.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/package-info.java new file mode 100644 index 0000000000000..c5e7e45b9f015 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/stream/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Contains encapsulations to create streams for part uploads */ +package org.opensearch.common.blobstore.transfer.stream; diff --git a/server/src/main/java/org/opensearch/common/io/InputStreamContainer.java b/server/src/main/java/org/opensearch/common/io/InputStreamContainer.java new file mode 100644 index 0000000000000..ce5dcff9f5349 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/io/InputStreamContainer.java @@ -0,0 +1,47 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.io; + +import java.io.InputStream; + +/** + * Model composed of an input stream and the total content length of the stream + * + * @opensearch.internal + */ +public class InputStreamContainer { + + private final InputStream inputStream; + private final long contentLength; + + /** + * Construct a new stream object + * + * @param inputStream The input stream that is to be encapsulated + * @param contentLength The total content length that is to be read from the stream + */ + public InputStreamContainer(InputStream inputStream, long contentLength) { + this.inputStream = inputStream; + this.contentLength = contentLength; + } + + /** + * @return The input stream this object is reading from + */ + public InputStream getInputStream() { + return inputStream; + } + + /** + * @return The total length of the content that has to be read from this stream + */ + public long getContentLength() { + return contentLength; + } +} diff --git a/server/src/main/java/org/opensearch/common/util/UploadListener.java b/server/src/main/java/org/opensearch/common/util/UploadListener.java new file mode 100644 index 0000000000000..66feb68f7bf0d --- /dev/null +++ b/server/src/main/java/org/opensearch/common/util/UploadListener.java @@ -0,0 +1,23 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.util; + +/** + * A tracker class that is fed to FileUploader. + * + * @opensearch.internal + */ +public interface UploadListener { + + void beforeUpload(String file); + + void onSuccess(String file); + + void onFailure(String file); +} diff --git a/server/src/main/java/org/opensearch/index/store/exception/ChecksumCombinationException.java b/server/src/main/java/org/opensearch/index/store/exception/ChecksumCombinationException.java new file mode 100644 index 0000000000000..a355473aa2afd --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/exception/ChecksumCombinationException.java @@ -0,0 +1,26 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store.exception; + +import org.apache.lucene.index.CorruptIndexException; + +/** + * Exception is raised when combination to two crc checksums fail. + * + * @opensearch.internal + */ +public class ChecksumCombinationException extends CorruptIndexException { + public ChecksumCombinationException(String msg, String resourceDescription) { + super(msg, resourceDescription); + } + + public ChecksumCombinationException(String msg, String resourceDescription, Throwable cause) { + super(msg, resourceDescription, cause); + } +} diff --git a/server/src/main/java/org/opensearch/index/store/exception/package-info.java b/server/src/main/java/org/opensearch/index/store/exception/package-info.java new file mode 100644 index 0000000000000..b93ce9ffd6201 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/exception/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Contains exceptions thrown during segment file uploads */ +package org.opensearch.index.store.exception; diff --git a/server/src/main/java/org/opensearch/index/translog/checked/TranslogCheckedContainer.java b/server/src/main/java/org/opensearch/index/translog/checked/TranslogCheckedContainer.java new file mode 100644 index 0000000000000..b90794e29d2b1 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/checked/TranslogCheckedContainer.java @@ -0,0 +1,81 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.checked; + +import org.opensearch.common.io.Channels; +import org.opensearch.common.util.concurrent.ReleasableLock; + +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; +import java.util.zip.CRC32; +import java.util.zip.Checksum; + +/** + * TranslogCheckedContainer is used to store, update and retrieve checksums for translog files. + * + * @opensearch.internal + */ +public class TranslogCheckedContainer { + + private final Checksum checksum; + private final AtomicLong contentLength; + private final ReleasableLock updateLock = new ReleasableLock(new ReentrantLock()); + private final String file; + + /** + * Creates TranslogCheckedContainer from provided channel. + * + * @param channel {@link FileChannel} to read from + * @param offset offset of channel from which bytes are to be read. + * @param len Length of bytes to be read. + */ + public TranslogCheckedContainer(FileChannel channel, int offset, int len, String file) throws IOException { + this.checksum = new CRC32(); + this.contentLength = new AtomicLong(); + this.file = file; + + byte[] bytes = Channels.readFromFileChannel(channel, offset, len); + updateFromBytes(bytes, 0, bytes.length); + } + + /** + * Updates checksum from bytes array + * + * @param bytes Input bytes to update checksum from + * @param offset Position in bytesReference to buffer bytes from + * @param len Length of bytes to be buffered + */ + public void updateFromBytes(byte[] bytes, int offset, int len) { + try (ReleasableLock ignored = updateLock.acquire()) { + checksum.update(bytes, offset, len); + updateContentLength(len); + } + } + + private void updateContentLength(long delta) { + assert updateLock.isHeldByCurrentThread(); + contentLength.addAndGet(delta); + } + + /** + * @return checksum value of bytes which have been supplied to container so far. + */ + public long getChecksum() { + return checksum.getValue(); + } + + /** + * @return Content length of bytes which have been supplied to container so far. + */ + public long getContentLength() { + return contentLength.get(); + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/checked/package-info.java b/server/src/main/java/org/opensearch/index/translog/checked/package-info.java new file mode 100644 index 0000000000000..ddb235fdbedce --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/checked/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Contains checksum related utilities for translog files */ +package org.opensearch.index.translog.checked; diff --git a/server/src/test/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainerTests.java b/server/src/test/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainerTests.java new file mode 100644 index 0000000000000..1ebec46042247 --- /dev/null +++ b/server/src/test/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainerTests.java @@ -0,0 +1,175 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.transfer; + +import org.junit.Before; +import org.opensearch.common.io.InputStreamContainer; +import org.opensearch.common.StreamContext; +import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.common.blobstore.transfer.stream.OffsetRangeFileInputStream; +import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream; +import org.opensearch.common.blobstore.transfer.stream.ResettableCheckedInputStream; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; + +public class RemoteTransferContainerTests extends OpenSearchTestCase { + + private static final int TEST_FILE_SIZE_BYTES = 128; + + private Path testFile; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + testFile = createTempFile(); + Files.write(testFile, randomByteArrayOfLength(TEST_FILE_SIZE_BYTES), StandardOpenOption.APPEND); + } + + public void testSupplyStreamContextDivisibleParts() throws IOException, InterruptedException { + try ( + RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( + testFile.getFileName().toString(), + testFile.getFileName().toString(), + TEST_FILE_SIZE_BYTES, + true, + WritePriority.HIGH, + new RemoteTransferContainer.OffsetRangeInputStreamSupplier() { + @Override + public OffsetRangeInputStream get(long size, long position) throws IOException { + return new OffsetRangeFileInputStream(testFile, size, position); + } + }, + 0, + false + ) + ) { + testSupplyStreamContext(remoteTransferContainer, 16, 16, 8); + } + } + + public void testSupplyStreamContextNonDivisibleParts() throws IOException, InterruptedException { + try ( + RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( + testFile.getFileName().toString(), + testFile.getFileName().toString(), + TEST_FILE_SIZE_BYTES, + true, + WritePriority.HIGH, + new RemoteTransferContainer.OffsetRangeInputStreamSupplier() { + @Override + public OffsetRangeInputStream get(long size, long position) throws IOException { + return new OffsetRangeFileInputStream(testFile, size, position); + } + }, + 0, + false + ) + ) { + testSupplyStreamContext(remoteTransferContainer, 10, 8, 13); + } + } + + private void testSupplyStreamContext( + RemoteTransferContainer remoteTransferContainer, + long partSize, + long lastPartSize, + int expectedPartCount + ) throws InterruptedException { + StreamContext streamContext = remoteTransferContainer.supplyStreamContext(partSize); + int partCount = streamContext.getNumberOfParts(); + assertEquals(expectedPartCount, partCount); + Thread[] threads = new Thread[partCount]; + long totalContentLength = remoteTransferContainer.getContentLength(); + assert partSize * (partCount - 1) + lastPartSize == totalContentLength + : "part sizes and last part size don't add up to total content length"; + logger.info("partSize: {}, lastPartSize: {}, partCount: {}", partSize, lastPartSize, streamContext.getNumberOfParts()); + for (int partIdx = 0; partIdx < partCount; partIdx++) { + int finalPartIdx = partIdx; + long expectedPartSize = (partIdx == partCount - 1) ? lastPartSize : partSize; + threads[partIdx] = new Thread(() -> { + try { + InputStreamContainer inputStreamContainer = streamContext.provideStream(finalPartIdx); + assertEquals(expectedPartSize, inputStreamContainer.getContentLength()); + } catch (IOException e) { + fail("IOException during stream creation"); + } + }); + threads[partIdx].start(); + } + for (int i = 0; i < partCount; i++) { + threads[i].join(); + } + } + + public void testSupplyStreamContextCalledTwice() throws IOException { + try ( + RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( + testFile.getFileName().toString(), + testFile.getFileName().toString(), + TEST_FILE_SIZE_BYTES, + true, + WritePriority.HIGH, + new RemoteTransferContainer.OffsetRangeInputStreamSupplier() { + @Override + public OffsetRangeInputStream get(long size, long position) throws IOException { + return new OffsetRangeFileInputStream(testFile, size, position); + } + }, + 0, + false + ) + ) { + remoteTransferContainer.supplyStreamContext(16); + assertThrows(RuntimeException.class, () -> remoteTransferContainer.supplyStreamContext(16)); + } + } + + public void testTypeOfProvidedStreamsAllCases() throws IOException { + testTypeOfProvidedStreams(true); + testTypeOfProvidedStreams(false); + } + + private void testTypeOfProvidedStreams(boolean isRemoteDataIntegritySupported) throws IOException { + try ( + RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( + testFile.getFileName().toString(), + testFile.getFileName().toString(), + TEST_FILE_SIZE_BYTES, + true, + WritePriority.HIGH, + new RemoteTransferContainer.OffsetRangeInputStreamSupplier() { + @Override + public OffsetRangeInputStream get(long size, long position) throws IOException { + return new OffsetRangeFileInputStream(testFile, size, position); + } + }, + 0, + isRemoteDataIntegritySupported + ) + ) { + StreamContext streamContext = remoteTransferContainer.supplyStreamContext(16); + InputStreamContainer inputStreamContainer = streamContext.provideStream(0); + if (shouldOffsetInputStreamsBeChecked(isRemoteDataIntegritySupported)) { + assertTrue(inputStreamContainer.getInputStream() instanceof ResettableCheckedInputStream); + } else { + assertTrue(inputStreamContainer.getInputStream() instanceof OffsetRangeInputStream); + } + assertThrows(RuntimeException.class, () -> remoteTransferContainer.supplyStreamContext(16)); + } + } + + private boolean shouldOffsetInputStreamsBeChecked(boolean isRemoteDataIntegritySupported) { + return !isRemoteDataIntegritySupported; + } +} diff --git a/server/src/test/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeFileInputStreamTests.java b/server/src/test/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeFileInputStreamTests.java new file mode 100644 index 0000000000000..f7e093f8357cc --- /dev/null +++ b/server/src/test/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeFileInputStreamTests.java @@ -0,0 +1,19 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.transfer.stream; + +import java.io.IOException; + +public class OffsetRangeFileInputStreamTests extends ResettableCheckedInputStreamBaseTest { + + @Override + protected OffsetRangeInputStream getOffsetRangeInputStream(long size, long position) throws IOException { + return new OffsetRangeFileInputStream(testFile, size, position); + } +} diff --git a/server/src/test/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeIndexInputStreamTests.java b/server/src/test/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeIndexInputStreamTests.java new file mode 100644 index 0000000000000..7381d49bcdf99 --- /dev/null +++ b/server/src/test/java/org/opensearch/common/blobstore/transfer/stream/OffsetRangeIndexInputStreamTests.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.transfer.stream; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.NIOFSDirectory; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; + +public class OffsetRangeIndexInputStreamTests extends ResettableCheckedInputStreamBaseTest { + + private Directory directory; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + directory = new NIOFSDirectory(testFile.getParent()); + } + + @Override + protected OffsetRangeInputStream getOffsetRangeInputStream(long size, long position) throws IOException { + return new OffsetRangeIndexInputStream(directory.openInput(testFile.getFileName().toString(), IOContext.DEFAULT), size, position); + } + + @Override + @After + public void tearDown() throws Exception { + directory.close(); + super.tearDown(); + } +} diff --git a/server/src/test/java/org/opensearch/common/blobstore/transfer/stream/ResettableCheckedInputStreamBaseTest.java b/server/src/test/java/org/opensearch/common/blobstore/transfer/stream/ResettableCheckedInputStreamBaseTest.java new file mode 100644 index 0000000000000..07e86cd64524f --- /dev/null +++ b/server/src/test/java/org/opensearch/common/blobstore/transfer/stream/ResettableCheckedInputStreamBaseTest.java @@ -0,0 +1,180 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.blobstore.transfer.stream; + +import org.junit.After; +import org.junit.Before; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; + +public abstract class ResettableCheckedInputStreamBaseTest extends OpenSearchTestCase { + + private static final int TEST_FILE_SIZE_BYTES = 10; + + private final byte[] bytesToWrite = randomByteArrayOfLength(TEST_FILE_SIZE_BYTES); + protected Path testFile; + private ResettableCheckedInputStream[] resettableCheckedInputStreams; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + testFile = createTempFile(); + Files.write(testFile, bytesToWrite, StandardOpenOption.TRUNCATE_EXISTING); + } + + protected abstract OffsetRangeInputStream getOffsetRangeInputStream(long size, long position) throws IOException; + + public void testReadSingleByte() throws IOException, InterruptedException { + final int nParallelReads = 10; + Thread[] threads = new Thread[nParallelReads]; + resettableCheckedInputStreams = new ResettableCheckedInputStream[nParallelReads]; + for (int readIdx = 0; readIdx < nParallelReads; readIdx++) { + int offset = randomInt(TEST_FILE_SIZE_BYTES - 1); + OffsetRangeInputStream offsetRangeInputStream = getOffsetRangeInputStream(1, offset); + ResettableCheckedInputStream resettableCheckedInputStream = new ResettableCheckedInputStream( + offsetRangeInputStream, + testFile.getFileName().toString() + ); + resettableCheckedInputStreams[readIdx] = resettableCheckedInputStream; + threads[readIdx] = new Thread(() -> { + try { + assertEquals(bytesToWrite[offset], resettableCheckedInputStream.read()); + } catch (IOException e) { + fail("Failure while reading single byte from offset stream"); + } + }); + threads[readIdx].start(); + } + for (Thread thread : threads) { + thread.join(); + } + } + + public void testReadMultipleBytes() throws IOException, InterruptedException { + final int nParallelReads = 10; + Thread[] threads = new Thread[nParallelReads]; + resettableCheckedInputStreams = new ResettableCheckedInputStream[nParallelReads]; + for (int readIdx = 0; readIdx < nParallelReads; readIdx++) { + int readByteCount = randomInt(TEST_FILE_SIZE_BYTES - 1) + 1; + int offset = randomInt(TEST_FILE_SIZE_BYTES - readByteCount); + OffsetRangeInputStream offsetRangeInputStream = getOffsetRangeInputStream(readByteCount, offset); + ResettableCheckedInputStream resettableCheckedInputStream = new ResettableCheckedInputStream( + offsetRangeInputStream, + testFile.getFileName().toString() + ); + resettableCheckedInputStreams[readIdx] = resettableCheckedInputStream; + threads[readIdx] = new Thread(() -> { + try { + byte[] buffer = new byte[readByteCount]; + int bytesRead = resettableCheckedInputStream.read(buffer, 0, readByteCount); + assertEquals(readByteCount, bytesRead); + for (int bufferIdx = 0; bufferIdx < readByteCount; bufferIdx++) { + assertEquals(bytesToWrite[offset + bufferIdx], buffer[bufferIdx]); + } + } catch (IOException e) { + fail("Failure while reading bytes from offset stream"); + } + }); + threads[readIdx].start(); + } + for (Thread thread : threads) { + thread.join(); + } + } + + public void testMarkAndReset() throws IOException, InterruptedException { + final int nParallelReads = 100; + Thread[] threads = new Thread[nParallelReads]; + resettableCheckedInputStreams = new ResettableCheckedInputStream[nParallelReads]; + for (int readIdx = 0; readIdx < nParallelReads; readIdx++) { + int readByteCount = randomInt(TEST_FILE_SIZE_BYTES - 1) + 1; + int offset = randomInt(TEST_FILE_SIZE_BYTES - readByteCount); + OffsetRangeInputStream offsetRangeInputStream = getOffsetRangeInputStream(readByteCount, offset); + ResettableCheckedInputStream resettableCheckedInputStream = new ResettableCheckedInputStream( + offsetRangeInputStream, + testFile.getFileName().toString() + ); + resettableCheckedInputStreams[readIdx] = resettableCheckedInputStream; + threads[readIdx] = new Thread(() -> { + try { + boolean streamMarked = false; + long streamMarkPosition = -1; + long streamMarkChecksum = -1; + for (int byteIdx = 0; byteIdx < readByteCount - 1; byteIdx++) { + resettableCheckedInputStream.read(); + if (!streamMarked && randomBoolean()) { + streamMarked = true; + streamMarkPosition = offsetRangeInputStream.getFilePointer(); + resettableCheckedInputStream.mark(readByteCount); + streamMarkChecksum = resettableCheckedInputStream.getChecksum(); + } + } + if (!streamMarked) { + streamMarkPosition = offsetRangeInputStream.getFilePointer(); + resettableCheckedInputStream.mark(readByteCount); + streamMarkChecksum = resettableCheckedInputStream.getChecksum(); + } + resettableCheckedInputStream.reset(); + assertEquals(streamMarkChecksum, resettableCheckedInputStream.getChecksum()); + assertEquals(bytesToWrite[(int) streamMarkPosition], resettableCheckedInputStream.read()); + } catch (IOException e) { + fail("Failure while reading bytes from offset stream"); + } + }); + threads[readIdx].start(); + } + for (Thread thread : threads) { + thread.join(); + } + } + + public void testReadAfterSkip() throws IOException { + OffsetRangeInputStream offsetRangeInputStream = getOffsetRangeInputStream(TEST_FILE_SIZE_BYTES, 0); + ResettableCheckedInputStream resettableCheckedInputStream = new ResettableCheckedInputStream( + offsetRangeInputStream, + testFile.getFileName().toString() + ); + resettableCheckedInputStreams = new ResettableCheckedInputStream[] { resettableCheckedInputStream }; + + long skipBytes = randomLongBetween(1, TEST_FILE_SIZE_BYTES - 1); + long actualBytesSkipped = resettableCheckedInputStream.skip(skipBytes); + assertEquals(skipBytes, actualBytesSkipped); + assertEquals(bytesToWrite[(int) skipBytes], resettableCheckedInputStream.read()); + } + + public void testReadLastByte() throws IOException { + OffsetRangeInputStream offsetRangeInputStream = getOffsetRangeInputStream(TEST_FILE_SIZE_BYTES, 0); + ResettableCheckedInputStream resettableCheckedInputStream = new ResettableCheckedInputStream( + offsetRangeInputStream, + testFile.getFileName().toString() + ); + resettableCheckedInputStreams = new ResettableCheckedInputStream[] { resettableCheckedInputStream }; + + long skipBytes = TEST_FILE_SIZE_BYTES; + long actualBytesSkipped = resettableCheckedInputStream.skip(skipBytes); + assertEquals(skipBytes, actualBytesSkipped); + assertEquals(-1, resettableCheckedInputStream.read()); + } + + @Override + @After + public void tearDown() throws Exception { + for (ResettableCheckedInputStream resettableCheckedInputStream : resettableCheckedInputStreams) { + if (resettableCheckedInputStream != null) { + resettableCheckedInputStream.close(); + } + } + super.tearDown(); + } +}