Skip to content

Commit

Permalink
GH-15239: [C++][Parquet] Parquet writer writes decimal as int32/64 (#…
Browse files Browse the repository at this point in the history
…15244)

As the parquet [specs](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#decimal) states, DECIMAL can be used to annotate the following types:
- int32: for 1 <= precision <= 9
- int64: for 1 <= precision <= 18; precision < 10 will produce a warning
- fixed_len_byte_array: precision is limited by the array size. Length n can store <= floor(log_10(2^(8*n - 1) - 1)) base-10 digits
- binary: precision is not limited, but is required. The minimum number of bytes to store the unscaled value should be used.

The aim of this patch is to provide a writer option to use int32 to annotate decimal when 1 <= precision <= 9 and int64 when 10 <= precision <= 18.
* Closes: #15239

Authored-by: Gang Wu <ustcwg@gmail.com>
Signed-off-by: Will Jones <willjones127@gmail.com>
  • Loading branch information
wgtmac authored Jan 11, 2023
1 parent 0da51b7 commit 7d17a5b
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 32 deletions.
98 changes: 97 additions & 1 deletion cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

#include <cstdint>
#include <functional>
#include <iostream>
#include <sstream>
#include <vector>

Expand Down Expand Up @@ -3432,6 +3431,25 @@ TEST(ArrowReadWrite, NestedRequiredOuterOptionalDecimal) {
}
}

TEST(ArrowReadWrite, Decimal256AsInt) {
using ::arrow::Decimal256;
using ::arrow::field;

auto type = ::arrow::decimal256(8, 4);

const char* json = R"(["1.0000", null, "-1.2345", "-1000.5678",
"-9999.9999", "9999.9999"])";
auto array = ::arrow::ArrayFromJSON(type, json);
auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array});

parquet::WriterProperties::Builder builder;
// Enforce integer type to annotate decimal type
auto writer_properties = builder.enable_integer_annotate_decimal()->build();
auto props_store_schema = ArrowWriterProperties::Builder().store_schema()->build();

CheckConfiguredRoundtrip(table, table, writer_properties, props_store_schema);
}

class TestNestedSchemaRead : public ::testing::TestWithParam<Repetition::type> {
protected:
// make it *3 to make it easily divisible by 3
Expand Down Expand Up @@ -4796,5 +4814,83 @@ std::vector<NestedFilterTestCase> GenerateMapFilteredTestCases() {
INSTANTIATE_TEST_SUITE_P(MapFilteredReads, TestNestedSchemaFilteredReader,
::testing::ValuesIn(GenerateMapFilteredTestCases()));

template <typename TestType>
class TestIntegerAnnotateDecimalTypeParquetIO : public TestParquetIO<TestType> {
public:
void WriteColumn(const std::shared_ptr<Array>& values) {
auto arrow_schema = ::arrow::schema({::arrow::field("a", values->type())});

parquet::WriterProperties::Builder builder;
// Enforce integer type to annotate decimal type
auto writer_properties = builder.enable_integer_annotate_decimal()->build();
std::shared_ptr<SchemaDescriptor> parquet_schema;
ASSERT_OK_NO_THROW(ToParquetSchema(arrow_schema.get(), *writer_properties,
*default_arrow_writer_properties(),
&parquet_schema));

this->sink_ = CreateOutputStream();
auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());

std::unique_ptr<FileWriter> writer;
ASSERT_OK_NO_THROW(FileWriter::Make(
::arrow::default_memory_pool(),
ParquetFileWriter::Open(this->sink_, schema_node, writer_properties),
arrow_schema, default_arrow_writer_properties(), &writer));
ASSERT_OK_NO_THROW(writer->NewRowGroup(values->length()));
ASSERT_OK_NO_THROW(writer->WriteColumnChunk(*values));
ASSERT_OK_NO_THROW(writer->Close());
}

void ReadAndCheckSingleDecimalColumnFile(const Array& values) {
std::shared_ptr<Array> out;
std::unique_ptr<FileReader> reader;
this->ReaderFromSink(&reader);
this->ReadSingleColumnFile(std::move(reader), &out);

// Reader always read values as DECIMAL128 type
ASSERT_EQ(out->type()->id(), ::arrow::Type::DECIMAL128);

if (values.type()->id() == ::arrow::Type::DECIMAL128) {
AssertArraysEqual(values, *out);
} else {
auto& expected_values = dynamic_cast<const ::arrow::Decimal256Array&>(values);
auto& read_values = dynamic_cast<const ::arrow::Decimal128Array&>(*out);
ASSERT_EQ(expected_values.length(), read_values.length());
ASSERT_EQ(expected_values.null_count(), read_values.null_count());
ASSERT_EQ(expected_values.length(), read_values.length());
for (int64_t i = 0; i < expected_values.length(); ++i) {
ASSERT_EQ(expected_values.IsNull(i), read_values.IsNull(i));
if (!expected_values.IsNull(i)) {
ASSERT_EQ(::arrow::Decimal256(expected_values.Value(i)).ToString(0),
::arrow::Decimal128(read_values.Value(i)).ToString(0));
}
}
}
}
};

typedef ::testing::Types<
DecimalWithPrecisionAndScale<1>, DecimalWithPrecisionAndScale<5>,
DecimalWithPrecisionAndScale<10>, DecimalWithPrecisionAndScale<18>,
Decimal256WithPrecisionAndScale<1>, Decimal256WithPrecisionAndScale<5>,
Decimal256WithPrecisionAndScale<10>, Decimal256WithPrecisionAndScale<18>>
DecimalTestTypes;

TYPED_TEST_SUITE(TestIntegerAnnotateDecimalTypeParquetIO, DecimalTestTypes);

TYPED_TEST(TestIntegerAnnotateDecimalTypeParquetIO, SingleNonNullableDecimalColumn) {
std::shared_ptr<Array> values;
ASSERT_OK(NonNullArray<TypeParam>(SMALL_SIZE, &values));
ASSERT_NO_FATAL_FAILURE(this->WriteColumn(values));
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleDecimalColumnFile(*values));
}

TYPED_TEST(TestIntegerAnnotateDecimalTypeParquetIO, SingleNullableDecimalColumn) {
std::shared_ptr<Array> values;
ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, SMALL_SIZE / 2, kDefaultSeed, &values));
ASSERT_NO_FATAL_FAILURE(this->WriteColumn(values));
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleDecimalColumnFile(*values));
}

} // namespace arrow
} // namespace parquet
41 changes: 25 additions & 16 deletions cpp/src/parquet/arrow/reader_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -638,20 +638,15 @@ struct DecimalConverter<DecimalArrayType, ByteArrayType> {
/// small enough to fit in less 4 bytes or less than 8 bytes, respectively.
/// This function implements the conversion from int32 and int64 arrays to decimal arrays.
template <
typename ParquetIntegerType,
typename DecimalArrayType, typename ParquetIntegerType,
typename = ::arrow::enable_if_t<std::is_same<ParquetIntegerType, Int32Type>::value ||
std::is_same<ParquetIntegerType, Int64Type>::value>>
static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool,
const std::shared_ptr<Field>& field, Datum* out) {
// Decimal128 and Decimal256 are only Arrow constructs. Parquet does not
// specifically distinguish between decimal byte widths.
// Decimal256 isn't relevant here because the Arrow-Parquet C++ bindings never
// write Decimal values as integers and if the decimal value can fit in an
// integer it is wasteful to use Decimal256. Put another way, the only
// way an integer column could be construed as Decimal256 is if an arrow
// schema was stored as metadata in the file indicating the column was
// Decimal256. The current Arrow-Parquet C++ bindings will never do this.
DCHECK(field->type()->id() == ::arrow::Type::DECIMAL128);
DCHECK(field->type()->id() == ::arrow::Type::DECIMAL128 ||
field->type()->id() == ::arrow::Type::DECIMAL256);

const int64_t length = reader->values_written();

Expand All @@ -674,16 +669,21 @@ static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool,
// sign/zero extend int32_t values, otherwise a no-op
const auto value = static_cast<int64_t>(values[i]);

::arrow::Decimal128 decimal(value);
decimal.ToBytes(out_ptr);
if constexpr (std::is_same_v<DecimalArrayType, Decimal128Array>) {
::arrow::Decimal128 decimal(value);
decimal.ToBytes(out_ptr);
} else {
::arrow::Decimal256 decimal(value);
decimal.ToBytes(out_ptr);
}
}

if (reader->nullable_values() && field->nullable()) {
std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
*out = std::make_shared<Decimal128Array>(field->type(), length, std::move(data),
is_valid, reader->null_count());
*out = std::make_shared<DecimalArrayType>(field->type(), length, std::move(data),
is_valid, reader->null_count());
} else {
*out = std::make_shared<Decimal128Array>(field->type(), length, std::move(data));
*out = std::make_shared<DecimalArrayType>(field->type(), length, std::move(data));
}
return Status::OK();
}
Expand Down Expand Up @@ -776,11 +776,11 @@ Status TransferColumnData(RecordReader* reader, const std::shared_ptr<Field>& va
case ::arrow::Type::DECIMAL128: {
switch (descr->physical_type()) {
case ::parquet::Type::INT32: {
auto fn = DecimalIntegerTransfer<Int32Type>;
auto fn = DecimalIntegerTransfer<Decimal128Array, Int32Type>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::INT64: {
auto fn = &DecimalIntegerTransfer<Int64Type>;
auto fn = &DecimalIntegerTransfer<Decimal128Array, Int64Type>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::BYTE_ARRAY: {
Expand All @@ -799,6 +799,14 @@ Status TransferColumnData(RecordReader* reader, const std::shared_ptr<Field>& va
} break;
case ::arrow::Type::DECIMAL256:
switch (descr->physical_type()) {
case ::parquet::Type::INT32: {
auto fn = DecimalIntegerTransfer<Decimal256Array, Int32Type>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::INT64: {
auto fn = &DecimalIntegerTransfer<Decimal256Array, Int64Type>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::BYTE_ARRAY: {
auto fn = &TransferDecimal<Decimal256Array, ByteArrayType>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
Expand All @@ -809,7 +817,8 @@ Status TransferColumnData(RecordReader* reader, const std::shared_ptr<Field>& va
} break;
default:
return Status::Invalid(
"Physical type for decimal256 must be fixed length binary");
"Physical type for decimal256 must be int32, int64, byte array, or fixed "
"length binary");
}
break;

Expand Down
8 changes: 6 additions & 2 deletions cpp/src/parquet/arrow/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,15 @@ Status FieldToNode(const std::string& name, const std::shared_ptr<Field>& field,
} break;
case ArrowTypeId::DECIMAL128:
case ArrowTypeId::DECIMAL256: {
type = ParquetType::FIXED_LEN_BYTE_ARRAY;
const auto& decimal_type = static_cast<const ::arrow::DecimalType&>(*field->type());
precision = decimal_type.precision();
scale = decimal_type.scale();
length = DecimalType::DecimalSize(precision);
if (properties.integer_annotate_decimal() && 1 <= precision && precision <= 18) {
type = precision <= 9 ? ParquetType ::INT32 : ParquetType ::INT64;
} else {
type = ParquetType::FIXED_LEN_BYTE_ARRAY;
length = DecimalType::DecimalSize(precision);
}
PARQUET_CATCH_NOT_OK(logical_type = LogicalType::Decimal(precision, scale));
} break;
case ArrowTypeId::DATE32:
Expand Down
18 changes: 10 additions & 8 deletions cpp/src/parquet/arrow/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,14 @@ ::arrow::enable_if_fixed_size_binary<ArrowType, Status> NonNullArray(
return builder.Finish(out);
}

template <int32_t byte_width>
static void random_decimals(int64_t n, uint32_t seed, int32_t precision, uint8_t* out) {
auto gen = ::arrow::random::RandomArrayGenerator(seed);
std::shared_ptr<Array> decimals;
int32_t byte_width = 0;
if (precision <= ::arrow::Decimal128Type::kMaxPrecision) {
if constexpr (byte_width == 16) {
decimals = gen.Decimal128(::arrow::decimal128(precision, 0), n);
byte_width = ::arrow::Decimal128Type::kByteWidth;
} else {
decimals = gen.Decimal256(::arrow::decimal256(precision, 0), n);
byte_width = ::arrow::Decimal256Type::kByteWidth;
}
std::memcpy(out, decimals->data()->GetValues<uint8_t>(1, 0), byte_width * n);
}
Expand All @@ -158,7 +156,8 @@ NonNullArray(size_t size, std::shared_ptr<Array>* out) {
constexpr int32_t seed = 0;

ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width));
random_decimals(size, seed, kDecimalPrecision, out_buf->mutable_data());
random_decimals<::arrow::Decimal128Type::kByteWidth>(size, seed, kDecimalPrecision,
out_buf->mutable_data());

RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size));
return builder.Finish(out);
Expand All @@ -179,7 +178,8 @@ NonNullArray(size_t size, std::shared_ptr<Array>* out) {
constexpr int32_t seed = 0;

ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width));
random_decimals(size, seed, kDecimalPrecision, out_buf->mutable_data());
random_decimals<::arrow::Decimal256Type::kByteWidth>(size, seed, kDecimalPrecision,
out_buf->mutable_data());

RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size));
return builder.Finish(out);
Expand Down Expand Up @@ -341,7 +341,8 @@ NullableArray(size_t size, size_t num_nulls, uint32_t seed,

ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width));

random_decimals(size, seed, precision, out_buf->mutable_data());
random_decimals<::arrow::Decimal128Type::kByteWidth>(size, seed, precision,
out_buf->mutable_data());

::arrow::Decimal128Builder builder(type);
RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size, valid_bytes.data()));
Expand All @@ -367,7 +368,8 @@ NullableArray(size_t size, size_t num_nulls, uint32_t seed,

ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width));

random_decimals(size, seed, precision, out_buf->mutable_data());
random_decimals<::arrow::Decimal256Type::kByteWidth>(size, seed, precision,
out_buf->mutable_data());

::arrow::Decimal256Builder builder(type);
RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size, valid_bytes.data()));
Expand Down
53 changes: 51 additions & 2 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <cstring>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>

Expand All @@ -41,6 +40,7 @@
#include "arrow/util/endian.h"
#include "arrow/util/logging.h"
#include "arrow/util/rle_encoding.h"
#include "arrow/util/type_traits.h"
#include "arrow/visit_array_inline.h"
#include "parquet/column_page.h"
#include "parquet/encoding.h"
Expand Down Expand Up @@ -1676,6 +1676,47 @@ struct SerializeFunctor<Int32Type, ::arrow::Date64Type> {
}
};

template <typename ParquetType, typename ArrowType>
struct SerializeFunctor<
ParquetType, ArrowType,
::arrow::enable_if_t<::arrow::is_decimal_type<ArrowType>::value&& ::arrow::internal::
IsOneOf<ParquetType, Int32Type, Int64Type>::value>> {
using value_type = typename ParquetType::c_type;

Status Serialize(const typename ::arrow::TypeTraits<ArrowType>::ArrayType& array,
ArrowWriteContext* ctx, value_type* out) {
if (array.null_count() == 0) {
for (int64_t i = 0; i < array.length(); i++) {
out[i] = TransferValue<ArrowType::kByteWidth>(array.Value(i));
}
} else {
for (int64_t i = 0; i < array.length(); i++) {
out[i] =
array.IsValid(i) ? TransferValue<ArrowType::kByteWidth>(array.Value(i)) : 0;
}
}

return Status::OK();
}

template <int byte_width>
value_type TransferValue(const uint8_t* in) const {
static_assert(byte_width == 16 || byte_width == 32,
"only 16 and 32 byte Decimals supported");
value_type value = 0;
if constexpr (byte_width == 16) {
::arrow::Decimal128 decimal_value(in);
PARQUET_THROW_NOT_OK(decimal_value.ToInteger(&value));
} else {
::arrow::Decimal256 decimal_value(in);
// Decimal256 does not provide ToInteger, but we are sure it fits in the target
// integer type.
value = static_cast<value_type>(decimal_value.low_bits());
}
return value;
}
};

template <>
struct SerializeFunctor<Int32Type, ::arrow::Time32Type> {
Status Serialize(const ::arrow::Time32Array& array, ArrowWriteContext*, int32_t* out) {
Expand Down Expand Up @@ -1709,6 +1750,8 @@ Status TypedColumnWriterImpl<Int32Type>::WriteArrowDense(
WRITE_ZERO_COPY_CASE(DATE32, Date32Type, Int32Type)
WRITE_SERIALIZE_CASE(DATE64, Date64Type, Int32Type)
WRITE_SERIALIZE_CASE(TIME32, Time32Type, Int32Type)
WRITE_SERIALIZE_CASE(DECIMAL128, Decimal128Type, Int32Type)
WRITE_SERIALIZE_CASE(DECIMAL256, Decimal256Type, Int32Type)
default:
ARROW_UNSUPPORTED()
}
Expand Down Expand Up @@ -1879,6 +1922,8 @@ Status TypedColumnWriterImpl<Int64Type>::WriteArrowDense(
WRITE_SERIALIZE_CASE(UINT64, UInt64Type, Int64Type)
WRITE_ZERO_COPY_CASE(TIME64, Time64Type, Int64Type)
WRITE_ZERO_COPY_CASE(DURATION, DurationType, Int64Type)
WRITE_SERIALIZE_CASE(DECIMAL128, Decimal128Type, Int64Type)
WRITE_SERIALIZE_CASE(DECIMAL256, Decimal256Type, Int64Type)
default:
ARROW_UNSUPPORTED();
}
Expand Down Expand Up @@ -1997,7 +2042,11 @@ struct SerializeFunctor<
// Requires a custom serializer because decimal in parquet are in big-endian
// format. Thus, a temporary local buffer is required.
template <typename ParquetType, typename ArrowType>
struct SerializeFunctor<ParquetType, ArrowType, ::arrow::enable_if_decimal<ArrowType>> {
struct SerializeFunctor<
ParquetType, ArrowType,
::arrow::enable_if_t<
::arrow::is_decimal_type<ArrowType>::value &&
!::arrow::internal::IsOneOf<ParquetType, Int32Type, Int64Type>::value>> {
Status Serialize(const typename ::arrow::TypeTraits<ArrowType>::ArrayType& array,
ArrowWriteContext* ctx, FLBA* out) {
AllocateScratch(array, ctx);
Expand Down
Loading

0 comments on commit 7d17a5b

Please sign in to comment.