-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve Parquet #1064
Merged
Merged
Improve Parquet #1064
Changes from all commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
4a8bdb6
Parquet refactor.
Tang8330 bcedca7
WIP.
Tang8330 7493bab
Clean up.
Tang8330 4a7b710
WIP.
Tang8330 d3990d3
WIP.
Tang8330 d9ab850
Checkpoint.
Tang8330 6c66925
Fix.
Tang8330 89337c8
Clean.
Tang8330 60e9f35
Clean up.
Tang8330 b8fde48
Remove inName
Tang8330 a489e36
Adding a comment.
Tang8330 c132d13
Clean up.
Tang8330 b32e4d2
Clean up more.
Tang8330 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,11 +2,11 @@ package s3 | |
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"fmt" | ||
"log/slog" | ||
"os" | ||
"strings" | ||
"time" | ||
|
||
"github.com/xitongsys/parquet-go-source/local" | ||
"github.com/xitongsys/parquet-go/parquet" | ||
|
@@ -49,8 +49,8 @@ func (s *Store) IdentifierFor(topicConfig kafkalib.TopicConfig, table string) sq | |
func (s *Store) ObjectPrefix(tableData *optimization.TableData) string { | ||
tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name()) | ||
fqTableName := tableID.FullyQualifiedName() | ||
yyyyMMDDFormat := tableData.LatestCDCTs.Format(ext.PostgresDateFormat) | ||
|
||
// Adding date= prefix so that it adheres to the partitioning format for Hive. | ||
yyyyMMDDFormat := fmt.Sprintf("date=%s", time.Now().Format(ext.PostgresDateFormat)) | ||
if len(s.config.S3.FolderName) > 0 { | ||
return strings.Join([]string{s.config.S3.FolderName, fqTableName, yyyyMMDDFormat}, "/") | ||
} | ||
|
@@ -78,7 +78,7 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er | |
} | ||
|
||
cols := tableData.ReadOnlyInMemoryCols().ValidColumns() | ||
schema, err := parquetutil.GenerateJSONSchema(cols) | ||
schema, err := parquetutil.BuildCSVSchema(cols) | ||
if err != nil { | ||
return fmt.Errorf("failed to generate parquet schema: %w", err) | ||
} | ||
|
@@ -89,29 +89,24 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er | |
return fmt.Errorf("failed to create a local parquet file: %w", err) | ||
} | ||
|
||
pw, err := writer.NewJSONWriter(schema, fw, 4) | ||
pw, err := writer.NewCSVWriter(schema, fw, 4) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using CSV instead since JSON schema for Parquet is not widely supported |
||
if err != nil { | ||
return fmt.Errorf("failed to instantiate parquet writer: %w", err) | ||
} | ||
|
||
pw.CompressionType = parquet.CompressionCodec_GZIP | ||
for _, val := range tableData.Rows() { | ||
row := make(map[string]any) | ||
var row []any | ||
for _, col := range cols { | ||
value, err := parquetutil.ParseValue(val[col.Name()], col) | ||
value, err := parquetutil.ParseValue(val[col.Name()], col.KindDetails) | ||
if err != nil { | ||
return fmt.Errorf("failed to parse value, err: %w, value: %v, column: %q", err, val[col.Name()], col.Name()) | ||
} | ||
|
||
row[col.Name()] = value | ||
} | ||
|
||
rowBytes, err := json.Marshal(row) | ||
if err != nil { | ||
return fmt.Errorf("failed to marshal row: %w", err) | ||
row = append(row, value) | ||
} | ||
|
||
if err = pw.Write(string(rowBytes)); err != nil { | ||
if err = pw.Write(row); err != nil { | ||
return fmt.Errorf("failed to write row: %w", err) | ||
} | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,34 +1,20 @@ | ||
package parquetutil | ||
|
||
import ( | ||
"encoding/json" | ||
|
||
"github.com/artie-labs/transfer/lib/typing" | ||
"github.com/artie-labs/transfer/lib/typing/columns" | ||
) | ||
|
||
func GenerateJSONSchema(columns []columns.Column) (string, error) { | ||
var fields []typing.Field | ||
func BuildCSVSchema(columns []columns.Column) ([]string, error) { | ||
var fields []string | ||
for _, column := range columns { | ||
// We don't need to escape the column name here. | ||
field, err := column.KindDetails.ParquetAnnotation(column.Name()) | ||
if err != nil { | ||
return "", err | ||
return nil, err | ||
} | ||
|
||
fields = append(fields, *field) | ||
} | ||
|
||
schemaBytes, err := json.Marshal( | ||
typing.Field{ | ||
Tag: typing.FieldTag{Name: "parquet-go-root", RepetitionType: typing.ToPtr("REQUIRED")}.String(), | ||
Fields: fields, | ||
}, | ||
) | ||
|
||
if err != nil { | ||
return "", err | ||
fields = append(fields, field.Tag) | ||
} | ||
|
||
return string(schemaBytes), nil | ||
return fields, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,17 +9,16 @@ import ( | |
"github.com/artie-labs/transfer/lib/array" | ||
"github.com/artie-labs/transfer/lib/config/constants" | ||
"github.com/artie-labs/transfer/lib/typing" | ||
"github.com/artie-labs/transfer/lib/typing/columns" | ||
"github.com/artie-labs/transfer/lib/typing/decimal" | ||
"github.com/artie-labs/transfer/lib/typing/ext" | ||
) | ||
|
||
func ParseValue(colVal any, colKind columns.Column) (any, error) { | ||
func ParseValue(colVal any, colKind typing.KindDetails) (any, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using colKind instead, we don't need the whole column |
||
if colVal == nil { | ||
return nil, nil | ||
} | ||
|
||
switch colKind.KindDetails.Kind { | ||
switch colKind.Kind { | ||
case typing.Date.Kind: | ||
_time, err := ext.ParseDateFromAny(colVal) | ||
if err != nil { | ||
|
@@ -51,7 +50,7 @@ func ParseValue(colVal any, colKind columns.Column) (any, error) { | |
case typing.String.Kind: | ||
return colVal, nil | ||
case typing.Struct.Kind: | ||
if colKind.KindDetails == typing.Struct { | ||
if colKind == typing.Struct { | ||
if strings.Contains(fmt.Sprint(colVal), constants.ToastUnavailableValuePlaceholder) { | ||
colVal = map[string]any{ | ||
"key": constants.ToastUnavailableValuePlaceholder, | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
date=
so that it's consistent with Hive's partitioning scheme: https://lakefs.io/blog/hive-metastore-it-didnt-age-well/#hives-partitioning-scheme