Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement interface changes for s3 plugin to read/write blob with obj… #13765

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
import software.amazon.awssdk.utils.CollectionUtils;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -77,6 +78,7 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStoreException;
import org.opensearch.common.blobstore.DeleteResult;
import org.opensearch.common.blobstore.FetchBlobResult;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
Expand Down Expand Up @@ -139,6 +141,13 @@
}
}

@ExperimentalApi
@Override
public FetchBlobResult readBlobWithMetadata(String blobName) throws IOException {
S3RetryingInputStream s3RetryingInputStream = new S3RetryingInputStream(blobStore, buildKey(blobName));
return new FetchBlobResult(s3RetryingInputStream, s3RetryingInputStream.getMetadata());

Check warning on line 148 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java#L147-L148

Added lines #L147 - L148 were not covered by tests
}

@Override
public InputStream readBlob(String blobName) throws IOException {
return new S3RetryingInputStream(blobStore, buildKey(blobName));
Expand Down Expand Up @@ -170,12 +179,27 @@
*/
@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
writeBlobWithMetadata(blobName, inputStream, blobSize, failIfAlreadyExists, null);
}

/**
* Write blob with its object metadata.
*/
@ExperimentalApi
@Override
public void writeBlobWithMetadata(
String blobName,
InputStream inputStream,
long blobSize,
boolean failIfAlreadyExists,
@Nullable Map<String, String> metadata
) throws IOException {
assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests";
SocketAccess.doPrivilegedIOException(() -> {
if (blobSize <= getLargeBlobThresholdInBytes()) {
executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize);
executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata);
} else {
executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize);
executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata);
}
return null;
});
Expand All @@ -191,7 +215,8 @@
writeContext.getUploadFinalizer(),
writeContext.doRemoteDataIntegrityCheck(),
writeContext.getExpectedChecksum(),
blobStore.isUploadRetryEnabled()
blobStore.isUploadRetryEnabled(),
writeContext.getMetadata()
);
try {
// If file size is greater than the queue capacity than SizeBasedBlockingQ will always reject the upload.
Expand All @@ -211,7 +236,8 @@
blobStore,
uploadRequest.getKey(),
inputStream.getInputStream(),
uploadRequest.getContentLength()
uploadRequest.getContentLength(),
uploadRequest.getMetadata()
);
completionListener.onResponse(null);
} catch (Exception ex) {
Expand Down Expand Up @@ -582,8 +608,13 @@
/**
* Uploads a blob using a single upload request
*/
void executeSingleUpload(final S3BlobStore blobStore, final String blobName, final InputStream input, final long blobSize)
throws IOException {
void executeSingleUpload(
final S3BlobStore blobStore,
final String blobName,
final InputStream input,
final long blobSize,
final Map<String, String> metadata
) throws IOException {

// Extra safety checks
if (blobSize > MAX_FILE_SIZE.getBytes()) {
Expand All @@ -600,6 +631,10 @@
.storageClass(blobStore.getStorageClass())
.acl(blobStore.getCannedACL())
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().putObjectMetricPublisher));

if (CollectionUtils.isNotEmpty(metadata)) {
putObjectRequestBuilder = putObjectRequestBuilder.metadata(metadata);
}
if (blobStore.serverSideEncryption()) {
putObjectRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
}
Expand All @@ -623,8 +658,13 @@
/**
* Uploads a blob using multipart upload requests.
*/
void executeMultipartUpload(final S3BlobStore blobStore, final String blobName, final InputStream input, final long blobSize)
throws IOException {
void executeMultipartUpload(
final S3BlobStore blobStore,
final String blobName,
final InputStream input,
final long blobSize,
final Map<String, String> metadata
) throws IOException {

ensureMultiPartUploadSize(blobSize);
final long partSize = blobStore.bufferSizeInBytes();
Expand All @@ -649,6 +689,10 @@
.acl(blobStore.getCannedACL())
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().multipartUploadMetricCollector));

if (CollectionUtils.isNotEmpty(metadata)) {
createMultipartUploadRequestBuilder.metadata(metadata);
}

if (blobStore.serverSideEncryption()) {
createMultipartUploadRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand Down Expand Up @@ -77,6 +78,7 @@ class S3RetryingInputStream extends InputStream {
private long currentOffset;
private boolean closed;
private boolean eof;
private Map<String, String> metadata;

S3RetryingInputStream(S3BlobStore blobStore, String blobKey) throws IOException {
this(blobStore, blobKey, 0, Long.MAX_VALUE - 1);
Expand Down Expand Up @@ -122,6 +124,7 @@ private void openStream() throws IOException {
getObjectResponseInputStream.response().contentLength()
);
this.currentStream = getObjectResponseInputStream;
this.metadata = getObjectResponseInputStream.response().metadata();
this.isStreamAborted.set(false);
} catch (final SdkException e) {
if (e instanceof S3Exception) {
Expand Down Expand Up @@ -265,4 +268,8 @@ boolean isEof() {
boolean isAborted() {
return isStreamAborted.get();
}

Map<String, String> getMetadata() {
return this.metadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.awssdk.utils.CompletableFutureUtils;

import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -154,6 +155,10 @@ private void uploadInParts(
.bucket(uploadRequest.getBucket())
.key(uploadRequest.getKey())
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector));

if (CollectionUtils.isNotEmpty(uploadRequest.getMetadata())) {
createMultipartUploadRequestBuilder.metadata(uploadRequest.getMetadata());
}
if (uploadRequest.doRemoteDataIntegrityCheck()) {
createMultipartUploadRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32);
}
Expand Down Expand Up @@ -352,6 +357,10 @@ private void uploadInOneChunk(
.key(uploadRequest.getKey())
.contentLength(uploadRequest.getContentLength())
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.putObjectMetricPublisher));

if (CollectionUtils.isNotEmpty(uploadRequest.getMetadata())) {
putObjectRequestBuilder.metadata(uploadRequest.getMetadata());
}
if (uploadRequest.doRemoteDataIntegrityCheck()) {
putObjectRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32);
putObjectRequestBuilder.checksumCRC32(base64StringFromLong(uploadRequest.getExpectedChecksum()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
package org.opensearch.repositories.s3.async;

import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.Nullable;
import org.opensearch.common.blobstore.stream.write.WritePriority;

import java.io.IOException;
import java.util.Map;

/**
* A model encapsulating all details for an upload to S3
Expand All @@ -25,6 +27,7 @@ public class UploadRequest {
private final boolean doRemoteDataIntegrityCheck;
private final Long expectedChecksum;
private final boolean uploadRetryEnabled;
private final Map<String, String> metadata;

/**
* Construct a new UploadRequest object
Expand All @@ -36,6 +39,7 @@ public class UploadRequest {
* @param uploadFinalizer An upload finalizer to call once all parts are uploaded
* @param doRemoteDataIntegrityCheck A boolean to inform vendor plugins whether remote data integrity checks need to be done
* @param expectedChecksum Checksum of the file being uploaded for remote data integrity check
* @param metadata Metadata of the file being uploaded
*/
public UploadRequest(
String bucket,
Expand All @@ -45,7 +49,8 @@ public UploadRequest(
CheckedConsumer<Boolean, IOException> uploadFinalizer,
boolean doRemoteDataIntegrityCheck,
Long expectedChecksum,
boolean uploadRetryEnabled
boolean uploadRetryEnabled,
@Nullable Map<String, String> metadata
) {
this.bucket = bucket;
this.key = key;
Expand All @@ -55,6 +60,7 @@ public UploadRequest(
this.doRemoteDataIntegrityCheck = doRemoteDataIntegrityCheck;
this.expectedChecksum = expectedChecksum;
this.uploadRetryEnabled = uploadRetryEnabled;
this.metadata = metadata;
}

public String getBucket() {
Expand Down Expand Up @@ -88,4 +94,11 @@ public Long getExpectedChecksum() {
public boolean isUploadRetryEnabled() {
return uploadRetryEnabled;
}

/**
* @return metadata of the blob to be uploaded
*/
public Map<String, String> getMetadata() {
return metadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
Expand All @@ -79,6 +80,7 @@

import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -722,6 +724,7 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException, W
.writePriority(writePriority)
.uploadFinalizer(Assert::assertTrue)
.doRemoteDataIntegrityCheck(false)
.metadata(new HashMap<>())
.build();

s3BlobContainer.asyncBlobUpload(writeContext, completionListener);
Expand All @@ -731,7 +734,13 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException, W
} else {
assertNull(exceptionRef.get());
}
verify(s3BlobContainer, times(1)).executeMultipartUpload(any(S3BlobStore.class), anyString(), any(InputStream.class), anyLong());
verify(s3BlobContainer, times(1)).executeMultipartUpload(
any(S3BlobStore.class),
anyString(),
any(InputStream.class),
anyLong(),
anyMap()
);

if (expectException) {
verify(client, times(1)).abortMultipartUpload(any(AbortMultipartUploadRequest.class));
Expand Down
Loading
Loading