From 1e77e012b1941faa67b92e8ed0be5076939c777a Mon Sep 17 00:00:00 2001 From: xitongsys Date: Sat, 2 May 2020 12:58:11 +0800 Subject: [PATCH 1/2] ad schema in table/page --- layout/chunk.go | 8 +++---- layout/dictpage.go | 17 +++++++-------- layout/page.go | 47 +++++++++++++++++++----------------------- layout/table.go | 9 +++----- marshal/csv.go | 3 +-- marshal/json.go | 3 +-- marshal/marshal.go | 3 +-- reader/columnbuffer.go | 3 +-- writer/writer.go | 2 +- 9 files changed, 40 insertions(+), 55 deletions(-) diff --git a/layout/chunk.go b/layout/chunk.go index 3c5fd1cb..ab3881ae 100644 --- a/layout/chunk.go +++ b/layout/chunk.go @@ -23,7 +23,7 @@ func PagesToChunk(pages []*Page) *Chunk { var maxVal interface{} = pages[0].MaxVal var minVal interface{} = pages[0].MinVal - pT, cT := pages[0].DataType, pages[0].DataConvertedType + pT, cT := pages[0].Schema.Type, pages[0].Schema.ConvertedType for i := 0; i < ln; i++ { if pages[i].Header.DataPageHeader != nil { @@ -41,7 +41,7 @@ func PagesToChunk(pages []*Page) *Chunk { chunk.Pages = pages chunk.ChunkHeader = parquet.NewColumnChunk() metaData := parquet.NewColumnMetaData() - metaData.Type = *pages[0].DataType + metaData.Type = *pages[0].Schema.Type metaData.Encodings = append(metaData.Encodings, parquet.Encoding_RLE) metaData.Encodings = append(metaData.Encodings, parquet.Encoding_BIT_PACKED) metaData.Encodings = append(metaData.Encodings, parquet.Encoding_PLAIN) @@ -80,7 +80,7 @@ func PagesToDictChunk(pages []*Page) *Chunk { var maxVal interface{} = pages[1].MaxVal var minVal interface{} = pages[1].MinVal - pT, cT := pages[1].DataType, pages[1].DataConvertedType + pT, cT := pages[1].Schema.Type, pages[1].Schema.ConvertedType for i := 0; i < len(pages); i++ { if pages[i].Header.DataPageHeader != nil { @@ -100,7 +100,7 @@ func PagesToDictChunk(pages []*Page) *Chunk { chunk.Pages = pages chunk.ChunkHeader = parquet.NewColumnChunk() metaData := parquet.NewColumnMetaData() - metaData.Type = *pages[1].DataType + metaData.Type = *pages[1].Schema.Type metaData.Encodings = append(metaData.Encodings, parquet.Encoding_RLE) metaData.Encodings = append(metaData.Encodings, parquet.Encoding_BIT_PACKED) metaData.Encodings = append(metaData.Encodings, parquet.Encoding_PLAIN) diff --git a/layout/dictpage.go b/layout/dictpage.go index 5b496ee8..c69696d1 100644 --- a/layout/dictpage.go +++ b/layout/dictpage.go @@ -35,8 +35,9 @@ func DictRecToDictPage(dictRec *DictRecType, pageSize int32, compressType parque page.DataTable = new(Table) page.DataTable.Values = dictRec.DictSlice dataType := parquet.Type_INT32 - page.DataType = &dataType - page.DataConvertedType = nil + page.Schema = &parquet.SchemaElement{ + Type: &dataType, + } page.CompressType = compressType page.DictPageCompress(compressType, dictRec.Type) @@ -75,9 +76,8 @@ func TableToDictDataPages(dictRec *DictRecType, table *Table, pageSize int32, bi totalLn := len(table.Values) res := make([]*Page, 0) i := 0 - dataType := table.Type - pT, cT := table.Type, table.ConvertedType + pT, cT := table.Schema.Type, table.Schema.ConvertedType for i < totalLn { j := i @@ -118,8 +118,7 @@ func TableToDictDataPages(dictRec *DictRecType, table *Table, pageSize int32, bi page.DataTable.RepetitionLevels = table.RepetitionLevels[i:j] page.MaxVal = maxVal page.MinVal = minVal - - page.DataType = dataType //check !!! + page.Schema = table.Schema page.CompressType = compressType page.Path = table.Path page.Info = table.Info @@ -199,7 +198,6 @@ func (page *Page) DictDataPageCompress(compressType parquet.CompressionCodec, bi func TableToDictPage(table *Table, pageSize int32, compressType parquet.CompressionCodec) (*Page, int64) { var totSize int64 = 0 totalLn := len(table.Values) - pT, cT := table.Type, table.ConvertedType page := NewDataPage() page.PageSize = pageSize @@ -214,13 +212,12 @@ func TableToDictPage(table *Table, pageSize int32, compressType parquet.Compress page.DataTable.Values = table.Values page.DataTable.DefinitionLevels = table.DefinitionLevels page.DataTable.RepetitionLevels = table.RepetitionLevels - page.DataType = pT - page.DataConvertedType = cT + page.Schema = table.Schema page.CompressType = compressType page.Path = table.Path page.Info = table.Info - page.DictPageCompress(compressType, *page.DataType) + page.DictPageCompress(compressType, *page.Schema.Type) totSize += int64(len(page.RawData)) return page, totSize } diff --git a/layout/page.go b/layout/page.go index 7c0bd5a2..df09328e 100644 --- a/layout/page.go +++ b/layout/page.go @@ -25,10 +25,8 @@ type Page struct { RawData []byte //Compress type: gzip/snappy/zstd/none CompressType parquet.CompressionCodec - //Parquet type of the values in the page - DataType *parquet.Type - //Parquet converted type of values in page - DataConvertedType *parquet.ConvertedType + //Schema + Schema *parquet.SchemaElement //Path in schema(include the root) Path []string //Maximum of the values @@ -73,7 +71,7 @@ func TableToDataPages(table *Table, pageSize int32, compressType parquet.Compres totalLn := len(table.Values) res := make([]*Page, 0) i := 0 - pT, cT := table.Type, table.ConvertedType + pT, cT := table.Schema.Type, table.Schema.ConvertedType for i < totalLn { j := i + 1 @@ -108,8 +106,7 @@ func TableToDataPages(table *Table, pageSize int32, compressType parquet.Compres page.DataTable.RepetitionLevels = table.RepetitionLevels[i:j] page.MaxVal = maxVal page.MinVal = minVal - page.DataType = pT - page.DataConvertedType = cT + page.Schema = table.Schema page.CompressType = compressType page.Path = table.Path page.Info = table.Info @@ -159,7 +156,7 @@ func (page *Page) EncodingValues(valuesBuf []interface{}) []byte { } if encodingMethod == parquet.Encoding_RLE { bitWidth := page.Info.Length - return encoding.WriteRLEBitPackedHybrid(valuesBuf, bitWidth, *page.DataType) + return encoding.WriteRLEBitPackedHybrid(valuesBuf, bitWidth, *page.Schema.Type) } else if encodingMethod == parquet.Encoding_DELTA_BINARY_PACKED { return encoding.WriteDelta(valuesBuf) @@ -171,7 +168,7 @@ func (page *Page) EncodingValues(valuesBuf []interface{}) []byte { return encoding.WriteDeltaLengthByteArray(valuesBuf) } else { - return encoding.WritePlain(valuesBuf, *page.DataType) + return encoding.WritePlain(valuesBuf, *page.Schema.Type) } return []byte{} } @@ -235,17 +232,17 @@ func (page *Page) DataPageCompress(compressType parquet.CompressionCodec) []byte page.Header.DataPageHeader.Statistics = parquet.NewStatistics() if page.MaxVal != nil { - tmpBuf := encoding.WritePlain([]interface{}{page.MaxVal}, *page.DataType) - if (page.DataConvertedType != nil && *page.DataConvertedType == parquet.ConvertedType_DECIMAL) || - (page.DataConvertedType != nil && *page.DataConvertedType == parquet.ConvertedType_UTF8) { + tmpBuf := encoding.WritePlain([]interface{}{page.MaxVal}, *page.Schema.Type) + if (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_DECIMAL) || + (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_UTF8) { tmpBuf = tmpBuf[4:] } page.Header.DataPageHeader.Statistics.Max = tmpBuf } if page.MinVal != nil { - tmpBuf := encoding.WritePlain([]interface{}{page.MinVal}, *page.DataType) - if (page.DataConvertedType != nil && *page.DataConvertedType == parquet.ConvertedType_DECIMAL) || - (page.DataConvertedType != nil && *page.DataConvertedType == parquet.ConvertedType_UTF8) { + tmpBuf := encoding.WritePlain([]interface{}{page.MinVal}, *page.Schema.Type) + if (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_DECIMAL) || + (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_UTF8) { tmpBuf = tmpBuf[4:] } page.Header.DataPageHeader.Statistics.Min = tmpBuf @@ -325,17 +322,17 @@ func (page *Page) DataPageV2Compress(compressType parquet.CompressionCodec) []by page.Header.DataPageHeaderV2.Statistics = parquet.NewStatistics() if page.MaxVal != nil { - tmpBuf := encoding.WritePlain([]interface{}{page.MaxVal}, *page.DataType) - if (page.DataConvertedType != nil && *page.DataConvertedType == parquet.ConvertedType_DECIMAL) || - (page.DataConvertedType != nil && *page.DataConvertedType == parquet.ConvertedType_UTF8) { + tmpBuf := encoding.WritePlain([]interface{}{page.MaxVal}, *page.Schema.Type) + if (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_DECIMAL) || + (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_UTF8) { tmpBuf = tmpBuf[4:] } page.Header.DataPageHeaderV2.Statistics.Max = tmpBuf } if page.MinVal != nil { - tmpBuf := encoding.WritePlain([]interface{}{page.MinVal}, *page.DataType) - if (page.DataConvertedType != nil && *page.DataConvertedType == parquet.ConvertedType_DECIMAL) || - (page.DataConvertedType != nil && *page.DataConvertedType == parquet.ConvertedType_UTF8) { + tmpBuf := encoding.WritePlain([]interface{}{page.MinVal}, *page.Schema.Type) + if (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_DECIMAL) || + (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_UTF8) { tmpBuf = tmpBuf[4:] } page.Header.DataPageHeaderV2.Statistics.Min = tmpBuf @@ -406,9 +403,7 @@ func ReadPageRawData(thriftReader *thrift.TBufferedTransport, schemaHandler *sch page.Path = append(page.Path, colMetaData.GetPathInSchema()...) pathIndex := schemaHandler.MapIndex[common.PathToStr(page.Path)] schema := schemaHandler.SchemaElements[pathIndex] - pT, cT := schema.GetType(), schema.GetConvertedType() - page.DataType = &pT - page.DataConvertedType = &cT + page.Schema = schema return page, nil } @@ -550,7 +545,7 @@ func (self *Page) GetValueFromRawData(schemaHandler *schema.SchemaHandler) error case parquet.PageType_DICTIONARY_PAGE: bytesReader := bytes.NewReader(self.RawData) self.DataTable.Values, err = encoding.ReadPlain(bytesReader, - *self.DataType, + *self.Schema.Type, uint64(self.Header.DictionaryPageHeader.GetNumValues()), 0) if err != nil { @@ -581,7 +576,7 @@ func (self *Page) GetValueFromRawData(schemaHandler *schema.SchemaHandler) error values, err = ReadDataPageValues(bytesReader, encodingType, - *self.DataType, + *self.Schema.Type, ct, uint64(len(self.DataTable.DefinitionLevels))-numNulls, uint64(schemaHandler.SchemaElements[schemaHandler.MapIndex[name]].GetTypeLength())) diff --git a/layout/table.go b/layout/table.go index c5b247bd..3d69c63a 100644 --- a/layout/table.go +++ b/layout/table.go @@ -10,8 +10,7 @@ func NewTableFromTable(src *Table) *Table { return nil } table := new(Table) - table.Type = src.Type - table.ConvertedType = src.ConvertedType + table.Schema = src.Schema table.Path = append(table.Path, src.Path...) table.MaxDefinitionLevel = 0 table.MaxRepetitionLevel = 0 @@ -29,10 +28,8 @@ func NewEmptyTable() *Table { type Table struct { //Repetition type of the values: REQUIRED/OPTIONAL/REPEATED RepetitionType parquet.FieldRepetitionType - //Parquet type - Type *parquet.Type - //Converted type - ConvertedType *parquet.ConvertedType + //Schema + Schema *parquet.SchemaElement //Path of this column Path []string //Maximum of definition levels diff --git a/marshal/csv.go b/marshal/csv.go index 4a188f8c..a1e515a5 100644 --- a/marshal/csv.go +++ b/marshal/csv.go @@ -21,8 +21,7 @@ func MarshalCSV(records []interface{}, bgn int, end int, schemaHandler *schema.S res[pathStr].MaxDefinitionLevel = 1 res[pathStr].MaxRepetitionLevel = 0 res[pathStr].RepetitionType = parquet.FieldRepetitionType_OPTIONAL - res[pathStr].Type = schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]].Type - res[pathStr].ConvertedType = schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]].ConvertedType + res[pathStr].Schema = schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]] res[pathStr].Info = schemaHandler.Infos[i+1] for j := bgn; j < end; j++ { diff --git a/marshal/json.go b/marshal/json.go index 856f45fe..332ec5ad 100644 --- a/marshal/json.go +++ b/marshal/json.go @@ -42,8 +42,7 @@ func MarshalJSON(ss []interface{}, bgn int, end int, schemaHandler *schema.Schem res[pathStr].MaxDefinitionLevel, _ = schemaHandler.MaxDefinitionLevel(res[pathStr].Path) res[pathStr].MaxRepetitionLevel, _ = schemaHandler.MaxRepetitionLevel(res[pathStr].Path) res[pathStr].RepetitionType = schema.GetRepetitionType() - res[pathStr].Type = schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]].Type - res[pathStr].ConvertedType = schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]].ConvertedType + res[pathStr].Schema = schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]] res[pathStr].Info = schemaHandler.Infos[i] } } diff --git a/marshal/marshal.go b/marshal/marshal.go index 41c4b8cc..3e04982c 100644 --- a/marshal/marshal.go +++ b/marshal/marshal.go @@ -223,8 +223,7 @@ func Marshal(srcInterface []interface{}, bgn int, end int, schemaHandler *schema res[pathStr].MaxDefinitionLevel, _ = schemaHandler.MaxDefinitionLevel(res[pathStr].Path) res[pathStr].MaxRepetitionLevel, _ = schemaHandler.MaxRepetitionLevel(res[pathStr].Path) res[pathStr].RepetitionType = schema.GetRepetitionType() - res[pathStr].Type = schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]].Type - res[pathStr].ConvertedType = schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]].ConvertedType + res[pathStr].Schema = schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]] res[pathStr].Info = schemaHandler.Infos[i] } } diff --git a/reader/columnbuffer.go b/reader/columnbuffer.go index b6370d34..b2021023 100644 --- a/reader/columnbuffer.go +++ b/reader/columnbuffer.go @@ -112,8 +112,7 @@ func (self *ColumnBufferType) ReadPage() error { if self.DataTable == nil { index := self.SchemaHandler.MapIndex[self.PathStr] self.DataTable = layout.NewEmptyTable() - self.DataTable.Type = self.SchemaHandler.SchemaElements[index].Type - self.DataTable.ConvertedType = self.SchemaHandler.SchemaElements[index].ConvertedType + self.DataTable.Schema = self.SchemaHandler.SchemaElements[index] self.DataTable.Path = common.StrToPath(self.PathStr) } diff --git a/writer/writer.go b/writer/writer.go index 7ada2da4..3af349a1 100644 --- a/writer/writer.go +++ b/writer/writer.go @@ -227,7 +227,7 @@ func (self *ParquetWriter) flushObjs() error { table.Info.Encoding == parquet.Encoding_RLE_DICTIONARY { lock.Lock() if _, ok := self.DictRecs[name]; !ok { - self.DictRecs[name] = layout.NewDictRec(*table.Type) + self.DictRecs[name] = layout.NewDictRec(*table.Schema.Type) } pagesMapList[index][name], _ = layout.TableToDictDataPages(self.DictRecs[name], table, int32(self.PageSize), 32, self.CompressionType) From ae1632779819c7d59281a579e9478455c4d0f74b Mon Sep 17 00:00:00 2001 From: xitongsys Date: Sat, 2 May 2020 15:49:13 +0800 Subject: [PATCH 2/2] fix statistics of fix_length_byte_array bug --- layout/page.go | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/layout/page.go b/layout/page.go index df09328e..4e8feee8 100644 --- a/layout/page.go +++ b/layout/page.go @@ -233,16 +233,18 @@ func (page *Page) DataPageCompress(compressType parquet.CompressionCodec) []byte page.Header.DataPageHeader.Statistics = parquet.NewStatistics() if page.MaxVal != nil { tmpBuf := encoding.WritePlain([]interface{}{page.MaxVal}, *page.Schema.Type) - if (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_DECIMAL) || - (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_UTF8) { + if *page.Schema.Type == parquet.Type_BYTE_ARRAY { + // if (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_DECIMAL) || + // (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_UTF8) { tmpBuf = tmpBuf[4:] } page.Header.DataPageHeader.Statistics.Max = tmpBuf } if page.MinVal != nil { tmpBuf := encoding.WritePlain([]interface{}{page.MinVal}, *page.Schema.Type) - if (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_DECIMAL) || - (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_UTF8) { + if *page.Schema.Type == parquet.Type_BYTE_ARRAY { + // if (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_DECIMAL) || + // (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_UTF8) { tmpBuf = tmpBuf[4:] } page.Header.DataPageHeader.Statistics.Min = tmpBuf @@ -323,16 +325,18 @@ func (page *Page) DataPageV2Compress(compressType parquet.CompressionCodec) []by page.Header.DataPageHeaderV2.Statistics = parquet.NewStatistics() if page.MaxVal != nil { tmpBuf := encoding.WritePlain([]interface{}{page.MaxVal}, *page.Schema.Type) - if (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_DECIMAL) || - (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_UTF8) { + if *page.Schema.Type == parquet.Type_BYTE_ARRAY { + // if (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_DECIMAL) || + // (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_UTF8) { tmpBuf = tmpBuf[4:] } page.Header.DataPageHeaderV2.Statistics.Max = tmpBuf } if page.MinVal != nil { tmpBuf := encoding.WritePlain([]interface{}{page.MinVal}, *page.Schema.Type) - if (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_DECIMAL) || - (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_UTF8) { + if *page.Schema.Type == parquet.Type_BYTE_ARRAY { + // if (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_DECIMAL) || + // (page.Schema.ConvertedType != nil && *page.Schema.ConvertedType == parquet.ConvertedType_UTF8) { tmpBuf = tmpBuf[4:] } page.Header.DataPageHeaderV2.Statistics.Min = tmpBuf