From a704c6d2dd51480690aa6b6d6548e102f33a785e Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 2 Jan 2023 09:55:10 -0800 Subject: [PATCH 1/4] fix: update stats on subsequent batches of dictionaries --- .../parquet/arrow/arrow_reader_writer_test.cc | 69 +++++++++++++++++++ cpp/src/parquet/column_writer.cc | 59 +++++++++------- 2 files changed, 103 insertions(+), 25 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 247681eec8096..faee01b11a8ed 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -80,6 +80,7 @@ using arrow::DataType; using arrow::Datum; using arrow::DecimalType; using arrow::default_memory_pool; +using arrow::DictionaryArray; using arrow::ListArray; using arrow::PrimitiveArray; using arrow::ResizableBuffer; @@ -4138,6 +4139,74 @@ TEST_P(TestArrowWriteDictionary, Statistics) { INSTANTIATE_TEST_SUITE_P(WriteDictionary, TestArrowWriteDictionary, ::testing::Values(ParquetDataPageVersion::V1, ParquetDataPageVersion::V2)); + +TEST_P(TestArrowWriteDictionary, StatisticsUnifiedDictionary) { + // Two chunks, with a shared dictionary + std::shared_ptr<::arrow::Table> table; + std::shared_ptr<::arrow::DataType> dict_type = + ::arrow::dictionary(::arrow::int32(), ::arrow::utf8()); + std::shared_ptr<::arrow::Schema> schema = + ::arrow::schema({::arrow::field("values", dict_type)}); + { + // It's important there are no duplicate values in the dictionary, otherwise + // we trigger the WriteDense() code path which side-steps dictionary encoding. + std::shared_ptr<::arrow::Array> test_dictionary = + ArrayFromJSON(::arrow::utf8(), R"(["b", "c", "d", "a"])"); + std::vector> test_indices = { + ArrayFromJSON(::arrow::int32(), + R"([0, null, 3, 0, null, 3])"), // ["b", null "a", "b", null, "a"] + ArrayFromJSON( + ::arrow::int32(), + R"([0, 3, null, 0, null, 1])")}; // ["b", "c", null, "b", "c", null] + + ::arrow::ArrayVector chunks = { + std::make_shared(dict_type, test_indices[0], test_dictionary), + std::make_shared(dict_type, test_indices[1], test_dictionary), + }; + std::shared_ptr arr = std::make_shared(chunks, dict_type); + table = ::arrow::Table::Make(schema, {arr}); + } + + std::shared_ptr<::arrow::ResizableBuffer> serialized_data = AllocateBuffer(); + auto out_stream = std::make_shared<::arrow::io::BufferOutputStream>(serialized_data); + { + // Will write data as two row groups, one with 9 rows and one with 3. + std::shared_ptr writer_properties = + WriterProperties::Builder() + .max_row_group_length(9) + ->data_page_version(this->GetParquetDataPageVersion()) + ->write_batch_size(3) + ->data_pagesize(3) + ->build(); + std::unique_ptr writer; + ASSERT_OK_AND_ASSIGN( + writer, FileWriter::Open(*schema, ::arrow::default_memory_pool(), out_stream, + writer_properties, default_arrow_writer_properties())); + ASSERT_OK(writer->WriteTable(*table, std::numeric_limits::max())); + ASSERT_OK(writer->Close()); + ASSERT_OK(out_stream->Close()); + } + + auto buffer_reader = std::make_shared<::arrow::io::BufferReader>(serialized_data); + std::unique_ptr parquet_reader = + ParquetFileReader::Open(std::move(buffer_reader)); + // Check row group statistics + std::shared_ptr metadata = parquet_reader->metadata(); + ASSERT_EQ(metadata->num_row_groups(), 2); + ASSERT_EQ(metadata->RowGroup(0)->num_rows(), 9); + ASSERT_EQ(metadata->RowGroup(1)->num_rows(), 3); + auto stats0 = metadata->RowGroup(0)->ColumnChunk(0)->statistics(); + auto stats1 = metadata->RowGroup(1)->ColumnChunk(0)->statistics(); + ASSERT_EQ(stats0->num_values(), 6); + ASSERT_EQ(stats1->num_values(), 2); + ASSERT_EQ(stats0->null_count(), 3); + ASSERT_EQ(stats1->null_count(), 1); + ASSERT_EQ(stats0->EncodeMin(), "a"); + ASSERT_EQ(stats1->EncodeMin(), "b"); + ASSERT_EQ(stats0->EncodeMax(), "b"); + ASSERT_EQ(stats1->EncodeMax(), "c"); +} + // ---------------------------------------------------------------------- // Tests for directly reading DictionaryArray diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 9057cc8f2f3ed..7e34d7bd1bca7 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -1479,6 +1479,34 @@ Status TypedColumnWriterImpl::WriteArrowDictionary( value_offset += batch_num_spaced_values; }; + auto update_stats = [&]() { + // TODO(PARQUET-2068) This approach may make two copies. First, a copy of the + // indices array to a (hopefully smaller) referenced indices array. Second, a copy + // of the values array to a (probably not smaller) referenced values array. + // + // Once the MinMax kernel supports all data types we should use that kernel instead + // as it does not make any copies. + ::arrow::compute::ExecContext exec_ctx(ctx->memory_pool); + exec_ctx.set_use_threads(false); + PARQUET_ASSIGN_OR_THROW(::arrow::Datum referenced_indices, + ::arrow::compute::Unique(*indices, &exec_ctx)); + std::shared_ptr<::arrow::Array> referenced_dictionary; + if (referenced_indices.length() == dictionary->length()) { + referenced_dictionary = dictionary; + } else { + PARQUET_ASSIGN_OR_THROW( + ::arrow::Datum referenced_dictionary_datum, + ::arrow::compute::Take(dictionary, referenced_indices, + ::arrow::compute::TakeOptions(/*boundscheck=*/false), + &exec_ctx)); + referenced_dictionary = referenced_dictionary_datum.make_array(); + } + int64_t non_null_count = indices->length() - indices->null_count(); + page_statistics_->IncrementNullCount(num_levels - non_null_count); + page_statistics_->IncrementNumValues(non_null_count); + page_statistics_->Update(*referenced_dictionary, /*update_counts=*/false); + }; + // Handle seeing dictionary for the first time if (!preserved_dictionary_) { // It's a new dictionary. Call PutDictionary and keep track of it @@ -1493,37 +1521,18 @@ Status TypedColumnWriterImpl::WriteArrowDictionary( } if (page_statistics_ != nullptr) { - // TODO(PARQUET-2068) This approach may make two copies. First, a copy of the - // indices array to a (hopefully smaller) referenced indices array. Second, a copy - // of the values array to a (probably not smaller) referenced values array. - // - // Once the MinMax kernel supports all data types we should use that kernel instead - // as it does not make any copies. - ::arrow::compute::ExecContext exec_ctx(ctx->memory_pool); - exec_ctx.set_use_threads(false); - PARQUET_ASSIGN_OR_THROW(::arrow::Datum referenced_indices, - ::arrow::compute::Unique(*indices, &exec_ctx)); - std::shared_ptr<::arrow::Array> referenced_dictionary; - if (referenced_indices.length() == dictionary->length()) { - referenced_dictionary = dictionary; - } else { - PARQUET_ASSIGN_OR_THROW( - ::arrow::Datum referenced_dictionary_datum, - ::arrow::compute::Take(dictionary, referenced_indices, - ::arrow::compute::TakeOptions(/*boundscheck=*/false), - &exec_ctx)); - referenced_dictionary = referenced_dictionary_datum.make_array(); - } - int64_t non_null_count = indices->length() - indices->null_count(); - page_statistics_->IncrementNullCount(num_levels - non_null_count); - page_statistics_->IncrementNumValues(non_null_count); - page_statistics_->Update(*referenced_dictionary, /*update_counts=*/false); + update_stats(); } preserved_dictionary_ = dictionary; } else if (!dictionary->Equals(*preserved_dictionary_)) { // Dictionary has changed PARQUET_CATCH_NOT_OK(FallbackToPlainEncoding()); return WriteDense(); + } else { + // Dictionary is same, but we need to update stats + if (page_statistics_ != nullptr) { + update_stats(); + } } PARQUET_CATCH_NOT_OK( From 07f205e85f8d2fded5eb6b1ef2e9a5d807b2b3de Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 4 Jan 2023 11:56:53 -0800 Subject: [PATCH 2/4] feat: add optimization --- cpp/src/parquet/column_writer.cc | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 7e34d7bd1bca7..16f6c36fac336 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -1488,19 +1488,28 @@ Status TypedColumnWriterImpl::WriteArrowDictionary( // as it does not make any copies. ::arrow::compute::ExecContext exec_ctx(ctx->memory_pool); exec_ctx.set_use_threads(false); - PARQUET_ASSIGN_OR_THROW(::arrow::Datum referenced_indices, - ::arrow::compute::Unique(*indices, &exec_ctx)); + std::shared_ptr<::arrow::Array> referenced_dictionary; - if (referenced_indices.length() == dictionary->length()) { - referenced_dictionary = dictionary; + // If dictionary is the same dictionary we already have, just use that + if (preserved_dictionary_ && preserved_dictionary_ == dictionary) { + referenced_dictionary = preserved_dictionary_; } else { - PARQUET_ASSIGN_OR_THROW( - ::arrow::Datum referenced_dictionary_datum, - ::arrow::compute::Take(dictionary, referenced_indices, - ::arrow::compute::TakeOptions(/*boundscheck=*/false), - &exec_ctx)); - referenced_dictionary = referenced_dictionary_datum.make_array(); + PARQUET_ASSIGN_OR_THROW(::arrow::Datum referenced_indices, + ::arrow::compute::Unique(*indices, &exec_ctx)); + + // On first run, we might be able to re-use the existing dictionary + if (referenced_indices.length() == dictionary->length()) { + referenced_dictionary = dictionary; + } else { + PARQUET_ASSIGN_OR_THROW( + ::arrow::Datum referenced_dictionary_datum, + ::arrow::compute::Take(dictionary, referenced_indices, + ::arrow::compute::TakeOptions(/*boundscheck=*/false), + &exec_ctx)); + referenced_dictionary = referenced_dictionary_datum.make_array(); + } } + int64_t non_null_count = indices->length() - indices->null_count(); page_statistics_->IncrementNullCount(num_levels - non_null_count); page_statistics_->IncrementNumValues(non_null_count); From 20eab17842c74d5542dd85902ccf55f1e5678bb7 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 4 Jan 2023 13:17:20 -0800 Subject: [PATCH 3/4] update comment for data. --- cpp/src/parquet/arrow/arrow_reader_writer_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index faee01b11a8ed..dcdf0078307e4 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -4157,7 +4157,7 @@ TEST_P(TestArrowWriteDictionary, StatisticsUnifiedDictionary) { R"([0, null, 3, 0, null, 3])"), // ["b", null "a", "b", null, "a"] ArrayFromJSON( ::arrow::int32(), - R"([0, 3, null, 0, null, 1])")}; // ["b", "c", null, "b", "c", null] + R"([0, 3, null, 0, null, 1])")}; // ["b", "a", null, "b", null, "c"] ::arrow::ArrayVector chunks = { std::make_shared(dict_type, test_indices[0], test_dictionary), From 4e2f156d2ae5ecbbfa51bf4206909f03e8883b1e Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 4 Jan 2023 14:30:55 -0800 Subject: [PATCH 4/4] fix: make chunks have different max value --- cpp/src/parquet/arrow/arrow_reader_writer_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index dcdf0078307e4..e885873deb7d2 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -4154,7 +4154,7 @@ TEST_P(TestArrowWriteDictionary, StatisticsUnifiedDictionary) { ArrayFromJSON(::arrow::utf8(), R"(["b", "c", "d", "a"])"); std::vector> test_indices = { ArrayFromJSON(::arrow::int32(), - R"([0, null, 3, 0, null, 3])"), // ["b", null "a", "b", null, "a"] + R"([3, null, 3, 3, null, 3])"), // ["a", null "a", "a", null, "a"] ArrayFromJSON( ::arrow::int32(), R"([0, 3, null, 0, null, 1])")}; // ["b", "a", null, "b", null, "c"]