Skip to content

Commit

Permalink
apacheGH-15239: [C++][Parquet] Parquet writer writes decimal as int32/64
Browse files Browse the repository at this point in the history
  • Loading branch information
wgtmac committed Jan 7, 2023
1 parent 21d6374 commit b4241ea
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 16 deletions.
70 changes: 69 additions & 1 deletion cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

#include <cstdint>
#include <functional>
#include <iostream>
#include <sstream>
#include <vector>

Expand Down Expand Up @@ -4727,5 +4726,74 @@ std::vector<NestedFilterTestCase> GenerateMapFilteredTestCases() {
INSTANTIATE_TEST_SUITE_P(MapFilteredReads, TestNestedSchemaFilteredReader,
::testing::ValuesIn(GenerateMapFilteredTestCases()));

template <typename TestType>
class TestIntegerAnnotateDecimalTypeParquetIO : public TestParquetIO<TestType> {
public:
template <typename ArrayType>
void WriteColumn(const std::shared_ptr<ArrayType>& values) {
auto arrow_schema = ::arrow::schema({::arrow::field("a", values->type())});

parquet::WriterProperties::Builder builder;
// Enforce integer type to annotate decimal type
auto writer_properties = builder.enable_integer_annotate_decimal()->build();
std::shared_ptr<SchemaDescriptor> parquet_schema;
ASSERT_OK_NO_THROW(ToParquetSchema(arrow_schema.get(), *writer_properties,
*default_arrow_writer_properties(),
&parquet_schema));

this->sink_ = CreateOutputStream();
auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());

std::unique_ptr<FileWriter> writer;
ASSERT_OK_NO_THROW(FileWriter::Make(
::arrow::default_memory_pool(),
ParquetFileWriter::Open(this->sink_, schema_node, writer_properties),
arrow_schema, default_arrow_writer_properties(), &writer));
ASSERT_OK_NO_THROW(writer->NewRowGroup(values->length()));
ASSERT_OK_NO_THROW(writer->WriteColumnChunk(*values));
ASSERT_OK_NO_THROW(writer->Close());
}

void ReadAndCheckSingleDecimalColumnFile(const Array& values) {
std::shared_ptr<Array> out;
std::unique_ptr<FileReader> reader;
this->ReaderFromSink(&reader);
this->ReadSingleColumnFile(std::move(reader), &out);

// Reader always read values as DECIMAL128 type
ASSERT_EQ(out->type()->id(), ::arrow::Type::DECIMAL128);

if (values.type()->id() == ::arrow::Type::DECIMAL128) {
AssertArraysEqual(values, *out);
} else {
auto& expected_values = dynamic_cast<const ::arrow::Decimal256Array&>(values);
auto& read_values = dynamic_cast<const ::arrow::Decimal128Array&>(*out);
ASSERT_EQ(expected_values.null_count(), read_values.null_count());
ASSERT_EQ(0, read_values.null_count());
ASSERT_EQ(expected_values.length(), read_values.length());
for (int64_t i = 0; i < expected_values.length(); ++i) {
ASSERT_EQ(::arrow::Decimal256(expected_values.Value(i)).ToString(0),
::arrow::Decimal128(read_values.Value(i)).ToString(0));
}
}
}
};

typedef ::testing::Types<
DecimalWithPrecisionAndScale<1>, DecimalWithPrecisionAndScale<5>,
DecimalWithPrecisionAndScale<10>, DecimalWithPrecisionAndScale<18>,
Decimal256WithPrecisionAndScale<1>, Decimal256WithPrecisionAndScale<5>,
Decimal256WithPrecisionAndScale<10>, Decimal256WithPrecisionAndScale<18>>
DecimalTestTypes;

TYPED_TEST_SUITE(TestIntegerAnnotateDecimalTypeParquetIO, DecimalTestTypes);

TYPED_TEST(TestIntegerAnnotateDecimalTypeParquetIO, SingleDecimalColumn) {
std::shared_ptr<Array> values;
ASSERT_OK(NonNullArray<TypeParam>(SMALL_SIZE, &values));
ASSERT_NO_FATAL_FAILURE(this->WriteColumn(values));
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleDecimalColumnFile(*values));
}

} // namespace arrow
} // namespace parquet
8 changes: 6 additions & 2 deletions cpp/src/parquet/arrow/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,15 @@ Status FieldToNode(const std::string& name, const std::shared_ptr<Field>& field,
} break;
case ArrowTypeId::DECIMAL128:
case ArrowTypeId::DECIMAL256: {
type = ParquetType::FIXED_LEN_BYTE_ARRAY;
const auto& decimal_type = static_cast<const ::arrow::DecimalType&>(*field->type());
precision = decimal_type.precision();
scale = decimal_type.scale();
length = DecimalType::DecimalSize(precision);
if (properties.integer_annotate_decimal() && 1 <= precision && precision <= 18) {
type = precision <= 9 ? ParquetType ::INT32 : ParquetType ::INT64;
} else {
type = ParquetType::FIXED_LEN_BYTE_ARRAY;
length = DecimalType::DecimalSize(precision);
}
PARQUET_CATCH_NOT_OK(logical_type = LogicalType::Decimal(precision, scale));
} break;
case ArrowTypeId::DATE32:
Expand Down
18 changes: 10 additions & 8 deletions cpp/src/parquet/arrow/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,14 @@ ::arrow::enable_if_fixed_size_binary<ArrowType, Status> NonNullArray(
return builder.Finish(out);
}

template <int32_t byte_width>
static void random_decimals(int64_t n, uint32_t seed, int32_t precision, uint8_t* out) {
auto gen = ::arrow::random::RandomArrayGenerator(seed);
std::shared_ptr<Array> decimals;
int32_t byte_width = 0;
if (precision <= ::arrow::Decimal128Type::kMaxPrecision) {
if constexpr (byte_width == 16) {
decimals = gen.Decimal128(::arrow::decimal128(precision, 0), n);
byte_width = ::arrow::Decimal128Type::kByteWidth;
} else {
decimals = gen.Decimal256(::arrow::decimal256(precision, 0), n);
byte_width = ::arrow::Decimal256Type::kByteWidth;
}
std::memcpy(out, decimals->data()->GetValues<uint8_t>(1, 0), byte_width * n);
}
Expand All @@ -158,7 +156,8 @@ NonNullArray(size_t size, std::shared_ptr<Array>* out) {
constexpr int32_t seed = 0;

ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width));
random_decimals(size, seed, kDecimalPrecision, out_buf->mutable_data());
random_decimals<::arrow::Decimal128Type::kByteWidth>(size, seed, kDecimalPrecision,
out_buf->mutable_data());

RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size));
return builder.Finish(out);
Expand All @@ -179,7 +178,8 @@ NonNullArray(size_t size, std::shared_ptr<Array>* out) {
constexpr int32_t seed = 0;

ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width));
random_decimals(size, seed, kDecimalPrecision, out_buf->mutable_data());
random_decimals<::arrow::Decimal256Type::kByteWidth>(size, seed, kDecimalPrecision,
out_buf->mutable_data());

RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size));
return builder.Finish(out);
Expand Down Expand Up @@ -341,7 +341,8 @@ NullableArray(size_t size, size_t num_nulls, uint32_t seed,

ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width));

random_decimals(size, seed, precision, out_buf->mutable_data());
random_decimals<::arrow::Decimal128Type::kByteWidth>(size, seed, precision,
out_buf->mutable_data());

::arrow::Decimal128Builder builder(type);
RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size, valid_bytes.data()));
Expand All @@ -367,7 +368,8 @@ NullableArray(size_t size, size_t num_nulls, uint32_t seed,

ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width));

random_decimals(size, seed, precision, out_buf->mutable_data());
random_decimals<::arrow::Decimal256Type::kByteWidth>(size, seed, precision,
out_buf->mutable_data());

::arrow::Decimal256Builder builder(type);
RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size, valid_bytes.data()));
Expand Down
53 changes: 51 additions & 2 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <cstring>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>

Expand All @@ -41,6 +40,7 @@
#include "arrow/util/endian.h"
#include "arrow/util/logging.h"
#include "arrow/util/rle_encoding.h"
#include "arrow/util/type_traits.h"
#include "arrow/visit_array_inline.h"
#include "parquet/column_page.h"
#include "parquet/encoding.h"
Expand Down Expand Up @@ -1658,6 +1658,47 @@ struct SerializeFunctor<Int32Type, ::arrow::Date64Type> {
}
};

template <typename ParquetType, typename ArrowType>
struct SerializeFunctor<
ParquetType, ArrowType,
::arrow::enable_if_t<::arrow::is_decimal_type<ArrowType>::value&& ::arrow::internal::
IsOneOf<ParquetType, Int32Type, Int64Type>::value>> {
using value_type = typename ParquetType::c_type;

Status Serialize(const typename ::arrow::TypeTraits<ArrowType>::ArrayType& array,
ArrowWriteContext* ctx, value_type* out) {
if (array.null_count() == 0) {
for (int64_t i = 0; i < array.length(); i++) {
out[i] = TransferValue<ArrowType::kByteWidth>(array.Value(i));
}
} else {
for (int64_t i = 0; i < array.length(); i++) {
out[i] =
array.IsValid(i) ? TransferValue<ArrowType::kByteWidth>(array.Value(i)) : 0;
}
}

return Status::OK();
}

template <int byte_width>
value_type TransferValue(const uint8_t* in) const {
static_assert(byte_width == 16 || byte_width == 32,
"only 16 and 32 byte Decimals supported");
value_type value = 0;
if constexpr (byte_width == 16) {
::arrow::Decimal128 decimal_value(in);
PARQUET_THROW_NOT_OK(decimal_value.ToInteger(&value));
} else {
::arrow::Decimal256 decimal_value(in);
// Decimal256 does not provide ToInteger, but we are sure it fits in the target
// integer type.
value = static_cast<value_type>(decimal_value.low_bits());
}
return value;
}
};

template <>
struct SerializeFunctor<Int32Type, ::arrow::Time32Type> {
Status Serialize(const ::arrow::Time32Array& array, ArrowWriteContext*, int32_t* out) {
Expand Down Expand Up @@ -1691,6 +1732,8 @@ Status TypedColumnWriterImpl<Int32Type>::WriteArrowDense(
WRITE_ZERO_COPY_CASE(DATE32, Date32Type, Int32Type)
WRITE_SERIALIZE_CASE(DATE64, Date64Type, Int32Type)
WRITE_SERIALIZE_CASE(TIME32, Time32Type, Int32Type)
WRITE_SERIALIZE_CASE(DECIMAL128, Decimal128Type, Int32Type)
WRITE_SERIALIZE_CASE(DECIMAL256, Decimal256Type, Int32Type)
default:
ARROW_UNSUPPORTED()
}
Expand Down Expand Up @@ -1861,6 +1904,8 @@ Status TypedColumnWriterImpl<Int64Type>::WriteArrowDense(
WRITE_SERIALIZE_CASE(UINT64, UInt64Type, Int64Type)
WRITE_ZERO_COPY_CASE(TIME64, Time64Type, Int64Type)
WRITE_ZERO_COPY_CASE(DURATION, DurationType, Int64Type)
WRITE_SERIALIZE_CASE(DECIMAL128, Decimal128Type, Int64Type)
WRITE_SERIALIZE_CASE(DECIMAL256, Decimal256Type, Int64Type)
default:
ARROW_UNSUPPORTED();
}
Expand Down Expand Up @@ -1979,7 +2024,11 @@ struct SerializeFunctor<
// Requires a custom serializer because decimal in parquet are in big-endian
// format. Thus, a temporary local buffer is required.
template <typename ParquetType, typename ArrowType>
struct SerializeFunctor<ParquetType, ArrowType, ::arrow::enable_if_decimal<ArrowType>> {
struct SerializeFunctor<
ParquetType, ArrowType,
::arrow::enable_if_t<
::arrow::is_decimal_type<ArrowType>::value &&
!::arrow::internal::IsOneOf<ParquetType, Int32Type, Int64Type>::value>> {
Status Serialize(const typename ::arrow::TypeTraits<ArrowType>::ArrayType& array,
ArrowWriteContext* ctx, FLBA* out) {
AllocateScratch(array, ctx);
Expand Down
29 changes: 26 additions & 3 deletions cpp/src/parquet/properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ class PARQUET_EXPORT WriterProperties {
pagesize_(kDefaultDataPageSize),
version_(ParquetVersion::PARQUET_2_4),
data_page_version_(ParquetDataPageVersion::V1),
created_by_(DEFAULT_CREATED_BY) {}
created_by_(DEFAULT_CREATED_BY),
integer_annotate_decimal_(false) {}
virtual ~Builder() {}

/// Specify the memory pool for the writer. Default default_memory_pool.
Expand Down Expand Up @@ -450,6 +451,22 @@ class PARQUET_EXPORT WriterProperties {
return this->disable_statistics(path->ToDotString());
}

/// Enable integer type to annotate decimal type as below:
/// int32: 1 <= precision <= 9
/// int64: 10 <= precision <= 18
/// Default disabled.
Builder* enable_integer_annotate_decimal() {
integer_annotate_decimal_ = true;
return this;
}

/// Disable integer type to annotate decimal type.
/// Default disabled.
Builder* disable_integer_annotate_decimal() {
integer_annotate_decimal_ = false;
return this;
}

/// \brief Build the WriterProperties with the builder parameters.
/// \return The WriterProperties defined by the builder.
std::shared_ptr<WriterProperties> build() {
Expand All @@ -474,7 +491,8 @@ class PARQUET_EXPORT WriterProperties {
return std::shared_ptr<WriterProperties>(new WriterProperties(
pool_, dictionary_pagesize_limit_, write_batch_size_, max_row_group_length_,
pagesize_, version_, created_by_, std::move(file_encryption_properties_),
default_column_properties_, column_properties, data_page_version_));
default_column_properties_, column_properties, data_page_version_,
integer_annotate_decimal_));
}

private:
Expand All @@ -486,6 +504,7 @@ class PARQUET_EXPORT WriterProperties {
ParquetVersion::type version_;
ParquetDataPageVersion data_page_version_;
std::string created_by_;
bool integer_annotate_decimal_;

std::shared_ptr<FileEncryptionProperties> file_encryption_properties_;

Expand Down Expand Up @@ -516,6 +535,8 @@ class PARQUET_EXPORT WriterProperties {

inline std::string created_by() const { return parquet_created_by_; }

inline bool integer_annotate_decimal() const { return integer_annotate_decimal_; }

inline Encoding::type dictionary_index_encoding() const {
if (parquet_version_ == ParquetVersion::PARQUET_1_0) {
return Encoding::PLAIN_DICTIONARY;
Expand Down Expand Up @@ -584,7 +605,7 @@ class PARQUET_EXPORT WriterProperties {
std::shared_ptr<FileEncryptionProperties> file_encryption_properties,
const ColumnProperties& default_column_properties,
const std::unordered_map<std::string, ColumnProperties>& column_properties,
ParquetDataPageVersion data_page_version)
ParquetDataPageVersion data_page_version, bool integer_annotate_decimal)
: pool_(pool),
dictionary_pagesize_limit_(dictionary_pagesize_limit),
write_batch_size_(write_batch_size),
Expand All @@ -593,6 +614,7 @@ class PARQUET_EXPORT WriterProperties {
parquet_data_page_version_(data_page_version),
parquet_version_(version),
parquet_created_by_(created_by),
integer_annotate_decimal_(integer_annotate_decimal),
file_encryption_properties_(file_encryption_properties),
default_column_properties_(default_column_properties),
column_properties_(column_properties) {}
Expand All @@ -605,6 +627,7 @@ class PARQUET_EXPORT WriterProperties {
ParquetDataPageVersion parquet_data_page_version_;
ParquetVersion::type parquet_version_;
std::string parquet_created_by_;
bool integer_annotate_decimal_;

std::shared_ptr<FileEncryptionProperties> file_encryption_properties_;

Expand Down

0 comments on commit b4241ea

Please sign in to comment.