Skip to content

Commit

Permalink
PARQUET-1482: [C++] Add branch to TypedRecordReader::ReadNewPage for …
Browse files Browse the repository at this point in the history
…PageType::DATA_PAGE_V2 to address incompatibility with parquetjs. (#9)

PARQUET-1482: [C++] Add branch to TypedRecordReader::ReadNewPage for PageType::DATA_PAGE_V2 to address incompatibility with parquetjs.

Tests

This commit doesn't include tests; I am working on them now. I may need to use an actual file generated by parquetjs to test this issue, so I wonder if adding feeds1kMicros.parquet from the JIRA task to the parquet-testing repository is an option.

Description

parquetjs seems to be writing Parquet V2 files with DataPageV2 pages, while parquet-cpp writes Parquet V2 files with DataPage pages.

Since TypedRecordReader::ReadNewPage() only had a branch for PageType::DATA_PAGE, the reader would return without reading any data for records that have DATA_PAGE_V2 pages. This explains the behavior observed in PARQUET-1482.

This commit adds a new if-else branch for the DataPageV2 case in TypedRecordReader::ReadNewPage(). Since the DataPageV2 branch needed to reuse the code from the DataPage case, I refactored the repetition/definition level decoder initialization and the data decoder initialization to two new methods in the TypedRecordReader class. These new methods are now called by the DataPage and DataPageV2 initialization branches in TypedRecordReader::ReadNewPage().

There is an alternate implementation possible (with a smaller diff) by sharing the same else-if branch between DataPage and DataPageV2 using a pointer-to-derived shared_ptr<Page>. However, since the Page superclass doesn't have the necessary encoding() or num_values() methods, I would need to add a common superclass to both DataPage and DataPageV2 that defined these methods. I didn't do this because I was hesitant to modify the Page class hierarchy for this commit.
  • Loading branch information
rdmello committed Jan 5, 2019
1 parent fba4f32 commit d158118
Showing 1 changed file with 108 additions and 75 deletions.
183 changes: 108 additions & 75 deletions cpp/src/parquet/arrow/record_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,16 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl {

DecoderType* current_decoder_;

// Initialize repetition and definition level decoders on the next data page.
template <typename PageType>
int64_t InitializeLevelDecoders(const std::shared_ptr<PageType> page,
const Encoding::type repetition_level_encoding,
const Encoding::type definition_level_encoding);

template <typename PageType>
void InitializeDataDecoder(const std::shared_ptr<PageType> page,
const int64_t levels_bytes);

// Advance to the next data page
bool ReadNewPage() override;

Expand Down Expand Up @@ -724,11 +734,95 @@ inline void TypedRecordReader<DType>::ConfigureDictionary(const DictionaryPage*
current_decoder_ = decoders_[encoding].get();
}

// If the data page includes repetition and definition levels, we
// initialize the level decoders and return the number of encoded level bytes.
// The return value helps determine the number of bytes in the encoded data.
template <typename DType>
template <typename PageType>
int64_t TypedRecordReader<DType>::InitializeLevelDecoders(
const std::shared_ptr<PageType> page, const Encoding::type repetition_level_encoding,
const Encoding::type definition_level_encoding) {
// Read a data page.
num_buffered_values_ = page->num_values();

// Have not decoded any values from the data page yet
num_decoded_values_ = 0;

const uint8_t* buffer = page->data();
int64_t levels_byte_size = 0;

// Data page Layout: Repetition Levels - Definition Levels - encoded values.
// Levels are encoded as rle or bit-packed.
// Init repetition levels
if (descr_->max_repetition_level() > 0) {
int64_t rep_levels_bytes = repetition_level_decoder_.SetData(
repetition_level_encoding, descr_->max_repetition_level(),
static_cast<int>(num_buffered_values_), buffer);
buffer += rep_levels_bytes;
levels_byte_size += rep_levels_bytes;
}
// TODO figure a way to set max_definition_level_ to 0
// if the initial value is invalid

// Init definition levels
if (descr_->max_definition_level() > 0) {
int64_t def_levels_bytes = definition_level_decoder_.SetData(
definition_level_encoding, descr_->max_definition_level(),
static_cast<int>(num_buffered_values_), buffer);
levels_byte_size += def_levels_bytes;
}

return levels_byte_size;
}

// Get a decoder object for this page or create a new decoder if this is the
// first page with this encoding.
template <typename DType>
template <typename PageType>
void TypedRecordReader<DType>::InitializeDataDecoder(const std::shared_ptr<PageType> page,
const int64_t levels_byte_size) {
const uint8_t* buffer = page->data() + levels_byte_size;
const int64_t data_size = page->size() - levels_byte_size;

Encoding::type encoding = page->encoding();

if (IsDictionaryIndexEncoding(encoding)) {
encoding = Encoding::RLE_DICTIONARY;
}

auto it = decoders_.find(static_cast<int>(encoding));
if (it != decoders_.end()) {
if (encoding == Encoding::RLE_DICTIONARY) {
DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY);
}
current_decoder_ = it->second.get();
} else {
switch (encoding) {
case Encoding::PLAIN: {
std::shared_ptr<DecoderType> decoder(new PlainDecoder<DType>(descr_));
decoders_[static_cast<int>(encoding)] = decoder;
current_decoder_ = decoder.get();
break;
}
case Encoding::RLE_DICTIONARY:
throw ParquetException("Dictionary page must be before data page.");

case Encoding::DELTA_BINARY_PACKED:
case Encoding::DELTA_LENGTH_BYTE_ARRAY:
case Encoding::DELTA_BYTE_ARRAY:
ParquetException::NYI("Unsupported encoding");

default:
throw ParquetException("Unknown encoding type.");
}
}
current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer,
static_cast<int>(data_size));
}

template <typename DType>
bool TypedRecordReader<DType>::ReadNewPage() {
// Loop until we find the next data page.
const uint8_t* buffer;

while (true) {
current_page_ = pager_->NextPage();
if (!current_page_) {
Expand All @@ -740,79 +834,18 @@ bool TypedRecordReader<DType>::ReadNewPage() {
ConfigureDictionary(static_cast<const DictionaryPage*>(current_page_.get()));
continue;
} else if (current_page_->type() == PageType::DATA_PAGE) {
const DataPage* page = static_cast<const DataPage*>(current_page_.get());

// Read a data page.
num_buffered_values_ = page->num_values();

// Have not decoded any values from the data page yet
num_decoded_values_ = 0;

buffer = page->data();

// If the data page includes repetition and definition levels, we
// initialize the level decoder and subtract the encoded level bytes from
// the page size to determine the number of bytes in the encoded data.
int64_t data_size = page->size();

// Data page Layout: Repetition Levels - Definition Levels - encoded values.
// Levels are encoded as rle or bit-packed.
// Init repetition levels
if (descr_->max_repetition_level() > 0) {
int64_t rep_levels_bytes = repetition_level_decoder_.SetData(
page->repetition_level_encoding(), descr_->max_repetition_level(),
static_cast<int>(num_buffered_values_), buffer);
buffer += rep_levels_bytes;
data_size -= rep_levels_bytes;
}
// TODO figure a way to set max_definition_level_ to 0
// if the initial value is invalid

// Init definition levels
if (descr_->max_definition_level() > 0) {
int64_t def_levels_bytes = definition_level_decoder_.SetData(
page->definition_level_encoding(), descr_->max_definition_level(),
static_cast<int>(num_buffered_values_), buffer);
buffer += def_levels_bytes;
data_size -= def_levels_bytes;
}

// Get a decoder object for this page or create a new decoder if this is the
// first page with this encoding.
Encoding::type encoding = page->encoding();

if (IsDictionaryIndexEncoding(encoding)) {
encoding = Encoding::RLE_DICTIONARY;
}

auto it = decoders_.find(static_cast<int>(encoding));
if (it != decoders_.end()) {
if (encoding == Encoding::RLE_DICTIONARY) {
DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY);
}
current_decoder_ = it->second.get();
} else {
switch (encoding) {
case Encoding::PLAIN: {
std::shared_ptr<DecoderType> decoder(new PlainDecoder<DType>(descr_));
decoders_[static_cast<int>(encoding)] = decoder;
current_decoder_ = decoder.get();
break;
}
case Encoding::RLE_DICTIONARY:
throw ParquetException("Dictionary page must be before data page.");

case Encoding::DELTA_BINARY_PACKED:
case Encoding::DELTA_LENGTH_BYTE_ARRAY:
case Encoding::DELTA_BYTE_ARRAY:
ParquetException::NYI("Unsupported encoding");

default:
throw ParquetException("Unknown encoding type.");
}
}
current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer,
static_cast<int>(data_size));
const auto page = std::static_pointer_cast<DataPage>(current_page_);
const int64_t levels_byte_size = InitializeLevelDecoders(
page, page->repetition_level_encoding(), page->definition_level_encoding());
InitializeDataDecoder(page, levels_byte_size);
return true;
} else if (current_page_->type() == PageType::DATA_PAGE_V2) {
const auto page = std::static_pointer_cast<DataPageV2>(current_page_);
// Repetition and definition levels are always encoded using RLE encoding
// in the DataPageV2 format.
const int64_t levels_byte_size =
InitializeLevelDecoders(page, Encoding::RLE, Encoding::RLE);
InitializeDataDecoder(page, levels_byte_size);
return true;
} else {
// We don't know what this page type is. We're allowed to skip non-data
Expand Down

0 comments on commit d158118

Please sign in to comment.