Skip to content

Commit

Permalink
util/parquet: support tuples
Browse files Browse the repository at this point in the history
This change adds support for writing tuples. Implementation details below.

The standard way to write a tuple in parquet is to use a group:
```
message schema {                 -- toplevel schema
   optional group a (LIST) {
       optional T1 element;       -- physical column for the first field
       ...
       optional Tn element;       -- physical column for the nth field
   }
}
```

Because parquet has a very strict format, it does not write such groups
as one column with all the fields adjacent to each other. Instead, it
writes each field in the tuple as its own column. This 1:N mapping
from CRDB datum to physical column in parquet violates the assumption
used in this library that the mapping is 1:1.

This change aims to update the library to break that assumption. Firstly,
there is now a clear distiction between a "datum column" and a "physical
column". Also, the `Writer` is updated to be able to write to multiple
physical columns for a given datum, and the reader is updated
to "squash" physical columns into single tuple datums if needed. Finally,
randomized testing and benchmarking is extended to cover tuples.

Informs: #99028
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-15071
Release note: None
  • Loading branch information
jayshrivastava committed May 19, 2023
1 parent b5283bc commit 71f2e7c
Show file tree
Hide file tree
Showing 8 changed files with 653 additions and 218 deletions.
20 changes: 7 additions & 13 deletions pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1232,29 +1232,23 @@ func extractKeyFromJSONValue(isBare bool, wrapped []byte) (key []byte, value []b
func (c *cloudFeed) appendParquetTestFeedMessages(
path string, topic string, envelopeType changefeedbase.EnvelopeType,
) (err error) {
meta, datums, closeReader, err := parquet.ReadFile(path)
meta, datums, err := parquet.ReadFile(path)
if err != nil {
return err
}
defer func() {
closeErr := closeReader()
if closeErr != nil {
err = errors.CombineErrors(err, closeErr)
}
}()

primaryKeyColumnsString := meta.KeyValueMetadata().FindValue("keyCols")
if primaryKeyColumnsString == nil {
primaryKeyColumnsString, ok := meta.KVMeta["keyCols"]
if !ok {
return errors.Errorf("could not find primary key column names in parquet metadata")
}

columnsNamesString := meta.KeyValueMetadata().FindValue("allCols")
if columnsNamesString == nil {
columnsNamesString, ok := meta.KVMeta["allCols"]
if !ok {
return errors.Errorf("could not find column names in parquet metadata")
}

primaryKeys := strings.Split(*primaryKeyColumnsString, ",")
columns := strings.Split(*columnsNamesString, ",")
primaryKeys := strings.Split(primaryKeyColumnsString, ",")
columns := strings.Split(columnsNamesString, ",")

columnNameSet := make(map[string]struct{})
primaryKeyColumnSet := make(map[string]struct{})
Expand Down
1 change: 1 addition & 0 deletions pkg/util/parquet/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ go_test(
"//pkg/util/bitarray",
"//pkg/util/duration",
"//pkg/util/ipaddr",
"//pkg/util/json",
"//pkg/util/timeutil",
"//pkg/util/timeutil/pgdate",
"//pkg/util/uuid",
Expand Down
Loading

0 comments on commit 71f2e7c

Please sign in to comment.