Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-34949: [C++][Parquet] Enable page index by columns #35230

Merged
merged 1 commit into from
Apr 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include <cstdint>
#include <functional>
#include <set>
#include <sstream>
#include <vector>

Expand Down Expand Up @@ -5235,7 +5236,8 @@ class ParquetPageIndexRoundTripTest : public ::testing::Test {
ASSERT_OK_AND_ASSIGN(buffer_, sink->Finish());
}

void ReadPageIndexes(int expect_num_row_groups, int expect_num_pages) {
void ReadPageIndexes(int expect_num_row_groups, int expect_num_pages,
const std::set<int>& expect_columns_without_index = {}) {
auto read_properties = default_arrow_reader_properties();
auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(buffer_));

Expand All @@ -5255,7 +5257,12 @@ class ParquetPageIndexRoundTripTest : public ::testing::Test {
column_indexes_.emplace_back(column_index.get());

auto offset_index = row_group_index_reader->GetOffsetIndex(col);
CheckOffsetIndex(offset_index.get(), expect_num_pages, &offset_lower_bound);
if (expect_columns_without_index.find(col) !=
expect_columns_without_index.cend()) {
ASSERT_EQ(offset_index, nullptr);
} else {
CheckOffsetIndex(offset_index.get(), expect_num_pages, &offset_lower_bound);
}
}
}
}
Expand Down Expand Up @@ -5425,5 +5432,32 @@ TEST_F(ParquetPageIndexRoundTripTest, DoubleWithNaNs) {
/* Page with only NaN values does not have column index built */}));
}

TEST_F(ParquetPageIndexRoundTripTest, EnablePerColumn) {
auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()),
::arrow::field("c1", ::arrow::int64()),
::arrow::field("c2", ::arrow::int64())});
auto writer_properties =
WriterProperties::Builder()
.enable_write_page_index() /* enable by default */
->enable_write_page_index("c0") /* enable c0 explicitly */
->disable_write_page_index("c1") /* disable c1 explicitly */
->build();
WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([[0, 1, 2]])"}));

ReadPageIndexes(/*expect_num_row_groups=*/1, /*expect_num_pages=*/1,
/*expect_columns_without_index=*/{1});

EXPECT_THAT(
column_indexes_,
::testing::ElementsAre(
ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(0)},
/*max_values=*/{encode_int64(0)}, BoundaryOrder::Ascending,
/*null_counts=*/{0}},
ColumnIndexObject{/* page index of c1 is disabled */},
ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{encode_int64(2)},
/*max_values=*/{encode_int64(2)}, BoundaryOrder::Ascending,
/*null_counts=*/{0}}));
}

} // namespace arrow
} // namespace parquet
2 changes: 1 addition & 1 deletion cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1368,7 +1368,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<

bool pages_change_on_record_boundaries() const {
return properties_->data_page_version() == ParquetDataPageVersion::V2 ||
properties_->write_page_index();
properties_->page_index_enabled(descr_->path());
}

private:
Expand Down
10 changes: 5 additions & 5 deletions cpp/src/parquet/file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,10 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
auto data_encryptor =
file_encryptor_ ? file_encryptor_->GetColumnDataEncryptor(path->ToDotString())
: nullptr;
auto ci_builder = page_index_builder_
auto ci_builder = page_index_builder_ && properties_->page_index_enabled(path)
? page_index_builder_->GetColumnIndexBuilder(column_ordinal)
: nullptr;
auto oi_builder = page_index_builder_
auto oi_builder = page_index_builder_ && properties_->page_index_enabled(path)
? page_index_builder_->GetOffsetIndexBuilder(column_ordinal)
: nullptr;
std::unique_ptr<PageWriter> pager = PageWriter::Open(
Expand Down Expand Up @@ -283,10 +283,10 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
auto data_encryptor =
file_encryptor_ ? file_encryptor_->GetColumnDataEncryptor(path->ToDotString())
: nullptr;
auto ci_builder = page_index_builder_
auto ci_builder = page_index_builder_ && properties_->page_index_enabled(path)
? page_index_builder_->GetColumnIndexBuilder(column_ordinal)
: nullptr;
auto oi_builder = page_index_builder_
auto oi_builder = page_index_builder_ && properties_->page_index_enabled(path)
? page_index_builder_->GetOffsetIndexBuilder(column_ordinal)
: nullptr;
std::unique_ptr<PageWriter> pager = PageWriter::Open(
Expand Down Expand Up @@ -505,7 +505,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
}
}

if (properties_->write_page_index()) {
if (properties_->page_index_enabled()) {
page_index_builder_ = PageIndexBuilder::Make(&schema_);
}
}
Expand Down
81 changes: 60 additions & 21 deletions cpp/src/parquet/properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,20 +138,23 @@ static constexpr int64_t DEFAULT_MAX_STATISTICS_SIZE = 4096;
static constexpr Encoding::type DEFAULT_ENCODING = Encoding::PLAIN;
static const char DEFAULT_CREATED_BY[] = CREATED_BY_VERSION;
static constexpr Compression::type DEFAULT_COMPRESSION_TYPE = Compression::UNCOMPRESSED;
static constexpr bool DEFAULT_IS_PAGE_INDEX_ENABLED = false;

class PARQUET_EXPORT ColumnProperties {
public:
ColumnProperties(Encoding::type encoding = DEFAULT_ENCODING,
Compression::type codec = DEFAULT_COMPRESSION_TYPE,
bool dictionary_enabled = DEFAULT_IS_DICTIONARY_ENABLED,
bool statistics_enabled = DEFAULT_ARE_STATISTICS_ENABLED,
size_t max_stats_size = DEFAULT_MAX_STATISTICS_SIZE)
size_t max_stats_size = DEFAULT_MAX_STATISTICS_SIZE,
bool page_index_enabled = DEFAULT_IS_PAGE_INDEX_ENABLED)
: encoding_(encoding),
codec_(codec),
dictionary_enabled_(dictionary_enabled),
statistics_enabled_(statistics_enabled),
max_stats_size_(max_stats_size),
compression_level_(Codec::UseDefaultCompressionLevel()) {}
compression_level_(Codec::UseDefaultCompressionLevel()),
page_index_enabled_(DEFAULT_IS_PAGE_INDEX_ENABLED) {}

void set_encoding(Encoding::type encoding) { encoding_ = encoding; }

Expand All @@ -173,6 +176,10 @@ class PARQUET_EXPORT ColumnProperties {
compression_level_ = compression_level;
}

void set_page_index_enabled(bool page_index_enabled) {
page_index_enabled_ = page_index_enabled;
}

Encoding::type encoding() const { return encoding_; }

Compression::type compression() const { return codec_; }
Expand All @@ -185,13 +192,16 @@ class PARQUET_EXPORT ColumnProperties {

int compression_level() const { return compression_level_; }

bool page_index_enabled() const { return page_index_enabled_; }

private:
Encoding::type encoding_;
Compression::type codec_;
bool dictionary_enabled_;
bool statistics_enabled_;
size_t max_stats_size_;
int compression_level_;
bool page_index_enabled_;
};

class PARQUET_EXPORT WriterProperties {
Expand All @@ -208,8 +218,7 @@ class PARQUET_EXPORT WriterProperties {
data_page_version_(ParquetDataPageVersion::V1),
created_by_(DEFAULT_CREATED_BY),
store_decimal_as_integer_(false),
page_checksum_enabled_(false),
write_page_index_(false) {}
page_checksum_enabled_(false) {}
virtual ~Builder() {}

/// Specify the memory pool for the writer. Default default_memory_pool.
Expand Down Expand Up @@ -502,28 +511,46 @@ class PARQUET_EXPORT WriterProperties {
return this;
}

/// Enable writing page index.
/// Enable writing page index in general for all columns. Default disabled.
///
/// Page index contains statistics for data pages and can be used to skip pages
/// when scanning data in ordered and unordered columns.
///
/// Please check the link below for more details:
/// https://github.com/apache/parquet-format/blob/master/PageIndex.md
///
/// Default disabled.
Builder* enable_write_page_index() {
write_page_index_ = true;
default_column_properties_.set_page_index_enabled(true);
return this;
}

/// Disable writing page index.
///
/// Default disabled.
/// Disable writing page index in general for all columns. Default disabled.
Builder* disable_write_page_index() {
write_page_index_ = false;
default_column_properties_.set_page_index_enabled(false);
return this;
}

/// Enable writing page index for column specified by `path`. Default disabled.
Builder* enable_write_page_index(const std::string& path) {
page_index_enabled_[path] = true;
return this;
}

/// Enable writing page index for column specified by `path`. Default disabled.
Builder* enable_write_page_index(const std::shared_ptr<schema::ColumnPath>& path) {
return this->enable_write_page_index(path->ToDotString());
}

/// Disable writing page index for column specified by `path`. Default disabled.
Builder* disable_write_page_index(const std::string& path) {
page_index_enabled_[path] = false;
return this;
}

/// Disable writing page index for column specified by `path`. Default disabled.
Builder* disable_write_page_index(const std::shared_ptr<schema::ColumnPath>& path) {
return this->disable_write_page_index(path->ToDotString());
}

/// \brief Build the WriterProperties with the builder parameters.
/// \return The WriterProperties defined by the builder.
std::shared_ptr<WriterProperties> build() {
Expand All @@ -544,13 +571,14 @@ class PARQUET_EXPORT WriterProperties {
get(item.first).set_dictionary_enabled(item.second);
for (const auto& item : statistics_enabled_)
get(item.first).set_statistics_enabled(item.second);
for (const auto& item : page_index_enabled_)
get(item.first).set_page_index_enabled(item.second);

return std::shared_ptr<WriterProperties>(new WriterProperties(
pool_, dictionary_pagesize_limit_, write_batch_size_, max_row_group_length_,
pagesize_, version_, created_by_, page_checksum_enabled_,
std::move(file_encryption_properties_), default_column_properties_,
column_properties, data_page_version_, store_decimal_as_integer_,
write_page_index_));
column_properties, data_page_version_, store_decimal_as_integer_));
}

private:
Expand All @@ -564,7 +592,6 @@ class PARQUET_EXPORT WriterProperties {
std::string created_by_;
bool store_decimal_as_integer_;
bool page_checksum_enabled_;
bool write_page_index_;

std::shared_ptr<FileEncryptionProperties> file_encryption_properties_;

Expand All @@ -575,6 +602,7 @@ class PARQUET_EXPORT WriterProperties {
std::unordered_map<std::string, int32_t> codecs_compression_level_;
std::unordered_map<std::string, bool> dictionary_enabled_;
std::unordered_map<std::string, bool> statistics_enabled_;
std::unordered_map<std::string, bool> page_index_enabled_;
};

inline MemoryPool* memory_pool() const { return pool_; }
Expand All @@ -599,8 +627,6 @@ class PARQUET_EXPORT WriterProperties {

inline bool page_checksum_enabled() const { return page_checksum_enabled_; }

inline bool write_page_index() const { return write_page_index_; }

inline Encoding::type dictionary_index_encoding() const {
if (parquet_version_ == ParquetVersion::PARQUET_1_0) {
return Encoding::PLAIN_DICTIONARY;
Expand Down Expand Up @@ -648,6 +674,22 @@ class PARQUET_EXPORT WriterProperties {
return column_properties(path).max_statistics_size();
}

bool page_index_enabled(const std::shared_ptr<schema::ColumnPath>& path) const {
return column_properties(path).page_index_enabled();
}

bool page_index_enabled() const {
if (default_column_properties_.page_index_enabled()) {
return true;
}
for (const auto& item : column_properties_) {
if (item.second.page_index_enabled()) {
return true;
}
}
return false;
}

inline FileEncryptionProperties* file_encryption_properties() const {
return file_encryption_properties_.get();
}
Expand All @@ -669,8 +711,7 @@ class PARQUET_EXPORT WriterProperties {
std::shared_ptr<FileEncryptionProperties> file_encryption_properties,
const ColumnProperties& default_column_properties,
const std::unordered_map<std::string, ColumnProperties>& column_properties,
ParquetDataPageVersion data_page_version, bool store_short_decimal_as_integer,
bool write_page_index)
ParquetDataPageVersion data_page_version, bool store_short_decimal_as_integer)
: pool_(pool),
dictionary_pagesize_limit_(dictionary_pagesize_limit),
write_batch_size_(write_batch_size),
Expand All @@ -681,7 +722,6 @@ class PARQUET_EXPORT WriterProperties {
parquet_created_by_(created_by),
store_decimal_as_integer_(store_short_decimal_as_integer),
page_checksum_enabled_(page_write_checksum_enabled),
write_page_index_(write_page_index),
file_encryption_properties_(file_encryption_properties),
default_column_properties_(default_column_properties),
column_properties_(column_properties) {}
Expand All @@ -696,7 +736,6 @@ class PARQUET_EXPORT WriterProperties {
std::string parquet_created_by_;
bool store_decimal_as_integer_;
bool page_checksum_enabled_;
bool write_page_index_;

std::shared_ptr<FileEncryptionProperties> file_encryption_properties_;

Expand Down