Skip to content

Commit

Permalink
Merge pull request #162 from xitongsys/dev
Browse files Browse the repository at this point in the history
fix csv/json/column
  • Loading branch information
xitongsys authored Sep 10, 2019
2 parents defba21 + 116c1b7 commit 6d4d798
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 65 deletions.
2 changes: 1 addition & 1 deletion common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func StringToTag(tag string) *Tag {
mp.ValueFieldID = valInt32
case "name":
if mp.InName == "" {
mp.InName = val
mp.InName = HeadToUpper(val)
}
mp.ExName = val
case "inname":
Expand Down
20 changes: 10 additions & 10 deletions example/column_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,20 @@ func main() {
}
num = int(pr.GetNumRows())

pr.SkipRowsByPath("name", 5) //skip the first five rows
names, rls, dls = pr.ReadColumnByPath("name", num)
log.Println("name", names, rls, dls)
pr.SkipRowsByPath("parquet_go_root.name", 5) //skip the first five rows
names, rls, dls, err = pr.ReadColumnByPath("parquet_go_root.name", num)
log.Println("name", names, rls, dls, err)

classes, rls, dls = pr.ReadColumnByPath("class.list.element", num)
log.Println("class", classes, rls, dls)
classes, rls, dls, err = pr.ReadColumnByPath("parquet_go_root.class.list.element", num)
log.Println("class", classes, rls, dls, err)

scores_key, rls, dls = pr.ReadColumnByPath("score.key_value.key", num)
scores_value, rls, dls = pr.ReadColumnByPath("score.key_value.value", num)
log.Println("scores_key", scores_key)
log.Println("scores_value", scores_value)
scores_key, rls, dls, err = pr.ReadColumnByPath("parquet_go_root.score.key_value.key", num)
scores_value, rls, dls, err = pr.ReadColumnByPath("parquet_go_root.score.key_value.value", num)
log.Println("parquet_go_root.scores_key", scores_key, err)
log.Println("parquet_go_root.scores_value", scores_value, err)

pr.SkipRowsByIndex(2, 5) //skip the first five rows
ids, _, _ = pr.ReadColumnByIndex(2, num)
ids, _, _, _ = pr.ReadColumnByIndex(2, num)
log.Println(ids)

pr.ReadStop()
Expand Down
4 changes: 3 additions & 1 deletion example/json_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Student struct {

var jsonSchema string = `
{
"Tag": "name=parquet-go-root, repetitiontype=REQUIRED",
"Tag": "name=parquet_go_root, repetitiontype=REQUIRED",
"Fields": [
{"Tag": "name=name, inname=Name, type=UTF8, repetitiontype=REQUIRED"},
{"Tag": "name=age, inname=Age, type=INT32, repetitiontype=REQUIRED"},
Expand Down Expand Up @@ -159,10 +159,12 @@ func main() {
log.Println("Can't create parquet reader", err)
return
}
/*
if err = pr.SetSchemaHandlerFromJSON(jsonSchema); err != nil {
log.Println("Can't set schema from json", err)
return
}
*/

num = int(pr.GetNumRows())
for i := 0; i < num; i++ {
Expand Down
2 changes: 1 addition & 1 deletion marshal/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func MarshalCSV(records []interface{}, bgn int, end int, schemaHandler *schema.S
}

for i := 0; i < len(records[0].([]interface{})); i++ {
pathStr := schemaHandler.GetRootExName() + "." + schemaHandler.Infos[i+1].ExName
pathStr := schemaHandler.GetRootInName() + "." + schemaHandler.Infos[i+1].InName
res[pathStr] = layout.NewEmptyTable()
res[pathStr].Path = common.StrToPath(pathStr)
res[pathStr].MaxDefinitionLevel = 1
Expand Down
23 changes: 12 additions & 11 deletions marshal/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func MarshalJSON(ss []interface{}, bgn int, end int, schemaHandler *schema.Schem
keys := node.Val.MapKeys()

if info.Type == "MAP" { //real map
pathStr = pathStr + ".key_value"
pathStr = pathStr + ".Key_value"
if len(keys) <= 0 {
for key, table := range res {
if len(key) >= len(node.PathMap.Path) &&
Expand All @@ -104,7 +104,7 @@ func MarshalJSON(ss []interface{}, bgn int, end int, schemaHandler *schema.Schem
value := node.Val.MapIndex(key).Elem()

newNode := nodeBuf.GetNode()
newNode.PathMap = node.PathMap.Children["key_value"].Children["key"]
newNode.PathMap = node.PathMap.Children["Key_value"].Children["Key"]
newNode.Val = key
newNode.DL = node.DL + 1
if j == 0 {
Expand All @@ -115,7 +115,7 @@ func MarshalJSON(ss []interface{}, bgn int, end int, schemaHandler *schema.Schem
stack = append(stack, newNode)

newNode = nodeBuf.GetNode()
newNode.PathMap = node.PathMap.Children["key_value"].Children["value"]
newNode.PathMap = node.PathMap.Children["Key_value"].Children["Value"]
newNode.Val = value
newNode.DL = node.DL + 1
newPathStr := newNode.PathMap.Path // check again
Expand All @@ -134,17 +134,18 @@ func MarshalJSON(ss []interface{}, bgn int, end int, schemaHandler *schema.Schem
}

} else { //struct
keysMap := make(map[string]bool)
keysMap := make(map[string]int)
for j := 0; j < len(keys); j++ {
keysMap[keys[j].String()] = true
//ExName to InName
keysMap[common.HeadToUpper(keys[j].String())] = j
}
for key, _ := range node.PathMap.Children {
_, ok := keysMap[key]
if ok && node.Val.MapIndex(reflect.ValueOf(key)).Elem().IsValid() {

ki, ok := keysMap[key]

if ok && node.Val.MapIndex(keys[ki]).Elem().IsValid() {
newNode := nodeBuf.GetNode()
newNode.PathMap = node.PathMap.Children[key]
newNode.Val = node.Val.MapIndex(reflect.ValueOf(key)).Elem()
newNode.Val = node.Val.MapIndex(keys[ki]).Elem()
newNode.RL = node.RL
newNode.DL = node.DL
newPathStr := newNode.PathMap.Path
Expand Down Expand Up @@ -174,7 +175,7 @@ func MarshalJSON(ss []interface{}, bgn int, end int, schemaHandler *schema.Schem
ln := node.Val.Len()

if info.Type == "LIST" { //real LIST
pathStr = pathStr + ".list" + ".element"
pathStr = pathStr + ".List" + ".Element"
if ln <= 0 {
for key, table := range res {
if len(key) >= len(node.PathMap.Path) &&
Expand All @@ -189,7 +190,7 @@ func MarshalJSON(ss []interface{}, bgn int, end int, schemaHandler *schema.Schem

for j := ln - 1; j >= 0; j-- {
newNode := nodeBuf.GetNode()
newNode.PathMap = node.PathMap.Children["list"].Children["element"]
newNode.PathMap = node.PathMap.Children["List"].Children["Element"]
newNode.Val = node.Val.Index(j).Elem()
if j == 0 {
newNode.RL = node.RL
Expand Down
62 changes: 28 additions & 34 deletions reader/columnreader.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package reader

import (
"strings"
"fmt"

"github.com/xitongsys/parquet-go/source"
"github.com/xitongsys/parquet-go/schema"
Expand All @@ -17,44 +17,38 @@ func NewParquetColumnReader(pFile source.ParquetFile, np int64) (*ParquetReader,
}
res.ColumnBuffers = make(map[string]*ColumnBufferType)
res.SchemaHandler = schema.NewSchemaHandlerFromSchemaList(res.Footer.GetSchema())
res.RenameSchema()

/* open when need
for i := 0; i < len(res.schema.SchemaElements); i++ {
schema := res.schema.SchemaElements[i]
if schema.GetNumChildren() == 0 {
pathStr := res.schema.IndexMap[int32(i)]
if res.ColumnBuffers[pathStr], err = NewColumnBuffer(pFile, res.Footer, res.SchemaHandler, pathStr); err != nil {
return res, err
}
}
}
*/
return res, nil
}

func (self *ParquetReader) SkipRowsByPath(pathStr string, num int) {
if num <= 0 || len(pathStr) <= 0 {
return
}
rootName := self.SchemaHandler.GetRootInName()
if !strings.HasPrefix(pathStr, rootName) {
pathStr = rootName + "." + pathStr
func (self *ParquetReader) SkipRowsByPath(pathStr string, num int) error {
errPathNotFound := fmt.Errorf("path %v not found", pathStr)

pathStr, err := self.SchemaHandler.ConvertToInPathStr(pathStr)
if num <= 0 || len(pathStr) <= 0 || err != nil {
return err
}

if _, ok := self.SchemaHandler.MapIndex[pathStr]; !ok {
return
return errPathNotFound
}

if _, ok := self.ColumnBuffers[pathStr]; !ok {
var err error
if self.ColumnBuffers[pathStr], err = NewColumnBuffer(self.PFile, self.Footer, self.SchemaHandler, pathStr); err != nil {
return
return err
}
}

if cb, ok := self.ColumnBuffers[pathStr]; ok {
cb.SkipRows(int64(num))

} else{
return errPathNotFound
}

return nil
}

func (self *ParquetReader) SkipRowsByIndex(index int, num int) {
Expand All @@ -66,36 +60,36 @@ func (self *ParquetReader) SkipRowsByIndex(index int, num int) {
}

// ReadColumnByPath reads column by path in schema.
func (self *ParquetReader) ReadColumnByPath(pathStr string, num int) (values []interface{}, rls []int32, dls []int32) {
if num <= 0 || len(pathStr) <= 0 {
return []interface{}{}, []int32{}, []int32{}
}
rootName := self.SchemaHandler.GetRootInName()
if !strings.HasPrefix(pathStr, rootName) {
pathStr = rootName + "." + pathStr
}
func (self *ParquetReader) ReadColumnByPath(pathStr string, num int) (values []interface{}, rls []int32, dls []int32, err error) {
errPathNotFound := fmt.Errorf("path %v not found", pathStr)

pathStr, err = self.SchemaHandler.ConvertToInPathStr(pathStr)
if num <= 0 || len(pathStr) <= 0 || err != nil {
return []interface{}{}, []int32{}, []int32{}, err
}

if _, ok := self.SchemaHandler.MapIndex[pathStr]; !ok {
return []interface{}{}, []int32{}, []int32{}
return []interface{}{}, []int32{}, []int32{}, errPathNotFound
}

if _, ok := self.ColumnBuffers[pathStr]; !ok {
var err error
if self.ColumnBuffers[pathStr], err = NewColumnBuffer(self.PFile, self.Footer, self.SchemaHandler, pathStr); err != nil {
return []interface{}{}, []int32{}, []int32{}
return []interface{}{}, []int32{}, []int32{}, err
}
}

if cb, ok := self.ColumnBuffers[pathStr]; ok {
table, _ := cb.ReadRows(int64(num))
return table.Values, table.RepetitionLevels, table.DefinitionLevels
return table.Values, table.RepetitionLevels, table.DefinitionLevels, nil
}
return []interface{}{}, []int32{}, []int32{}
return []interface{}{}, []int32{}, []int32{}, errPathNotFound
}

// ReadColumnByIndex reads column by index. The index of first column is 0.
func (self *ParquetReader) ReadColumnByIndex(index int, num int) (values []interface{}, rls []int32, dls []int32) {
func (self *ParquetReader) ReadColumnByIndex(index int, num int) (values []interface{}, rls []int32, dls []int32, err error) {
if index >= len(self.SchemaHandler.ValueColumns) {
err = fmt.Errorf("index %v out of range %v", index, len(self.SchemaHandler.ValueColumns))
return
}
pathStr := self.SchemaHandler.ValueColumns[index]
Expand Down
1 change: 0 additions & 1 deletion reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ func (self *ParquetReader) SetSchemaHandlerFromJSON(jsonSchema string) error {
return err
}

self.RenameSchema()
for i := 0; i < len(self.SchemaHandler.SchemaElements); i++ {
schemaElement := self.SchemaHandler.SchemaElements[i]
if schemaElement.GetNumChildren() == 0 {
Expand Down
4 changes: 2 additions & 2 deletions schema/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ func NewSchemaHandlerFromMetadata(mds []string) *SchemaHandler {
infos := make([]*common.Tag, 0)

rootSchema := parquet.NewSchemaElement()
rootSchema.Name = "parquet_go_root"
rootSchema.Name = "Parquet_go_root"
rootNumChildren := int32(len(mds))
rootSchema.NumChildren = &rootNumChildren
rt := parquet.FieldRepetitionType_REQUIRED
rootSchema.RepetitionType = &rt
schemaList = append(schemaList, rootSchema)

rootInfo := common.NewTag()
rootInfo.InName = "parquet_go_root"
rootInfo.InName = "Parquet_go_root"
rootInfo.ExName = "parquet_go_root"
rootInfo.RepetitionType = parquet.FieldRepetitionType_REQUIRED
infos = append(infos, rootInfo)
Expand Down
8 changes: 4 additions & 4 deletions schema/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func NewSchemaHandlerFromJSON(str string) (sh *SchemaHandler, err error) {
infos = append(infos, newInfo)

schema = parquet.NewSchemaElement()
schema.Name = "list"
schema.Name = "List"
rt2 := parquet.FieldRepetitionType_REPEATED
schema.RepetitionType = &rt2
var numField2 int32 = 1
Expand All @@ -88,7 +88,7 @@ func NewSchemaHandlerFromJSON(str string) (sh *SchemaHandler, err error) {

newInfo = common.NewTag()
common.DeepCopy(info, newInfo)
newInfo.InName, newInfo.ExName = "list", "list"
newInfo.InName, newInfo.ExName = "List", "list"
infos = append(infos, newInfo)

stack = append(stack, item.Fields[0])
Expand All @@ -109,7 +109,7 @@ func NewSchemaHandlerFromJSON(str string) (sh *SchemaHandler, err error) {
infos = append(infos, newInfo)

schema = parquet.NewSchemaElement()
schema.Name = "key_value"
schema.Name = "Key_value"
rt2 := parquet.FieldRepetitionType_REPEATED
schema.RepetitionType = &rt2
ct2 := parquet.ConvertedType_MAP_KEY_VALUE
Expand All @@ -120,7 +120,7 @@ func NewSchemaHandlerFromJSON(str string) (sh *SchemaHandler, err error) {

newInfo = common.NewTag()
common.DeepCopy(info, newInfo)
newInfo.InName, newInfo.ExName = "key_value", "key_value"
newInfo.InName, newInfo.ExName = "Key_value", "key_value"
infos = append(infos, newInfo)

stack = append(stack, item.Fields[1]) //put value
Expand Down

0 comments on commit 6d4d798

Please sign in to comment.