Skip to content

Commit

Permalink
PARQUET-1482: [C++] Add branch to TypedRecordReader::ReadNewPage for …
Browse files Browse the repository at this point in the history
…PageType::DATA_PAGE_V2 to address incompatibility with parquetjs.

**Tests**

This commit doesn't include tests right now; I am working on adding tests and was hoping for some initial feedback on the code changes. I may need to use an actual file generated by `parquetjs` to test this issue, so I wonder if adding `feeds1kMicros.parquet` from the JIRA task to the parquet-testing repository is an option for this.

**Description**

`parquetjs` seems to be writing Parquet V2 files with [`DataPageV2`](https://github.com/apache/parquet-format/blob/e93dd628d90aa076745558998f0bf5d9c262bf22/src/main/thrift/parquet.thrift#L529) pages, while `parquet-cpp` writes Parquet V2 files with [`DataPage`](https://github.com/apache/parquet-format/blob/e93dd628d90aa076745558998f0bf5d9c262bf22/src/main/thrift/parquet.thrift#L492) pages.

Since `TypedRecordReader::ReadNewPage()` only had a branch for `PageType::DATA_PAGE`, the reader would return without reading any data for records that have `DATA_PAGE_V2` pages. This explains the behavior observed in [PARQUET-1482](https://issues.apache.org/jira/projects/PARQUET/issues/PARQUET-1482?filter=allopenissues).

This commit adds a new if-else branch for the `DataPageV2` case in `TypedRecordReader::ReadNewPage()`. Since the `DataPageV2` branch needed to reuse the code from the `DataPage` case, I refactored the repetition/definition level decoder initialization and the data decoder initialization to two new methods in the `TypedRecordReader` class. These new methods are now called by the `DataPage` and `DataPageV2` initialization branches in `TypedRecordReader::ReadNewPage()`.

There is an alternate implementation possible (with a smaller diff) by sharing the same else-if branch between `DataPage` and `DataPageV2` using a pointer-to-derived `shared_ptr<Page>`. However, since the Page superclass doesn't have the necessary `encoding()` or `num_values()` methods, I would need to add a common superclass to both `DataPage` and `DataPageV2` that defined these methods. I didn't do this because I was hesitant to modify the `Page` class hierarchy for this commit.

Author: Wes McKinney <wesm+git@apache.org>
Author: rdmello <rylan.dmello@mathworks.com>
Author: Rylan Dmello <rdmello@users.noreply.github.com>

Closes #3312 from rdmello/parquet_1482 and squashes the following commits:

c5cb0f3 <Wes McKinney> Add DataPage base class for DataPageV1 and DataPageV2
8df8328 <rdmello> PARQUET-1482:  Adding basic unit test for DataPageV2 serialization and deserialization.
9df3222 <Rylan Dmello> PARQUET-1482:  Add branch to TypedRecordReader::ReadNewPage for PageType::DATA_PAGE_V2 to address incompatibility with parquetjs.
  • Loading branch information
wesm committed Mar 6, 2019
1 parent b77b662 commit c16eccb
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 123 deletions.
186 changes: 107 additions & 79 deletions cpp/src/parquet/arrow/record_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ namespace BitUtil = ::arrow::BitUtil;

// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index
// encoding.
static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
static bool IsDictionaryIndexEncoding(Encoding::type e) {
return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY;
}

Expand Down Expand Up @@ -86,7 +86,7 @@ class RecordReader::RecordReaderImpl {

virtual ~RecordReaderImpl() = default;

virtual int64_t ReadRecordData(const int64_t num_records) = 0;
virtual int64_t ReadRecordData(int64_t num_records) = 0;

// Returns true if there are still values in this column.
bool HasNext() {
Expand Down Expand Up @@ -494,7 +494,7 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl {
}

// Return number of logical records read
int64_t ReadRecordData(const int64_t num_records) override {
int64_t ReadRecordData(int64_t num_records) override {
// Conservative upper bound
const int64_t possible_num_values =
std::max(num_records, levels_written_ - levels_position_);
Expand Down Expand Up @@ -580,6 +580,13 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl {

DecoderType* current_decoder_;

// Initialize repetition and definition level decoders on the next data page.
int64_t InitializeLevelDecoders(const DataPage& page,
Encoding::type repetition_level_encoding,
Encoding::type definition_level_encoding);

void InitializeDataDecoder(const DataPage& page, int64_t levels_bytes);

// Advance to the next data page
bool ReadNewPage() override;

Expand Down Expand Up @@ -717,11 +724,94 @@ inline void TypedRecordReader<DType>::ConfigureDictionary(const DictionaryPage*
DCHECK(current_decoder_);
}

// If the data page includes repetition and definition levels, we
// initialize the level decoders and return the number of encoded level bytes.
// The return value helps determine the number of bytes in the encoded data.
template <typename DType>
int64_t TypedRecordReader<DType>::InitializeLevelDecoders(
const DataPage& page, Encoding::type repetition_level_encoding,
Encoding::type definition_level_encoding) {
// Read a data page.
num_buffered_values_ = page.num_values();

// Have not decoded any values from the data page yet
num_decoded_values_ = 0;

const uint8_t* buffer = page.data();
int64_t levels_byte_size = 0;

// Data page Layout: Repetition Levels - Definition Levels - encoded values.
// Levels are encoded as rle or bit-packed.
// Init repetition levels
if (descr_->max_repetition_level() > 0) {
int64_t rep_levels_bytes = repetition_level_decoder_.SetData(
repetition_level_encoding, descr_->max_repetition_level(),
static_cast<int>(num_buffered_values_), buffer);
buffer += rep_levels_bytes;
levels_byte_size += rep_levels_bytes;
}
// TODO figure a way to set max_definition_level_ to 0
// if the initial value is invalid

// Init definition levels
if (descr_->max_definition_level() > 0) {
int64_t def_levels_bytes = definition_level_decoder_.SetData(
definition_level_encoding, descr_->max_definition_level(),
static_cast<int>(num_buffered_values_), buffer);
levels_byte_size += def_levels_bytes;
}

return levels_byte_size;
}

// Get a decoder object for this page or create a new decoder if this is the
// first page with this encoding.
template <typename DType>
void TypedRecordReader<DType>::InitializeDataDecoder(const DataPage& page,
int64_t levels_byte_size) {
const uint8_t* buffer = page.data() + levels_byte_size;
const int64_t data_size = page.size() - levels_byte_size;

Encoding::type encoding = page.encoding();

if (IsDictionaryIndexEncoding(encoding)) {
encoding = Encoding::RLE_DICTIONARY;
}

auto it = decoders_.find(static_cast<int>(encoding));
if (it != decoders_.end()) {
DCHECK(it->second.get() != nullptr);
if (encoding == Encoding::RLE_DICTIONARY) {
DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY);
}
current_decoder_ = it->second.get();
} else {
switch (encoding) {
case Encoding::PLAIN: {
auto decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_);
current_decoder_ = decoder.get();
decoders_[static_cast<int>(encoding)] = std::move(decoder);
break;
}
case Encoding::RLE_DICTIONARY:
throw ParquetException("Dictionary page must be before data page.");

case Encoding::DELTA_BINARY_PACKED:
case Encoding::DELTA_LENGTH_BYTE_ARRAY:
case Encoding::DELTA_BYTE_ARRAY:
ParquetException::NYI("Unsupported encoding");

default:
throw ParquetException("Unknown encoding type.");
}
}
current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer,
static_cast<int>(data_size));
}

template <typename DType>
bool TypedRecordReader<DType>::ReadNewPage() {
// Loop until we find the next data page.
const uint8_t* buffer;

while (true) {
current_page_ = pager_->NextPage();
if (!current_page_) {
Expand All @@ -733,80 +823,18 @@ bool TypedRecordReader<DType>::ReadNewPage() {
ConfigureDictionary(static_cast<const DictionaryPage*>(current_page_.get()));
continue;
} else if (current_page_->type() == PageType::DATA_PAGE) {
const DataPage* page = static_cast<const DataPage*>(current_page_.get());

// Read a data page.
num_buffered_values_ = page->num_values();

// Have not decoded any values from the data page yet
num_decoded_values_ = 0;

buffer = page->data();

// If the data page includes repetition and definition levels, we
// initialize the level decoder and subtract the encoded level bytes from
// the page size to determine the number of bytes in the encoded data.
int64_t data_size = page->size();

// Data page Layout: Repetition Levels - Definition Levels - encoded values.
// Levels are encoded as rle or bit-packed.
// Init repetition levels
if (descr_->max_repetition_level() > 0) {
int64_t rep_levels_bytes = repetition_level_decoder_.SetData(
page->repetition_level_encoding(), descr_->max_repetition_level(),
static_cast<int>(num_buffered_values_), buffer);
buffer += rep_levels_bytes;
data_size -= rep_levels_bytes;
}
// TODO figure a way to set max_definition_level_ to 0
// if the initial value is invalid

// Init definition levels
if (descr_->max_definition_level() > 0) {
int64_t def_levels_bytes = definition_level_decoder_.SetData(
page->definition_level_encoding(), descr_->max_definition_level(),
static_cast<int>(num_buffered_values_), buffer);
buffer += def_levels_bytes;
data_size -= def_levels_bytes;
}

// Get a decoder object for this page or create a new decoder if this is the
// first page with this encoding.
Encoding::type encoding = page->encoding();

if (IsDictionaryIndexEncoding(encoding)) {
encoding = Encoding::RLE_DICTIONARY;
}

auto it = decoders_.find(static_cast<int>(encoding));
if (it != decoders_.end()) {
DCHECK(it->second.get() != nullptr);
if (encoding == Encoding::RLE_DICTIONARY) {
DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY);
}
current_decoder_ = it->second.get();
} else {
switch (encoding) {
case Encoding::PLAIN: {
auto decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_);
current_decoder_ = decoder.get();
decoders_[static_cast<int>(encoding)] = std::move(decoder);
break;
}
case Encoding::RLE_DICTIONARY:
throw ParquetException("Dictionary page must be before data page.");

case Encoding::DELTA_BINARY_PACKED:
case Encoding::DELTA_LENGTH_BYTE_ARRAY:
case Encoding::DELTA_BYTE_ARRAY:
ParquetException::NYI("Unsupported encoding");

default:
throw ParquetException("Unknown encoding type.");
}
}
current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer,
static_cast<int>(data_size));
const auto page = std::static_pointer_cast<DataPageV1>(current_page_);
const int64_t levels_byte_size = InitializeLevelDecoders(
*page, page->repetition_level_encoding(), page->definition_level_encoding());
InitializeDataDecoder(*page, levels_byte_size);
return true;
} else if (current_page_->type() == PageType::DATA_PAGE_V2) {
const auto page = std::static_pointer_cast<DataPageV2>(current_page_);
// Repetition and definition levels are always encoded using RLE encoding
// in the DataPageV2 format.
const int64_t levels_byte_size =
InitializeLevelDecoders(*page, Encoding::RLE, Encoding::RLE);
InitializeDataDecoder(*page, levels_byte_size);
return true;
} else {
// We don't know what this page type is. We're allowed to skip non-data
Expand Down
53 changes: 27 additions & 26 deletions cpp/src/parquet/column_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,45 +59,54 @@ class Page {
PageType::type type_;
};

/// \brief Base type for DataPageV1 and DataPageV2 including common attributes
class DataPage : public Page {
public:
DataPage(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
Encoding::type encoding, Encoding::type definition_level_encoding,
Encoding::type repetition_level_encoding,
int32_t num_values() const { return num_values_; }
Encoding::type encoding() const { return encoding_; }
const EncodedStatistics& statistics() const { return statistics_; }

protected:
DataPage(PageType::type type, const std::shared_ptr<Buffer>& buffer, int32_t num_values,
Encoding::type encoding,
const EncodedStatistics& statistics = EncodedStatistics())
: Page(buffer, PageType::DATA_PAGE),
: Page(buffer, type),
num_values_(num_values),
encoding_(encoding),
definition_level_encoding_(definition_level_encoding),
repetition_level_encoding_(repetition_level_encoding),
statistics_(statistics) {}

int32_t num_values() const { return num_values_; }
int32_t num_values_;
Encoding::type encoding_;
EncodedStatistics statistics_;
};

Encoding::type encoding() const { return encoding_; }
class DataPageV1 : public DataPage {
public:
DataPageV1(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
Encoding::type encoding, Encoding::type definition_level_encoding,
Encoding::type repetition_level_encoding,
const EncodedStatistics& statistics = EncodedStatistics())
: DataPage(PageType::DATA_PAGE, buffer, num_values, encoding, statistics),
definition_level_encoding_(definition_level_encoding),
repetition_level_encoding_(repetition_level_encoding) {}

Encoding::type repetition_level_encoding() const { return repetition_level_encoding_; }

Encoding::type definition_level_encoding() const { return definition_level_encoding_; }

const EncodedStatistics& statistics() const { return statistics_; }

private:
int32_t num_values_;
Encoding::type encoding_;
Encoding::type definition_level_encoding_;
Encoding::type repetition_level_encoding_;
EncodedStatistics statistics_;
};

class CompressedDataPage : public DataPage {
class CompressedDataPage : public DataPageV1 {
public:
CompressedDataPage(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
Encoding::type encoding, Encoding::type definition_level_encoding,
Encoding::type repetition_level_encoding, int64_t uncompressed_size,
const EncodedStatistics& statistics = EncodedStatistics())
: DataPage(buffer, num_values, encoding, definition_level_encoding,
repetition_level_encoding, statistics),
: DataPageV1(buffer, num_values, encoding, definition_level_encoding,
repetition_level_encoding, statistics),
uncompressed_size_(uncompressed_size) {}

int64_t uncompressed_size() const { return uncompressed_size_; }
Expand All @@ -106,40 +115,32 @@ class CompressedDataPage : public DataPage {
int64_t uncompressed_size_;
};

class DataPageV2 : public Page {
class DataPageV2 : public DataPage {
public:
DataPageV2(const std::shared_ptr<Buffer>& buffer, int32_t num_values, int32_t num_nulls,
int32_t num_rows, Encoding::type encoding,
int32_t definition_levels_byte_length, int32_t repetition_levels_byte_length,
bool is_compressed = false)
: Page(buffer, PageType::DATA_PAGE_V2),
num_values_(num_values),
: DataPage(PageType::DATA_PAGE_V2, buffer, num_values, encoding),
num_nulls_(num_nulls),
num_rows_(num_rows),
encoding_(encoding),
definition_levels_byte_length_(definition_levels_byte_length),
repetition_levels_byte_length_(repetition_levels_byte_length),
is_compressed_(is_compressed) {}

int32_t num_values() const { return num_values_; }

int32_t num_nulls() const { return num_nulls_; }

int32_t num_rows() const { return num_rows_; }

Encoding::type encoding() const { return encoding_; }

int32_t definition_levels_byte_length() const { return definition_levels_byte_length_; }

int32_t repetition_levels_byte_length() const { return repetition_levels_byte_length_; }

bool is_compressed() const { return is_compressed_; }

private:
int32_t num_values_;
int32_t num_nulls_;
int32_t num_rows_;
Encoding::type encoding_;
int32_t definition_levels_byte_length_;
int32_t repetition_levels_byte_length_;
bool is_compressed_;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/column_reader-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ TEST_F(TestPrimitiveReader, TestDictionaryEncodedPages) {

shared_ptr<DictionaryPage> dict_page =
std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN);
shared_ptr<DataPage> data_page = MakeDataPage<Int32Type>(
shared_ptr<DataPageV1> data_page = MakeDataPage<Int32Type>(
&descr, {}, 0, Encoding::RLE_DICTIONARY, {}, 0, {}, 0, {}, 0);
pages_.push_back(dict_page);
pages_.push_back(data_page);
Expand Down
Loading

0 comments on commit c16eccb

Please sign in to comment.