Skip to content

Commit

Permalink
fix: update stats on subsequent batches of dictionaries
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 committed Jan 3, 2023
1 parent ec9a8a3 commit 10e5a7c
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 25 deletions.
69 changes: 69 additions & 0 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ using arrow::DataType;
using arrow::Datum;
using arrow::DecimalType;
using arrow::default_memory_pool;
using arrow::DictionaryArray;
using arrow::ListArray;
using arrow::PrimitiveArray;
using arrow::ResizableBuffer;
Expand Down Expand Up @@ -4138,6 +4139,74 @@ TEST_P(TestArrowWriteDictionary, Statistics) {
INSTANTIATE_TEST_SUITE_P(WriteDictionary, TestArrowWriteDictionary,
::testing::Values(ParquetDataPageVersion::V1,
ParquetDataPageVersion::V2));

TEST_P(TestArrowWriteDictionary, StatisticsUnifiedDictionary) {
// Two chunks, with a shared dictionary
std::shared_ptr<::arrow::Table> table;
std::shared_ptr<::arrow::DataType> dict_type =
::arrow::dictionary(::arrow::int32(), ::arrow::utf8());
std::shared_ptr<::arrow::Schema> schema =
::arrow::schema({::arrow::field("values", dict_type)});
{
// It's important there are no duplicate values in the dictionary, otherwise
// we trigger the WriteDense() code path which side-steps dictionary encoding.
std::shared_ptr<::arrow::Array> test_dictionary =
ArrayFromJSON(::arrow::utf8(), R"(["b", "c", "d", "a"])");
std::vector<std::shared_ptr<::arrow::Array>> test_indices = {
ArrayFromJSON(::arrow::int32(),
R"([0, null, 3, 0, null, 3])"), // ["b", null "a", "b", null, "a"]
ArrayFromJSON(
::arrow::int32(),
R"([0, 3, null, 0, null, 1])")}; // ["b", "c", null, "b", "c", null]

::arrow::ArrayVector chunks = {
std::make_shared<DictionaryArray>(dict_type, test_indices[0], test_dictionary),
std::make_shared<DictionaryArray>(dict_type, test_indices[1], test_dictionary),
};
std::shared_ptr<ChunkedArray> arr = std::make_shared<ChunkedArray>(chunks, dict_type);
table = ::arrow::Table::Make(schema, {arr});
}

std::shared_ptr<::arrow::ResizableBuffer> serialized_data = AllocateBuffer();
auto out_stream = std::make_shared<::arrow::io::BufferOutputStream>(serialized_data);
{
// Will write data as two row groups, one with 9 rows and one with 3.
std::shared_ptr<WriterProperties> writer_properties =
WriterProperties::Builder()
.max_row_group_length(9)
->data_page_version(this->GetParquetDataPageVersion())
->write_batch_size(3)
->data_pagesize(3)
->build();
std::unique_ptr<FileWriter> writer;
ASSERT_OK_AND_ASSIGN(
writer, FileWriter::Open(*schema, ::arrow::default_memory_pool(), out_stream,
writer_properties, default_arrow_writer_properties()));
ASSERT_OK(writer->WriteTable(*table, std::numeric_limits<int64_t>::max()));
ASSERT_OK(writer->Close());
ASSERT_OK(out_stream->Close());
}

auto buffer_reader = std::make_shared<::arrow::io::BufferReader>(serialized_data);
std::unique_ptr<ParquetFileReader> parquet_reader =
ParquetFileReader::Open(std::move(buffer_reader));
// Check row group statistics
std::shared_ptr<FileMetaData> metadata = parquet_reader->metadata();
ASSERT_EQ(metadata->num_row_groups(), 2);
ASSERT_EQ(metadata->RowGroup(0)->num_rows(), 9);
ASSERT_EQ(metadata->RowGroup(1)->num_rows(), 3);
auto stats0 = metadata->RowGroup(0)->ColumnChunk(0)->statistics();
auto stats1 = metadata->RowGroup(1)->ColumnChunk(0)->statistics();
ASSERT_EQ(stats0->num_values(), 6);
ASSERT_EQ(stats1->num_values(), 2);
ASSERT_EQ(stats0->null_count(), 3);
ASSERT_EQ(stats1->null_count(), 1);
ASSERT_EQ(stats0->EncodeMin(), "a");
ASSERT_EQ(stats1->EncodeMin(), "b");
ASSERT_EQ(stats0->EncodeMax(), "b");
ASSERT_EQ(stats1->EncodeMax(), "c");
}

// ----------------------------------------------------------------------
// Tests for directly reading DictionaryArray

Expand Down
59 changes: 34 additions & 25 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1479,6 +1479,34 @@ Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
value_offset += batch_num_spaced_values;
};

auto update_stats = [&]() {
// TODO(PARQUET-2068) This approach may make two copies. First, a copy of the
// indices array to a (hopefully smaller) referenced indices array. Second, a copy
// of the values array to a (probably not smaller) referenced values array.
//
// Once the MinMax kernel supports all data types we should use that kernel instead
// as it does not make any copies.
::arrow::compute::ExecContext exec_ctx(ctx->memory_pool);
exec_ctx.set_use_threads(false);
PARQUET_ASSIGN_OR_THROW(::arrow::Datum referenced_indices,
::arrow::compute::Unique(*indices, &exec_ctx));
std::shared_ptr<::arrow::Array> referenced_dictionary;
if (referenced_indices.length() == dictionary->length()) {
referenced_dictionary = dictionary;
} else {
PARQUET_ASSIGN_OR_THROW(
::arrow::Datum referenced_dictionary_datum,
::arrow::compute::Take(dictionary, referenced_indices,
::arrow::compute::TakeOptions(/*boundscheck=*/false),
&exec_ctx));
referenced_dictionary = referenced_dictionary_datum.make_array();
}
int64_t non_null_count = indices->length() - indices->null_count();
page_statistics_->IncrementNullCount(num_levels - non_null_count);
page_statistics_->IncrementNumValues(non_null_count);
page_statistics_->Update(*referenced_dictionary, /*update_counts=*/false);
};

// Handle seeing dictionary for the first time
if (!preserved_dictionary_) {
// It's a new dictionary. Call PutDictionary and keep track of it
Expand All @@ -1493,37 +1521,18 @@ Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
}

if (page_statistics_ != nullptr) {
// TODO(PARQUET-2068) This approach may make two copies. First, a copy of the
// indices array to a (hopefully smaller) referenced indices array. Second, a copy
// of the values array to a (probably not smaller) referenced values array.
//
// Once the MinMax kernel supports all data types we should use that kernel instead
// as it does not make any copies.
::arrow::compute::ExecContext exec_ctx(ctx->memory_pool);
exec_ctx.set_use_threads(false);
PARQUET_ASSIGN_OR_THROW(::arrow::Datum referenced_indices,
::arrow::compute::Unique(*indices, &exec_ctx));
std::shared_ptr<::arrow::Array> referenced_dictionary;
if (referenced_indices.length() == dictionary->length()) {
referenced_dictionary = dictionary;
} else {
PARQUET_ASSIGN_OR_THROW(
::arrow::Datum referenced_dictionary_datum,
::arrow::compute::Take(dictionary, referenced_indices,
::arrow::compute::TakeOptions(/*boundscheck=*/false),
&exec_ctx));
referenced_dictionary = referenced_dictionary_datum.make_array();
}
int64_t non_null_count = indices->length() - indices->null_count();
page_statistics_->IncrementNullCount(num_levels - non_null_count);
page_statistics_->IncrementNumValues(non_null_count);
page_statistics_->Update(*referenced_dictionary, /*update_counts=*/false);
update_stats();
}
preserved_dictionary_ = dictionary;
} else if (!dictionary->Equals(*preserved_dictionary_)) {
// Dictionary has changed
PARQUET_CATCH_NOT_OK(FallbackToPlainEncoding());
return WriteDense();
} else {
// Dictionary is same, but we need to update stats
if (page_statistics_ != nullptr) {
update_stats();
}
}

PARQUET_CATCH_NOT_OK(
Expand Down
1 change: 1 addition & 0 deletions cpp/src/parquet/statistics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <utility>

#include "arrow/array.h"
#include "arrow/compute/api.h"
#include "arrow/type.h"
#include "arrow/type_traits.h"
#include "arrow/util/bit_run_reader.h"
Expand Down

0 comments on commit 10e5a7c

Please sign in to comment.