From 6321829c492b1d7efb165552aa503890c06124be Mon Sep 17 00:00:00 2001 From: xitongsys Date: Fri, 8 Nov 2019 10:51:53 +0800 Subject: [PATCH 1/3] add rle-dict --- layout/page.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/layout/page.go b/layout/page.go index e4064485..d24a28e2 100644 --- a/layout/page.go +++ b/layout/page.go @@ -614,7 +614,7 @@ func ReadDataPageValues(bytesReader *bytes.Reader, encodingMethod parquet.Encodi if encodingMethod == parquet.Encoding_PLAIN { return encoding.ReadPlain(bytesReader, dataType, cnt, bitWidth) - } else if encodingMethod == parquet.Encoding_PLAIN_DICTIONARY { + } else if encodingMethod == parquet.Encoding_PLAIN_DICTIONARY || encodingMethod == parquet.Encoding_RLE_DICTIONARY { b, err := bytesReader.ReadByte() if err != nil { return res, err From 86471e072247a95378c066221db67d7d826efd25 Mon Sep 17 00:00:00 2001 From: xitongsys Date: Mon, 11 Nov 2019 08:11:27 +0000 Subject: [PATCH 2/3] add rle_dictionary --- common/common.go | 6 ++++-- layout/chunk.go | 5 +++-- writer/writer.go | 15 ++++++++------- 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/common/common.go b/common/common.go index 2832eaf6..a343eea1 100644 --- a/common/common.go +++ b/common/common.go @@ -157,8 +157,8 @@ func StringToTag(tag string) *Tag { } case "encoding": switch strings.ToLower(val) { - case "plain": - mp.Encoding = parquet.Encoding_PLAIN + case "plain": + mp.Encoding = parquet.Encoding_PLAIN case "rle": mp.Encoding = parquet.Encoding_RLE case "delta_binary_packed": @@ -169,6 +169,8 @@ func StringToTag(tag string) *Tag { mp.Encoding = parquet.Encoding_DELTA_BYTE_ARRAY case "plain_dictionary": mp.Encoding = parquet.Encoding_PLAIN_DICTIONARY + case "rle_dictionary": + mp.Encoding = parquet.Encoding_RLE_DICTIONARY default: panic(fmt.Errorf("Unknown encoding type: '%v'", val)) } diff --git a/layout/chunk.go b/layout/chunk.go index a8f716a9..8b595efb 100644 --- a/layout/chunk.go +++ b/layout/chunk.go @@ -4,9 +4,9 @@ import ( "github.com/apache/thrift/lib/go/thrift" "github.com/xitongsys/parquet-go/common" "github.com/xitongsys/parquet-go/encoding" - "github.com/xitongsys/parquet-go/types" - "github.com/xitongsys/parquet-go/schema" "github.com/xitongsys/parquet-go/parquet" + "github.com/xitongsys/parquet-go/schema" + "github.com/xitongsys/parquet-go/types" ) //Chunk stores the ColumnChunk in parquet file @@ -107,6 +107,7 @@ func PagesToDictChunk(pages []*Page) *Chunk { metaData.Encodings = append(metaData.Encodings, parquet.Encoding_BIT_PACKED) metaData.Encodings = append(metaData.Encodings, parquet.Encoding_PLAIN) metaData.Encodings = append(metaData.Encodings, parquet.Encoding_PLAIN_DICTIONARY) + metaData.Encodings = append(metaData.Encodings, parquet.Encoding_RLE_DICTIONARY) metaData.Codec = pages[1].CompressType metaData.NumValues = numValues diff --git a/writer/writer.go b/writer/writer.go index 4a091588..7c9efd5f 100644 --- a/writer/writer.go +++ b/writer/writer.go @@ -3,17 +3,17 @@ package writer import ( "context" "encoding/binary" + "errors" "reflect" "sync" - "errors" "github.com/apache/thrift/lib/go/thrift" "github.com/xitongsys/parquet-go/common" "github.com/xitongsys/parquet-go/layout" "github.com/xitongsys/parquet-go/marshal" - "github.com/xitongsys/parquet-go/source" - "github.com/xitongsys/parquet-go/schema" "github.com/xitongsys/parquet-go/parquet" + "github.com/xitongsys/parquet-go/schema" + "github.com/xitongsys/parquet-go/source" ) //ParquetWriter is a writer parquet file @@ -69,7 +69,7 @@ func NewParquetWriter(pFile source.ParquetFile, obj interface{}, np int64) (*Par err = res.SetSchemaHandlerFromJSON(sa) return res, err - }else{ + } else { if res.SchemaHandler, err = schema.NewSchemaHandlerFromStruct(obj); err != nil { return res, err } @@ -149,7 +149,7 @@ func (self *ParquetWriter) Write(src interface{}) error { } if self.CheckSizeCritical <= ln { - self.ObjSize = (self.ObjSize + common.SizeOf(val))/2 + 1 + self.ObjSize = (self.ObjSize+common.SizeOf(val))/2 + 1 } self.ObjsSize += self.ObjSize self.Objs = append(self.Objs, src) @@ -215,7 +215,8 @@ func (self *ParquetWriter) flushObjs() error { if err2 == nil { for name, table := range *tableMap { - if table.Info.Encoding == parquet.Encoding_PLAIN_DICTIONARY { + if table.Info.Encoding == parquet.Encoding_PLAIN_DICTIONARY || + table.Info.Encoding == parquet.Encoding_RLE_DICTIONARY { lock.Lock() if _, ok := self.DictRecs[name]; !ok { self.DictRecs[name] = layout.NewDictRec(table.Type) @@ -271,7 +272,7 @@ func (self *ParquetWriter) Flush(flag bool) error { //pages -> chunk chunkMap := make(map[string]*layout.Chunk) for name, pages := range self.PagesMapBuf { - if len(pages) > 0 && pages[0].Info.Encoding == parquet.Encoding_PLAIN_DICTIONARY { + if len(pages) > 0 && (pages[0].Info.Encoding == parquet.Encoding_PLAIN_DICTIONARY || pages[0].Info.Encoding == parquet.Encoding_RLE_DICTIONARY) { dictPage, _ := layout.DictRecToDictPage(self.DictRecs[name], int32(self.PageSize), self.CompressionType) tmp := append([]*layout.Page{dictPage}, pages...) chunkMap[name] = layout.PagesToDictChunk(tmp) From 522c7a9c85d208e2e85cdaa5255f41d95f040ddb Mon Sep 17 00:00:00 2001 From: xitongsys Date: Mon, 11 Nov 2019 09:58:16 +0000 Subject: [PATCH 3/3] fix block bug --- README.md | 2 +- layout/rowgroup.go | 7 ++++--- writer/writer.go | 4 ++-- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index a5a9a768..6dac07a4 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# parquet-go v1.4.2 +# parquet-go v1.4.3 [![Travis Status for xitongsys/parquet-go](https://travis-ci.org/xitongsys/parquet-go.svg?branch=master&label=linux+build)](https://travis-ci.org/xitongsys/parquet-go) [![godoc for xitongsys/parquet-go](https://godoc.org/github.com/nathany/looper?status.svg)](http://godoc.org/github.com/xitongsys/parquet-go) diff --git a/layout/rowgroup.go b/layout/rowgroup.go index 3d53d326..5101958e 100644 --- a/layout/rowgroup.go +++ b/layout/rowgroup.go @@ -2,11 +2,11 @@ package layout import ( "errors" - + "github.com/xitongsys/parquet-go/common" - "github.com/xitongsys/parquet-go/source" - "github.com/xitongsys/parquet-go/schema" "github.com/xitongsys/parquet-go/parquet" + "github.com/xitongsys/parquet-go/schema" + "github.com/xitongsys/parquet-go/source" ) //RowGroup stores the RowGroup in parquet file @@ -77,6 +77,7 @@ func ReadRowGroup(rowGroupHeader *parquet.RowGroup, PFile source.ParquetFile, sc default: err = errors.New("unknown error") } + close(doneChan) } }() diff --git a/writer/writer.go b/writer/writer.go index 7c9efd5f..25011c01 100644 --- a/writer/writer.go +++ b/writer/writer.go @@ -203,11 +203,12 @@ func (self *ParquetWriter) flushObjs() error { default: err = errors.New("unknown error") } + close(doneChan) } + doneChan <- 0 }() if e <= b { - doneChan <- 0 return } @@ -234,7 +235,6 @@ func (self *ParquetWriter) flushObjs() error { err = err2 } - doneChan <- 0 }(int(bgn), int(end), c) }