Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unit tests for S3 + parquet #5441

Merged
merged 15 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion extensions/parquet/table/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,19 @@ dependencies {
testImplementation project(':base-test-utils')
testImplementation project(':engine-test-utils')

testImplementation TestTools.projectDependency(project, 'extensions-s3')
Classpaths.inheritJUnitClassic(project, 'testImplementation')

testImplementation "org.testcontainers:testcontainers:1.19.4"
testImplementation "org.testcontainers:localstack:1.19.4"
testImplementation "org.testcontainers:minio:1.19.4"

testRuntimeOnly project(':log-to-slf4j'),
project(path: ':configs'),
project(path: ':test-configs')
Classpaths.inheritSlf4j(project, 'slf4j-simple', 'testRuntimeOnly')

runtimeOnly project(':extensions-trackedfile')
testImplementation project(':extensions-s3')
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved

brotliTestImplementation project(':extensions-parquet-table')
brotliTestImplementation('com.github.rdblue:brotli-codec:0.1.1')
Expand All @@ -70,3 +74,6 @@ if (Architecture.fromHost() == Architecture.AMD64) {
}

TestTools.addEngineOutOfBandTest(project)

testOutOfBand.systemProperty 'testcontainers.localstack.image', project.property('testcontainers.localstack.image')
testOutOfBand.systemProperty 'testcontainers.minio.image', project.property('testcontainers.minio.image')
4 changes: 4 additions & 0 deletions extensions/parquet/table/gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
io.deephaven.project.ProjectType=JAVA_PUBLIC

# TODO(deephaven-core#5115): EPIC: Dependency management
testcontainers.localstack.image=localstack/localstack:3.1.0
testcontainers.minio.image=minio/minio:RELEASE.2024-02-04T22-36-13Z
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,12 @@ public final String getColumnNameFromParquetColumnNameOrDefault(final String par
*/
public abstract ParquetInstructions withTableDefinition(final TableDefinition tableDefinition);

/**
* Creates a new {@link ParquetInstructions} object with the same properties as the current object but layout set as
* the provided {@link ParquetFileLayout}.
*/
public abstract ParquetInstructions withLayout(final ParquetFileLayout fileLayout);

/**
* Creates a new {@link ParquetInstructions} object with the same properties as the current object but definition
* and layout set as the provided values.
Expand Down Expand Up @@ -350,18 +356,23 @@ public Optional<Collection<List<String>>> getIndexColumns() {
}

@Override
public ParquetInstructions withTableDefinition(@Nullable final TableDefinition tableDefinition) {
return withTableDefinitionAndLayout(tableDefinition, null);
public ParquetInstructions withTableDefinition(@Nullable final TableDefinition useDefinition) {
return withTableDefinitionAndLayout(useDefinition, null);
}

@Override
public ParquetInstructions withLayout(@Nullable final ParquetFileLayout useLayout) {
return withTableDefinitionAndLayout(null, useLayout);
}

@Override
public ParquetInstructions withTableDefinitionAndLayout(
@Nullable final TableDefinition tableDefinition,
@Nullable final ParquetFileLayout fileLayout) {
@Nullable final TableDefinition useDefinition,
@Nullable final ParquetFileLayout useLayout) {
return new ReadOnly(null, null, getCompressionCodecName(), getMaximumDictionaryKeys(),
getMaximumDictionarySize(), isLegacyParquet(), getTargetPageSize(), isRefreshing(),
getSpecialInstructions(), generateMetadataFiles(), baseNameForPartitionedParquetData(),
fileLayout, tableDefinition, null);
useLayout, useDefinition, null);
}

@Override
Expand Down Expand Up @@ -598,7 +609,12 @@ public Optional<Collection<List<String>>> getIndexColumns() {

@Override
public ParquetInstructions withTableDefinition(@Nullable final TableDefinition useDefinition) {
return withTableDefinitionAndLayout(useDefinition, getFileLayout().orElse(null));
return withTableDefinitionAndLayout(useDefinition, fileLayout);
}

@Override
public ParquetInstructions withLayout(@Nullable final ParquetFileLayout useLayout) {
return withTableDefinitionAndLayout(tableDefinition, useLayout);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public static Table readTable(
return readTableFromFileUri(sourceURI, readInstructions);
}
if (source.endsWith(METADATA_FILE_URI_SUFFIX) || source.endsWith(COMMON_METADATA_FILE_URI_SUFFIX)) {
throw new UncheckedDeephavenException("We currently do not support reading parquet metadata files " +
throw new UnsupportedOperationException("We currently do not support reading parquet metadata files " +
"from non local storage");
}
if (!isDirectory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.deephaven.api.SortColumn;
import io.deephaven.base.FileUtils;
import io.deephaven.base.verify.Assert;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.primitive.function.ByteConsumer;
import io.deephaven.engine.primitive.function.CharConsumer;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.parquet.table;

import io.deephaven.extensions.s3.S3Instructions.Builder;
import io.deephaven.extensions.s3.testlib.SingletonContainers.LocalStack;
import io.deephaven.extensions.s3.testlib.SingletonContainers;
import org.junit.BeforeClass;
import software.amazon.awssdk.services.s3.S3AsyncClient;

public class S3ParquetLocalStackTest extends S3ParquetTestBase {

@BeforeClass
public static void initContainer() {
// ensure container is started so container startup time isn't associated with a specific test
LocalStack.init();
}

@Override
public Builder s3Instructions(Builder builder) {
return LocalStack.s3Instructions(builder);
}

@Override
public S3AsyncClient s3AsyncClient() {
return SingletonContainers.LocalStack.s3AsyncClient();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.parquet.table;

import io.deephaven.extensions.s3.S3Instructions.Builder;
import io.deephaven.extensions.s3.testlib.SingletonContainers.MinIO;
import io.deephaven.extensions.s3.testlib.SingletonContainers;
import io.deephaven.stats.util.OSUtil;
import org.junit.Assume;
import org.junit.BeforeClass;
import software.amazon.awssdk.services.s3.S3AsyncClient;

public class S3ParquetMinIOTest extends S3ParquetTestBase {

@BeforeClass
public static void initContainer() {
// TODO(deephaven-core#5116): MinIO testcontainers does not work on OS X
Assume.assumeFalse("OSUtil.runningMacOS()", OSUtil.runningMacOS());
// ensure container is started so container startup time isn't associated with a specific test
MinIO.init();
}

@Override
public Builder s3Instructions(final Builder builder) {
return MinIO.s3Instructions(builder);
}

@Override
public S3AsyncClient s3AsyncClient() {
return SingletonContainers.MinIO.s3AsyncClient();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.parquet.table;

import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.testutil.junit4.EngineCleanup;
import io.deephaven.extensions.s3.Credentials;
import io.deephaven.extensions.s3.S3Instructions;
import io.deephaven.test.types.OutOfBandTest;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import java.time.Duration;

import static org.junit.Assert.assertEquals;

/**
* These tests verify the behavior of Parquet implementation when reading against remote S3 servers.
**/
@Category(OutOfBandTest.class)
public class S3ParquetRemoteTest {

// The following tests are disabled by default, and should be run manually.
private static final boolean ENABLE_REMOTE_S3_TESTING = false;

@Rule
public final EngineCleanup framework = new EngineCleanup();

@Test
public void readSampleParquetFilesFromPublicS3() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_REMOTE_S3_TESTING);
final S3Instructions s3Instructions = S3Instructions.builder()
.regionName("us-east-2")
.readTimeout(Duration.ofSeconds(60))
.credentials(Credentials.anonymous())
.build();
final TableDefinition tableDefinition = TableDefinition.of(
ColumnDefinition.ofString("hash"),
ColumnDefinition.ofLong("version"),
ColumnDefinition.ofLong("size"),
ColumnDefinition.ofString("block_hash"),
ColumnDefinition.ofLong("block_number"),
ColumnDefinition.ofLong("index"),
ColumnDefinition.ofLong("virtual_size"),
ColumnDefinition.ofLong("lock_time"),
ColumnDefinition.ofLong("input_count"),
ColumnDefinition.ofLong("output_count"),
ColumnDefinition.ofBoolean("isCoinbase"),
ColumnDefinition.ofDouble("output_value"),
ColumnDefinition.ofTime("last_modified"),
ColumnDefinition.ofDouble("input_value"));
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(s3Instructions)
.setTableDefinition(tableDefinition)
.build();
ParquetTools.readTable(
"s3://aws-public-blockchain/v1.0/btc/transactions/date=2009-01-03/part-00000-bdd84ab2-82e9-4a79-8212-7accd76815e8-c000.snappy.parquet",
readInstructions).head(10).select();

ParquetTools.readTable(
"s3://aws-public-blockchain/v1.0/btc/transactions/date=2023-11-13/part-00000-da3a3c27-700d-496d-9c41-81281388eca8-c000.snappy.parquet",
readInstructions).head(10).select();
}

@Test
public void readKeyValuePartitionedParquetFromPublicS3() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_REMOTE_S3_TESTING);
final S3Instructions s3Instructions = S3Instructions.builder()
.regionName("us-east-1")
.readTimeout(Duration.ofSeconds(60))
.credentials(Credentials.anonymous())
.build();
final TableDefinition ookla_table_definition = TableDefinition.of(
ColumnDefinition.ofInt("quarter").withPartitioning(),
ColumnDefinition.ofString("quadkey"));
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(s3Instructions)
.setTableDefinition(ookla_table_definition)
.build();
final Table table = ParquetTools.readTable("s3://ookla-open-data/parquet/performance/type=mobile/year=2023",
readInstructions).head(10).select();
assertEquals(2, table.numColumns());
}
}
Loading
Loading