Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: NPE in reading parquet TIMESTAMP_MILLIS and TIMESTAMP_MICROS columns #5877

Merged
merged 2 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.parquet.format.Type;
import org.apache.parquet.schema.*;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -296,18 +297,18 @@ private static void buildChildren(Types.GroupBuilder builder, Iterator<SchemaEle
columnOrders, columnCount);
}

final LogicalTypeAnnotation logicalType;
if (schemaElement.isSetLogicalType()) {
((Types.Builder) childBuilder).as(getLogicalTypeAnnotation(schemaElement.logicalType));
logicalType = getLogicalTypeAnnotation(schemaElement.logicalType);
((Types.Builder) childBuilder).as(logicalType);
} else {
logicalType = null;
}

if (schemaElement.isSetConverted_type()) {
final LogicalTypeAnnotation originalType = getLogicalTypeAnnotation(
schemaElement.converted_type, schemaElement.logicalType, schemaElement);
final LogicalTypeAnnotation newOriginalType = schemaElement.isSetLogicalType()
&& getLogicalTypeAnnotation(schemaElement.logicalType) != null
? getLogicalTypeAnnotation(schemaElement.logicalType)
: null;
if (!originalType.equals(newOriginalType)) {
schemaElement.converted_type, schemaElement);
if (!originalType.equals(logicalType)) {
((Types.Builder) childBuilder).as(originalType);
}
}
Expand Down Expand Up @@ -335,7 +336,9 @@ private static LogicalTypeAnnotation.TimeUnit convertTimeUnit(TimeUnit unit) thr
}
}

static LogicalTypeAnnotation getLogicalTypeAnnotation(LogicalType type) throws ParquetFileReaderException {
@Nullable
private static LogicalTypeAnnotation getLogicalTypeAnnotation(@NotNull final LogicalType type)
throws ParquetFileReaderException {
switch (type.getSetField()) {
case MAP:
return LogicalTypeAnnotation.mapType();
Expand Down Expand Up @@ -405,8 +408,9 @@ private static org.apache.parquet.schema.ColumnOrder fromParquetColumnOrder(Colu
return org.apache.parquet.schema.ColumnOrder.undefined();
}

private static LogicalTypeAnnotation getLogicalTypeAnnotation(final ConvertedType convertedType,
final LogicalType logicalType, final SchemaElement schemaElement) throws ParquetFileReaderException {
private static LogicalTypeAnnotation getLogicalTypeAnnotation(
final ConvertedType convertedType,
final SchemaElement schemaElement) throws ParquetFileReaderException {
switch (convertedType) {
case UTF8:
return LogicalTypeAnnotation.stringType();
Expand All @@ -430,12 +434,13 @@ private static LogicalTypeAnnotation getLogicalTypeAnnotation(final ConvertedTyp
case TIME_MICROS:
return LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MICROS);
case TIMESTAMP_MILLIS:
// Converted type doesn't have isAdjustedToUTC parameter, so use the information from logical type
return LogicalTypeAnnotation.timestampType(isAdjustedToUTC(logicalType),
LogicalTypeAnnotation.TimeUnit.MILLIS);
// TIMESTAMP_MILLIS is always adjusted to UTC
// ref: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS);
case TIMESTAMP_MICROS:
return LogicalTypeAnnotation.timestampType(isAdjustedToUTC(logicalType),
LogicalTypeAnnotation.TimeUnit.MICROS);
// TIMESTAMP_MICROS is always adjusted to UTC
// ref: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MICROS);
case INTERVAL:
return LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance();
case INT_8:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class S3ParquetRemoteTest {
public final EngineCleanup framework = new EngineCleanup();

@Test
public void readSampleParquetFilesFromPublicS3() {
public void readSampleParquetFilesFromPublicS3Part1() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_REMOTE_S3_TESTING);
final S3Instructions s3Instructions = S3Instructions.builder()
.regionName("us-east-2")
Expand Down Expand Up @@ -71,6 +71,21 @@ public void readSampleParquetFilesFromPublicS3() {
readInstructions).head(10).select();
}

@Test
public void readSampleParquetFilesFromPublicS3Part2() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_REMOTE_S3_TESTING);
final S3Instructions s3Instructions = S3Instructions.builder()
.regionName("eu-west-3")
.readTimeout(Duration.ofSeconds(60))
.credentials(Credentials.anonymous())
.build();
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(s3Instructions)
.build();
readTable("s3://datasets-documentation/pypi/2023/pypi_66_7_29.snappy.parquet", readInstructions)
.head(10).select();
}

@Test
public void readKeyValuePartitionedParquetFromPublicS3() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_REMOTE_S3_TESTING);
Expand Down
Loading