From b4241ea5a5aa78c3fb9e483b1e33b78541353b02 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Sat, 7 Jan 2023 23:44:05 +0800 Subject: [PATCH] GH-15239: [C++][Parquet] Parquet writer writes decimal as int32/64 --- .../parquet/arrow/arrow_reader_writer_test.cc | 70 ++++++++++++++++++- cpp/src/parquet/arrow/schema.cc | 8 ++- cpp/src/parquet/arrow/test_util.h | 18 ++--- cpp/src/parquet/column_writer.cc | 53 +++++++++++++- cpp/src/parquet/properties.h | 29 +++++++- 5 files changed, 162 insertions(+), 16 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 247681eec8096..f2cecbd2f37c2 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -25,7 +25,6 @@ #include #include -#include #include #include @@ -4727,5 +4726,74 @@ std::vector GenerateMapFilteredTestCases() { INSTANTIATE_TEST_SUITE_P(MapFilteredReads, TestNestedSchemaFilteredReader, ::testing::ValuesIn(GenerateMapFilteredTestCases())); +template +class TestIntegerAnnotateDecimalTypeParquetIO : public TestParquetIO { + public: + template + void WriteColumn(const std::shared_ptr& values) { + auto arrow_schema = ::arrow::schema({::arrow::field("a", values->type())}); + + parquet::WriterProperties::Builder builder; + // Enforce integer type to annotate decimal type + auto writer_properties = builder.enable_integer_annotate_decimal()->build(); + std::shared_ptr parquet_schema; + ASSERT_OK_NO_THROW(ToParquetSchema(arrow_schema.get(), *writer_properties, + *default_arrow_writer_properties(), + &parquet_schema)); + + this->sink_ = CreateOutputStream(); + auto schema_node = std::static_pointer_cast(parquet_schema->schema_root()); + + std::unique_ptr writer; + ASSERT_OK_NO_THROW(FileWriter::Make( + ::arrow::default_memory_pool(), + ParquetFileWriter::Open(this->sink_, schema_node, writer_properties), + arrow_schema, default_arrow_writer_properties(), &writer)); + ASSERT_OK_NO_THROW(writer->NewRowGroup(values->length())); + ASSERT_OK_NO_THROW(writer->WriteColumnChunk(*values)); + ASSERT_OK_NO_THROW(writer->Close()); + } + + void ReadAndCheckSingleDecimalColumnFile(const Array& values) { + std::shared_ptr out; + std::unique_ptr reader; + this->ReaderFromSink(&reader); + this->ReadSingleColumnFile(std::move(reader), &out); + + // Reader always read values as DECIMAL128 type + ASSERT_EQ(out->type()->id(), ::arrow::Type::DECIMAL128); + + if (values.type()->id() == ::arrow::Type::DECIMAL128) { + AssertArraysEqual(values, *out); + } else { + auto& expected_values = dynamic_cast(values); + auto& read_values = dynamic_cast(*out); + ASSERT_EQ(expected_values.null_count(), read_values.null_count()); + ASSERT_EQ(0, read_values.null_count()); + ASSERT_EQ(expected_values.length(), read_values.length()); + for (int64_t i = 0; i < expected_values.length(); ++i) { + ASSERT_EQ(::arrow::Decimal256(expected_values.Value(i)).ToString(0), + ::arrow::Decimal128(read_values.Value(i)).ToString(0)); + } + } + } +}; + +typedef ::testing::Types< + DecimalWithPrecisionAndScale<1>, DecimalWithPrecisionAndScale<5>, + DecimalWithPrecisionAndScale<10>, DecimalWithPrecisionAndScale<18>, + Decimal256WithPrecisionAndScale<1>, Decimal256WithPrecisionAndScale<5>, + Decimal256WithPrecisionAndScale<10>, Decimal256WithPrecisionAndScale<18>> + DecimalTestTypes; + +TYPED_TEST_SUITE(TestIntegerAnnotateDecimalTypeParquetIO, DecimalTestTypes); + +TYPED_TEST(TestIntegerAnnotateDecimalTypeParquetIO, SingleDecimalColumn) { + std::shared_ptr values; + ASSERT_OK(NonNullArray(SMALL_SIZE, &values)); + ASSERT_NO_FATAL_FAILURE(this->WriteColumn(values)); + ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleDecimalColumnFile(*values)); +} + } // namespace arrow } // namespace parquet diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index a845e3c40cced..267b892e4b40d 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -354,11 +354,15 @@ Status FieldToNode(const std::string& name, const std::shared_ptr& field, } break; case ArrowTypeId::DECIMAL128: case ArrowTypeId::DECIMAL256: { - type = ParquetType::FIXED_LEN_BYTE_ARRAY; const auto& decimal_type = static_cast(*field->type()); precision = decimal_type.precision(); scale = decimal_type.scale(); - length = DecimalType::DecimalSize(precision); + if (properties.integer_annotate_decimal() && 1 <= precision && precision <= 18) { + type = precision <= 9 ? ParquetType ::INT32 : ParquetType ::INT64; + } else { + type = ParquetType::FIXED_LEN_BYTE_ARRAY; + length = DecimalType::DecimalSize(precision); + } PARQUET_CATCH_NOT_OK(logical_type = LogicalType::Decimal(precision, scale)); } break; case ArrowTypeId::DATE32: diff --git a/cpp/src/parquet/arrow/test_util.h b/cpp/src/parquet/arrow/test_util.h index 495b6ebba270e..16c03130c9672 100644 --- a/cpp/src/parquet/arrow/test_util.h +++ b/cpp/src/parquet/arrow/test_util.h @@ -129,16 +129,14 @@ ::arrow::enable_if_fixed_size_binary NonNullArray( return builder.Finish(out); } +template static void random_decimals(int64_t n, uint32_t seed, int32_t precision, uint8_t* out) { auto gen = ::arrow::random::RandomArrayGenerator(seed); std::shared_ptr decimals; - int32_t byte_width = 0; - if (precision <= ::arrow::Decimal128Type::kMaxPrecision) { + if constexpr (byte_width == 16) { decimals = gen.Decimal128(::arrow::decimal128(precision, 0), n); - byte_width = ::arrow::Decimal128Type::kByteWidth; } else { decimals = gen.Decimal256(::arrow::decimal256(precision, 0), n); - byte_width = ::arrow::Decimal256Type::kByteWidth; } std::memcpy(out, decimals->data()->GetValues(1, 0), byte_width * n); } @@ -158,7 +156,8 @@ NonNullArray(size_t size, std::shared_ptr* out) { constexpr int32_t seed = 0; ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width)); - random_decimals(size, seed, kDecimalPrecision, out_buf->mutable_data()); + random_decimals<::arrow::Decimal128Type::kByteWidth>(size, seed, kDecimalPrecision, + out_buf->mutable_data()); RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size)); return builder.Finish(out); @@ -179,7 +178,8 @@ NonNullArray(size_t size, std::shared_ptr* out) { constexpr int32_t seed = 0; ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width)); - random_decimals(size, seed, kDecimalPrecision, out_buf->mutable_data()); + random_decimals<::arrow::Decimal256Type::kByteWidth>(size, seed, kDecimalPrecision, + out_buf->mutable_data()); RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size)); return builder.Finish(out); @@ -341,7 +341,8 @@ NullableArray(size_t size, size_t num_nulls, uint32_t seed, ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width)); - random_decimals(size, seed, precision, out_buf->mutable_data()); + random_decimals<::arrow::Decimal128Type::kByteWidth>(size, seed, precision, + out_buf->mutable_data()); ::arrow::Decimal128Builder builder(type); RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size, valid_bytes.data())); @@ -367,7 +368,8 @@ NullableArray(size_t size, size_t num_nulls, uint32_t seed, ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width)); - random_decimals(size, seed, precision, out_buf->mutable_data()); + random_decimals<::arrow::Decimal256Type::kByteWidth>(size, seed, precision, + out_buf->mutable_data()); ::arrow::Decimal256Builder builder(type); RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size, valid_bytes.data())); diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 9057cc8f2f3ed..0583282ce61f4 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -22,7 +22,6 @@ #include #include #include -#include #include #include @@ -41,6 +40,7 @@ #include "arrow/util/endian.h" #include "arrow/util/logging.h" #include "arrow/util/rle_encoding.h" +#include "arrow/util/type_traits.h" #include "arrow/visit_array_inline.h" #include "parquet/column_page.h" #include "parquet/encoding.h" @@ -1658,6 +1658,47 @@ struct SerializeFunctor { } }; +template +struct SerializeFunctor< + ParquetType, ArrowType, + ::arrow::enable_if_t<::arrow::is_decimal_type::value&& ::arrow::internal:: + IsOneOf::value>> { + using value_type = typename ParquetType::c_type; + + Status Serialize(const typename ::arrow::TypeTraits::ArrayType& array, + ArrowWriteContext* ctx, value_type* out) { + if (array.null_count() == 0) { + for (int64_t i = 0; i < array.length(); i++) { + out[i] = TransferValue(array.Value(i)); + } + } else { + for (int64_t i = 0; i < array.length(); i++) { + out[i] = + array.IsValid(i) ? TransferValue(array.Value(i)) : 0; + } + } + + return Status::OK(); + } + + template + value_type TransferValue(const uint8_t* in) const { + static_assert(byte_width == 16 || byte_width == 32, + "only 16 and 32 byte Decimals supported"); + value_type value = 0; + if constexpr (byte_width == 16) { + ::arrow::Decimal128 decimal_value(in); + PARQUET_THROW_NOT_OK(decimal_value.ToInteger(&value)); + } else { + ::arrow::Decimal256 decimal_value(in); + // Decimal256 does not provide ToInteger, but we are sure it fits in the target + // integer type. + value = static_cast(decimal_value.low_bits()); + } + return value; + } +}; + template <> struct SerializeFunctor { Status Serialize(const ::arrow::Time32Array& array, ArrowWriteContext*, int32_t* out) { @@ -1691,6 +1732,8 @@ Status TypedColumnWriterImpl::WriteArrowDense( WRITE_ZERO_COPY_CASE(DATE32, Date32Type, Int32Type) WRITE_SERIALIZE_CASE(DATE64, Date64Type, Int32Type) WRITE_SERIALIZE_CASE(TIME32, Time32Type, Int32Type) + WRITE_SERIALIZE_CASE(DECIMAL128, Decimal128Type, Int32Type) + WRITE_SERIALIZE_CASE(DECIMAL256, Decimal256Type, Int32Type) default: ARROW_UNSUPPORTED() } @@ -1861,6 +1904,8 @@ Status TypedColumnWriterImpl::WriteArrowDense( WRITE_SERIALIZE_CASE(UINT64, UInt64Type, Int64Type) WRITE_ZERO_COPY_CASE(TIME64, Time64Type, Int64Type) WRITE_ZERO_COPY_CASE(DURATION, DurationType, Int64Type) + WRITE_SERIALIZE_CASE(DECIMAL128, Decimal128Type, Int64Type) + WRITE_SERIALIZE_CASE(DECIMAL256, Decimal256Type, Int64Type) default: ARROW_UNSUPPORTED(); } @@ -1979,7 +2024,11 @@ struct SerializeFunctor< // Requires a custom serializer because decimal in parquet are in big-endian // format. Thus, a temporary local buffer is required. template -struct SerializeFunctor> { +struct SerializeFunctor< + ParquetType, ArrowType, + ::arrow::enable_if_t< + ::arrow::is_decimal_type::value && + !::arrow::internal::IsOneOf::value>> { Status Serialize(const typename ::arrow::TypeTraits::ArrayType& array, ArrowWriteContext* ctx, FLBA* out) { AllocateScratch(array, ctx); diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 5a11f68c690f8..edb5e44f02ecc 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -199,7 +199,8 @@ class PARQUET_EXPORT WriterProperties { pagesize_(kDefaultDataPageSize), version_(ParquetVersion::PARQUET_2_4), data_page_version_(ParquetDataPageVersion::V1), - created_by_(DEFAULT_CREATED_BY) {} + created_by_(DEFAULT_CREATED_BY), + integer_annotate_decimal_(false) {} virtual ~Builder() {} /// Specify the memory pool for the writer. Default default_memory_pool. @@ -450,6 +451,22 @@ class PARQUET_EXPORT WriterProperties { return this->disable_statistics(path->ToDotString()); } + /// Enable integer type to annotate decimal type as below: + /// int32: 1 <= precision <= 9 + /// int64: 10 <= precision <= 18 + /// Default disabled. + Builder* enable_integer_annotate_decimal() { + integer_annotate_decimal_ = true; + return this; + } + + /// Disable integer type to annotate decimal type. + /// Default disabled. + Builder* disable_integer_annotate_decimal() { + integer_annotate_decimal_ = false; + return this; + } + /// \brief Build the WriterProperties with the builder parameters. /// \return The WriterProperties defined by the builder. std::shared_ptr build() { @@ -474,7 +491,8 @@ class PARQUET_EXPORT WriterProperties { return std::shared_ptr(new WriterProperties( pool_, dictionary_pagesize_limit_, write_batch_size_, max_row_group_length_, pagesize_, version_, created_by_, std::move(file_encryption_properties_), - default_column_properties_, column_properties, data_page_version_)); + default_column_properties_, column_properties, data_page_version_, + integer_annotate_decimal_)); } private: @@ -486,6 +504,7 @@ class PARQUET_EXPORT WriterProperties { ParquetVersion::type version_; ParquetDataPageVersion data_page_version_; std::string created_by_; + bool integer_annotate_decimal_; std::shared_ptr file_encryption_properties_; @@ -516,6 +535,8 @@ class PARQUET_EXPORT WriterProperties { inline std::string created_by() const { return parquet_created_by_; } + inline bool integer_annotate_decimal() const { return integer_annotate_decimal_; } + inline Encoding::type dictionary_index_encoding() const { if (parquet_version_ == ParquetVersion::PARQUET_1_0) { return Encoding::PLAIN_DICTIONARY; @@ -584,7 +605,7 @@ class PARQUET_EXPORT WriterProperties { std::shared_ptr file_encryption_properties, const ColumnProperties& default_column_properties, const std::unordered_map& column_properties, - ParquetDataPageVersion data_page_version) + ParquetDataPageVersion data_page_version, bool integer_annotate_decimal) : pool_(pool), dictionary_pagesize_limit_(dictionary_pagesize_limit), write_batch_size_(write_batch_size), @@ -593,6 +614,7 @@ class PARQUET_EXPORT WriterProperties { parquet_data_page_version_(data_page_version), parquet_version_(version), parquet_created_by_(created_by), + integer_annotate_decimal_(integer_annotate_decimal), file_encryption_properties_(file_encryption_properties), default_column_properties_(default_column_properties), column_properties_(column_properties) {} @@ -605,6 +627,7 @@ class PARQUET_EXPORT WriterProperties { ParquetDataPageVersion parquet_data_page_version_; ParquetVersion::type parquet_version_; std::string parquet_created_by_; + bool integer_annotate_decimal_; std::shared_ptr file_encryption_properties_;