Skip to content

Commit

Permalink
Add support for S3TransferManager TransferListeners (#2770)
Browse files Browse the repository at this point in the history
Add support for S3TransferManager TransferListeners

This adds initial support for S3TransferManager TransferListeners. The
motivation and design is consistent as outlined in
#2729. It also addresses
some customer asks as mentioned in
#37. For more context,
see the discussion in #2770.

Every @SdkPublicApi has been thoroughly documented with its description
and usage instructions where applicable.
  • Loading branch information
Bennett-Lynch authored Oct 21, 2021
1 parent de77ae5 commit da53f77
Show file tree
Hide file tree
Showing 36 changed files with 2,421 additions and 62 deletions.
6 changes: 6 additions & 0 deletions .changes/next-release/feature-S3TransferManager-acedb56.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"category": "S3TransferManager",
"contributor": "",
"type": "feature",
"description": "Add support for S3TransferManager TransferListeners"
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
* Based on code developed by Brian Goetz and Tim Peierls and concepts
* published in 'Java Concurrency in Practice' by Brian Goetz, Tim Peierls,
* Joshua Bloch, Joseph Bowbeer, David Holmes and Doug Lea.
*
* @see Mutable
*/
@Documented
@Target(ElementType.TYPE)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.annotations;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* The class to which this annotation is applied is explicitly mutable,
* meaning that its state is subject to change between calls. Mutable
* classes offer no inherent guarantees on thread-safety. Where possible,
* classes may be further annotated as either {@link ThreadSafe} or
* {@link NotThreadSafe}.
*
* @see Immutable
*/
@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.CLASS)
@SdkProtectedApi
public @interface Mutable {
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,40 +26,45 @@
import org.junit.Test;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.testutils.RandomTempFile;
import software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener;
import software.amazon.awssdk.utils.Md5Utils;

public class S3TransferManagerDownloadIntegrationTest extends S3IntegrationTestBase {
private static final String BUCKET = temporaryBucketName(S3TransferManagerDownloadIntegrationTest.class);
private static final String KEY = "key";
private static S3TransferManager transferManager;
private static final int OBJ_SIZE = 16 * 1024 * 1024;
private static S3TransferManager tm;
private static File file;

@BeforeClass
public static void setup() throws IOException {
createBucket(BUCKET);
file = new RandomTempFile(10_000);
file = new RandomTempFile(OBJ_SIZE);
s3.putObject(PutObjectRequest.builder()
.bucket(BUCKET)
.key(KEY)
.build(), file.toPath());
transferManager = S3TransferManager.builder()
.s3ClientConfiguration(b -> b.region(DEFAULT_REGION)
tm = S3TransferManager.builder()
.s3ClientConfiguration(b -> b.region(DEFAULT_REGION)
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN))
.build();
.build();
}

@AfterClass
public static void cleanup() {
deleteBucketAndAllContents(BUCKET);
transferManager.close();
tm.close();
S3IntegrationTestBase.cleanUp();
}

@Test
public void download_shouldWork() throws IOException {
Path path = RandomTempFile.randomUncreatedFile().toPath();
Download download = transferManager.download(b -> b.getObjectRequest(r -> r.bucket(BUCKET).key(KEY))
.destination(path));
Download download = tm.download(DownloadRequest.builder()
.getObjectRequest(b -> b.bucket(BUCKET).key(KEY))
.destination(path)
.overrideConfiguration(b -> b.addListener(LoggingTransferListener.create()))
.build());
CompletedDownload completedDownload = download.completionFuture().join();
assertThat(Md5Utils.md5AsBase64(path.toFile())).isEqualTo(Md5Utils.md5AsBase64(file));
assertThat(completedDownload.response().responseMetadata().requestId()).isNotNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,23 @@

import static org.assertj.core.api.Assertions.assertThat;
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;

import java.io.IOException;
import java.nio.file.Files;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.transfer.s3.util.ChecksumUtils;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.testutils.RandomTempFile;
import software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener;
import software.amazon.awssdk.transfer.s3.util.ChecksumUtils;

public class S3TransferManagerUploadIntegrationTest extends S3IntegrationTestBase {
private static final String TEST_BUCKET = temporaryBucketName(S3TransferManagerUploadIntegrationTest.class);
private static final String TEST_KEY = "8mib_file.dat";
private static final int OBJ_SIZE = 8 * 1024 * 1024;
private static final String TEST_KEY = "16mib_file.dat";
private static final int OBJ_SIZE = 16 * 1024 * 1024;

private static RandomTempFile testFile;
private static S3TransferManager tm;
Expand Down Expand Up @@ -64,6 +66,7 @@ public void upload_fileSentCorrectly() throws IOException {
Upload upload = tm.upload(UploadRequest.builder()
.putObjectRequest(b -> b.bucket(TEST_BUCKET).key(TEST_KEY))
.source(testFile.toPath())
.overrideConfiguration(b -> b.addListener(LoggingTransferListener.create()))
.build());

CompletedUpload completedUpload = upload.completionFuture().join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.annotations.SdkPreviewApi;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.transfer.s3.progress.TransferProgress;

/**
* A download transfer of a single object from S3.
Expand All @@ -28,4 +29,9 @@ public interface Download extends Transfer {

@Override
CompletableFuture<CompletedDownload> completionFuture();

/**
* The stateful {@link TransferProgress} associated with this transfer.
*/
TransferProgress progress();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
import java.io.File;
import java.nio.file.Path;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import software.amazon.awssdk.annotations.NotThreadSafe;
import software.amazon.awssdk.annotations.SdkPreviewApi;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.utils.ToString;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.builder.CopyableBuilder;
import software.amazon.awssdk.utils.builder.ToCopyableBuilder;
Expand All @@ -35,10 +37,12 @@
public final class DownloadRequest implements TransferRequest, ToCopyableBuilder<DownloadRequest.Builder, DownloadRequest> {
private final Path destination;
private final GetObjectRequest getObjectRequest;
private final TransferRequestOverrideConfiguration overrideConfiguration;

private DownloadRequest(BuilderImpl builder) {
this.destination = Validate.paramNotNull(builder.destination, "destination");
this.getObjectRequest = Validate.paramNotNull(builder.getObjectRequest, "getObjectRequest");
this.overrideConfiguration = builder.configuration;
}

/**
Expand Down Expand Up @@ -72,6 +76,23 @@ public GetObjectRequest getObjectRequest() {
return getObjectRequest;
}

/**
* @return the optional override configuration
* @see Builder#overrideConfiguration(TransferRequestOverrideConfiguration)
*/
public Optional<TransferRequestOverrideConfiguration> overrideConfiguration() {
return Optional.ofNullable(overrideConfiguration);
}

@Override
public String toString() {
return ToString.builder("DownloadRequest")
.add("destination", destination)
.add("getObjectRequest", getObjectRequest)
.add("overrideConfiguration", overrideConfiguration)
.build();
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -86,13 +107,17 @@ public boolean equals(Object o) {
if (!Objects.equals(destination, that.destination)) {
return false;
}
return Objects.equals(getObjectRequest, that.getObjectRequest);
if (!Objects.equals(getObjectRequest, that.getObjectRequest)) {
return false;
}
return Objects.equals(overrideConfiguration, that.overrideConfiguration);
}

@Override
public int hashCode() {
int result = destination != null ? destination.hashCode() : 0;
result = 31 * result + (getObjectRequest != null ? getObjectRequest.hashCode() : 0);
result = 31 * result + (overrideConfiguration != null ? overrideConfiguration.hashCode() : 0);
return result;
}

Expand Down Expand Up @@ -157,6 +182,31 @@ default Builder getObjectRequest(Consumer<GetObjectRequest.Builder> getObjectReq
return this;
}

/**
* Add an optional request override configuration.
*
* @param configuration The override configuration.
* @return This builder for method chaining.
*/
Builder overrideConfiguration(TransferRequestOverrideConfiguration configuration);

/**
* Similar to {@link #overrideConfiguration(TransferRequestOverrideConfiguration)}, but takes a lambda to configure a new
* {@link TransferRequestOverrideConfiguration.Builder}. This removes the need to call
* {@link TransferRequestOverrideConfiguration#builder()} and
* {@link TransferRequestOverrideConfiguration.Builder#build()}.
*
* @param configurationBuilder the upload configuration
* @return this builder for method chaining.
* @see #overrideConfiguration(TransferRequestOverrideConfiguration)
*/
default Builder overrideConfiguration(Consumer<TransferRequestOverrideConfiguration.Builder> configurationBuilder) {
Validate.paramNotNull(configurationBuilder, "configurationBuilder");
return overrideConfiguration(TransferRequestOverrideConfiguration.builder()
.applyMutation(configurationBuilder)
.build());
}

/**
* @return The built request.
*/
Expand All @@ -167,6 +217,7 @@ default Builder getObjectRequest(Consumer<GetObjectRequest.Builder> getObjectReq
private static final class BuilderImpl implements Builder {
private Path destination;
private GetObjectRequest getObjectRequest;
private TransferRequestOverrideConfiguration configuration;

private BuilderImpl() {
}
Expand Down Expand Up @@ -199,6 +250,20 @@ public void setGetObjectRequest(GetObjectRequest getObjectRequest) {
getObjectRequest(getObjectRequest);
}

@Override
public Builder overrideConfiguration(TransferRequestOverrideConfiguration configuration) {
this.configuration = configuration;
return this;
}

public void setOverrideConfiguration(TransferRequestOverrideConfiguration configuration) {
overrideConfiguration(configuration);
}

public TransferRequestOverrideConfiguration getOverrideConfiguration() {
return configuration;
}

@Override
public DownloadRequest build() {
return new DownloadRequest(this);
Expand Down
Loading

0 comments on commit da53f77

Please sign in to comment.