Skip to content

Commit

Permalink
fix lazy reading of dictionary pages when first column chunk page is …
Browse files Browse the repository at this point in the history
…skipped (xitongsys#195)

* fix lazy reading of dictionary pages when first column chunk page is skipped

* factorize more code, use less abstractions

* fix comment
  • Loading branch information
Achille committed May 27, 2022
1 parent 0733add commit bf1e4db
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 72 deletions.
15 changes: 10 additions & 5 deletions dictionary.go
Original file line number Diff line number Diff line change
Expand Up @@ -1034,17 +1034,22 @@ type indexedPageValues struct {
}

func (r *indexedPageValues) ReadValues(values []Value) (n int, err error) {
var v Value
for n < len(values) && r.offset < len(r.page.values) {
v = r.page.typ.dict.Index(r.page.values[r.offset])
v.columnIndex = r.page.columnIndex
dict := r.page.typ.dict
pageValues := r.page.values
columnIndex := r.page.columnIndex

for n < len(values) && r.offset < len(pageValues) {
v := dict.Index(pageValues[r.offset])
v.columnIndex = columnIndex
values[n] = v
r.offset++
n++
}
if r.offset == len(r.page.values) {

if r.offset == len(pageValues) {
err = io.EOF
}

return n, err
}

Expand Down
7 changes: 5 additions & 2 deletions encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,11 @@ var (
)

func isDictionaryEncoding(encoding encoding.Encoding) bool {
enc := encoding.Encoding()
return enc == format.PlainDictionary || enc == format.RLEDictionary
return isDictionaryFormat(encoding.Encoding())
}

func isDictionaryFormat(encoding format.Encoding) bool {
return encoding == format.PlainDictionary || encoding == format.RLEDictionary
}

// LookupEncoding returns the parquet encoding associated with the given code.
Expand Down
179 changes: 114 additions & 65 deletions file.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,83 +467,23 @@ func (f *filePages) ReadPage() (Page, error) {
if err := f.decoder.Decode(header); err != nil {
return nil, err
}

if cap(f.dataPage.data) < int(header.CompressedPageSize) {
f.dataPage.data = make([]byte, header.CompressedPageSize)
} else {
f.dataPage.data = f.dataPage.data[:header.CompressedPageSize]
}

if cap(f.dataPage.values) < int(header.UncompressedPageSize) {
f.dataPage.values = make([]byte, 0, header.UncompressedPageSize)
}

if _, err := io.ReadFull(f.rbuf, f.dataPage.data); err != nil {
if err := f.readPage(header, f.dataPage, f.rbuf); err != nil {
return nil, err
}

if header.CRC != 0 {
headerChecksum := uint32(header.CRC)
bufferChecksum := crc32.ChecksumIEEE(f.dataPage.data)

if headerChecksum != bufferChecksum {
// The parquet specs indicate that corruption errors could be
// handled gracefully by skipping pages, tho this may not always
// be practical. Depending on how the pages are consumed,
// missing rows may cause unpredictable behaviors in algorithms.
//
// For now, we assume these errors to be fatal, but we may
// revisit later and improve error handling to be more resilient
// to data corruption.
return nil, fmt.Errorf("crc32 checksum mismatch in page %d of column %q: 0x%08X != 0x%08X: %w",
f.index,
f.columnPath(),
headerChecksum,
bufferChecksum,
ErrCorrupted,
)
}
}

var column = f.chunk.column
var page Page
var err error

switch header.Type {
case format.DataPageV2:
if header.DataPageHeaderV2 == nil {
err = ErrMissingPageHeader
} else {
page, err = column.decodeDataPageV2(DataPageHeaderV2{header.DataPageHeaderV2}, f.dataPage)
}

page, err = f.readDataPageV2(header)
case format.DataPage:
if header.DataPageHeader == nil {
err = ErrMissingPageHeader
} else {
page, err = column.decodeDataPageV1(DataPageHeaderV1{header.DataPageHeader}, f.dataPage)
}

page, err = f.readDataPageV1(header)
case format.DictionaryPage:
// Sometimes parquet files do not have the dictionary page offset
// recorded in the column metadata. We account for this by lazily
// checking whether the first page is a dictionary page.
if header.DictionaryPageHeader == nil {
err = ErrMissingPageHeader
} else if f.index > 0 {
err = ErrUnexpectedDictionaryPage
} else {
f.dictPage, _ = dictPagePool.Get().(*dictPage)
if f.dictPage == nil {
f.dictPage = new(dictPage)
}
f.dataPage.dictionary, err = column.decodeDictionary(
DictionaryPageHeader{header.DictionaryPageHeader},
f.dataPage,
f.dictPage,
)
}

// reading dictionary pages when we encounter them.
err = f.readDictionaryPage(header, f.dataPage)
default:
err = fmt.Errorf("cannot read values of type %s from page", header.Type)
}
Expand Down Expand Up @@ -575,6 +515,115 @@ func (f *filePages) ReadPage() (Page, error) {
}
}

func (f *filePages) readDictionary() error {
chunk := io.NewSectionReader(f.chunk.file, f.baseOffset, f.chunk.chunk.MetaData.TotalCompressedSize)
rbuf := acquireReadBuffer(chunk)
defer releaseReadBuffer(rbuf)

decoder := thrift.NewDecoder(f.protocol.NewReader(rbuf))
header := new(format.PageHeader)

if err := decoder.Decode(header); err != nil {
return err
}

page := acquireDataPage()
defer releaseDataPage(page)

if err := f.readPage(header, page, rbuf); err != nil {
return err
}

return f.readDictionaryPage(header, page)
}

func (f *filePages) readDictionaryPage(header *format.PageHeader, page *dataPage) (err error) {
if header.DictionaryPageHeader == nil {
return ErrMissingPageHeader
}
if f.index > 0 {
return ErrUnexpectedDictionaryPage
}
f.dictPage, _ = dictPagePool.Get().(*dictPage)
if f.dictPage == nil {
f.dictPage = new(dictPage)
}
f.dataPage.dictionary, err = f.chunk.column.decodeDictionary(
DictionaryPageHeader{header.DictionaryPageHeader},
page,
f.dictPage,
)
return err
}

func (f *filePages) readDataPageV1(header *format.PageHeader) (Page, error) {
if header.DataPageHeader == nil {
return nil, ErrMissingPageHeader
}
if isDictionaryFormat(header.DataPageHeader.Encoding) && f.dataPage.dictionary == nil {
if err := f.readDictionary(); err != nil {
return nil, err
}
}
return f.chunk.column.decodeDataPageV1(DataPageHeaderV1{header.DataPageHeader}, f.dataPage)
}

func (f *filePages) readDataPageV2(header *format.PageHeader) (Page, error) {
if header.DataPageHeaderV2 == nil {
return nil, ErrMissingPageHeader
}
if isDictionaryFormat(header.DataPageHeaderV2.Encoding) && f.dataPage.dictionary == nil {
// If the program seeked to a row passed the first page, the dictionary
// page may not have been seen, in which case we have to lazily load it
// from the beginning of column chunk.
if err := f.readDictionary(); err != nil {
return nil, err
}
}
return f.chunk.column.decodeDataPageV2(DataPageHeaderV2{header.DataPageHeaderV2}, f.dataPage)
}

func (f *filePages) readPage(header *format.PageHeader, page *dataPage, reader *bufio.Reader) error {
compressedPageSize, uncompressedPageSize := int(header.CompressedPageSize), int(header.UncompressedPageSize)

if cap(page.data) < compressedPageSize {
page.data = make([]byte, compressedPageSize)
} else {
page.data = page.data[:compressedPageSize]
}
if cap(page.values) < uncompressedPageSize {
page.values = make([]byte, 0, uncompressedPageSize)
}

if _, err := io.ReadFull(reader, page.data); err != nil {
return err
}

if header.CRC != 0 {
headerChecksum := uint32(header.CRC)
bufferChecksum := crc32.ChecksumIEEE(page.data)

if headerChecksum != bufferChecksum {
// The parquet specs indicate that corruption errors could be
// handled gracefully by skipping pages, tho this may not always
// be practical. Depending on how the pages are consumed,
// missing rows may cause unpredictable behaviors in algorithms.
//
// For now, we assume these errors to be fatal, but we may
// revisit later and improve error handling to be more resilient
// to data corruption.
return fmt.Errorf("crc32 checksum mismatch in page of column %q: want=0x%08X got=0x%08X: %w",
f.columnPath(),
headerChecksum,
bufferChecksum,
ErrCorrupted,
)
}
}

return nil
}

func (f *filePages) SeekToRow(rowIndex int64) (err error) {
if f.chunk == nil {
return io.ErrClosedPipe
Expand Down
109 changes: 109 additions & 0 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,3 +352,112 @@ func TestReaderSeekToRow(t *testing.T) {
}
}
}

func TestSeekToRowNoDict(t *testing.T) {
type rowType struct {
Name utf8string `parquet:","` // no dictionary encoding
}

// write samples to in-memory buffer
buf := new(bytes.Buffer)
schema := parquet.SchemaOf(new(rowType))
w := parquet.NewWriter(buf, schema)
sample := rowType{
Name: "foo1",
}
// write two rows
w.Write(sample)
sample.Name = "foo2"
w.Write(sample)
w.Close()

// create reader
r := parquet.NewReader(bytes.NewReader(buf.Bytes()))

// read second row
r.SeekToRow(1)
row := new(rowType)
err := r.Read(row)
if err != nil {
t.Fatalf("reading row: %v", err)
}
// fmt.Println(&sample, row)
if *row != sample {
t.Fatalf("read != write")
}
}

func TestSeekToRowReadAll(t *testing.T) {
type rowType struct {
Name utf8string `parquet:",dict"`
}

// write samples to in-memory buffer
buf := new(bytes.Buffer)
schema := parquet.SchemaOf(new(rowType))
w := parquet.NewWriter(buf, schema)
sample := rowType{
Name: "foo1",
}
// write two rows
w.Write(sample)
sample.Name = "foo2"
w.Write(sample)
w.Close()

// create reader
r := parquet.NewReader(bytes.NewReader(buf.Bytes()))

// read first row
r.SeekToRow(0)
row := new(rowType)
err := r.Read(row)
if err != nil {
t.Fatalf("reading row: %v", err)
}
// read second row
r.SeekToRow(1)
row = new(rowType)
err = r.Read(row)
if err != nil {
t.Fatalf("reading row: %v", err)
}
// fmt.Println(&sample, row)
if *row != sample {
t.Fatalf("read != write")
}
}

func TestSeekToRowDictReadSecond(t *testing.T) {
type rowType struct {
Name utf8string `parquet:",dict"`
}

// write samples to in-memory buffer
buf := new(bytes.Buffer)
schema := parquet.SchemaOf(new(rowType))
w := parquet.NewWriter(buf, schema)
sample := rowType{
Name: "foo1",
}
// write two rows
w.Write(sample)
sample.Name = "foo2"
w.Write(sample)
w.Close()

// create reader
r := parquet.NewReader(bytes.NewReader(buf.Bytes()))

// read second row
r.SeekToRow(1)
row := new(rowType)
err := r.Read(row)
if err != nil {
t.Fatalf("reading row: %v", err)
}
// fmt.Println(&sample, row)
if *row != sample {
t.Fatalf("read != write")
}
}

0 comments on commit bf1e4db

Please sign in to comment.