Skip to content

Commit

Permalink
Merge pull request #183 from xitongsys/dev
Browse files Browse the repository at this point in the history
1.4.3
  • Loading branch information
xitongsys committed Nov 11, 2019
2 parents 7fc2d91 + 522c7a9 commit 197c897
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 18 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# parquet-go v1.4.2
# parquet-go v1.4.3
[![Travis Status for xitongsys/parquet-go](https://travis-ci.org/xitongsys/parquet-go.svg?branch=master&label=linux+build)](https://travis-ci.org/xitongsys/parquet-go)
[![godoc for xitongsys/parquet-go](https://godoc.org/github.com/nathany/looper?status.svg)](http://godoc.org/github.com/xitongsys/parquet-go)

Expand Down
6 changes: 4 additions & 2 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ func StringToTag(tag string) *Tag {
}
case "encoding":
switch strings.ToLower(val) {
case "plain":
mp.Encoding = parquet.Encoding_PLAIN
case "plain":
mp.Encoding = parquet.Encoding_PLAIN
case "rle":
mp.Encoding = parquet.Encoding_RLE
case "delta_binary_packed":
Expand All @@ -169,6 +169,8 @@ func StringToTag(tag string) *Tag {
mp.Encoding = parquet.Encoding_DELTA_BYTE_ARRAY
case "plain_dictionary":
mp.Encoding = parquet.Encoding_PLAIN_DICTIONARY
case "rle_dictionary":
mp.Encoding = parquet.Encoding_RLE_DICTIONARY
default:
panic(fmt.Errorf("Unknown encoding type: '%v'", val))
}
Expand Down
5 changes: 3 additions & 2 deletions layout/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"github.com/apache/thrift/lib/go/thrift"
"github.com/xitongsys/parquet-go/common"
"github.com/xitongsys/parquet-go/encoding"
"github.com/xitongsys/parquet-go/types"
"github.com/xitongsys/parquet-go/schema"
"github.com/xitongsys/parquet-go/parquet"
"github.com/xitongsys/parquet-go/schema"
"github.com/xitongsys/parquet-go/types"
)

//Chunk stores the ColumnChunk in parquet file
Expand Down Expand Up @@ -107,6 +107,7 @@ func PagesToDictChunk(pages []*Page) *Chunk {
metaData.Encodings = append(metaData.Encodings, parquet.Encoding_BIT_PACKED)
metaData.Encodings = append(metaData.Encodings, parquet.Encoding_PLAIN)
metaData.Encodings = append(metaData.Encodings, parquet.Encoding_PLAIN_DICTIONARY)
metaData.Encodings = append(metaData.Encodings, parquet.Encoding_RLE_DICTIONARY)

metaData.Codec = pages[1].CompressType
metaData.NumValues = numValues
Expand Down
2 changes: 1 addition & 1 deletion layout/page.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ func ReadDataPageValues(bytesReader *bytes.Reader, encodingMethod parquet.Encodi
if encodingMethod == parquet.Encoding_PLAIN {
return encoding.ReadPlain(bytesReader, dataType, cnt, bitWidth)

} else if encodingMethod == parquet.Encoding_PLAIN_DICTIONARY {
} else if encodingMethod == parquet.Encoding_PLAIN_DICTIONARY || encodingMethod == parquet.Encoding_RLE_DICTIONARY {
b, err := bytesReader.ReadByte()
if err != nil {
return res, err
Expand Down
7 changes: 4 additions & 3 deletions layout/rowgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package layout

import (
"errors"

"github.com/xitongsys/parquet-go/common"
"github.com/xitongsys/parquet-go/source"
"github.com/xitongsys/parquet-go/schema"
"github.com/xitongsys/parquet-go/parquet"
"github.com/xitongsys/parquet-go/schema"
"github.com/xitongsys/parquet-go/source"
)

//RowGroup stores the RowGroup in parquet file
Expand Down Expand Up @@ -77,6 +77,7 @@ func ReadRowGroup(rowGroupHeader *parquet.RowGroup, PFile source.ParquetFile, sc
default:
err = errors.New("unknown error")
}
close(doneChan)
}
}()

Expand Down
19 changes: 10 additions & 9 deletions writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ package writer
import (
"context"
"encoding/binary"
"errors"
"reflect"
"sync"
"errors"

"github.com/apache/thrift/lib/go/thrift"
"github.com/xitongsys/parquet-go/common"
"github.com/xitongsys/parquet-go/layout"
"github.com/xitongsys/parquet-go/marshal"
"github.com/xitongsys/parquet-go/source"
"github.com/xitongsys/parquet-go/schema"
"github.com/xitongsys/parquet-go/parquet"
"github.com/xitongsys/parquet-go/schema"
"github.com/xitongsys/parquet-go/source"
)

//ParquetWriter is a writer parquet file
Expand Down Expand Up @@ -69,7 +69,7 @@ func NewParquetWriter(pFile source.ParquetFile, obj interface{}, np int64) (*Par
err = res.SetSchemaHandlerFromJSON(sa)
return res, err

}else{
} else {
if res.SchemaHandler, err = schema.NewSchemaHandlerFromStruct(obj); err != nil {
return res, err
}
Expand Down Expand Up @@ -149,7 +149,7 @@ func (self *ParquetWriter) Write(src interface{}) error {
}

if self.CheckSizeCritical <= ln {
self.ObjSize = (self.ObjSize + common.SizeOf(val))/2 + 1
self.ObjSize = (self.ObjSize+common.SizeOf(val))/2 + 1
}
self.ObjsSize += self.ObjSize
self.Objs = append(self.Objs, src)
Expand Down Expand Up @@ -203,19 +203,21 @@ func (self *ParquetWriter) flushObjs() error {
default:
err = errors.New("unknown error")
}
close(doneChan)
}
doneChan <- 0
}()

if e <= b {
doneChan <- 0
return
}

tableMap, err2 := self.MarshalFunc(self.Objs, b, e, self.SchemaHandler)

if err2 == nil {
for name, table := range *tableMap {
if table.Info.Encoding == parquet.Encoding_PLAIN_DICTIONARY {
if table.Info.Encoding == parquet.Encoding_PLAIN_DICTIONARY ||
table.Info.Encoding == parquet.Encoding_RLE_DICTIONARY {
lock.Lock()
if _, ok := self.DictRecs[name]; !ok {
self.DictRecs[name] = layout.NewDictRec(table.Type)
Expand All @@ -233,7 +235,6 @@ func (self *ParquetWriter) flushObjs() error {
err = err2
}

doneChan <- 0
}(int(bgn), int(end), c)
}

Expand Down Expand Up @@ -271,7 +272,7 @@ func (self *ParquetWriter) Flush(flag bool) error {
//pages -> chunk
chunkMap := make(map[string]*layout.Chunk)
for name, pages := range self.PagesMapBuf {
if len(pages) > 0 && pages[0].Info.Encoding == parquet.Encoding_PLAIN_DICTIONARY {
if len(pages) > 0 && (pages[0].Info.Encoding == parquet.Encoding_PLAIN_DICTIONARY || pages[0].Info.Encoding == parquet.Encoding_RLE_DICTIONARY) {
dictPage, _ := layout.DictRecToDictPage(self.DictRecs[name], int32(self.PageSize), self.CompressionType)
tmp := append([]*layout.Page{dictPage}, pages...)
chunkMap[name] = layout.PagesToDictChunk(tmp)
Expand Down

0 comments on commit 197c897

Please sign in to comment.