From 7d17a5b058efcddd3b425fa845b809aa07d2daa9 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Thu, 12 Jan 2023 02:19:13 +0800 Subject: [PATCH] GH-15239: [C++][Parquet] Parquet writer writes decimal as int32/64 (#15244) As the parquet [specs](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal) states, DECIMAL can be used to annotate the following types: - int32: for 1 <= precision <= 9 - int64: for 1 <= precision <= 18; precision < 10 will produce a warning - fixed_len_byte_array: precision is limited by the array size. Length n can store <= floor(log_10(2^(8*n - 1) - 1)) base-10 digits - binary: precision is not limited, but is required. The minimum number of bytes to store the unscaled value should be used. The aim of this patch is to provide a writer option to use int32 to annotate decimal when 1 <= precision <= 9 and int64 when 10 <= precision <= 18. * Closes: #15239 Authored-by: Gang Wu Signed-off-by: Will Jones --- .../parquet/arrow/arrow_reader_writer_test.cc | 98 ++++++++++++++++++- cpp/src/parquet/arrow/reader_internal.cc | 41 +++++--- cpp/src/parquet/arrow/schema.cc | 8 +- cpp/src/parquet/arrow/test_util.h | 18 ++-- cpp/src/parquet/column_writer.cc | 53 +++++++++- cpp/src/parquet/properties.h | 29 +++++- 6 files changed, 215 insertions(+), 32 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index e885873deb7d2..dd0099e38f7a7 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -25,7 +25,6 @@ #include #include -#include #include #include @@ -3432,6 +3431,25 @@ TEST(ArrowReadWrite, NestedRequiredOuterOptionalDecimal) { } } +TEST(ArrowReadWrite, Decimal256AsInt) { + using ::arrow::Decimal256; + using ::arrow::field; + + auto type = ::arrow::decimal256(8, 4); + + const char* json = R"(["1.0000", null, "-1.2345", "-1000.5678", + "-9999.9999", "9999.9999"])"; + auto array = ::arrow::ArrayFromJSON(type, json); + auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array}); + + parquet::WriterProperties::Builder builder; + // Enforce integer type to annotate decimal type + auto writer_properties = builder.enable_integer_annotate_decimal()->build(); + auto props_store_schema = ArrowWriterProperties::Builder().store_schema()->build(); + + CheckConfiguredRoundtrip(table, table, writer_properties, props_store_schema); +} + class TestNestedSchemaRead : public ::testing::TestWithParam { protected: // make it *3 to make it easily divisible by 3 @@ -4796,5 +4814,83 @@ std::vector GenerateMapFilteredTestCases() { INSTANTIATE_TEST_SUITE_P(MapFilteredReads, TestNestedSchemaFilteredReader, ::testing::ValuesIn(GenerateMapFilteredTestCases())); +template +class TestIntegerAnnotateDecimalTypeParquetIO : public TestParquetIO { + public: + void WriteColumn(const std::shared_ptr& values) { + auto arrow_schema = ::arrow::schema({::arrow::field("a", values->type())}); + + parquet::WriterProperties::Builder builder; + // Enforce integer type to annotate decimal type + auto writer_properties = builder.enable_integer_annotate_decimal()->build(); + std::shared_ptr parquet_schema; + ASSERT_OK_NO_THROW(ToParquetSchema(arrow_schema.get(), *writer_properties, + *default_arrow_writer_properties(), + &parquet_schema)); + + this->sink_ = CreateOutputStream(); + auto schema_node = std::static_pointer_cast(parquet_schema->schema_root()); + + std::unique_ptr writer; + ASSERT_OK_NO_THROW(FileWriter::Make( + ::arrow::default_memory_pool(), + ParquetFileWriter::Open(this->sink_, schema_node, writer_properties), + arrow_schema, default_arrow_writer_properties(), &writer)); + ASSERT_OK_NO_THROW(writer->NewRowGroup(values->length())); + ASSERT_OK_NO_THROW(writer->WriteColumnChunk(*values)); + ASSERT_OK_NO_THROW(writer->Close()); + } + + void ReadAndCheckSingleDecimalColumnFile(const Array& values) { + std::shared_ptr out; + std::unique_ptr reader; + this->ReaderFromSink(&reader); + this->ReadSingleColumnFile(std::move(reader), &out); + + // Reader always read values as DECIMAL128 type + ASSERT_EQ(out->type()->id(), ::arrow::Type::DECIMAL128); + + if (values.type()->id() == ::arrow::Type::DECIMAL128) { + AssertArraysEqual(values, *out); + } else { + auto& expected_values = dynamic_cast(values); + auto& read_values = dynamic_cast(*out); + ASSERT_EQ(expected_values.length(), read_values.length()); + ASSERT_EQ(expected_values.null_count(), read_values.null_count()); + ASSERT_EQ(expected_values.length(), read_values.length()); + for (int64_t i = 0; i < expected_values.length(); ++i) { + ASSERT_EQ(expected_values.IsNull(i), read_values.IsNull(i)); + if (!expected_values.IsNull(i)) { + ASSERT_EQ(::arrow::Decimal256(expected_values.Value(i)).ToString(0), + ::arrow::Decimal128(read_values.Value(i)).ToString(0)); + } + } + } + } +}; + +typedef ::testing::Types< + DecimalWithPrecisionAndScale<1>, DecimalWithPrecisionAndScale<5>, + DecimalWithPrecisionAndScale<10>, DecimalWithPrecisionAndScale<18>, + Decimal256WithPrecisionAndScale<1>, Decimal256WithPrecisionAndScale<5>, + Decimal256WithPrecisionAndScale<10>, Decimal256WithPrecisionAndScale<18>> + DecimalTestTypes; + +TYPED_TEST_SUITE(TestIntegerAnnotateDecimalTypeParquetIO, DecimalTestTypes); + +TYPED_TEST(TestIntegerAnnotateDecimalTypeParquetIO, SingleNonNullableDecimalColumn) { + std::shared_ptr values; + ASSERT_OK(NonNullArray(SMALL_SIZE, &values)); + ASSERT_NO_FATAL_FAILURE(this->WriteColumn(values)); + ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleDecimalColumnFile(*values)); +} + +TYPED_TEST(TestIntegerAnnotateDecimalTypeParquetIO, SingleNullableDecimalColumn) { + std::shared_ptr values; + ASSERT_OK(NullableArray(SMALL_SIZE, SMALL_SIZE / 2, kDefaultSeed, &values)); + ASSERT_NO_FATAL_FAILURE(this->WriteColumn(values)); + ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleDecimalColumnFile(*values)); +} + } // namespace arrow } // namespace parquet diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index e428c206bfc2a..a294b712a7ce3 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -638,20 +638,15 @@ struct DecimalConverter { /// small enough to fit in less 4 bytes or less than 8 bytes, respectively. /// This function implements the conversion from int32 and int64 arrays to decimal arrays. template < - typename ParquetIntegerType, + typename DecimalArrayType, typename ParquetIntegerType, typename = ::arrow::enable_if_t::value || std::is_same::value>> static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool, const std::shared_ptr& field, Datum* out) { // Decimal128 and Decimal256 are only Arrow constructs. Parquet does not // specifically distinguish between decimal byte widths. - // Decimal256 isn't relevant here because the Arrow-Parquet C++ bindings never - // write Decimal values as integers and if the decimal value can fit in an - // integer it is wasteful to use Decimal256. Put another way, the only - // way an integer column could be construed as Decimal256 is if an arrow - // schema was stored as metadata in the file indicating the column was - // Decimal256. The current Arrow-Parquet C++ bindings will never do this. - DCHECK(field->type()->id() == ::arrow::Type::DECIMAL128); + DCHECK(field->type()->id() == ::arrow::Type::DECIMAL128 || + field->type()->id() == ::arrow::Type::DECIMAL256); const int64_t length = reader->values_written(); @@ -674,16 +669,21 @@ static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool, // sign/zero extend int32_t values, otherwise a no-op const auto value = static_cast(values[i]); - ::arrow::Decimal128 decimal(value); - decimal.ToBytes(out_ptr); + if constexpr (std::is_same_v) { + ::arrow::Decimal128 decimal(value); + decimal.ToBytes(out_ptr); + } else { + ::arrow::Decimal256 decimal(value); + decimal.ToBytes(out_ptr); + } } if (reader->nullable_values() && field->nullable()) { std::shared_ptr is_valid = reader->ReleaseIsValid(); - *out = std::make_shared(field->type(), length, std::move(data), - is_valid, reader->null_count()); + *out = std::make_shared(field->type(), length, std::move(data), + is_valid, reader->null_count()); } else { - *out = std::make_shared(field->type(), length, std::move(data)); + *out = std::make_shared(field->type(), length, std::move(data)); } return Status::OK(); } @@ -776,11 +776,11 @@ Status TransferColumnData(RecordReader* reader, const std::shared_ptr& va case ::arrow::Type::DECIMAL128: { switch (descr->physical_type()) { case ::parquet::Type::INT32: { - auto fn = DecimalIntegerTransfer; + auto fn = DecimalIntegerTransfer; RETURN_NOT_OK(fn(reader, pool, value_field, &result)); } break; case ::parquet::Type::INT64: { - auto fn = &DecimalIntegerTransfer; + auto fn = &DecimalIntegerTransfer; RETURN_NOT_OK(fn(reader, pool, value_field, &result)); } break; case ::parquet::Type::BYTE_ARRAY: { @@ -799,6 +799,14 @@ Status TransferColumnData(RecordReader* reader, const std::shared_ptr& va } break; case ::arrow::Type::DECIMAL256: switch (descr->physical_type()) { + case ::parquet::Type::INT32: { + auto fn = DecimalIntegerTransfer; + RETURN_NOT_OK(fn(reader, pool, value_field, &result)); + } break; + case ::parquet::Type::INT64: { + auto fn = &DecimalIntegerTransfer; + RETURN_NOT_OK(fn(reader, pool, value_field, &result)); + } break; case ::parquet::Type::BYTE_ARRAY: { auto fn = &TransferDecimal; RETURN_NOT_OK(fn(reader, pool, value_field, &result)); @@ -809,7 +817,8 @@ Status TransferColumnData(RecordReader* reader, const std::shared_ptr& va } break; default: return Status::Invalid( - "Physical type for decimal256 must be fixed length binary"); + "Physical type for decimal256 must be int32, int64, byte array, or fixed " + "length binary"); } break; diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index a845e3c40cced..267b892e4b40d 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -354,11 +354,15 @@ Status FieldToNode(const std::string& name, const std::shared_ptr& field, } break; case ArrowTypeId::DECIMAL128: case ArrowTypeId::DECIMAL256: { - type = ParquetType::FIXED_LEN_BYTE_ARRAY; const auto& decimal_type = static_cast(*field->type()); precision = decimal_type.precision(); scale = decimal_type.scale(); - length = DecimalType::DecimalSize(precision); + if (properties.integer_annotate_decimal() && 1 <= precision && precision <= 18) { + type = precision <= 9 ? ParquetType ::INT32 : ParquetType ::INT64; + } else { + type = ParquetType::FIXED_LEN_BYTE_ARRAY; + length = DecimalType::DecimalSize(precision); + } PARQUET_CATCH_NOT_OK(logical_type = LogicalType::Decimal(precision, scale)); } break; case ArrowTypeId::DATE32: diff --git a/cpp/src/parquet/arrow/test_util.h b/cpp/src/parquet/arrow/test_util.h index 495b6ebba270e..16c03130c9672 100644 --- a/cpp/src/parquet/arrow/test_util.h +++ b/cpp/src/parquet/arrow/test_util.h @@ -129,16 +129,14 @@ ::arrow::enable_if_fixed_size_binary NonNullArray( return builder.Finish(out); } +template static void random_decimals(int64_t n, uint32_t seed, int32_t precision, uint8_t* out) { auto gen = ::arrow::random::RandomArrayGenerator(seed); std::shared_ptr decimals; - int32_t byte_width = 0; - if (precision <= ::arrow::Decimal128Type::kMaxPrecision) { + if constexpr (byte_width == 16) { decimals = gen.Decimal128(::arrow::decimal128(precision, 0), n); - byte_width = ::arrow::Decimal128Type::kByteWidth; } else { decimals = gen.Decimal256(::arrow::decimal256(precision, 0), n); - byte_width = ::arrow::Decimal256Type::kByteWidth; } std::memcpy(out, decimals->data()->GetValues(1, 0), byte_width * n); } @@ -158,7 +156,8 @@ NonNullArray(size_t size, std::shared_ptr* out) { constexpr int32_t seed = 0; ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width)); - random_decimals(size, seed, kDecimalPrecision, out_buf->mutable_data()); + random_decimals<::arrow::Decimal128Type::kByteWidth>(size, seed, kDecimalPrecision, + out_buf->mutable_data()); RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size)); return builder.Finish(out); @@ -179,7 +178,8 @@ NonNullArray(size_t size, std::shared_ptr* out) { constexpr int32_t seed = 0; ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width)); - random_decimals(size, seed, kDecimalPrecision, out_buf->mutable_data()); + random_decimals<::arrow::Decimal256Type::kByteWidth>(size, seed, kDecimalPrecision, + out_buf->mutable_data()); RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size)); return builder.Finish(out); @@ -341,7 +341,8 @@ NullableArray(size_t size, size_t num_nulls, uint32_t seed, ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width)); - random_decimals(size, seed, precision, out_buf->mutable_data()); + random_decimals<::arrow::Decimal128Type::kByteWidth>(size, seed, precision, + out_buf->mutable_data()); ::arrow::Decimal128Builder builder(type); RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size, valid_bytes.data())); @@ -367,7 +368,8 @@ NullableArray(size_t size, size_t num_nulls, uint32_t seed, ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width)); - random_decimals(size, seed, precision, out_buf->mutable_data()); + random_decimals<::arrow::Decimal256Type::kByteWidth>(size, seed, precision, + out_buf->mutable_data()); ::arrow::Decimal256Builder builder(type); RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size, valid_bytes.data())); diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 16f6c36fac336..7b5ca8810bcbe 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -22,7 +22,6 @@ #include #include #include -#include #include #include @@ -41,6 +40,7 @@ #include "arrow/util/endian.h" #include "arrow/util/logging.h" #include "arrow/util/rle_encoding.h" +#include "arrow/util/type_traits.h" #include "arrow/visit_array_inline.h" #include "parquet/column_page.h" #include "parquet/encoding.h" @@ -1676,6 +1676,47 @@ struct SerializeFunctor { } }; +template +struct SerializeFunctor< + ParquetType, ArrowType, + ::arrow::enable_if_t<::arrow::is_decimal_type::value&& ::arrow::internal:: + IsOneOf::value>> { + using value_type = typename ParquetType::c_type; + + Status Serialize(const typename ::arrow::TypeTraits::ArrayType& array, + ArrowWriteContext* ctx, value_type* out) { + if (array.null_count() == 0) { + for (int64_t i = 0; i < array.length(); i++) { + out[i] = TransferValue(array.Value(i)); + } + } else { + for (int64_t i = 0; i < array.length(); i++) { + out[i] = + array.IsValid(i) ? TransferValue(array.Value(i)) : 0; + } + } + + return Status::OK(); + } + + template + value_type TransferValue(const uint8_t* in) const { + static_assert(byte_width == 16 || byte_width == 32, + "only 16 and 32 byte Decimals supported"); + value_type value = 0; + if constexpr (byte_width == 16) { + ::arrow::Decimal128 decimal_value(in); + PARQUET_THROW_NOT_OK(decimal_value.ToInteger(&value)); + } else { + ::arrow::Decimal256 decimal_value(in); + // Decimal256 does not provide ToInteger, but we are sure it fits in the target + // integer type. + value = static_cast(decimal_value.low_bits()); + } + return value; + } +}; + template <> struct SerializeFunctor { Status Serialize(const ::arrow::Time32Array& array, ArrowWriteContext*, int32_t* out) { @@ -1709,6 +1750,8 @@ Status TypedColumnWriterImpl::WriteArrowDense( WRITE_ZERO_COPY_CASE(DATE32, Date32Type, Int32Type) WRITE_SERIALIZE_CASE(DATE64, Date64Type, Int32Type) WRITE_SERIALIZE_CASE(TIME32, Time32Type, Int32Type) + WRITE_SERIALIZE_CASE(DECIMAL128, Decimal128Type, Int32Type) + WRITE_SERIALIZE_CASE(DECIMAL256, Decimal256Type, Int32Type) default: ARROW_UNSUPPORTED() } @@ -1879,6 +1922,8 @@ Status TypedColumnWriterImpl::WriteArrowDense( WRITE_SERIALIZE_CASE(UINT64, UInt64Type, Int64Type) WRITE_ZERO_COPY_CASE(TIME64, Time64Type, Int64Type) WRITE_ZERO_COPY_CASE(DURATION, DurationType, Int64Type) + WRITE_SERIALIZE_CASE(DECIMAL128, Decimal128Type, Int64Type) + WRITE_SERIALIZE_CASE(DECIMAL256, Decimal256Type, Int64Type) default: ARROW_UNSUPPORTED(); } @@ -1997,7 +2042,11 @@ struct SerializeFunctor< // Requires a custom serializer because decimal in parquet are in big-endian // format. Thus, a temporary local buffer is required. template -struct SerializeFunctor> { +struct SerializeFunctor< + ParquetType, ArrowType, + ::arrow::enable_if_t< + ::arrow::is_decimal_type::value && + !::arrow::internal::IsOneOf::value>> { Status Serialize(const typename ::arrow::TypeTraits::ArrayType& array, ArrowWriteContext* ctx, FLBA* out) { AllocateScratch(array, ctx); diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 5a11f68c690f8..edb5e44f02ecc 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -199,7 +199,8 @@ class PARQUET_EXPORT WriterProperties { pagesize_(kDefaultDataPageSize), version_(ParquetVersion::PARQUET_2_4), data_page_version_(ParquetDataPageVersion::V1), - created_by_(DEFAULT_CREATED_BY) {} + created_by_(DEFAULT_CREATED_BY), + integer_annotate_decimal_(false) {} virtual ~Builder() {} /// Specify the memory pool for the writer. Default default_memory_pool. @@ -450,6 +451,22 @@ class PARQUET_EXPORT WriterProperties { return this->disable_statistics(path->ToDotString()); } + /// Enable integer type to annotate decimal type as below: + /// int32: 1 <= precision <= 9 + /// int64: 10 <= precision <= 18 + /// Default disabled. + Builder* enable_integer_annotate_decimal() { + integer_annotate_decimal_ = true; + return this; + } + + /// Disable integer type to annotate decimal type. + /// Default disabled. + Builder* disable_integer_annotate_decimal() { + integer_annotate_decimal_ = false; + return this; + } + /// \brief Build the WriterProperties with the builder parameters. /// \return The WriterProperties defined by the builder. std::shared_ptr build() { @@ -474,7 +491,8 @@ class PARQUET_EXPORT WriterProperties { return std::shared_ptr(new WriterProperties( pool_, dictionary_pagesize_limit_, write_batch_size_, max_row_group_length_, pagesize_, version_, created_by_, std::move(file_encryption_properties_), - default_column_properties_, column_properties, data_page_version_)); + default_column_properties_, column_properties, data_page_version_, + integer_annotate_decimal_)); } private: @@ -486,6 +504,7 @@ class PARQUET_EXPORT WriterProperties { ParquetVersion::type version_; ParquetDataPageVersion data_page_version_; std::string created_by_; + bool integer_annotate_decimal_; std::shared_ptr file_encryption_properties_; @@ -516,6 +535,8 @@ class PARQUET_EXPORT WriterProperties { inline std::string created_by() const { return parquet_created_by_; } + inline bool integer_annotate_decimal() const { return integer_annotate_decimal_; } + inline Encoding::type dictionary_index_encoding() const { if (parquet_version_ == ParquetVersion::PARQUET_1_0) { return Encoding::PLAIN_DICTIONARY; @@ -584,7 +605,7 @@ class PARQUET_EXPORT WriterProperties { std::shared_ptr file_encryption_properties, const ColumnProperties& default_column_properties, const std::unordered_map& column_properties, - ParquetDataPageVersion data_page_version) + ParquetDataPageVersion data_page_version, bool integer_annotate_decimal) : pool_(pool), dictionary_pagesize_limit_(dictionary_pagesize_limit), write_batch_size_(write_batch_size), @@ -593,6 +614,7 @@ class PARQUET_EXPORT WriterProperties { parquet_data_page_version_(data_page_version), parquet_version_(version), parquet_created_by_(created_by), + integer_annotate_decimal_(integer_annotate_decimal), file_encryption_properties_(file_encryption_properties), default_column_properties_(default_column_properties), column_properties_(column_properties) {} @@ -605,6 +627,7 @@ class PARQUET_EXPORT WriterProperties { ParquetDataPageVersion parquet_data_page_version_; ParquetVersion::type parquet_version_; std::string parquet_created_by_; + bool integer_annotate_decimal_; std::shared_ptr file_encryption_properties_;