From 519682e320df557cb869bb5152f9b7766e4ee0fd Mon Sep 17 00:00:00 2001 From: Devin Smith Date: Wed, 1 May 2024 14:43:21 -0700 Subject: [PATCH] Added S3 test tools to help with directory uploading (#5442) As part of this, the bootstrapping S3 client was changed from sync to async to support S3TransferManager. Deleting keys has also been automated (unit tests no longer responsible for keeping track of keys used). SingletonContainers, LocalStack, and MinIO have been made public; this allows the container bootstrapping logic to be used by other projects' tests. These changes should make it easier for Iceberg testing in #5277 --- extensions/s3/build.gradle | 1 + .../s3/S3SeekableChannelLocalStackTest.java | 8 +- .../s3/S3SeekableChannelMinIOTest.java | 8 +- .../s3/S3SeekableChannelTestBase.java | 116 ++++++++++++------ .../extensions/s3/testlib/S3Helper.java | 88 +++++++++++++ .../s3/{ => testlib}/SingletonContainers.java | 32 ++--- .../extensions/s3/readSimpleFiles/empty.txt | 0 .../s3/readSimpleFiles/hello/world.txt | 1 + 8 files changed, 195 insertions(+), 59 deletions(-) create mode 100644 extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/S3Helper.java rename extensions/s3/src/test/java/io/deephaven/extensions/s3/{ => testlib}/SingletonContainers.java (80%) create mode 100644 extensions/s3/src/test/resources/io/deephaven/extensions/s3/readSimpleFiles/empty.txt create mode 100644 extensions/s3/src/test/resources/io/deephaven/extensions/s3/readSimpleFiles/hello/world.txt diff --git a/extensions/s3/build.gradle b/extensions/s3/build.gradle index d96e2cf4713..64954314387 100644 --- a/extensions/s3/build.gradle +++ b/extensions/s3/build.gradle @@ -31,6 +31,7 @@ dependencies { testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine' testRuntimeOnly 'org.junit.platform:junit-platform-launcher' + testImplementation 'software.amazon.awssdk:s3-transfer-manager' testImplementation "org.testcontainers:testcontainers:1.19.4" testImplementation "org.testcontainers:junit-jupiter:1.19.4" testImplementation "org.testcontainers:localstack:1.19.4" diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelLocalStackTest.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelLocalStackTest.java index 63c47e5d498..41daca3e3a6 100644 --- a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelLocalStackTest.java +++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelLocalStackTest.java @@ -5,10 +5,10 @@ import io.deephaven.extensions.s3.S3Instructions.Builder; -import io.deephaven.extensions.s3.SingletonContainers.LocalStack; +import io.deephaven.extensions.s3.testlib.SingletonContainers.LocalStack; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Tag; -import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3AsyncClient; @Tag("testcontainers") public class S3SeekableChannelLocalStackTest extends S3SeekableChannelTestBase { @@ -25,7 +25,7 @@ public Builder s3Instructions(Builder builder) { } @Override - public S3Client s3Client() { - return LocalStack.s3Client(); + public S3AsyncClient s3AsyncClient() { + return LocalStack.s3AsyncClient(); } } diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelMinIOTest.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelMinIOTest.java index b7c2464c4c7..e427fd3e64e 100644 --- a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelMinIOTest.java +++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelMinIOTest.java @@ -5,12 +5,12 @@ import io.deephaven.extensions.s3.S3Instructions.Builder; -import io.deephaven.extensions.s3.SingletonContainers.MinIO; +import io.deephaven.extensions.s3.testlib.SingletonContainers.MinIO; import io.deephaven.stats.util.OSUtil; import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Tag; -import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3AsyncClient; @Tag("testcontainers") public class S3SeekableChannelMinIOTest extends S3SeekableChannelTestBase { @@ -29,7 +29,7 @@ public Builder s3Instructions(Builder builder) { } @Override - public S3Client s3Client() { - return MinIO.s3Client(); + public S3AsyncClient s3AsyncClient() { + return MinIO.s3AsyncClient(); } } diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelTestBase.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelTestBase.java index 41a788ea634..74e941966f2 100644 --- a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelTestBase.java +++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelTestBase.java @@ -4,82 +4,101 @@ package io.deephaven.extensions.s3; +import io.deephaven.extensions.s3.testlib.S3Helper; import io.deephaven.util.channel.CachedChannelProvider; import io.deephaven.util.channel.SeekableChannelContext; import io.deephaven.util.channel.SeekableChannelsProvider; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import software.amazon.awssdk.core.sync.RequestBody; -import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; -import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import java.io.IOException; import java.io.InputStream; import java.net.URI; +import java.net.URISyntaxException; import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; import java.nio.channels.SeekableByteChannel; -import java.util.ArrayList; -import java.util.List; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.time.Duration; import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.assertj.core.api.Assertions.assertThat; public abstract class S3SeekableChannelTestBase { - public abstract S3Client s3Client(); + public abstract S3AsyncClient s3AsyncClient(); public abstract S3Instructions.Builder s3Instructions(S3Instructions.Builder builder); - private S3Client client; - + private ExecutorService executor; + private S3AsyncClient asyncClient; private String bucket; - private final List keys = new ArrayList<>(); - @BeforeEach - void setUp() { + void setUp() throws ExecutionException, InterruptedException, TimeoutException { + executor = Executors.newCachedThreadPool(); bucket = UUID.randomUUID().toString(); - client = s3Client(); - client.createBucket(CreateBucketRequest.builder().bucket(bucket).build()); + asyncClient = s3AsyncClient(); + asyncClient.createBucket(CreateBucketRequest.builder().bucket(bucket).build()).get(5, TimeUnit.SECONDS); } @AfterEach - void tearDown() { - for (String key : keys) { - client.deleteObject(DeleteObjectRequest.builder().bucket(bucket).key(key).build()); - } - keys.clear(); - client.deleteBucket(DeleteBucketRequest.builder().bucket(bucket).build()); - client.close(); + void tearDown() throws ExecutionException, InterruptedException, TimeoutException { + S3Helper.deleteAllKeys(asyncClient, bucket); + asyncClient.deleteBucket(DeleteBucketRequest.builder().bucket(bucket).build()).get(5, TimeUnit.SECONDS); + asyncClient.close(); + executor.shutdownNow(); } @Test - void readEmptyFile() throws IOException { - putObject("empty.txt", RequestBody.empty()); - final URI uri = uri("empty.txt"); - final ByteBuffer buffer = ByteBuffer.allocate(1); - try ( - final SeekableChannelsProvider providerImpl = providerImpl(uri); - final SeekableChannelsProvider provider = new CachedChannelProvider(providerImpl, 32); - final SeekableChannelContext context = provider.makeContext(); - final SeekableByteChannel readChannel = provider.getReadChannel(context, uri)) { - assertThat(readChannel.read(buffer)).isEqualTo(-1); + void readSimpleFiles() + throws IOException, URISyntaxException, ExecutionException, InterruptedException, TimeoutException { + uploadDirectory("readSimpleFiles"); + { + final URI uri = uri("empty.txt"); + final ByteBuffer buffer = ByteBuffer.allocate(1); + try ( + final SeekableChannelsProvider providerImpl = providerImpl(uri); + final SeekableChannelsProvider provider = new CachedChannelProvider(providerImpl, 32); + final SeekableChannelContext context = provider.makeContext(); + final SeekableByteChannel readChannel = provider.getReadChannel(context, uri)) { + assertThat(readChannel.read(buffer)).isEqualTo(-1); + } + } + { + final URI uri = uri("hello/world.txt"); + try ( + final SeekableChannelsProvider providerImpl = providerImpl(uri); + final SeekableChannelsProvider provider = new CachedChannelProvider(providerImpl, 32); + final SeekableChannelContext context = provider.makeContext(); + final SeekableByteChannel readChannel = provider.getReadChannel(context, uri)) { + final ByteBuffer bytes = readAll(readChannel, 32); + assertThat(bytes).isEqualTo(ByteBuffer.wrap("Hello, world!".getBytes(StandardCharsets.UTF_8))); + } } } @Test - void read32MiB() throws IOException { + void read32MiB() throws IOException, ExecutionException, InterruptedException, TimeoutException { final int numBytes = 33554432; - putObject("32MiB.bin", RequestBody.fromInputStream(new InputStream() { + putObject("32MiB.bin", AsyncRequestBody.fromInputStream(new InputStream() { @Override public int read() { return 42; } - }, numBytes)); + }, (long) numBytes, executor)); final URI uri = uri("32MiB.bin"); final ByteBuffer buffer = ByteBuffer.allocate(1); try ( @@ -96,13 +115,24 @@ public int read() { } } + private void uploadDirectory(String resourceDir) + throws URISyntaxException, ExecutionException, InterruptedException, TimeoutException { + S3Helper.uploadDirectory( + asyncClient, + Path.of(S3SeekableChannelTestBase.class.getResource(resourceDir).toURI()), + bucket, + null, + Duration.ofSeconds(5)); + } + private URI uri(String key) { return URI.create(String.format("s3://%s/%s", bucket, key)); } - private void putObject(String key, RequestBody body) { - client.putObject(PutObjectRequest.builder().bucket(bucket).key(key).build(), body); - keys.add(key); + private void putObject(String key, AsyncRequestBody body) + throws ExecutionException, InterruptedException, TimeoutException { + asyncClient.putObject(PutObjectRequest.builder().bucket(bucket).key(key).build(), body).get(5, + TimeUnit.SECONDS); } private SeekableChannelsProvider providerImpl(URI uri) { @@ -110,4 +140,18 @@ private SeekableChannelsProvider providerImpl(URI uri) { final S3Instructions instructions = s3Instructions(S3Instructions.builder()).build(); return plugin.createProvider(uri, instructions); } + + private static ByteBuffer readAll(ReadableByteChannel channel, int maxBytes) throws IOException { + final ByteBuffer dst = ByteBuffer.allocate(maxBytes); + while (dst.remaining() > 0 && channel.read(dst) != -1) { + // continue + } + if (dst.remaining() == 0) { + if (channel.read(ByteBuffer.allocate(1)) != -1) { + throw new RuntimeException(String.format("channel has more than %d bytes", maxBytes)); + } + } + dst.flip(); + return dst; + } } diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/S3Helper.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/S3Helper.java new file mode 100644 index 00000000000..6d2b839f471 --- /dev/null +++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/S3Helper.java @@ -0,0 +1,88 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.extensions.s3.testlib; + +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.Delete; +import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.ObjectIdentifier; +import software.amazon.awssdk.services.s3.model.S3Object; +import software.amazon.awssdk.transfer.s3.S3TransferManager; +import software.amazon.awssdk.transfer.s3.model.CompletedDirectoryUpload; +import software.amazon.awssdk.transfer.s3.model.DirectoryUpload; +import software.amazon.awssdk.transfer.s3.model.UploadDirectoryRequest; + +import java.nio.file.Path; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +public final class S3Helper { + public static void uploadDirectory( + S3AsyncClient s3AsyncClient, + Path dir, + String bucket, + String prefix, + Duration timeout) throws ExecutionException, InterruptedException, TimeoutException { + try (final S3TransferManager manager = S3TransferManager.builder().s3Client(s3AsyncClient).build()) { + uploadDirectory(manager, dir, bucket, prefix, timeout); + } + } + + public static void uploadDirectory( + S3TransferManager transferManager, + Path dir, + String bucket, + String prefix, + Duration timeout) throws ExecutionException, InterruptedException, TimeoutException { + // Not a way to get a list of the uploaded files, even when using a TransferListener. + final DirectoryUpload directoryUpload = transferManager.uploadDirectory(UploadDirectoryRequest.builder() + .source(dir) + .bucket(bucket) + .s3Prefix(prefix) + .build()); + final CompletedDirectoryUpload upload = + directoryUpload.completionFuture().get(timeout.toNanos(), TimeUnit.NANOSECONDS); + if (!upload.failedTransfers().isEmpty()) { + throw new RuntimeException("Upload has failed transfers"); + } + } + + public static void deleteAllKeys(S3AsyncClient s3AsyncClient, String bucket) + throws ExecutionException, InterruptedException, TimeoutException { + ListObjectsV2Response response = s3AsyncClient + .listObjectsV2(ListObjectsV2Request.builder().bucket(bucket).build()).get(5, TimeUnit.SECONDS); + final List> futures = new ArrayList<>(); + while (true) { + final List deletes = response.contents() + .stream() + .map(S3Object::key) + .map(S3Helper::objectId) + .collect(Collectors.toList()); + futures.add(s3AsyncClient.deleteObjects(DeleteObjectsRequest.builder() + .bucket(bucket) + .delete(Delete.builder().objects(deletes).build()) + .build())); + final String nextContinuationToken = response.nextContinuationToken(); + if (nextContinuationToken == null) { + break; + } + response = s3AsyncClient.listObjectsV2( + ListObjectsV2Request.builder().bucket(bucket).continuationToken(nextContinuationToken).build()) + .get(5, TimeUnit.SECONDS); + } + CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new)).get(5, TimeUnit.SECONDS); + } + + private static ObjectIdentifier objectId(String o) { + return ObjectIdentifier.builder().key(o).build(); + } +} diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/SingletonContainers.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/SingletonContainers.java similarity index 80% rename from extensions/s3/src/test/java/io/deephaven/extensions/s3/SingletonContainers.java rename to extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/SingletonContainers.java index f4432b6e1e4..3711235a4ec 100644 --- a/extensions/s3/src/test/java/io/deephaven/extensions/s3/SingletonContainers.java +++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/SingletonContainers.java @@ -1,8 +1,10 @@ // // Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending // -package io.deephaven.extensions.s3; +package io.deephaven.extensions.s3.testlib; +import io.deephaven.extensions.s3.Credentials; +import io.deephaven.extensions.s3.S3Instructions.Builder; import org.testcontainers.DockerClientFactory; import org.testcontainers.containers.MinIOContainer; import org.testcontainers.containers.localstack.LocalStackContainer; @@ -11,18 +13,18 @@ import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3AsyncClient; import java.net.URI; -final class SingletonContainers { +public final class SingletonContainers { // This pattern allows the respective images to be spun up as a container once per-JVM as opposed to once per-class // or once per-test. // https://java.testcontainers.org/test_framework_integration/manual_lifecycle_control/#singleton-containers // https://testcontainers.com/guides/testcontainers-container-lifecycle/#_using_singleton_containers - static final class LocalStack { + public static final class LocalStack { private static final LocalStackContainer LOCALSTACK_S3 = new LocalStackContainer(DockerImageName.parse(System.getProperty("testcontainers.localstack.image"))) .withServices(Service.S3); @@ -30,20 +32,20 @@ static final class LocalStack { LOCALSTACK_S3.start(); } - static void init() { + public static void init() { // no-op, ensures this class is initialized } - static S3Instructions.Builder s3Instructions(S3Instructions.Builder builder) { + public static Builder s3Instructions(Builder builder) { return builder .endpointOverride(LOCALSTACK_S3.getEndpoint()) .regionName(LOCALSTACK_S3.getRegion()) .credentials(Credentials.basic(LOCALSTACK_S3.getAccessKey(), LOCALSTACK_S3.getSecretKey())); } - static S3Client s3Client() { - return S3Client - .builder() + public static S3AsyncClient s3AsyncClient() { + return S3AsyncClient + .crtBuilder() .endpointOverride(LOCALSTACK_S3.getEndpoint()) .region(Region.of(LOCALSTACK_S3.getRegion())) .credentialsProvider(StaticCredentialsProvider.create( @@ -52,7 +54,7 @@ static S3Client s3Client() { } } - static final class MinIO { + public static final class MinIO { // MINIO_DOMAIN is set so MinIO will accept virtual-host style requests; see virtual-host style implementation // comments in S3Instructions. // https://min.io/docs/minio/linux/reference/minio-server/settings/core.html#domain @@ -63,20 +65,20 @@ static final class MinIO { MINIO.start(); } - static void init() { + public static void init() { // no-op, ensures this class is initialized } - static S3Instructions.Builder s3Instructions(S3Instructions.Builder builder) { + public static Builder s3Instructions(Builder builder) { return builder .endpointOverride(URI.create(MINIO.getS3URL())) .regionName(Region.AWS_GLOBAL.id()) .credentials(Credentials.basic(MINIO.getUserName(), MINIO.getPassword())); } - static S3Client s3Client() { - return S3Client - .builder() + public static S3AsyncClient s3AsyncClient() { + return S3AsyncClient + .crtBuilder() .endpointOverride(URI.create(MINIO.getS3URL())) .region(Region.AWS_GLOBAL) .credentialsProvider(StaticCredentialsProvider.create( diff --git a/extensions/s3/src/test/resources/io/deephaven/extensions/s3/readSimpleFiles/empty.txt b/extensions/s3/src/test/resources/io/deephaven/extensions/s3/readSimpleFiles/empty.txt new file mode 100644 index 00000000000..e69de29bb2d diff --git a/extensions/s3/src/test/resources/io/deephaven/extensions/s3/readSimpleFiles/hello/world.txt b/extensions/s3/src/test/resources/io/deephaven/extensions/s3/readSimpleFiles/hello/world.txt new file mode 100644 index 00000000000..5dd01c177f5 --- /dev/null +++ b/extensions/s3/src/test/resources/io/deephaven/extensions/s3/readSimpleFiles/hello/world.txt @@ -0,0 +1 @@ +Hello, world! \ No newline at end of file