Skip to content

Commit

Permalink
fix slicing of repeated pages (xitongsys#190)
Browse files Browse the repository at this point in the history
  • Loading branch information
Achille authored May 24, 2022
1 parent a6b77f5 commit e054303
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 28 deletions.
45 changes: 45 additions & 0 deletions buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"math/rand"
"reflect"
"sort"
"strconv"
"testing"
"testing/quick"
"time"
Expand Down Expand Up @@ -508,6 +509,50 @@ func TestBufferRoundtripNestedRepeatedPointer(t *testing.T) {
}
}

func TestBufferSeekToRow(t *testing.T) {
type B struct {
I int
C []string
}
type A struct {
B []B
}

buffer := parquet.NewBuffer()
var objs []A
for i := 0; i < 2; i++ {
o := A{
B: []B{
{I: i, C: []string{"foo", strconv.Itoa(i)}},
{I: i + 1, C: []string{"bar", strconv.Itoa(i + 1)}},
},
}
buffer.Write(&o)
objs = append(objs, o)
}

buf := new(bytes.Buffer)
w := parquet.NewWriter(buf)
w.WriteRowGroup(buffer)
w.Flush()
w.Close()

file := bytes.NewReader(buf.Bytes())
r := parquet.NewReader(file)

i := 1
o := new(A)
if err := r.SeekToRow(int64(i)); err != nil {
t.Fatal(err)
}
if err := r.Read(o); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(*o, objs[i]) {
t.Errorf("points mismatch at row index %d: want=%v got=%v", i, objs[i], o)
}
}

type benchmarkBufferRowType struct {
ID [16]byte `parquet:"id,uuid"`
Value float64 `parquet:"value"`
Expand Down
32 changes: 20 additions & 12 deletions page.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,27 +444,35 @@ func (page *repeatedPage) Slice(i, j int64) BufferedPage {
panic(errPageBoundsOutOfRange(i, j, numRows))
}

rowIndex0 := int64(0)
rowIndex1 := int64(len(page.repetitionLevels))
rowIndex2 := int64(len(page.repetitionLevels))
rowIndex0 := 0
rowIndex1 := len(page.repetitionLevels)
rowIndex2 := len(page.repetitionLevels)

for k, def := range page.repetitionLevels {
if def != page.maxRepetitionLevel {
if rowIndex0 == i {
rowIndex1 = int64(k)
if def == 0 {
if rowIndex0 == int(i) {
rowIndex1 = k
break
}
if rowIndex0 == j {
rowIndex2 = int64(k)
rowIndex0++
}
}

for k, def := range page.repetitionLevels[rowIndex1:] {
if def == 0 {
if rowIndex0 == int(j) {
rowIndex2 = rowIndex1 + k
break
}
rowIndex0++
}
}

numNulls1 := int64(countLevelsNotEqual(page.definitionLevels[:rowIndex1], page.maxDefinitionLevel))
numNulls2 := int64(countLevelsNotEqual(page.definitionLevels[rowIndex1:rowIndex2], page.maxDefinitionLevel))
numNulls1 := countLevelsNotEqual(page.definitionLevels[:rowIndex1], page.maxDefinitionLevel)
numNulls2 := countLevelsNotEqual(page.definitionLevels[rowIndex1:rowIndex2], page.maxDefinitionLevel)

i = rowIndex1 - numNulls1
j = rowIndex2 - (numNulls1 + numNulls2)
i = int64(rowIndex1 - numNulls1)
j = int64(rowIndex2 - (numNulls1 + numNulls2))

return newRepeatedPage(
page.base.Slice(i, j),
Expand Down
23 changes: 8 additions & 15 deletions print.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ func PrintRowGroup(w io.Writer, rowGroup RowGroup) error {

rowbuf := make([]Row, defaultRowBufferSize)
cells := make([]string, 0, len(columns))
parts := make([]string, 0)
rows := rowGroup.Rows()
defer rows.Close()

Expand All @@ -233,25 +232,19 @@ func PrintRowGroup(w io.Writer, rowGroup RowGroup) error {
for _, row := range rowbuf[:n] {
cells = cells[:0]

for i := 0; i < len(row); {
j := i + 1
for _, value := range row {
columnIndex := value.Column()

for j < len(row) && row[j].Column() == row[i].Column() {
j++
for len(cells) <= columnIndex {
cells = append(cells, "")
}

if (j - i) == 1 {
cells = append(cells, row[i].String())
if cells[columnIndex] == "" {
cells[columnIndex] = value.String()
} else {
parts = parts[:0]
for k := i; k < j; k++ {
parts = append(parts, row[k].String())
}
alignment[len(cells)] = tablewriter.ALIGN_LEFT
cells = append(cells, strings.Join(parts, ","))
cells[columnIndex] += "," + value.String()
alignment[columnIndex] = tablewriter.ALIGN_LEFT
}

i = j
}

tw.Append(cells)
Expand Down
2 changes: 1 addition & 1 deletion row.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ func reconstructFuncOfRepeated(columnIndex int16, node Node) (int16, reconstruct

func reconstructRepeated(columnIndex, rowLength int16, levels levels, row Row, do func(levels, Row) (Row, error)) (Row, error) {
if !row.startsWith(columnIndex) {
return row, fmt.Errorf("row is missing repeated column %d", columnIndex)
return row, fmt.Errorf("row is missing repeated column %d: %+v", columnIndex, row)
}
if len(row) < int(rowLength) {
return row, fmt.Errorf("expected repeated column %d to have at least %d values but got %d", columnIndex, rowLength, len(row))
Expand Down

0 comments on commit e054303

Please sign in to comment.