Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added S3 test tools to help with directory uploading #5442

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions extensions/s3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies {
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'

testImplementation 'software.amazon.awssdk:s3-transfer-manager'
testImplementation "org.testcontainers:testcontainers:1.19.4"
testImplementation "org.testcontainers:junit-jupiter:1.19.4"
testImplementation "org.testcontainers:localstack:1.19.4"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@


import io.deephaven.extensions.s3.S3Instructions.Builder;
import io.deephaven.extensions.s3.SingletonContainers.LocalStack;
import io.deephaven.extensions.s3.testlib.SingletonContainers.LocalStack;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3AsyncClient;

@Tag("testcontainers")
public class S3SeekableChannelLocalStackTest extends S3SeekableChannelTestBase {
Expand All @@ -25,7 +25,7 @@ public Builder s3Instructions(Builder builder) {
}

@Override
public S3Client s3Client() {
return LocalStack.s3Client();
public S3AsyncClient s3AsyncClient() {
return LocalStack.s3AsyncClient();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@


import io.deephaven.extensions.s3.S3Instructions.Builder;
import io.deephaven.extensions.s3.SingletonContainers.MinIO;
import io.deephaven.extensions.s3.testlib.SingletonContainers.MinIO;
import io.deephaven.stats.util.OSUtil;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3AsyncClient;

@Tag("testcontainers")
public class S3SeekableChannelMinIOTest extends S3SeekableChannelTestBase {
Expand All @@ -29,7 +29,7 @@ public Builder s3Instructions(Builder builder) {
}

@Override
public S3Client s3Client() {
return MinIO.s3Client();
public S3AsyncClient s3AsyncClient() {
return MinIO.s3AsyncClient();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,82 +4,101 @@
package io.deephaven.extensions.s3;


import io.deephaven.extensions.s3.testlib.S3Helper;
import io.deephaven.util.channel.CachedChannelProvider;
import io.deephaven.util.channel.SeekableChannelContext;
import io.deephaven.util.channel.SeekableChannelsProvider;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.util.ArrayList;
import java.util.List;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.assertj.core.api.Assertions.assertThat;

public abstract class S3SeekableChannelTestBase {

public abstract S3Client s3Client();
public abstract S3AsyncClient s3AsyncClient();

public abstract S3Instructions.Builder s3Instructions(S3Instructions.Builder builder);

private S3Client client;

private ExecutorService executor;
private S3AsyncClient asyncClient;
private String bucket;

private final List<String> keys = new ArrayList<>();

@BeforeEach
void setUp() {
void setUp() throws ExecutionException, InterruptedException, TimeoutException {
executor = Executors.newCachedThreadPool();
bucket = UUID.randomUUID().toString();
client = s3Client();
client.createBucket(CreateBucketRequest.builder().bucket(bucket).build());
asyncClient = s3AsyncClient();
asyncClient.createBucket(CreateBucketRequest.builder().bucket(bucket).build()).get(5, TimeUnit.SECONDS);
}

@AfterEach
void tearDown() {
for (String key : keys) {
client.deleteObject(DeleteObjectRequest.builder().bucket(bucket).key(key).build());
}
keys.clear();
client.deleteBucket(DeleteBucketRequest.builder().bucket(bucket).build());
client.close();
void tearDown() throws ExecutionException, InterruptedException, TimeoutException {
S3Helper.deleteAllKeys(asyncClient, bucket);
asyncClient.deleteBucket(DeleteBucketRequest.builder().bucket(bucket).build()).get(5, TimeUnit.SECONDS);
Comment on lines +59 to +60
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More a question than a comment: is there a way to upload a set of files and share between individual tests? Am I reading this correctly that each test must do its own setup / upload?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's just how this specific test orchestrates them. You can definitely upload once per class (org.junit.jupiter.api.BeforeAll JUnit 5, org.junit.BeforeClass JUnit 4), or even statically once per JVM.

asyncClient.close();
executor.shutdownNow();
}

@Test
void readEmptyFile() throws IOException {
putObject("empty.txt", RequestBody.empty());
final URI uri = uri("empty.txt");
final ByteBuffer buffer = ByteBuffer.allocate(1);
try (
final SeekableChannelsProvider providerImpl = providerImpl(uri);
final SeekableChannelsProvider provider = new CachedChannelProvider(providerImpl, 32);
final SeekableChannelContext context = provider.makeContext();
final SeekableByteChannel readChannel = provider.getReadChannel(context, uri)) {
assertThat(readChannel.read(buffer)).isEqualTo(-1);
void readSimpleFiles()
throws IOException, URISyntaxException, ExecutionException, InterruptedException, TimeoutException {
uploadDirectory("readSimpleFiles");
{
final URI uri = uri("empty.txt");
final ByteBuffer buffer = ByteBuffer.allocate(1);
try (
final SeekableChannelsProvider providerImpl = providerImpl(uri);
final SeekableChannelsProvider provider = new CachedChannelProvider(providerImpl, 32);
final SeekableChannelContext context = provider.makeContext();
final SeekableByteChannel readChannel = provider.getReadChannel(context, uri)) {
assertThat(readChannel.read(buffer)).isEqualTo(-1);
}
}
{
final URI uri = uri("hello/world.txt");
try (
final SeekableChannelsProvider providerImpl = providerImpl(uri);
final SeekableChannelsProvider provider = new CachedChannelProvider(providerImpl, 32);
final SeekableChannelContext context = provider.makeContext();
final SeekableByteChannel readChannel = provider.getReadChannel(context, uri)) {
final ByteBuffer bytes = readAll(readChannel, 32);
assertThat(bytes).isEqualTo(ByteBuffer.wrap("Hello, world!".getBytes(StandardCharsets.UTF_8)));
}
}
}

@Test
void read32MiB() throws IOException {
void read32MiB() throws IOException, ExecutionException, InterruptedException, TimeoutException {
final int numBytes = 33554432;
putObject("32MiB.bin", RequestBody.fromInputStream(new InputStream() {
putObject("32MiB.bin", AsyncRequestBody.fromInputStream(new InputStream() {
@Override
public int read() {
return 42;
}
}, numBytes));
}, (long) numBytes, executor));
final URI uri = uri("32MiB.bin");
final ByteBuffer buffer = ByteBuffer.allocate(1);
try (
Expand All @@ -96,18 +115,43 @@ public int read() {
}
}

private void uploadDirectory(String resourceDir)
throws URISyntaxException, ExecutionException, InterruptedException, TimeoutException {
S3Helper.uploadDirectory(
asyncClient,
Path.of(S3SeekableChannelTestBase.class.getResource(resourceDir).toURI()),
bucket,
null,
Duration.ofSeconds(5));
}

private URI uri(String key) {
return URI.create(String.format("s3://%s/%s", bucket, key));
}

private void putObject(String key, RequestBody body) {
client.putObject(PutObjectRequest.builder().bucket(bucket).key(key).build(), body);
keys.add(key);
private void putObject(String key, AsyncRequestBody body)
throws ExecutionException, InterruptedException, TimeoutException {
asyncClient.putObject(PutObjectRequest.builder().bucket(bucket).key(key).build(), body).get(5,
TimeUnit.SECONDS);
}

private SeekableChannelsProvider providerImpl(URI uri) {
final S3SeekableChannelProviderPlugin plugin = new S3SeekableChannelProviderPlugin();
final S3Instructions instructions = s3Instructions(S3Instructions.builder()).build();
return plugin.createProvider(uri, instructions);
}

private static ByteBuffer readAll(ReadableByteChannel channel, int maxBytes) throws IOException {
final ByteBuffer dst = ByteBuffer.allocate(maxBytes);
while (dst.remaining() > 0 && channel.read(dst) != -1) {
// continue
}
if (dst.remaining() == 0) {
if (channel.read(ByteBuffer.allocate(1)) != -1) {
throw new RuntimeException(String.format("channel has more than %d bytes", maxBytes));
}
}
dst.flip();
return dst;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.extensions.s3.testlib;

import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.CompletedDirectoryUpload;
import software.amazon.awssdk.transfer.s3.model.DirectoryUpload;
import software.amazon.awssdk.transfer.s3.model.UploadDirectoryRequest;

import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

public final class S3Helper {
public static void uploadDirectory(
S3AsyncClient s3AsyncClient,
Path dir,
String bucket,
String prefix,
Duration timeout) throws ExecutionException, InterruptedException, TimeoutException {
try (final S3TransferManager manager = S3TransferManager.builder().s3Client(s3AsyncClient).build()) {
uploadDirectory(manager, dir, bucket, prefix, timeout);
}
}

public static void uploadDirectory(
S3TransferManager transferManager,
Path dir,
String bucket,
String prefix,
Duration timeout) throws ExecutionException, InterruptedException, TimeoutException {
// Not a way to get a list of the uploaded files, even when using a TransferListener.
final DirectoryUpload directoryUpload = transferManager.uploadDirectory(UploadDirectoryRequest.builder()
.source(dir)
.bucket(bucket)
.s3Prefix(prefix)
.build());
final CompletedDirectoryUpload upload =
directoryUpload.completionFuture().get(timeout.toNanos(), TimeUnit.NANOSECONDS);
if (!upload.failedTransfers().isEmpty()) {
throw new RuntimeException("Upload has failed transfers");
}
}

public static void deleteAllKeys(S3AsyncClient s3AsyncClient, String bucket)
throws ExecutionException, InterruptedException, TimeoutException {
ListObjectsV2Response response = s3AsyncClient
.listObjectsV2(ListObjectsV2Request.builder().bucket(bucket).build()).get(5, TimeUnit.SECONDS);
final List<CompletableFuture<?>> futures = new ArrayList<>();
while (true) {
final List<ObjectIdentifier> deletes = response.contents()
.stream()
.map(S3Object::key)
.map(S3Helper::objectId)
.collect(Collectors.toList());
futures.add(s3AsyncClient.deleteObjects(DeleteObjectsRequest.builder()
.bucket(bucket)
.delete(Delete.builder().objects(deletes).build())
.build()));
final String nextContinuationToken = response.nextContinuationToken();
if (nextContinuationToken == null) {
break;
}
response = s3AsyncClient.listObjectsV2(
ListObjectsV2Request.builder().bucket(bucket).continuationToken(nextContinuationToken).build())
.get(5, TimeUnit.SECONDS);
}
CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new)).get(5, TimeUnit.SECONDS);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new)).get(5, TimeUnit.SECONDS);
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).get(5, TimeUnit.SECONDS);

}

private static ObjectIdentifier objectId(String o) {
return ObjectIdentifier.builder().key(o).build();
}
}
Loading
Loading