Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback #14603

Merged
merged 14 commits into from
Jan 12, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 71 additions & 28 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,12 @@ EncodedStatistics ExtractStatsFromHeader(const H& header) {
return page_statistics;
}

void CheckNumValuesInHeader(int num_values) {
if (num_values < 0) {
throw ParquetException("Invalid page header (negative number of values)");
}
}

// ----------------------------------------------------------------------
// SerializedPageReader deserializes Thrift metadata and pages that have been
// assembled in a serialized stream for storing in a Parquet files
Expand Down Expand Up @@ -263,6 +269,11 @@ class SerializedPageReader : public PageReader {
int compressed_len, int uncompressed_len,
int levels_byte_len = 0);

// Returns true for non-data pages, and if we should skip based on
// data_page_filter_. Performs basic checks on values in the page header.
// Fills in data_page_statistics.
bool ShouldSkipPage(EncodedStatistics* data_page_statistics);

const ReaderProperties properties_;
std::shared_ptr<ArrowInputStream> stream_;

Expand Down Expand Up @@ -337,6 +348,55 @@ void SerializedPageReader::UpdateDecryption(const std::shared_ptr<Decryptor>& de
}
}

bool SerializedPageReader::ShouldSkipPage(EncodedStatistics* data_page_statistics) {
const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);
if (page_type == PageType::DATA_PAGE) {
const format::DataPageHeader& header = current_page_header_.data_page_header;
CheckNumValuesInHeader(header.num_values);
*data_page_statistics = ExtractStatsFromHeader(header);
seen_num_values_ += header.num_values;
if (data_page_filter_) {
const EncodedStatistics* filter_statistics =
data_page_statistics->is_set() ? data_page_statistics : nullptr;
DataPageStats data_page_stats(filter_statistics, header.num_values,
/*num_rows=*/std::nullopt);
if (data_page_filter_(data_page_stats)) {
return true;
}
}
} else if (page_type == PageType::DATA_PAGE_V2) {
const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;
CheckNumValuesInHeader(header.num_values);
if (header.num_rows < 0) {
throw ParquetException("Invalid page header (negative number of rows)");
}
if (header.definition_levels_byte_length < 0 ||
header.repetition_levels_byte_length < 0) {
throw ParquetException("Invalid page header (negative levels byte length)");
}
*data_page_statistics = ExtractStatsFromHeader(header);
seen_num_values_ += header.num_values;
if (data_page_filter_) {
const EncodedStatistics* filter_statistics =
data_page_statistics->is_set() ? data_page_statistics : nullptr;
DataPageStats data_page_stats(filter_statistics, header.num_values,
header.num_rows);
if (data_page_filter_(data_page_stats)) {
return true;
}
}
} else if (page_type == PageType::DICTIONARY_PAGE) {
const format::DictionaryPageHeader& dict_header =
current_page_header_.dictionary_page_header;
CheckNumValuesInHeader(dict_header.num_values);
} else {
// We don't know what this page type is. We're allowed to skip non-data
// pages.
return true;
}
return false;
}

std::shared_ptr<Page> SerializedPageReader::NextPage() {
ThriftDeserializer deserializer(properties_);

Expand Down Expand Up @@ -386,6 +446,12 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
throw ParquetException("Invalid page header");
}

EncodedStatistics data_page_statistics;
if (ShouldSkipPage(&data_page_statistics)) {
PARQUET_THROW_NOT_OK(stream_->Advance(compressed_len));
continue;
}

if (crypto_ctx_.data_decryptor != nullptr) {
UpdateDecryption(crypto_ctx_.data_decryptor, encryption::kDictionaryPage,
data_page_aad_);
Expand All @@ -411,19 +477,14 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
page_buffer = decryption_buffer_;
}

// Uncompress and construct the pages to return.
const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);

if (page_type == PageType::DICTIONARY_PAGE) {
crypto_ctx_.start_decrypt_with_dictionary_page = false;
const format::DictionaryPageHeader& dict_header =
current_page_header_.dictionary_page_header;

bool is_sorted = dict_header.__isset.is_sorted ? dict_header.is_sorted : false;
if (dict_header.num_values < 0) {
throw ParquetException("Invalid page header (negative number of values)");
}

// Uncompress if needed
page_buffer =
DecompressIfNeeded(std::move(page_buffer), compressed_len, uncompressed_len);

Expand All @@ -433,39 +494,22 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
} else if (page_type == PageType::DATA_PAGE) {
++page_ordinal_;
const format::DataPageHeader& header = current_page_header_.data_page_header;

if (header.num_values < 0) {
throw ParquetException("Invalid page header (negative number of values)");
}
EncodedStatistics page_statistics = ExtractStatsFromHeader(header);
seen_num_values_ += header.num_values;

// Uncompress if needed
page_buffer =
DecompressIfNeeded(std::move(page_buffer), compressed_len, uncompressed_len);

return std::make_shared<DataPageV1>(page_buffer, header.num_values,
LoadEnumSafe(&header.encoding),
LoadEnumSafe(&header.definition_level_encoding),
LoadEnumSafe(&header.repetition_level_encoding),
uncompressed_len, page_statistics);
uncompressed_len, data_page_statistics);
} else if (page_type == PageType::DATA_PAGE_V2) {
++page_ordinal_;
const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;

if (header.num_values < 0) {
throw ParquetException("Invalid page header (negative number of values)");
}
if (header.definition_levels_byte_length < 0 ||
header.repetition_levels_byte_length < 0) {
throw ParquetException("Invalid page header (negative levels byte length)");
}
// Arrow prior to 3.0.0 set is_compressed to false but still compressed.
bool is_compressed =
(header.__isset.is_compressed ? header.is_compressed : false) ||
always_compressed_;
EncodedStatistics page_statistics = ExtractStatsFromHeader(header);
seen_num_values_ += header.num_values;

// Uncompress if needed
int levels_byte_len;
Expand All @@ -484,11 +528,10 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
page_buffer, header.num_values, header.num_nulls, header.num_rows,
LoadEnumSafe(&header.encoding), header.definition_levels_byte_length,
header.repetition_levels_byte_length, uncompressed_len, is_compressed,
page_statistics);
data_page_statistics);
} else {
// We don't know what this page type is. We're allowed to skip non-data
// pages.
continue;
throw ParquetException(
"Internal error, we have already skipped non-data pages in ShouldSkipPage()");
}
}
return std::shared_ptr<Page>(nullptr);
Expand Down
40 changes: 40 additions & 0 deletions cpp/src/parquet/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "parquet/exception.h"
#include "parquet/level_conversion.h"
#include "parquet/metadata.h"
#include "parquet/platform.h"
#include "parquet/properties.h"
#include "parquet/schema.h"
Expand Down Expand Up @@ -55,6 +56,27 @@ static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024;
// 16 KB is the default expected page header size
static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024;

// \brief DataPageStats stores encoded statistics and number of values/rows for
// a page.
struct PARQUET_EXPORT DataPageStats {
DataPageStats(const EncodedStatistics* encoded_statistics, int32_t num_values,
std::optional<int32_t> num_rows)
: encoded_statistics(encoded_statistics),
num_values(num_values),
num_rows(num_rows) {}

// Encoded statistics extracted from the page header.
// Nullptr if there are no statistics in the page header.
const EncodedStatistics* encoded_statistics;
// Number of values stored in the page. Filled for both V1 and V2 data pages.
// For repeated fields, this can be greater than number of rows. For
// non-repeated fields, this will be the same as the number of rows.
int32_t num_values;
// Number of rows stored in the page. std::nullopt for V1 data pages since
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe rephrase, is that this might not be available for v1 data pages. It still seems like it is possible to set if page indexes are written for v1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// num_rows is not stored in format::DataPageHeader.
std::optional<int32_t> num_rows;
};

class PARQUET_EXPORT LevelDecoder {
public:
LevelDecoder();
Expand Down Expand Up @@ -100,6 +122,8 @@ struct CryptoContext {
// Abstract page iterator interface. This way, we can feed column pages to the
// ColumnReader through whatever mechanism we choose
class PARQUET_EXPORT PageReader {
using DataPageFilter = std::function<bool(const DataPageStats&)>;

public:
virtual ~PageReader() = default;

Expand All @@ -115,11 +139,27 @@ class PARQUET_EXPORT PageReader {
bool always_compressed = false,
const CryptoContext* ctx = NULLPTR);

// If data_page_filter is present (not null), NextPage() will call the
// callback function exactly once per page in the order the pages appear in
// the column. If the callback function returns true the page will be
// skipped. The callback will be called only if the page type is DATA_PAGE or
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would read better if it was advertised as a page filter (returning true to include a page and false to exclude it).
Calling it a "skip page callback" may give the impression that it will only be called by the Skip APIs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, but this should then skip the page if the returned value is false, not true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like since it is a filter, it should return true if it is filtering? But I am not opposed to the other way around, especially if there is a convention for how it should be.

// DATA_PAGE_V2. Dictionary pages will not be skipped.
// Caller is responsible for checking that statistics are correct using
// ApplicationVersion::HasCorrectStatistics().
// \note API EXPERIMENTAL
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some description of what happens when the file does not contain page statistics (e.g. because the writer did not write them)? Will the filter simply not be called? Or will it be called with some kind of "empty" statistics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I added a is_stats_set field that should be checked before using the filled encoded_stats.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I removed the is_stats_set, it will return nullptr for encoded_statistics if there are no statistics in the header. I added that to the comments.

void set_data_page_filter(DataPageFilter data_page_filter) {
data_page_filter_ = std::move(data_page_filter);
}

// @returns: shared_ptr<Page>(nullptr) on EOS, std::shared_ptr<Page>
// containing new Page otherwise
virtual std::shared_ptr<Page> NextPage() = 0;

virtual void set_max_page_header_size(uint32_t size) = 0;

protected:
// Callback that decides if we should skip a page or not.
DataPageFilter data_page_filter_;
};

class PARQUET_EXPORT ColumnReader {
Expand Down
Loading