Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unit tests for S3 + parquet #5441

Merged
merged 15 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion extensions/parquet/table/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ dependencies {
Classpaths.inheritSlf4j(project, 'slf4j-simple', 'testRuntimeOnly')

runtimeOnly project(':extensions-trackedfile')
testImplementation project(':extensions-s3')
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved

brotliTestImplementation project(':extensions-parquet-table')
brotliTestImplementation('com.github.rdblue:brotli-codec:0.1.1')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,12 @@ public final String getColumnNameFromParquetColumnNameOrDefault(final String par
*/
public abstract ParquetInstructions withTableDefinition(final TableDefinition tableDefinition);

/**
* Creates a new {@link ParquetInstructions} object with the same properties as the current object but layout set as
* the provided {@link ParquetFileLayout}.
*/
public abstract ParquetInstructions withLayout(final ParquetFileLayout fileLayout);

/**
* Creates a new {@link ParquetInstructions} object with the same properties as the current object but definition
* and layout set as the provided values.
Expand Down Expand Up @@ -350,18 +356,23 @@ public Optional<Collection<List<String>>> getIndexColumns() {
}

@Override
public ParquetInstructions withTableDefinition(@Nullable final TableDefinition tableDefinition) {
return withTableDefinitionAndLayout(tableDefinition, null);
public ParquetInstructions withTableDefinition(@Nullable final TableDefinition useDefinition) {
return withTableDefinitionAndLayout(useDefinition, null);
}

@Override
public ParquetInstructions withLayout(@Nullable final ParquetFileLayout useLayout) {
return withTableDefinitionAndLayout(null, useLayout);
}

@Override
public ParquetInstructions withTableDefinitionAndLayout(
@Nullable final TableDefinition tableDefinition,
@Nullable final ParquetFileLayout fileLayout) {
@Nullable final TableDefinition useDefinition,
@Nullable final ParquetFileLayout useLayout) {
return new ReadOnly(null, null, getCompressionCodecName(), getMaximumDictionaryKeys(),
getMaximumDictionarySize(), isLegacyParquet(), getTargetPageSize(), isRefreshing(),
getSpecialInstructions(), generateMetadataFiles(), baseNameForPartitionedParquetData(),
fileLayout, tableDefinition, null);
useLayout, useDefinition, null);
}

@Override
Expand Down Expand Up @@ -601,6 +612,11 @@ public ParquetInstructions withTableDefinition(@Nullable final TableDefinition u
return withTableDefinitionAndLayout(useDefinition, getFileLayout().orElse(null));
}

@Override
public ParquetInstructions withLayout(@Nullable final ParquetFileLayout useLayout) {
return withTableDefinitionAndLayout(tableDefinition, getFileLayout().orElse(null));
}

@Override
public ParquetInstructions withTableDefinitionAndLayout(
@Nullable final TableDefinition useDefinition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import io.deephaven.api.Selectable;
import io.deephaven.base.FileUtils;
import io.deephaven.base.verify.Assert;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.primitive.function.ByteConsumer;
import io.deephaven.engine.primitive.function.CharConsumer;
Expand Down Expand Up @@ -35,8 +34,6 @@
import io.deephaven.engine.util.BigDecimalUtils;
import io.deephaven.engine.util.TableTools;
import io.deephaven.engine.util.file.TrackedFileHandleFactory;
import io.deephaven.extensions.s3.Credentials;
import io.deephaven.extensions.s3.S3Instructions;
import io.deephaven.parquet.base.InvalidParquetFileException;
import io.deephaven.parquet.base.NullStatistics;
import io.deephaven.parquet.table.location.ParquetTableLocationKey;
Expand Down Expand Up @@ -72,7 +69,6 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.URI;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
Expand Down Expand Up @@ -125,11 +121,6 @@ public final class ParquetTableReadWriteTest {
private static final ParquetInstructions EMPTY = ParquetInstructions.EMPTY;
private static final ParquetInstructions REFRESHING = ParquetInstructions.builder().setIsRefreshing(true).build();

// TODO(deephaven-core#5064): Add support for local S3 testing
// The following tests are disabled by default, as they are verifying against a remote system
private static final boolean ENABLE_S3_TESTING =
Configuration.getInstance().getBooleanWithDefault("ParquetTest.enableS3Testing", false);

private static File rootFile;

@Rule
Expand Down Expand Up @@ -1358,184 +1349,6 @@ public void testArrayColumns() {
&& !firstColumnMetadata.contains("RLE_DICTIONARY"));
}

@Test
public void readSampleParquetFilesFromDeephavenS3Bucket() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING);
final S3Instructions s3Instructions = S3Instructions.builder()
.regionName("us-east-1")
.readAheadCount(1)
.fragmentSize(5 * 1024 * 1024)
.maxConcurrentRequests(50)
.maxCacheSize(32)
.readTimeout(Duration.ofSeconds(60))
.credentials(Credentials.defaultCredentials())
.build();
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(s3Instructions)
.build();
final Table fromAws1 =
ParquetTools.readTable("s3://dh-s3-parquet-test1/multiColFile.parquet", readInstructions).select();
final Table dhTable1 = TableTools.emptyTable(1_000_000).update("A=(int)i", "B=(double)(i+1)");
assertTableEquals(fromAws1, dhTable1);

final Table fromAws2 =
ParquetTools.readTable("s3://dh-s3-parquet-test1/singleColFile.parquet", readInstructions).select();
final Table dhTable2 = TableTools.emptyTable(5).update("A=(int)i");
assertTableEquals(fromAws2, dhTable2);

final Table fromAws3 = ParquetTools
.readTable("s3://dh-s3-parquet-test1/single%20col%20file%20with%20spaces%20in%20name.parquet",
readInstructions)
.select();
assertTableEquals(fromAws3, dhTable2);

final Table fromAws4 =
ParquetTools.readTable("s3://dh-s3-parquet-test1/singleColFile.parquet", readInstructions)
.select().sumBy();
final Table dhTable4 = TableTools.emptyTable(5).update("A=(int)i").sumBy();
assertTableEquals(fromAws4, dhTable4);
}

@Test
public void readSampleParquetFilesFromPublicS3() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING);
final S3Instructions s3Instructions = S3Instructions.builder()
.regionName("us-east-2")
.readAheadCount(1)
.fragmentSize(5 * 1024 * 1024)
.maxConcurrentRequests(50)
.maxCacheSize(32)
.connectionTimeout(Duration.ofSeconds(1))
.readTimeout(Duration.ofSeconds(60))
.credentials(Credentials.anonymous())
.build();
final TableDefinition tableDefinition = TableDefinition.of(
ColumnDefinition.ofString("hash"),
ColumnDefinition.ofLong("version"),
ColumnDefinition.ofLong("size"),
ColumnDefinition.ofString("block_hash"),
ColumnDefinition.ofLong("block_number"),
ColumnDefinition.ofLong("index"),
ColumnDefinition.ofLong("virtual_size"),
ColumnDefinition.ofLong("lock_time"),
ColumnDefinition.ofLong("input_count"),
ColumnDefinition.ofLong("output_count"),
ColumnDefinition.ofBoolean("isCoinbase"),
ColumnDefinition.ofDouble("output_value"),
ColumnDefinition.ofTime("last_modified"),
ColumnDefinition.ofDouble("input_value"));
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(s3Instructions)
.setTableDefinition(tableDefinition)
.build();
ParquetTools.readTable(
"s3://aws-public-blockchain/v1.0/btc/transactions/date=2009-01-03/part-00000-bdd84ab2-82e9-4a79-8212-7accd76815e8-c000.snappy.parquet",
readInstructions).head(10).select();

ParquetTools.readTable(
"s3://aws-public-blockchain/v1.0/btc/transactions/date=2023-11-13/part-00000-da3a3c27-700d-496d-9c41-81281388eca8-c000.snappy.parquet",
readInstructions).head(10).select();
}

@Test
public void readFlatPartitionedParquetFromS3() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING);
final S3Instructions s3Instructions = S3Instructions.builder()
.regionName("us-east-1")
.readAheadCount(1)
.fragmentSize(5 * 1024 * 1024)
.maxConcurrentRequests(50)
.maxCacheSize(32)
.readTimeout(Duration.ofSeconds(60))
.credentials(Credentials.defaultCredentials())
.build();
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(s3Instructions)
.setFileLayout(ParquetInstructions.ParquetFileLayout.FLAT_PARTITIONED)
.build();
final Table table = ParquetTools.readTable("s3://dh-s3-parquet-test1/flatPartitionedParquet/",
readInstructions);
final Table expected = emptyTable(30).update("A = (int)i % 10");
assertTableEquals(expected, table);
}

@Test
public void readFlatPartitionedDataAsKeyValuePartitionedParquetFromS3() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING);
final S3Instructions s3Instructions = S3Instructions.builder()
.regionName("us-east-1")
.readAheadCount(1)
.fragmentSize(5 * 1024 * 1024)
.maxConcurrentRequests(50)
.maxCacheSize(32)
.readTimeout(Duration.ofSeconds(60))
.credentials(Credentials.defaultCredentials())
.build();
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(s3Instructions)
.setFileLayout(ParquetInstructions.ParquetFileLayout.KV_PARTITIONED)
.build();
final Table table = ParquetTools.readTable("s3://dh-s3-parquet-test1/flatPartitionedParquet3/",
readInstructions);
final Table expected = emptyTable(30).update("A = (int)i % 10");
assertTableEquals(expected, table);
}

@Test
public void readKeyValuePartitionedParquetFromS3() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING);
final S3Instructions s3Instructions = S3Instructions.builder()
.regionName("us-east-1")
.readAheadCount(1)
.fragmentSize(5 * 1024 * 1024)
.maxConcurrentRequests(50)
.maxCacheSize(32)
.readTimeout(Duration.ofSeconds(60))
.credentials(Credentials.defaultCredentials())
.build();
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(s3Instructions)
.setFileLayout(ParquetInstructions.ParquetFileLayout.KV_PARTITIONED)
.build();
final Table table = ParquetTools.readTable("s3://dh-s3-parquet-test1/KeyValuePartitionedData/",
readInstructions);
final List<ColumnDefinition<?>> partitioningColumns = table.getDefinition().getPartitioningColumns();
assertEquals(3, partitioningColumns.size());
assertEquals("PC1", partitioningColumns.get(0).getName());
assertEquals("PC2", partitioningColumns.get(1).getName());
assertEquals("PC3", partitioningColumns.get(2).getName());
assertEquals(100, table.size());
assertEquals(3, table.selectDistinct("PC1").size());
assertEquals(2, table.selectDistinct("PC2").size());
assertEquals(2, table.selectDistinct("PC3").size());
assertEquals(100, table.selectDistinct("I").size());
assertEquals(1, table.selectDistinct("J").size());
}

@Test
public void readKeyValuePartitionedParquetFromPublicS3() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING);
final S3Instructions s3Instructions = S3Instructions.builder()
.regionName("us-east-1")
.readAheadCount(1)
.fragmentSize(5 * 1024 * 1024)
.maxConcurrentRequests(50)
.maxCacheSize(32)
.readTimeout(Duration.ofSeconds(60))
.credentials(Credentials.anonymous())
.build();
final TableDefinition ookla_table_definition = TableDefinition.of(
ColumnDefinition.ofInt("quarter").withPartitioning(),
ColumnDefinition.ofString("quadkey"));
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(s3Instructions)
.setTableDefinition(ookla_table_definition)
.build();
final Table table = ParquetTools.readTable("s3://ookla-open-data/parquet/performance/type=mobile/year=2023",
readInstructions).head(10).select();
assertEquals(2, table.numColumns());
}

@Test
public void stringDictionaryTest() {
final int nullPos = -5;
Expand Down
22 changes: 7 additions & 15 deletions extensions/s3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ dependencies {
Classpaths.inheritAutoService(project)
Classpaths.inheritImmutables(project)

Classpaths.inheritJUnitPlatform(project)
Classpaths.inheritJUnitClassic(project, 'testImplementation')
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
Classpaths.inheritAssertJ(project)
testImplementation 'org.junit.jupiter:junit-jupiter'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'

testImplementation project(':engine-table')
testImplementation project(':engine-test-utils')
testImplementation project(':extensions-parquet-table')

testImplementation "org.testcontainers:testcontainers:1.19.4"
testImplementation "org.testcontainers:junit-jupiter:1.19.4"
testImplementation "org.testcontainers:localstack:1.19.4"
testImplementation "org.testcontainers:minio:1.19.4"

Expand All @@ -41,17 +41,9 @@ dependencies {
Classpaths.inheritSlf4j(project, 'slf4j-simple', 'testRuntimeOnly')
}

test {
useJUnitPlatform {
excludeTags("testcontainers")
}
}
TestTools.addEngineOutOfBandTest(project)

tasks.register('testOutOfBand', Test) {
useJUnitPlatform {
includeTags("testcontainers")
}
testOutOfBand {
systemProperty 'testcontainers.localstack.image', project.property('testcontainers.localstack.image')
systemProperty 'testcontainers.minio.image', project.property('testcontainers.minio.image')
}

Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,28 @@
//
package io.deephaven.extensions.s3;


import org.junit.jupiter.api.Test;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class CredentialsTest {

@Test
void defaultCredentials() {
public void defaultCredentials() {
isCredentials(Credentials.defaultCredentials());
}

@Test
void basic() {
public void basic() {
isCredentials(Credentials.basic("accessKeyId", "secretAccessKey"));
}

@Test
void anonymous() {
public void anonymous() {
isCredentials(Credentials.anonymous());
}

private void isCredentials(Credentials c) {
private static void isCredentials(final Credentials c) {
assertThat(c).isInstanceOf(AwsSdkV2Credentials.class);
}
}
Loading
Loading