Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-1482: [C++] Add branch to TypedRecordReader::ReadNewPage for … #3312

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 109 additions & 76 deletions cpp/src/parquet/arrow/record_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,16 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl {

DecoderType* current_decoder_;

// Initialize repetition and definition level decoders on the next data page.
template <typename PageType>
int64_t InitializeLevelDecoders(const std::shared_ptr<PageType> page,
const Encoding::type repetition_level_encoding,
const Encoding::type definition_level_encoding);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const is not needed with Encoding::type


template <typename PageType>
void InitializeDataDecoder(const std::shared_ptr<PageType> page,
const int64_t levels_bytes);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const not needed with int64


// Advance to the next data page
bool ReadNewPage() override;

Expand Down Expand Up @@ -717,11 +727,96 @@ inline void TypedRecordReader<DType>::ConfigureDictionary(const DictionaryPage*
DCHECK(current_decoder_);
}

// If the data page includes repetition and definition levels, we
// initialize the level decoders and return the number of encoded level bytes.
// The return value helps determine the number of bytes in the encoded data.
template <typename DType>
template <typename PageType>
rdmello marked this conversation as resolved.
Show resolved Hide resolved
int64_t TypedRecordReader<DType>::InitializeLevelDecoders(
const std::shared_ptr<PageType> page, const Encoding::type repetition_level_encoding,
const Encoding::type definition_level_encoding) {
// Read a data page.
num_buffered_values_ = page->num_values();

// Have not decoded any values from the data page yet
num_decoded_values_ = 0;

const uint8_t* buffer = page->data();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only data() and num_values() are used here. We should have a common base class for DataPageV1 and DataPageV2 instead of having this template

int64_t levels_byte_size = 0;

// Data page Layout: Repetition Levels - Definition Levels - encoded values.
// Levels are encoded as rle or bit-packed.
// Init repetition levels
if (descr_->max_repetition_level() > 0) {
int64_t rep_levels_bytes = repetition_level_decoder_.SetData(
repetition_level_encoding, descr_->max_repetition_level(),
static_cast<int>(num_buffered_values_), buffer);
buffer += rep_levels_bytes;
levels_byte_size += rep_levels_bytes;
}
// TODO figure a way to set max_definition_level_ to 0
// if the initial value is invalid

// Init definition levels
if (descr_->max_definition_level() > 0) {
int64_t def_levels_bytes = definition_level_decoder_.SetData(
definition_level_encoding, descr_->max_definition_level(),
static_cast<int>(num_buffered_values_), buffer);
levels_byte_size += def_levels_bytes;
}

return levels_byte_size;
}

// Get a decoder object for this page or create a new decoder if this is the
// first page with this encoding.
template <typename DType>
template <typename PageType>
void TypedRecordReader<DType>::InitializeDataDecoder(const std::shared_ptr<PageType> page,
const int64_t levels_byte_size) {
const uint8_t* buffer = page->data() + levels_byte_size;
const int64_t data_size = page->size() - levels_byte_size;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.


Encoding::type encoding = page->encoding();

if (IsDictionaryIndexEncoding(encoding)) {
encoding = Encoding::RLE_DICTIONARY;
}

auto it = decoders_.find(static_cast<int>(encoding));
if (it != decoders_.end()) {
DCHECK(it->second.get() != nullptr);
if (encoding == Encoding::RLE_DICTIONARY) {
DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY);
}
current_decoder_ = it->second.get();
} else {
switch (encoding) {
case Encoding::PLAIN: {
auto decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_);
current_decoder_ = decoder.get();
decoders_[static_cast<int>(encoding)] = std::move(decoder);
break;
}
case Encoding::RLE_DICTIONARY:
throw ParquetException("Dictionary page must be before data page.");

case Encoding::DELTA_BINARY_PACKED:
case Encoding::DELTA_LENGTH_BYTE_ARRAY:
case Encoding::DELTA_BYTE_ARRAY:
ParquetException::NYI("Unsupported encoding");

default:
throw ParquetException("Unknown encoding type.");
}
}
current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer,
static_cast<int>(data_size));
}

template <typename DType>
bool TypedRecordReader<DType>::ReadNewPage() {
// Loop until we find the next data page.
const uint8_t* buffer;

while (true) {
current_page_ = pager_->NextPage();
if (!current_page_) {
Expand All @@ -733,80 +828,18 @@ bool TypedRecordReader<DType>::ReadNewPage() {
ConfigureDictionary(static_cast<const DictionaryPage*>(current_page_.get()));
continue;
} else if (current_page_->type() == PageType::DATA_PAGE) {
const DataPage* page = static_cast<const DataPage*>(current_page_.get());

// Read a data page.
num_buffered_values_ = page->num_values();

// Have not decoded any values from the data page yet
num_decoded_values_ = 0;

buffer = page->data();

// If the data page includes repetition and definition levels, we
// initialize the level decoder and subtract the encoded level bytes from
// the page size to determine the number of bytes in the encoded data.
int64_t data_size = page->size();

// Data page Layout: Repetition Levels - Definition Levels - encoded values.
// Levels are encoded as rle or bit-packed.
// Init repetition levels
if (descr_->max_repetition_level() > 0) {
int64_t rep_levels_bytes = repetition_level_decoder_.SetData(
page->repetition_level_encoding(), descr_->max_repetition_level(),
static_cast<int>(num_buffered_values_), buffer);
buffer += rep_levels_bytes;
data_size -= rep_levels_bytes;
}
// TODO figure a way to set max_definition_level_ to 0
// if the initial value is invalid

// Init definition levels
if (descr_->max_definition_level() > 0) {
int64_t def_levels_bytes = definition_level_decoder_.SetData(
page->definition_level_encoding(), descr_->max_definition_level(),
static_cast<int>(num_buffered_values_), buffer);
buffer += def_levels_bytes;
data_size -= def_levels_bytes;
}

// Get a decoder object for this page or create a new decoder if this is the
// first page with this encoding.
Encoding::type encoding = page->encoding();

if (IsDictionaryIndexEncoding(encoding)) {
encoding = Encoding::RLE_DICTIONARY;
}

auto it = decoders_.find(static_cast<int>(encoding));
if (it != decoders_.end()) {
DCHECK(it->second.get() != nullptr);
if (encoding == Encoding::RLE_DICTIONARY) {
DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY);
}
current_decoder_ = it->second.get();
} else {
switch (encoding) {
case Encoding::PLAIN: {
auto decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_);
current_decoder_ = decoder.get();
decoders_[static_cast<int>(encoding)] = std::move(decoder);
break;
}
case Encoding::RLE_DICTIONARY:
throw ParquetException("Dictionary page must be before data page.");

case Encoding::DELTA_BINARY_PACKED:
case Encoding::DELTA_LENGTH_BYTE_ARRAY:
case Encoding::DELTA_BYTE_ARRAY:
ParquetException::NYI("Unsupported encoding");

default:
throw ParquetException("Unknown encoding type.");
}
}
current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer,
static_cast<int>(data_size));
const auto page = std::static_pointer_cast<DataPage>(current_page_);
const int64_t levels_byte_size = InitializeLevelDecoders(
page, page->repetition_level_encoding(), page->definition_level_encoding());
InitializeDataDecoder(page, levels_byte_size);
return true;
} else if (current_page_->type() == PageType::DATA_PAGE_V2) {
const auto page = std::static_pointer_cast<DataPageV2>(current_page_);
// Repetition and definition levels are always encoded using RLE encoding
// in the DataPageV2 format.
const int64_t levels_byte_size =
InitializeLevelDecoders(page, Encoding::RLE, Encoding::RLE);
InitializeDataDecoder(page, levels_byte_size);
return true;
} else {
// We don't know what this page type is. We're allowed to skip non-data
Expand Down
46 changes: 46 additions & 0 deletions cpp/src/parquet/file-deserialize-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,21 @@ class TestPageSerde : public ::testing::Test {
ASSERT_NO_THROW(serializer.Serialize(&page_header_, out_stream_.get()));
}

void WriteDataPageHeaderV2(int max_serialized_len = 1024, int32_t uncompressed_size = 0,
int32_t compressed_size = 0) {
// Simplifying writing serialized data page V2 headers which may or may not
// have meaningful data associated with them

// Serialize the Page header
page_header_.__set_data_page_header_v2(data_page_header_v2_);
page_header_.uncompressed_page_size = uncompressed_size;
page_header_.compressed_page_size = compressed_size;
page_header_.type = format::PageType::DATA_PAGE_V2;

ThriftSerializer serializer;
ASSERT_NO_THROW(serializer.Serialize(&page_header_, out_stream_.get()));
}

void ResetStream() { out_stream_.reset(new InMemoryOutputStream); }

void EndStream() { out_buffer_ = out_stream_->GetBuffer(); }
Expand All @@ -101,6 +116,7 @@ class TestPageSerde : public ::testing::Test {
std::unique_ptr<PageReader> page_reader_;
format::PageHeader page_header_;
format::DataPageHeader data_page_header_;
format::DataPageHeaderV2 data_page_header_v2_;
};

void CheckDataPageHeader(const format::DataPageHeader expected, const Page* page) {
Expand All @@ -120,6 +136,24 @@ void CheckDataPageHeader(const format::DataPageHeader expected, const Page* page
}
}

// Overload for DataPageV2 tests.
void CheckDataPageHeader(const format::DataPageHeaderV2 expected, const Page* page) {
ASSERT_EQ(PageType::DATA_PAGE_V2, page->type());

const DataPageV2* data_page = static_cast<const DataPageV2*>(page);
ASSERT_EQ(expected.num_values, data_page->num_values());
ASSERT_EQ(expected.num_nulls, data_page->num_nulls());
ASSERT_EQ(expected.num_rows, data_page->num_rows());
ASSERT_EQ(expected.encoding, data_page->encoding());
ASSERT_EQ(expected.definition_levels_byte_length,
data_page->definition_levels_byte_length());
ASSERT_EQ(expected.repetition_levels_byte_length,
data_page->repetition_levels_byte_length());
ASSERT_EQ(expected.is_compressed, data_page->is_compressed());

// TODO: Tests for DataPageHeaderV2 statistics.
}

TEST_F(TestPageSerde, DataPage) {
format::PageHeader out_page_header;

Expand All @@ -134,6 +168,18 @@ TEST_F(TestPageSerde, DataPage) {
ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_, current_page.get()));
}

TEST_F(TestPageSerde, DataPageV2) {
format::PageHeader out_page_header;

const int32_t num_rows = 4444;
data_page_header_.num_values = num_rows;

ASSERT_NO_FATAL_FAILURE(WriteDataPageHeaderV2());
InitSerializedPageReader(num_rows);
std::shared_ptr<Page> current_page = page_reader_->NextPage();
ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_v2_, current_page.get()));
}

TEST_F(TestPageSerde, TestLargePageHeaders) {
int stats_size = 256 * 1024; // 256 KB
AddDummyStats(stats_size, data_page_header_);
Expand Down