Skip to content

Commit

Permalink
Added S3 test tools to help with directory uploading (#5442)
Browse files Browse the repository at this point in the history
As part of this, the bootstrapping S3 client was changed from sync to async to support S3TransferManager. Deleting keys has also been automated (unit tests no longer responsible for keeping track of keys used).

SingletonContainers, LocalStack, and MinIO have been made public; this allows the container bootstrapping logic to be used by other projects' tests.

These changes should make it easier for Iceberg testing in #5277
  • Loading branch information
devinrsmith authored and stanbrub committed May 17, 2024
1 parent b8448a6 commit 519682e
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 59 deletions.
1 change: 1 addition & 0 deletions extensions/s3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies {
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'

testImplementation 'software.amazon.awssdk:s3-transfer-manager'
testImplementation "org.testcontainers:testcontainers:1.19.4"
testImplementation "org.testcontainers:junit-jupiter:1.19.4"
testImplementation "org.testcontainers:localstack:1.19.4"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@


import io.deephaven.extensions.s3.S3Instructions.Builder;
import io.deephaven.extensions.s3.SingletonContainers.LocalStack;
import io.deephaven.extensions.s3.testlib.SingletonContainers.LocalStack;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3AsyncClient;

@Tag("testcontainers")
public class S3SeekableChannelLocalStackTest extends S3SeekableChannelTestBase {
Expand All @@ -25,7 +25,7 @@ public Builder s3Instructions(Builder builder) {
}

@Override
public S3Client s3Client() {
return LocalStack.s3Client();
public S3AsyncClient s3AsyncClient() {
return LocalStack.s3AsyncClient();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@


import io.deephaven.extensions.s3.S3Instructions.Builder;
import io.deephaven.extensions.s3.SingletonContainers.MinIO;
import io.deephaven.extensions.s3.testlib.SingletonContainers.MinIO;
import io.deephaven.stats.util.OSUtil;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3AsyncClient;

@Tag("testcontainers")
public class S3SeekableChannelMinIOTest extends S3SeekableChannelTestBase {
Expand All @@ -29,7 +29,7 @@ public Builder s3Instructions(Builder builder) {
}

@Override
public S3Client s3Client() {
return MinIO.s3Client();
public S3AsyncClient s3AsyncClient() {
return MinIO.s3AsyncClient();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,82 +4,101 @@
package io.deephaven.extensions.s3;


import io.deephaven.extensions.s3.testlib.S3Helper;
import io.deephaven.util.channel.CachedChannelProvider;
import io.deephaven.util.channel.SeekableChannelContext;
import io.deephaven.util.channel.SeekableChannelsProvider;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.util.ArrayList;
import java.util.List;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.assertj.core.api.Assertions.assertThat;

public abstract class S3SeekableChannelTestBase {

public abstract S3Client s3Client();
public abstract S3AsyncClient s3AsyncClient();

public abstract S3Instructions.Builder s3Instructions(S3Instructions.Builder builder);

private S3Client client;

private ExecutorService executor;
private S3AsyncClient asyncClient;
private String bucket;

private final List<String> keys = new ArrayList<>();

@BeforeEach
void setUp() {
void setUp() throws ExecutionException, InterruptedException, TimeoutException {
executor = Executors.newCachedThreadPool();
bucket = UUID.randomUUID().toString();
client = s3Client();
client.createBucket(CreateBucketRequest.builder().bucket(bucket).build());
asyncClient = s3AsyncClient();
asyncClient.createBucket(CreateBucketRequest.builder().bucket(bucket).build()).get(5, TimeUnit.SECONDS);
}

@AfterEach
void tearDown() {
for (String key : keys) {
client.deleteObject(DeleteObjectRequest.builder().bucket(bucket).key(key).build());
}
keys.clear();
client.deleteBucket(DeleteBucketRequest.builder().bucket(bucket).build());
client.close();
void tearDown() throws ExecutionException, InterruptedException, TimeoutException {
S3Helper.deleteAllKeys(asyncClient, bucket);
asyncClient.deleteBucket(DeleteBucketRequest.builder().bucket(bucket).build()).get(5, TimeUnit.SECONDS);
asyncClient.close();
executor.shutdownNow();
}

@Test
void readEmptyFile() throws IOException {
putObject("empty.txt", RequestBody.empty());
final URI uri = uri("empty.txt");
final ByteBuffer buffer = ByteBuffer.allocate(1);
try (
final SeekableChannelsProvider providerImpl = providerImpl(uri);
final SeekableChannelsProvider provider = new CachedChannelProvider(providerImpl, 32);
final SeekableChannelContext context = provider.makeContext();
final SeekableByteChannel readChannel = provider.getReadChannel(context, uri)) {
assertThat(readChannel.read(buffer)).isEqualTo(-1);
void readSimpleFiles()
throws IOException, URISyntaxException, ExecutionException, InterruptedException, TimeoutException {
uploadDirectory("readSimpleFiles");
{
final URI uri = uri("empty.txt");
final ByteBuffer buffer = ByteBuffer.allocate(1);
try (
final SeekableChannelsProvider providerImpl = providerImpl(uri);
final SeekableChannelsProvider provider = new CachedChannelProvider(providerImpl, 32);
final SeekableChannelContext context = provider.makeContext();
final SeekableByteChannel readChannel = provider.getReadChannel(context, uri)) {
assertThat(readChannel.read(buffer)).isEqualTo(-1);
}
}
{
final URI uri = uri("hello/world.txt");
try (
final SeekableChannelsProvider providerImpl = providerImpl(uri);
final SeekableChannelsProvider provider = new CachedChannelProvider(providerImpl, 32);
final SeekableChannelContext context = provider.makeContext();
final SeekableByteChannel readChannel = provider.getReadChannel(context, uri)) {
final ByteBuffer bytes = readAll(readChannel, 32);
assertThat(bytes).isEqualTo(ByteBuffer.wrap("Hello, world!".getBytes(StandardCharsets.UTF_8)));
}
}
}

@Test
void read32MiB() throws IOException {
void read32MiB() throws IOException, ExecutionException, InterruptedException, TimeoutException {
final int numBytes = 33554432;
putObject("32MiB.bin", RequestBody.fromInputStream(new InputStream() {
putObject("32MiB.bin", AsyncRequestBody.fromInputStream(new InputStream() {
@Override
public int read() {
return 42;
}
}, numBytes));
}, (long) numBytes, executor));
final URI uri = uri("32MiB.bin");
final ByteBuffer buffer = ByteBuffer.allocate(1);
try (
Expand All @@ -96,18 +115,43 @@ public int read() {
}
}

private void uploadDirectory(String resourceDir)
throws URISyntaxException, ExecutionException, InterruptedException, TimeoutException {
S3Helper.uploadDirectory(
asyncClient,
Path.of(S3SeekableChannelTestBase.class.getResource(resourceDir).toURI()),
bucket,
null,
Duration.ofSeconds(5));
}

private URI uri(String key) {
return URI.create(String.format("s3://%s/%s", bucket, key));
}

private void putObject(String key, RequestBody body) {
client.putObject(PutObjectRequest.builder().bucket(bucket).key(key).build(), body);
keys.add(key);
private void putObject(String key, AsyncRequestBody body)
throws ExecutionException, InterruptedException, TimeoutException {
asyncClient.putObject(PutObjectRequest.builder().bucket(bucket).key(key).build(), body).get(5,
TimeUnit.SECONDS);
}

private SeekableChannelsProvider providerImpl(URI uri) {
final S3SeekableChannelProviderPlugin plugin = new S3SeekableChannelProviderPlugin();
final S3Instructions instructions = s3Instructions(S3Instructions.builder()).build();
return plugin.createProvider(uri, instructions);
}

private static ByteBuffer readAll(ReadableByteChannel channel, int maxBytes) throws IOException {
final ByteBuffer dst = ByteBuffer.allocate(maxBytes);
while (dst.remaining() > 0 && channel.read(dst) != -1) {
// continue
}
if (dst.remaining() == 0) {
if (channel.read(ByteBuffer.allocate(1)) != -1) {
throw new RuntimeException(String.format("channel has more than %d bytes", maxBytes));
}
}
dst.flip();
return dst;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.extensions.s3.testlib;

import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.CompletedDirectoryUpload;
import software.amazon.awssdk.transfer.s3.model.DirectoryUpload;
import software.amazon.awssdk.transfer.s3.model.UploadDirectoryRequest;

import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

public final class S3Helper {
public static void uploadDirectory(
S3AsyncClient s3AsyncClient,
Path dir,
String bucket,
String prefix,
Duration timeout) throws ExecutionException, InterruptedException, TimeoutException {
try (final S3TransferManager manager = S3TransferManager.builder().s3Client(s3AsyncClient).build()) {
uploadDirectory(manager, dir, bucket, prefix, timeout);
}
}

public static void uploadDirectory(
S3TransferManager transferManager,
Path dir,
String bucket,
String prefix,
Duration timeout) throws ExecutionException, InterruptedException, TimeoutException {
// Not a way to get a list of the uploaded files, even when using a TransferListener.
final DirectoryUpload directoryUpload = transferManager.uploadDirectory(UploadDirectoryRequest.builder()
.source(dir)
.bucket(bucket)
.s3Prefix(prefix)
.build());
final CompletedDirectoryUpload upload =
directoryUpload.completionFuture().get(timeout.toNanos(), TimeUnit.NANOSECONDS);
if (!upload.failedTransfers().isEmpty()) {
throw new RuntimeException("Upload has failed transfers");
}
}

public static void deleteAllKeys(S3AsyncClient s3AsyncClient, String bucket)
throws ExecutionException, InterruptedException, TimeoutException {
ListObjectsV2Response response = s3AsyncClient
.listObjectsV2(ListObjectsV2Request.builder().bucket(bucket).build()).get(5, TimeUnit.SECONDS);
final List<CompletableFuture<?>> futures = new ArrayList<>();
while (true) {
final List<ObjectIdentifier> deletes = response.contents()
.stream()
.map(S3Object::key)
.map(S3Helper::objectId)
.collect(Collectors.toList());
futures.add(s3AsyncClient.deleteObjects(DeleteObjectsRequest.builder()
.bucket(bucket)
.delete(Delete.builder().objects(deletes).build())
.build()));
final String nextContinuationToken = response.nextContinuationToken();
if (nextContinuationToken == null) {
break;
}
response = s3AsyncClient.listObjectsV2(
ListObjectsV2Request.builder().bucket(bucket).continuationToken(nextContinuationToken).build())
.get(5, TimeUnit.SECONDS);
}
CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new)).get(5, TimeUnit.SECONDS);
}

private static ObjectIdentifier objectId(String o) {
return ObjectIdentifier.builder().key(o).build();
}
}
Loading

0 comments on commit 519682e

Please sign in to comment.