Skip to content

Commit

Permalink
version: Update Parquet library to latest release (#14502)
Browse files Browse the repository at this point in the history
The upstream Parquet library that is currently pinned for use in the S3 destination plugin is over a year old. The current version is generating invalid schemas for date-time with time-zone fields which appears to be addressed in the `1.12.3` release of the library in commit apache/parquet-java@c72862b
  • Loading branch information
blarghmatey authored Jul 11, 2022
1 parent 9c6c092 commit f4524e3
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ dependencies {
implementation group: 'org.apache.hadoop', name: 'hadoop-common', version: '3.3.3'
implementation group: 'org.apache.hadoop', name: 'hadoop-aws', version: '3.3.3'
implementation group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-core', version: '3.3.3'
implementation group: 'org.apache.parquet', name: 'parquet-avro', version: '1.12.0'
implementation group: 'org.apache.parquet', name: 'parquet-avro', version: '1.12.3'
implementation group: 'com.github.airbytehq', name: 'json-avro-converter', version: '1.0.1'

testImplementation 'org.apache.commons:commons-lang3:3.11'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public class ParquetSerializedBufferTest {
"field1", 10000,
"column2", "string value",
"another field", true,
"nested_column", Map.of("array_column", List.of(1, 2, 3))));
"nested_column", Map.of("array_column", List.of(1, 2, 3)),
"datetime_with_timezone", "2022-05-12T15:35:44.192950Z"));
private static final String STREAM = "stream1";
private static final AirbyteStreamNameNamespacePair streamPair = new AirbyteStreamNameNamespacePair(STREAM, null);
private static final AirbyteRecordMessage message = new AirbyteRecordMessage()
Expand All @@ -50,7 +51,8 @@ public class ParquetSerializedBufferTest {
Field.of("field1", JsonSchemaType.NUMBER),
Field.of("column2", JsonSchemaType.STRING),
Field.of("another field", JsonSchemaType.BOOLEAN),
Field.of("nested_column", JsonSchemaType.OBJECT));
Field.of("nested_column", JsonSchemaType.OBJECT),
Field.of("datetime_with_timezone", JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE));
private static final ConfiguredAirbyteCatalog catalog = CatalogHelpers.createConfiguredAirbyteCatalog(STREAM, null, FIELDS);

@Test
Expand All @@ -60,7 +62,7 @@ public void testUncompressedParquetWriter() throws Exception {
"format_type", "parquet"),
"s3_bucket_name", "test",
"s3_bucket_region", "us-east-2")));
runTest(195L, 205L, config, getExpectedString());
runTest(195L, 215L, config, getExpectedString());
}

@Test
Expand All @@ -72,14 +74,15 @@ public void testCompressedParquetWriter() throws Exception {
"s3_bucket_name", "test",
"s3_bucket_region", "us-east-2")));
// TODO: Compressed parquet is the same size as uncompressed??
runTest(195L, 205L, config, getExpectedString());
runTest(195L, 215L, config, getExpectedString());
}

private static String getExpectedString() {
return "{\"_airbyte_ab_id\": \"<UUID>\", \"_airbyte_emitted_at\": \"<timestamp>\", "
+ "\"field1\": 10000.0, \"another_field\": true, "
+ "\"nested_column\": {\"_airbyte_additional_properties\": {\"array_column\": \"[1,2,3]\"}}, "
+ "\"column2\": \"string value\", "
+ "\"datetime_with_timezone\": 1652369744192000, "
+ "\"_airbyte_additional_properties\": null}";
}

Expand Down

0 comments on commit f4524e3

Please sign in to comment.