Skip to content

Commit

Permalink
fix: Parquet format arrays (#71)
Browse files Browse the repository at this point in the history
This PR fixes the array/slice handling in the Parquet filetype by slightly updating the Parquet schema definition.

It's possible to represent UUID types with the proper logical type but it just didn't work with Athena. Also had to do some weird things like defining the uuid as **22-byte** FIXED_LEN_BYTE_ARRAY (UUIDs are 16 bytes) but giving it a 16-byte Go slice (not a Go array) and Base64 decoders in reverse transformers etc... And all this didn't even work when it came to `UUIDArray`s... So I gave up on that for now.

JSON logicaltype seems to somewhat work with DuckDB but doesn't with Athena so I removed that as well.
  • Loading branch information
disq authored Feb 19, 2023
1 parent b6fa8c8 commit 4a72a70
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 41 deletions.
2 changes: 1 addition & 1 deletion parquet/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func (*Client) Read(f io.Reader, table *schema.Table, sourceName string, res cha
return err
}

s := makeSchema(table.Columns)
s := makeSchema(table.Name, table.Columns)
r, err := reader.NewParquetReader(newPQReader(buf.Bytes()), s, 2)
if err != nil {
return fmt.Errorf("can't create parquet reader: %w", err)
Expand Down
106 changes: 67 additions & 39 deletions parquet/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,56 +8,84 @@ import (
pschema "github.com/xitongsys/parquet-go/schema"
)

func makeSchema(cols schema.ColumnList) string {
func makeSchema(tableName string, cols schema.ColumnList) string {
s := pschema.JSONSchemaItemType{
Tag: `name=parquet_go_root, repetitiontype=REQUIRED`,
Tag: `name=` + tableName + `_root, repetitiontype=REQUIRED`,
}

for i := range cols {
tag := `name=` + cols[i].Name
if opts := structOptsForColumn(cols[i]); len(opts) > 0 {
tag += ", " + strings.Join(opts, ", ")
for _, col := range cols {
var subFields []*pschema.JSONSchemaItemType

tag := []string{`name=` + col.Name}

switch col.Type {
case schema.TypeTimestamp:
tag = append(tag, "type=INT64", "convertedtype=TIMESTAMP_MILLIS")
case schema.TypeJSON, schema.TypeString, schema.TypeUUID, schema.TypeCIDR, schema.TypeInet, schema.TypeMacAddr:
tag = append(tag, "type=BYTE_ARRAY", "convertedtype=UTF8")
case schema.TypeFloat:
tag = append(tag, "type=DOUBLE")
case schema.TypeInt:
tag = append(tag, "type=INT64")
case schema.TypeByteArray:
tag = append(tag, "type=BYTE_ARRAY")
case schema.TypeBool:
tag = append(tag, "type=BOOLEAN")
case schema.TypeIntArray:
tag = append(tag, "type=LIST", "repetitiontype=OPTIONAL")
subFields = []*pschema.JSONSchemaItemType{
{
Tag: "name=element, type=INT64, repetitiontype=OPTIONAL",
},
}
case schema.TypeStringArray, schema.TypeUUIDArray, schema.TypeCIDRArray, schema.TypeInetArray, schema.TypeMacAddrArray:
tag = append(tag, "type=LIST", "repetitiontype=OPTIONAL")
subFields = []*pschema.JSONSchemaItemType{
{
Tag: "name=element, type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL",
},
}
default:
panic("unhandled type: " + col.Type.String())
}
s.Fields = append(s.Fields, &pschema.JSONSchemaItemType{Tag: tag})

if !isArray(col.Type) { // array types are handled differently, see above
if col.CreationOptions.PrimaryKey || col.CreationOptions.IncrementalKey {
tag = append(tag, "repetitiontype=REQUIRED")
} else {
tag = append(tag, "repetitiontype=OPTIONAL")
}
}

s.Fields = append(s.Fields, &pschema.JSONSchemaItemType{
Tag: strings.Join(tag, ", "),
Fields: subFields,
})
}

b, _ := json.Marshal(s)
return string(b)
}

func structOptsForColumn(col schema.Column) []string {
opts := []string{}

switch col.Type {
case schema.TypeJSON:
opts = append(opts, "type=BYTE_ARRAY", "convertedtype=UTF8")
case schema.TypeTimestamp:
opts = append(opts, "type=INT64", "convertedtype=TIMESTAMP_MILLIS")
case schema.TypeString, schema.TypeUUID, schema.TypeCIDR, schema.TypeInet, schema.TypeMacAddr,
schema.TypeStringArray, schema.TypeUUIDArray, schema.TypeCIDRArray, schema.TypeInetArray, schema.TypeMacAddrArray:
opts = append(opts, "type=BYTE_ARRAY", "convertedtype=UTF8")
case schema.TypeFloat:
opts = append(opts, "type=DOUBLE")
case schema.TypeInt, schema.TypeIntArray:
opts = append(opts, "type=INT64")
case schema.TypeByteArray:
opts = append(opts, "type=BYTE_ARRAY")
case schema.TypeBool:
opts = append(opts, "type=BOOLEAN")
default:
panic("unhandled type: " + col.Type.String())
}
func isArray(t schema.ValueType) bool {
return arrayElement(t) != schema.TypeInvalid
}

switch col.Type {
case schema.TypeStringArray, schema.TypeIntArray, schema.TypeUUIDArray, schema.TypeCIDRArray, schema.TypeInetArray, schema.TypeMacAddrArray:
opts = append(opts, "repetitiontype=REPEATED")
func arrayElement(t schema.ValueType) schema.ValueType {
switch t {
case schema.TypeIntArray:
return schema.TypeInt
case schema.TypeStringArray:
return schema.TypeString
case schema.TypeUUIDArray:
return schema.TypeUUID
case schema.TypeCIDRArray:
return schema.TypeCIDR
case schema.TypeInetArray:
return schema.TypeInet
case schema.TypeMacAddrArray:
return schema.TypeMacAddr
default:
if col.CreationOptions.PrimaryKey || col.CreationOptions.IncrementalKey {
opts = append(opts, "repetitiontype=REQUIRED")
} else {
opts = append(opts, "repetitiontype=OPTIONAL")
}
return schema.TypeInvalid
}

return opts
}
1 change: 1 addition & 0 deletions parquet/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func (ReverseTransformer) ReverseTransformValues(table *schema.Table, values []a
if err := t.Set(v); err != nil {
return nil, fmt.Errorf("failed to convert value %v to type %s: %w", v, table.Columns[i].Type, err)
}
res[i] = t
continue
}

Expand Down
2 changes: 1 addition & 1 deletion parquet/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

func (*Client) WriteTableBatch(w io.Writer, table *schema.Table, resources [][]any) error {
pw, err := writer.NewJSONWriterFromWriter(makeSchema(table.Columns), w, 2)
pw, err := writer.NewJSONWriterFromWriter(makeSchema(table.Name, table.Columns), w, 2)
if err != nil {
return fmt.Errorf("can't create parquet writer: %w", err)
}
Expand Down

0 comments on commit 4a72a70

Please sign in to comment.