Skip to content

Commit

Permalink
[Bug][Connector][FileBase]Parquet reader parsing array type exception. (
Browse files Browse the repository at this point in the history
apache#4457)



---------

Co-authored-by: lightzhao <zhaolianyong777@gmail.com>
  • Loading branch information
2 people authored and DESKTOP-GHPCOV0\dingaolong committed Dec 19, 2023
1 parent 4db0b52 commit 11f4a76
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.avro.data.TimeConversions;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -132,7 +133,16 @@ private Object resolveObject(Object field, SeaTunnelDataType<?> fieldType) {
switch (fieldType.getSqlType()) {
case ARRAY:
ArrayList<Object> origArray = new ArrayList<>();
((GenericData.Array<?>) field).iterator().forEachRemaining(origArray::add);
((GenericData.Array<?>) field)
.iterator()
.forEachRemaining(
ele -> {
if (ele instanceof Utf8) {
origArray.add(ele.toString());
} else {
origArray.add(ele);
}
});
SeaTunnelDataType<?> elementType = ((ArrayType<?, ?>) fieldType).getElementType();
switch (elementType.getSqlType()) {
case STRING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,32 @@
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;

import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;

import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Paths;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -157,6 +172,29 @@ public void testParquetReadProjection2() throws Exception {
parquetReadStrategy.read(path, "", testCollector);
}

@DisabledOnOs(OS.WINDOWS)
@Test
public void testParquetReadArray() throws Exception {
AutoGenerateParquetData.generateTestData();
ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
parquetReadStrategy.init(localConf);
SeaTunnelRowType seaTunnelRowTypeInfo =
parquetReadStrategy.getSeaTunnelRowTypeInfo(
localConf, AutoGenerateParquetData.DATA_FILE_PATH);
Assertions.assertNotNull(seaTunnelRowTypeInfo);
Assertions.assertEquals(seaTunnelRowTypeInfo.getFieldType(3).getClass(), ArrayType.class);
TestCollector testCollector = new TestCollector();
parquetReadStrategy.read(AutoGenerateParquetData.DATA_FILE_PATH, "1", testCollector);
List<SeaTunnelRow> rows = testCollector.getRows();
SeaTunnelRow seaTunnelRow = rows.get(0);
Assertions.assertEquals(seaTunnelRow.getField(1).toString(), "Alice");
String[] arrayData = (String[]) seaTunnelRow.getField(3);
Assertions.assertEquals(arrayData.length, 2);
Assertions.assertEquals(arrayData[0], "Java");
AutoGenerateParquetData.deleteFile();
}

public static class TestCollector implements Collector<SeaTunnelRow> {

private final List<SeaTunnelRow> rows = new ArrayList<>();
Expand Down Expand Up @@ -195,4 +233,58 @@ public String getSchema() {
return SCHEMA;
}
}

public static class AutoGenerateParquetData {

public static final String DATA_FILE_PATH = "/tmp/data.parquet";

public static void generateTestData() throws IOException {
deleteFile();
String schemaString =
"{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"salary\",\"type\":\"double\"},{\"name\":\"skills\",\"type\":{\"type\":\"array\",\"items\":\"string\"}}]}";
Schema schema = new Schema.Parser().parse(schemaString);

Configuration conf = new Configuration();

Path file = new Path(DATA_FILE_PATH);

ParquetWriter<GenericRecord> writer =
AvroParquetWriter.<GenericRecord>builder(file)
.withSchema(schema)
.withConf(conf)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build();

GenericRecord record1 = new GenericData.Record(schema);
record1.put("id", 1);
record1.put("name", "Alice");
record1.put("salary", 50000.0);
GenericArray<Utf8> skills1 =
new GenericData.Array<>(2, schema.getField("skills").schema());
skills1.add(new Utf8("Java"));
skills1.add(new Utf8("Python"));
record1.put("skills", skills1);
writer.write(record1);

GenericRecord record2 = new GenericData.Record(schema);
record2.put("id", 2);
record2.put("name", "Bob");
record2.put("salary", 60000.0);
GenericArray<Utf8> skills2 =
new GenericData.Array<>(2, schema.getField("skills").schema());
skills2.add(new Utf8("C++"));
skills2.add(new Utf8("Go"));
record2.put("skills", skills2);
writer.write(record2);

writer.close();
}

public static void deleteFile() {
File parquetFile = new File(DATA_FILE_PATH);
if (parquetFile.exists()) {
parquetFile.delete();
}
}
}
}

0 comments on commit 11f4a76

Please sign in to comment.