Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-15042: [C++][Parquet] Update stats on subsequent batches of dictionaries #15179

Merged
merged 4 commits into from
Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ using arrow::DataType;
using arrow::Datum;
using arrow::DecimalType;
using arrow::default_memory_pool;
using arrow::DictionaryArray;
using arrow::ListArray;
using arrow::PrimitiveArray;
using arrow::ResizableBuffer;
Expand Down Expand Up @@ -4138,6 +4139,74 @@ TEST_P(TestArrowWriteDictionary, Statistics) {
INSTANTIATE_TEST_SUITE_P(WriteDictionary, TestArrowWriteDictionary,
::testing::Values(ParquetDataPageVersion::V1,
ParquetDataPageVersion::V2));

TEST_P(TestArrowWriteDictionary, StatisticsUnifiedDictionary) {
// Two chunks, with a shared dictionary
std::shared_ptr<::arrow::Table> table;
std::shared_ptr<::arrow::DataType> dict_type =
::arrow::dictionary(::arrow::int32(), ::arrow::utf8());
std::shared_ptr<::arrow::Schema> schema =
::arrow::schema({::arrow::field("values", dict_type)});
{
// It's important there are no duplicate values in the dictionary, otherwise
// we trigger the WriteDense() code path which side-steps dictionary encoding.
std::shared_ptr<::arrow::Array> test_dictionary =
ArrayFromJSON(::arrow::utf8(), R"(["b", "c", "d", "a"])");
std::vector<std::shared_ptr<::arrow::Array>> test_indices = {
ArrayFromJSON(::arrow::int32(),
R"([0, null, 3, 0, null, 3])"), // ["b", null "a", "b", null, "a"]
ArrayFromJSON(
::arrow::int32(),
R"([0, 3, null, 0, null, 1])")}; // ["b", "c", null, "b", "c", null]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
R"([0, 3, null, 0, null, 1])")}; // ["b", "c", null, "b", "c", null]
R"([0, 1, null, 0, 1, null])")}; // ["b", "c", null, "b", "c", null]

I like what you have in the comment because then the min/max of row group 0 / chunk 0 is different from row group 0 / chunk 1. Right now it looks like your indices don't match your comment and we have:

// ["b", null, "a", "b", null, "c"]

This leads to a/b being the min/max in stats0 but a/b is the min/max in both chunks of stats0. To reproduce I think we want what you have in the comment which would mean chunk 0 is a/b and chunk 1 is b/c and so stats0 should be a/c.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct; I updated the data but forgot the comment. I did a weird thing where the chunks are 6/6 but the row groups are 9/3, anticipating that hit more interesting conditions in the writer (but maybe this is unnecessary).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I like the data as is. The first row group contains the first chunk plus the first three rows of the next chunk. Then the second group contains the last three elements of the second chunk. I've updated the comment so it is accurate again.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Write...so I think (but could be wrong) this would lead to three calls to WriteArrowDictionary:

Call #1: (no previous dictionary) min=a, max=b, nulls=2
Call #2: (previous dictionary is equal) min=a, max=b, nulls=1
Call #3: (no previous dictionary) min=b, max=c, nulls=1

So if the bug was still in place, and it was using the first chunk to determine row-group statistics, it would still get the correct answer in this case.

Admittedly, the null count would still be wrong (it would report 2 nulls for stat0), so the test case itself wouldn't pass with the old code. But I think it would get further than it should.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes I see now. You are correct (just verified in lldb). I will change it so call 1 and 2 will have a different max.

(Also funny to realize how much PR 1, 2, and 3 of this repo have been mentioned 🤣 )


::arrow::ArrayVector chunks = {
std::make_shared<DictionaryArray>(dict_type, test_indices[0], test_dictionary),
std::make_shared<DictionaryArray>(dict_type, test_indices[1], test_dictionary),
};
std::shared_ptr<ChunkedArray> arr = std::make_shared<ChunkedArray>(chunks, dict_type);
table = ::arrow::Table::Make(schema, {arr});
}

std::shared_ptr<::arrow::ResizableBuffer> serialized_data = AllocateBuffer();
auto out_stream = std::make_shared<::arrow::io::BufferOutputStream>(serialized_data);
{
// Will write data as two row groups, one with 9 rows and one with 3.
std::shared_ptr<WriterProperties> writer_properties =
WriterProperties::Builder()
.max_row_group_length(9)
->data_page_version(this->GetParquetDataPageVersion())
->write_batch_size(3)
->data_pagesize(3)
->build();
std::unique_ptr<FileWriter> writer;
ASSERT_OK_AND_ASSIGN(
writer, FileWriter::Open(*schema, ::arrow::default_memory_pool(), out_stream,
writer_properties, default_arrow_writer_properties()));
ASSERT_OK(writer->WriteTable(*table, std::numeric_limits<int64_t>::max()));
ASSERT_OK(writer->Close());
ASSERT_OK(out_stream->Close());
}

auto buffer_reader = std::make_shared<::arrow::io::BufferReader>(serialized_data);
std::unique_ptr<ParquetFileReader> parquet_reader =
ParquetFileReader::Open(std::move(buffer_reader));
// Check row group statistics
std::shared_ptr<FileMetaData> metadata = parquet_reader->metadata();
ASSERT_EQ(metadata->num_row_groups(), 2);
ASSERT_EQ(metadata->RowGroup(0)->num_rows(), 9);
ASSERT_EQ(metadata->RowGroup(1)->num_rows(), 3);
auto stats0 = metadata->RowGroup(0)->ColumnChunk(0)->statistics();
auto stats1 = metadata->RowGroup(1)->ColumnChunk(0)->statistics();
ASSERT_EQ(stats0->num_values(), 6);
ASSERT_EQ(stats1->num_values(), 2);
ASSERT_EQ(stats0->null_count(), 3);
ASSERT_EQ(stats1->null_count(), 1);
ASSERT_EQ(stats0->EncodeMin(), "a");
ASSERT_EQ(stats1->EncodeMin(), "b");
ASSERT_EQ(stats0->EncodeMax(), "b");
ASSERT_EQ(stats1->EncodeMax(), "c");
}

// ----------------------------------------------------------------------
// Tests for directly reading DictionaryArray

Expand Down
68 changes: 43 additions & 25 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1479,6 +1479,43 @@ Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
value_offset += batch_num_spaced_values;
};

auto update_stats = [&]() {
// TODO(PARQUET-2068) This approach may make two copies. First, a copy of the
// indices array to a (hopefully smaller) referenced indices array. Second, a copy
// of the values array to a (probably not smaller) referenced values array.
//
// Once the MinMax kernel supports all data types we should use that kernel instead
// as it does not make any copies.
::arrow::compute::ExecContext exec_ctx(ctx->memory_pool);
exec_ctx.set_use_threads(false);

std::shared_ptr<::arrow::Array> referenced_dictionary;
// If dictionary is the same dictionary we already have, just use that
if (preserved_dictionary_ && preserved_dictionary_ == dictionary) {
referenced_dictionary = preserved_dictionary_;
} else {
PARQUET_ASSIGN_OR_THROW(::arrow::Datum referenced_indices,
::arrow::compute::Unique(*indices, &exec_ctx));

// On first run, we might be able to re-use the existing dictionary
if (referenced_indices.length() == dictionary->length()) {
referenced_dictionary = dictionary;
} else {
PARQUET_ASSIGN_OR_THROW(
::arrow::Datum referenced_dictionary_datum,
::arrow::compute::Take(dictionary, referenced_indices,
::arrow::compute::TakeOptions(/*boundscheck=*/false),
&exec_ctx));
referenced_dictionary = referenced_dictionary_datum.make_array();
}
}

int64_t non_null_count = indices->length() - indices->null_count();
page_statistics_->IncrementNullCount(num_levels - non_null_count);
page_statistics_->IncrementNumValues(non_null_count);
page_statistics_->Update(*referenced_dictionary, /*update_counts=*/false);
};

// Handle seeing dictionary for the first time
if (!preserved_dictionary_) {
// It's a new dictionary. Call PutDictionary and keep track of it
Expand All @@ -1493,37 +1530,18 @@ Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
}

if (page_statistics_ != nullptr) {
// TODO(PARQUET-2068) This approach may make two copies. First, a copy of the
// indices array to a (hopefully smaller) referenced indices array. Second, a copy
// of the values array to a (probably not smaller) referenced values array.
//
// Once the MinMax kernel supports all data types we should use that kernel instead
// as it does not make any copies.
::arrow::compute::ExecContext exec_ctx(ctx->memory_pool);
exec_ctx.set_use_threads(false);
PARQUET_ASSIGN_OR_THROW(::arrow::Datum referenced_indices,
::arrow::compute::Unique(*indices, &exec_ctx));
std::shared_ptr<::arrow::Array> referenced_dictionary;
if (referenced_indices.length() == dictionary->length()) {
referenced_dictionary = dictionary;
} else {
PARQUET_ASSIGN_OR_THROW(
::arrow::Datum referenced_dictionary_datum,
::arrow::compute::Take(dictionary, referenced_indices,
::arrow::compute::TakeOptions(/*boundscheck=*/false),
&exec_ctx));
referenced_dictionary = referenced_dictionary_datum.make_array();
}
int64_t non_null_count = indices->length() - indices->null_count();
page_statistics_->IncrementNullCount(num_levels - non_null_count);
page_statistics_->IncrementNumValues(non_null_count);
page_statistics_->Update(*referenced_dictionary, /*update_counts=*/false);
update_stats();
}
preserved_dictionary_ = dictionary;
} else if (!dictionary->Equals(*preserved_dictionary_)) {
// Dictionary has changed
PARQUET_CATCH_NOT_OK(FallbackToPlainEncoding());
return WriteDense();
} else {
// Dictionary is same, but we need to update stats
if (page_statistics_ != nullptr) {
update_stats();
}
}

PARQUET_CATCH_NOT_OK(
Expand Down