Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback #14603

Merged
merged 14 commits into from
Jan 12, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,12 @@ class SerializedPageReader : public PageReader {

void set_max_page_header_size(uint32_t size) override { max_page_header_size_ = size; }

bool set_skip_page_callback(
std::function<bool(const DataPageStats*)> skip_page_callback) override {
skip_page_callback_ = skip_page_callback;
return true;
}

private:
void UpdateDecryption(const std::shared_ptr<Decryptor>& decryptor, int8_t module_type,
const std::string& page_aad);
Expand Down Expand Up @@ -304,6 +310,8 @@ class SerializedPageReader : public PageReader {
std::string data_page_header_aad_;
// Encryption
std::shared_ptr<ResizableBuffer> decryption_buffer_;
// Callback that decides if we should skip a page or not.
std::function<bool(const DataPageStats*)> skip_page_callback_;
};

void SerializedPageReader::InitDecryption() {
Expand Down Expand Up @@ -377,6 +385,7 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
}
}
}

// Advance the stream offset
PARQUET_THROW_NOT_OK(stream_->Advance(header_size));

Expand All @@ -386,6 +395,28 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
throw ParquetException("Invalid page header");
}

const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);
const bool is_data_page =
page_type == PageType::DATA_PAGE || page_type == PageType::DATA_PAGE_V2;

// Once we have the header, we will call the skip_page_call_back_ to
// determine if we should be skipping this page. If yes, we will advance the
// stream to the next page.
if (skip_page_callback_ && is_data_page) {
std::variant<format::DataPageHeader, format::DataPageHeaderV2> data_page_header;
if (page_type == PageType::DATA_PAGE) {
data_page_header = current_page_header_.data_page_header;
} else {
data_page_header = current_page_header_.data_page_header_v2;
}
std::unique_ptr<DataPageStats> data_page_stats =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one other thing to consider is whther statistics are correct:

bool ApplicationVersion::HasCorrectStatistics(Type::type col_type,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the caller should check that the stats are valid before setting a callback function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made that clear in the comments for setting the callback function.

DataPageStats::Make(&data_page_header);
if (skip_page_callback_(data_page_stats.get())) {
PARQUET_THROW_NOT_OK(stream_->Advance(compressed_len));
return NextPage();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it pay to refactor this a little bit so we can use iteration instead of recursion? (I think it would be good to avoid stack overflow for poorly constructed files that we filter out most pages for)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
}

if (crypto_ctx_.data_decryptor != nullptr) {
UpdateDecryption(crypto_ctx_.data_decryptor, encryption::kDictionaryPage,
data_page_aad_);
Expand All @@ -411,8 +442,8 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
page_buffer = decryption_buffer_;
}

const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);

// There is only one dictionary page per column, and that is the very first
// page before all the data pages.
if (page_type == PageType::DICTIONARY_PAGE) {
crypto_ctx_.start_decrypt_with_dictionary_page = false;
const format::DictionaryPageHeader& dict_header =
Expand Down
14 changes: 14 additions & 0 deletions cpp/src/parquet/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "parquet/properties.h"
#include "parquet/schema.h"
#include "parquet/types.h"
#include "parquet/metadata.h"

namespace arrow {

Expand Down Expand Up @@ -115,6 +116,19 @@ class PARQUET_EXPORT PageReader {
bool always_compressed = false,
const CryptoContext* ctx = NULLPTR);

// @returns: true if the skip callback was successfully set. Returns false
// if the callback was not set or not supported.
// If supported, NextPage() will use this callback to determine if it should
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there contexts where this wouldn't be supported? Why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not have a concrete example. Since this is an abstract class, I tried to make it a more general case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we don't have a failure case here in mind, then lets either make the method void (YAGNI) if we think there is a possible error, then Status should be returned. Similar question is does this need to be virtual?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// return or skip and move to the next page. If the callback function returns
// true the page must be skipped. The callback will be called only if the page
// type is DATA_PAGE or DATA_PAGE_V2. Dictionary pages must be read
// regardless.
// \note API EXPERIMENTAL
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some description of what happens when the file does not contain page statistics (e.g. because the writer did not write them)? Will the filter simply not be called? Or will it be called with some kind of "empty" statistics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I added a is_stats_set field that should be checked before using the filled encoded_stats.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I removed the is_stats_set, it will return nullptr for encoded_statistics if there are no statistics in the header. I added that to the comments.

virtual bool set_skip_page_callback(
std::function<bool(const DataPageStats*)> skip_page_callback) {
return false;
}

// @returns: shared_ptr<Page>(nullptr) on EOS, std::shared_ptr<Page>
// containing new Page otherwise
virtual std::shared_ptr<Page> NextPage() = 0;
Expand Down
186 changes: 185 additions & 1 deletion cpp/src/parquet/file_deserialize_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
#include "parquet/test_util.h"
#include "parquet/thrift_internal.h"
#include "parquet/types.h"

#include "arrow/io/memory.h"
#include "arrow/status.h"
#include "arrow/testing/gtest_util.h"
Expand All @@ -38,6 +37,7 @@
namespace parquet {

using ::arrow::io::BufferReader;
using ::parquet::DataPageStats;

// Adds page statistics occupying a certain amount of bytes (for testing very
// large page headers)
Expand Down Expand Up @@ -177,6 +177,190 @@ TEST_F(TestPageSerde, DataPageV1) {
ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_, current_page.get()));
}

// Templated test class to test both format::DataPageHeader and
// format::DataPageHeaderV2.
template <typename T>
class SkipPageTest : public TestPageSerde {
public:
const int kNumPages = 10;
void WriteStream();

protected:
std::vector<T> data_page_headers_;
int total_rows_ = 0;
};

template <>
void SkipPageTest<format::DataPageHeader>::WriteStream() {
for (int i = 0; i < kNumPages; ++i) {
// Vary the number of rows to produce different headers.
int32_t num_rows = i + 100;
total_rows_ += num_rows;
int data_size = i + 1024;
this->data_page_header_.__set_num_values(num_rows);
this->data_page_header_.statistics.__set_min_value("A");
this->data_page_header_.statistics.__set_max_value("Z");
this->data_page_header_.statistics.__set_null_count(0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be nice to vary the statistics a bit from page to page?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

this->data_page_header_.__isset.statistics = true;
ASSERT_NO_FATAL_FAILURE(
this->WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
data_page_headers_.push_back(this->data_page_header_);
// Also write data, to make sure we skip the data correctly.
std::vector<uint8_t> faux_data(data_size);
ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
}
this->EndStream();
}

template <>
void SkipPageTest<format::DataPageHeaderV2>::WriteStream() {
for (int i = 0; i < kNumPages; ++i) {
// Vary the number of rows to produce different headers.
int32_t num_rows = i + 100;
total_rows_ += num_rows;
int data_size = i + 1024;
this->data_page_header_v2_.__set_num_values(num_rows);
this->data_page_header_v2_.__set_num_rows(num_rows);
this->data_page_header_v2_.statistics.__set_min_value("A");
this->data_page_header_v2_.statistics.__set_max_value("Z");
this->data_page_header_v2_.statistics.__set_null_count(0);
this->data_page_header_v2_.__isset.statistics = true;
ASSERT_NO_FATAL_FAILURE(
this->WriteDataPageHeaderV2(/*max_serialized_len=*/1024, data_size, data_size));
data_page_headers_.push_back(this->data_page_header_v2_);
// Also write data, to make sure we skip the data correctly.
std::vector<uint8_t> faux_data(data_size);
ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
}
this->EndStream();
}

using DataPageHeaderTypes =
::testing::Types<format::DataPageHeader, format::DataPageHeaderV2>;
TYPED_TEST_SUITE(SkipPageTest, DataPageHeaderTypes);

// Test the accessors for DataPageStats for the different data page types.
TYPED_TEST(SkipPageTest, TestAccessors) {
this->WriteStream();
std::unique_ptr<DataPageStats> data_page_stats;
std::variant<format::DataPageHeader, format::DataPageHeaderV2> data_page_header;
if (std::is_same_v<TypeParam, format::DataPageHeader>) {
data_page_header = this->data_page_header_;
data_page_stats = DataPageStats::Make(&data_page_header);
ASSERT_EQ(data_page_stats->num_values(), 109);
ASSERT_THROW(data_page_stats->num_rows(), ParquetException);
ASSERT_EQ(data_page_stats->null_count(), 0);
ASSERT_EQ(data_page_stats->min_value(), "A");
ASSERT_EQ(data_page_stats->max_value(), "Z");
} else {
data_page_header = this->data_page_header_v2_;
data_page_stats = DataPageStats::Make(&data_page_header);
ASSERT_EQ(data_page_stats->num_values(), 109);
ASSERT_EQ(data_page_stats->num_rows(), 109);
ASSERT_EQ(data_page_stats->null_count(), 0);
ASSERT_EQ(data_page_stats->min_value(), "A");
ASSERT_EQ(data_page_stats->max_value(), "Z");
}
}

// Creates a number of pages and skips some of them with the skip page callback.
TYPED_TEST(SkipPageTest, TestSkipPageByCallback) {
this->WriteStream();

{ // Read all pages.
auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
this->page_reader_ =
PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);

// This callback will always return false.
auto read_all_pages = [](const DataPageStats* stats) -> bool { return false; };

this->page_reader_->set_skip_page_callback(read_all_pages);
for (int i = 0; i < this->kNumPages; ++i) {
std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
ASSERT_NE(current_page, nullptr);
ASSERT_NO_FATAL_FAILURE(
CheckDataPageHeader(this->data_page_headers_[i], current_page.get()));
}
ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
}
{ // Skip all pages.
auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
this->page_reader_ =
PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);

auto skip_all_pages = [](const DataPageStats* stats) -> bool { return true; };

this->page_reader_->set_skip_page_callback(skip_all_pages);
std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
}

{ // Skip every other page.
auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
this->page_reader_ =
PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);

// Skip pages with even number of values.
auto skip_all_pages = [](const DataPageStats* stats) -> bool {
if (stats->num_values() % 2 == 0) return true;
return false;
};

this->page_reader_->set_skip_page_callback(skip_all_pages);

for (int i = 0; i < this->kNumPages; ++i) {
// Only pages with odd number of values are read.
if (i % 2 != 0) {
std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
ASSERT_NE(current_page, nullptr);
ASSERT_NO_FATAL_FAILURE(
CheckDataPageHeader(this->data_page_headers_[i], current_page.get()));
}
}
// We should have exhausted reading the pages by reading the odd pages only.
ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
}
}

// Test that we do not skip dictionary pages.
TEST(SkipCallBack, DoesNotSkipDictionaryPages) {
std::shared_ptr<::arrow::io::BufferOutputStream> out_stream = CreateOutputStream();
ThriftSerializer serializer;
// Write a data page.
format::DataPageHeader data_page_header;
data_page_header.__set_num_values(100);
format::PageHeader data_page;
data_page.__set_data_page_header(data_page_header);
data_page.type = format::PageType::DATA_PAGE;
ASSERT_NO_THROW(serializer.Serialize(&data_page, out_stream.get()));
// Write a dictionary page.
format::DictionaryPageHeader dictionary_page_header;
dictionary_page_header.__set_num_values(100);
format::PageHeader dict_page;
dict_page.__set_dictionary_page_header(dictionary_page_header);
dict_page.type = format::PageType::DICTIONARY_PAGE;
ASSERT_NO_THROW(serializer.Serialize(&dict_page, out_stream.get()));
// Write another data page.
ASSERT_NO_THROW(serializer.Serialize(&data_page, out_stream.get()));
std::shared_ptr<Buffer> out_buffer;
PARQUET_ASSIGN_OR_THROW(out_buffer, out_stream->Finish());

// Try to read it back while asking for all data pages to be skipped.
auto stream = std::make_shared<::arrow::io::BufferReader>(out_buffer);
std::unique_ptr<PageReader> page_reader =
PageReader::Open(stream, /*num_rows=*/100, Compression::UNCOMPRESSED);
auto skip_all_pages = [](const DataPageStats* stats) -> bool { return true; };

page_reader->set_skip_page_callback(skip_all_pages);
// The first data page is skipped, so we are now at the dictionary page.
std::shared_ptr<Page> current_page = page_reader->NextPage();
ASSERT_NE(current_page, nullptr);
ASSERT_EQ(current_page->type(), PageType::DICTIONARY_PAGE);
// The data page after dictionary page is skipped.
ASSERT_EQ(page_reader->NextPage(), nullptr);
}

TEST_F(TestPageSerde, DataPageV2) {
int stats_size = 512;
const int32_t num_rows = 4444;
Expand Down
Loading