Skip to content

Commit

Permalink
Introduce interface changes to support read/write blob with object me…
Browse files Browse the repository at this point in the history
…tadata (#13023) (#13757)

* Introduce interface changes to read/write blob with object metadata
---------
Signed-off-by: Sandeep Kumawat <skumwt@amazon.com>
  • Loading branch information
skumawat2025 authored May 21, 2024
1 parent 215ffc8 commit fe89133
Show file tree
Hide file tree
Showing 13 changed files with 406 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ public void readBlobAsync(String blobName, ActionListener<ReadContext> listener)
);
}
}
listener.onResponse(new ReadContext(blobSize, blobPartInputStreamFutures, blobChecksum));
listener.onResponse(new ReadContext.Builder(blobSize, blobPartInputStreamFutures).blobChecksum(blobChecksum).build());
} catch (Exception ex) {
listener.onFailure(ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

import org.apache.lucene.store.IndexInput;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.common.CheckedTriFunction;
import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.stream.write.StreamContextSupplier;
Expand All @@ -52,6 +52,7 @@
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.Scheduler;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;

import java.io.IOException;
Expand Down Expand Up @@ -513,24 +514,30 @@ private void testWriteBlobByStreams(boolean expectException, boolean throwExcept
exceptionRef.set(ex);
countDownLatch.countDown();
});
blobContainer.asyncBlobUpload(new WriteContext("write_blob_by_streams_max_retries", new StreamContextSupplier() {
@Override
public StreamContext supplyStreamContext(long partSize) {
return new StreamContext(new CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException>() {
@Override
public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException {
InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
}
}, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize));
}
}, bytes.length, false, WritePriority.NORMAL, uploadSuccess -> {

StreamContextSupplier streamContextSupplier = partSize -> new StreamContext((partNo, size, position) -> {
InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
}, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize));

CheckedConsumer<Boolean, IOException> uploadFinalizer = uploadSuccess -> {
assertTrue(uploadSuccess);
if (throwExceptionOnFinalizeUpload) {
throw new RuntimeException();
}
}, false, null), completionListener);
};

WriteContext writeContext = new WriteContext.Builder().fileName("write_blob_by_streams_max_retries")
.streamContextSupplier(streamContextSupplier)
.fileSize(bytes.length)
.failIfAlreadyExists(false)
.writePriority(WritePriority.NORMAL)
.uploadFinalizer(uploadFinalizer)
.doRemoteDataIntegrityCheck(false)
.build();

blobContainer.asyncBlobUpload(writeContext, completionListener);

assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS));
// wait for completableFuture to finish
Expand Down Expand Up @@ -563,24 +570,30 @@ private void testWriteBlobByStreamsLargeBlob(boolean expectException, boolean th
countDownLatch.countDown();
});
List<InputStream> openInputStreams = new ArrayList<>();
blobContainer.asyncBlobUpload(new WriteContext("write_large_blob", new StreamContextSupplier() {
@Override
public StreamContext supplyStreamContext(long partSize) {
return new StreamContext(new CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException>() {
@Override
public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException {
InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
}
}, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize));
}
}, blobSize, false, WritePriority.NORMAL, uploadSuccess -> {

StreamContextSupplier streamContextSupplier = partSize1 -> new StreamContext((partNo, size, position) -> {
InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
}, partSize1, calculateLastPartSize(blobSize, partSize1), calculateNumberOfParts(blobSize, partSize1));

CheckedConsumer<Boolean, IOException> uploadFinalizer = uploadSuccess -> {
assertTrue(uploadSuccess);
if (throwExceptionOnFinalizeUpload) {
throw new RuntimeException();
}
}, false, null), completionListener);
};

WriteContext writeContext = new WriteContext.Builder().fileName("write_large_blob")
.streamContextSupplier(streamContextSupplier)
.fileSize(blobSize)
.failIfAlreadyExists(false)
.writePriority(WritePriority.NORMAL)
.uploadFinalizer(uploadFinalizer)
.doRemoteDataIntegrityCheck(false)
.build();

blobContainer.asyncBlobUpload(writeContext, completionListener);

assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS));
if (expectException || throwExceptionOnFinalizeUpload) {
Expand Down Expand Up @@ -695,20 +708,23 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException, W

List<InputStream> openInputStreams = new ArrayList<>();
final S3BlobContainer s3BlobContainer = Mockito.spy(new S3BlobContainer(blobPath, blobStore));
s3BlobContainer.asyncBlobUpload(new WriteContext("write_large_blob", new StreamContextSupplier() {
@Override
public StreamContext supplyStreamContext(long partSize) {
return new StreamContext(new CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException>() {
@Override
public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException {
InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
}
}, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize));
}
}, blobSize, false, writePriority, uploadSuccess -> { assertTrue(uploadSuccess); }, false, null), completionListener);

StreamContextSupplier streamContextSupplier = partSize1 -> new StreamContext((partNo, size, position) -> {
InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
}, partSize1, calculateLastPartSize(blobSize, partSize1), calculateNumberOfParts(blobSize, partSize1));

WriteContext writeContext = new WriteContext.Builder().fileName("write_large_blob")
.streamContextSupplier(streamContextSupplier)
.fileSize(blobSize)
.failIfAlreadyExists(false)
.writePriority(writePriority)
.uploadFinalizer(Assert::assertTrue)
.doRemoteDataIntegrityCheck(false)
.build();

s3BlobContainer.asyncBlobUpload(writeContext, completionListener);
assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS));
if (expectException) {
assertNotNull(exceptionRef.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@

import org.apache.http.HttpStatus;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.common.CheckedTriFunction;
import org.opensearch.common.Nullable;
import org.opensearch.common.StreamContext;
import org.opensearch.common.SuppressForbidden;
Expand Down Expand Up @@ -375,22 +374,24 @@ public void testWriteBlobByStreamsWithRetries() throws Exception {
exceptionRef.set(ex);
countDownLatch.countDown();
});
blobContainer.asyncBlobUpload(new WriteContext("write_blob_by_streams_max_retries", new StreamContextSupplier() {
@Override
public StreamContext supplyStreamContext(long partSize) {
return new StreamContext(new CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException>() {
@Override
public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException {
InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
}
}, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize));
}
}, bytes.length, false, WritePriority.NORMAL, Assert::assertTrue, false, null), completionListener);

StreamContextSupplier streamContextSupplier = partSize -> new StreamContext((partNo, size, position) -> {
InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
}, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize));

WriteContext writeContext = new WriteContext.Builder().fileName("write_blob_by_streams_max_retries")
.streamContextSupplier(streamContextSupplier)
.fileSize(bytes.length)
.failIfAlreadyExists(false)
.writePriority(WritePriority.NORMAL)
.uploadFinalizer(Assert::assertTrue)
.doRemoteDataIntegrityCheck(false)
.build();

blobContainer.asyncBlobUpload(writeContext, completionListener);
assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS));

assertThat(countDown.isCountedDown(), is(true));

openInputStreams.forEach(inputStream -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void readBlobAsync(String blobName, ActionListener<ReadContext> listener)
InputStreamContainer blobPartStream = new InputStreamContainer(readBlob(blobName, offset, partSize), partSize, offset);
blobPartStreams.add(() -> CompletableFuture.completedFuture(blobPartStream));
}
ReadContext blobReadContext = new ReadContext(contentLength, blobPartStreams, null);
ReadContext blobReadContext = new ReadContext.Builder(contentLength, blobPartStreams).build();
listener.onResponse(blobReadContext);
} catch (Exception e) {
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.common.blobstore;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.core.action.ActionListener;

import java.io.IOException;
Expand Down Expand Up @@ -77,6 +78,20 @@ public interface BlobContainer {
*/
InputStream readBlob(String blobName) throws IOException;

/**
* Creates a new {@link BlobDownloadResponse} for the given blob name.
*
* @param blobName
* The name of the blob to get an {@link InputStream} for.
* @return The {@link BlobDownloadResponse} of the blob.
* @throws NoSuchFileException if the blob does not exist
* @throws IOException if the blob can not be read.
*/
@ExperimentalApi
default BlobDownloadResponse readBlobWithMetadata(String blobName) throws IOException {
throw new UnsupportedOperationException("readBlobWithMetadata is not implemented yet");
};

/**
* Creates a new {@link InputStream} that can be used to read the given blob starting from
* a specific {@code position} in the blob. The {@code length} is an indication of the
Expand Down Expand Up @@ -128,6 +143,36 @@ default long readBlobPreferredLength() {
*/
void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;

/**
* Reads blob content from the input stream and writes it to the container in a new blob with the given name, and metadata.
* This method assumes the container does not already contain a blob of the same blobName. If a blob by the
* same name already exists, the operation will fail and an {@link IOException} will be thrown.
*
* @param blobName
* The name of the blob to write the contents of the input stream to.
* @param inputStream
* The input stream from which to retrieve the bytes to write to the blob.
* @param metadata
* The metadata to be associate with the blob upload.
* @param blobSize
* The size of the blob to be written, in bytes. It is implementation dependent whether
* this value is used in writing the blob to the repository.
* @param failIfAlreadyExists
* whether to throw a FileAlreadyExistsException if the given blob already exists
* @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
*/
@ExperimentalApi
default void writeBlobWithMetadata(
String blobName,
InputStream inputStream,
Map<String, String> metadata,
long blobSize,
boolean failIfAlreadyExists
) throws IOException {
throw new UnsupportedOperationException("writeBlobWithMetadata is not implemented yet");
};

/**
* Reads blob content from the input stream and writes it to the container in a new blob with the given name,
* using an atomic write operation if the implementation supports it.
Expand All @@ -149,6 +194,38 @@ default long readBlobPreferredLength() {
*/
void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;

/**
* Reads blob content from the input stream and writes it to the container in a new blob with the given name, and metadata
* using an atomic write operation if the implementation supports it.
* <p>
* This method assumes the container does not already contain a blob of the same blobName. If a blob by the
* same name already exists, the operation will fail and an {@link IOException} will be thrown.
*
* @param blobName
* The name of the blob to write the contents of the input stream to.
* @param inputStream
* The input stream from which to retrieve the bytes to write to the blob.
* @param metadata
* The metadata to be associate with the blob upload.
* @param blobSize
* The size of the blob to be written, in bytes. It is implementation dependent whether
* this value is used in writing the blob to the repository.
* @param failIfAlreadyExists
* whether to throw a FileAlreadyExistsException if the given blob already exists
* @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
*/
@ExperimentalApi
default void writeBlobAtomicWithMetadata(
String blobName,
InputStream inputStream,
Map<String, String> metadata,
long blobSize,
boolean failIfAlreadyExists
) throws IOException {
throw new UnsupportedOperationException("writeBlobAtomicWithMetadata is not implemented yet");
};

/**
* Deletes this container and all its contents from the repository.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.blobstore;

import java.io.InputStream;
import java.util.Map;

/**
* Represents the response from a blob download operation, containing both the
* input stream of the blob content and the associated metadata.
*
* @opensearch.experimental
*/
public class BlobDownloadResponse {

/**
* Downloaded blob InputStream
*/
private final InputStream inputStream;

/**
* Metadata of the downloaded blob
*/
private final Map<String, String> metadata;

public InputStream getInputStream() {
return inputStream;
}

public Map<String, String> getMetadata() {
return metadata;
}

public BlobDownloadResponse(InputStream inputStream, Map<String, String> metadata) {
this.inputStream = inputStream;
this.metadata = metadata;
}

}
Loading

0 comments on commit fe89133

Please sign in to comment.