diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 247681eec8096..e885873deb7d2 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -80,6 +80,7 @@ using arrow::DataType; using arrow::Datum; using arrow::DecimalType; using arrow::default_memory_pool; +using arrow::DictionaryArray; using arrow::ListArray; using arrow::PrimitiveArray; using arrow::ResizableBuffer; @@ -4138,6 +4139,74 @@ TEST_P(TestArrowWriteDictionary, Statistics) { INSTANTIATE_TEST_SUITE_P(WriteDictionary, TestArrowWriteDictionary, ::testing::Values(ParquetDataPageVersion::V1, ParquetDataPageVersion::V2)); + +TEST_P(TestArrowWriteDictionary, StatisticsUnifiedDictionary) { + // Two chunks, with a shared dictionary + std::shared_ptr<::arrow::Table> table; + std::shared_ptr<::arrow::DataType> dict_type = + ::arrow::dictionary(::arrow::int32(), ::arrow::utf8()); + std::shared_ptr<::arrow::Schema> schema = + ::arrow::schema({::arrow::field("values", dict_type)}); + { + // It's important there are no duplicate values in the dictionary, otherwise + // we trigger the WriteDense() code path which side-steps dictionary encoding. + std::shared_ptr<::arrow::Array> test_dictionary = + ArrayFromJSON(::arrow::utf8(), R"(["b", "c", "d", "a"])"); + std::vector> test_indices = { + ArrayFromJSON(::arrow::int32(), + R"([3, null, 3, 3, null, 3])"), // ["a", null "a", "a", null, "a"] + ArrayFromJSON( + ::arrow::int32(), + R"([0, 3, null, 0, null, 1])")}; // ["b", "a", null, "b", null, "c"] + + ::arrow::ArrayVector chunks = { + std::make_shared(dict_type, test_indices[0], test_dictionary), + std::make_shared(dict_type, test_indices[1], test_dictionary), + }; + std::shared_ptr arr = std::make_shared(chunks, dict_type); + table = ::arrow::Table::Make(schema, {arr}); + } + + std::shared_ptr<::arrow::ResizableBuffer> serialized_data = AllocateBuffer(); + auto out_stream = std::make_shared<::arrow::io::BufferOutputStream>(serialized_data); + { + // Will write data as two row groups, one with 9 rows and one with 3. + std::shared_ptr writer_properties = + WriterProperties::Builder() + .max_row_group_length(9) + ->data_page_version(this->GetParquetDataPageVersion()) + ->write_batch_size(3) + ->data_pagesize(3) + ->build(); + std::unique_ptr writer; + ASSERT_OK_AND_ASSIGN( + writer, FileWriter::Open(*schema, ::arrow::default_memory_pool(), out_stream, + writer_properties, default_arrow_writer_properties())); + ASSERT_OK(writer->WriteTable(*table, std::numeric_limits::max())); + ASSERT_OK(writer->Close()); + ASSERT_OK(out_stream->Close()); + } + + auto buffer_reader = std::make_shared<::arrow::io::BufferReader>(serialized_data); + std::unique_ptr parquet_reader = + ParquetFileReader::Open(std::move(buffer_reader)); + // Check row group statistics + std::shared_ptr metadata = parquet_reader->metadata(); + ASSERT_EQ(metadata->num_row_groups(), 2); + ASSERT_EQ(metadata->RowGroup(0)->num_rows(), 9); + ASSERT_EQ(metadata->RowGroup(1)->num_rows(), 3); + auto stats0 = metadata->RowGroup(0)->ColumnChunk(0)->statistics(); + auto stats1 = metadata->RowGroup(1)->ColumnChunk(0)->statistics(); + ASSERT_EQ(stats0->num_values(), 6); + ASSERT_EQ(stats1->num_values(), 2); + ASSERT_EQ(stats0->null_count(), 3); + ASSERT_EQ(stats1->null_count(), 1); + ASSERT_EQ(stats0->EncodeMin(), "a"); + ASSERT_EQ(stats1->EncodeMin(), "b"); + ASSERT_EQ(stats0->EncodeMax(), "b"); + ASSERT_EQ(stats1->EncodeMax(), "c"); +} + // ---------------------------------------------------------------------- // Tests for directly reading DictionaryArray diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 9057cc8f2f3ed..16f6c36fac336 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -1479,6 +1479,43 @@ Status TypedColumnWriterImpl::WriteArrowDictionary( value_offset += batch_num_spaced_values; }; + auto update_stats = [&]() { + // TODO(PARQUET-2068) This approach may make two copies. First, a copy of the + // indices array to a (hopefully smaller) referenced indices array. Second, a copy + // of the values array to a (probably not smaller) referenced values array. + // + // Once the MinMax kernel supports all data types we should use that kernel instead + // as it does not make any copies. + ::arrow::compute::ExecContext exec_ctx(ctx->memory_pool); + exec_ctx.set_use_threads(false); + + std::shared_ptr<::arrow::Array> referenced_dictionary; + // If dictionary is the same dictionary we already have, just use that + if (preserved_dictionary_ && preserved_dictionary_ == dictionary) { + referenced_dictionary = preserved_dictionary_; + } else { + PARQUET_ASSIGN_OR_THROW(::arrow::Datum referenced_indices, + ::arrow::compute::Unique(*indices, &exec_ctx)); + + // On first run, we might be able to re-use the existing dictionary + if (referenced_indices.length() == dictionary->length()) { + referenced_dictionary = dictionary; + } else { + PARQUET_ASSIGN_OR_THROW( + ::arrow::Datum referenced_dictionary_datum, + ::arrow::compute::Take(dictionary, referenced_indices, + ::arrow::compute::TakeOptions(/*boundscheck=*/false), + &exec_ctx)); + referenced_dictionary = referenced_dictionary_datum.make_array(); + } + } + + int64_t non_null_count = indices->length() - indices->null_count(); + page_statistics_->IncrementNullCount(num_levels - non_null_count); + page_statistics_->IncrementNumValues(non_null_count); + page_statistics_->Update(*referenced_dictionary, /*update_counts=*/false); + }; + // Handle seeing dictionary for the first time if (!preserved_dictionary_) { // It's a new dictionary. Call PutDictionary and keep track of it @@ -1493,37 +1530,18 @@ Status TypedColumnWriterImpl::WriteArrowDictionary( } if (page_statistics_ != nullptr) { - // TODO(PARQUET-2068) This approach may make two copies. First, a copy of the - // indices array to a (hopefully smaller) referenced indices array. Second, a copy - // of the values array to a (probably not smaller) referenced values array. - // - // Once the MinMax kernel supports all data types we should use that kernel instead - // as it does not make any copies. - ::arrow::compute::ExecContext exec_ctx(ctx->memory_pool); - exec_ctx.set_use_threads(false); - PARQUET_ASSIGN_OR_THROW(::arrow::Datum referenced_indices, - ::arrow::compute::Unique(*indices, &exec_ctx)); - std::shared_ptr<::arrow::Array> referenced_dictionary; - if (referenced_indices.length() == dictionary->length()) { - referenced_dictionary = dictionary; - } else { - PARQUET_ASSIGN_OR_THROW( - ::arrow::Datum referenced_dictionary_datum, - ::arrow::compute::Take(dictionary, referenced_indices, - ::arrow::compute::TakeOptions(/*boundscheck=*/false), - &exec_ctx)); - referenced_dictionary = referenced_dictionary_datum.make_array(); - } - int64_t non_null_count = indices->length() - indices->null_count(); - page_statistics_->IncrementNullCount(num_levels - non_null_count); - page_statistics_->IncrementNumValues(non_null_count); - page_statistics_->Update(*referenced_dictionary, /*update_counts=*/false); + update_stats(); } preserved_dictionary_ = dictionary; } else if (!dictionary->Equals(*preserved_dictionary_)) { // Dictionary has changed PARQUET_CATCH_NOT_OK(FallbackToPlainEncoding()); return WriteDense(); + } else { + // Dictionary is same, but we need to update stats + if (page_statistics_ != nullptr) { + update_stats(); + } } PARQUET_CATCH_NOT_OK(