Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-15239: [C++][Parquet] Parquet writer writes decimal as int32/64 #15244

Merged
merged 3 commits into from
Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 97 additions & 1 deletion cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

#include <cstdint>
#include <functional>
#include <iostream>
#include <sstream>
#include <vector>

Expand Down Expand Up @@ -3431,6 +3430,25 @@ TEST(ArrowReadWrite, NestedRequiredOuterOptionalDecimal) {
}
}

TEST(ArrowReadWrite, Decimal256AsInt) {
using ::arrow::Decimal256;
using ::arrow::field;

auto type = ::arrow::decimal256(8, 4);

const char* json = R"(["1.0000", null, "-1.2345", "-1000.5678",
"-9999.9999", "9999.9999"])";
auto array = ::arrow::ArrayFromJSON(type, json);
auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array});

parquet::WriterProperties::Builder builder;
// Enforce integer type to annotate decimal type
auto writer_properties = builder.enable_integer_annotate_decimal()->build();
auto props_store_schema = ArrowWriterProperties::Builder().store_schema()->build();

CheckConfiguredRoundtrip(table, table, writer_properties, props_store_schema);
}

class TestNestedSchemaRead : public ::testing::TestWithParam<Repetition::type> {
protected:
// make it *3 to make it easily divisible by 3
Expand Down Expand Up @@ -4727,5 +4745,83 @@ std::vector<NestedFilterTestCase> GenerateMapFilteredTestCases() {
INSTANTIATE_TEST_SUITE_P(MapFilteredReads, TestNestedSchemaFilteredReader,
::testing::ValuesIn(GenerateMapFilteredTestCases()));

template <typename TestType>
class TestIntegerAnnotateDecimalTypeParquetIO : public TestParquetIO<TestType> {
public:
void WriteColumn(const std::shared_ptr<Array>& values) {
auto arrow_schema = ::arrow::schema({::arrow::field("a", values->type())});

parquet::WriterProperties::Builder builder;
// Enforce integer type to annotate decimal type
auto writer_properties = builder.enable_integer_annotate_decimal()->build();
std::shared_ptr<SchemaDescriptor> parquet_schema;
ASSERT_OK_NO_THROW(ToParquetSchema(arrow_schema.get(), *writer_properties,
*default_arrow_writer_properties(),
&parquet_schema));

this->sink_ = CreateOutputStream();
auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());

std::unique_ptr<FileWriter> writer;
ASSERT_OK_NO_THROW(FileWriter::Make(
::arrow::default_memory_pool(),
ParquetFileWriter::Open(this->sink_, schema_node, writer_properties),
arrow_schema, default_arrow_writer_properties(), &writer));
ASSERT_OK_NO_THROW(writer->NewRowGroup(values->length()));
ASSERT_OK_NO_THROW(writer->WriteColumnChunk(*values));
ASSERT_OK_NO_THROW(writer->Close());
}

void ReadAndCheckSingleDecimalColumnFile(const Array& values) {
std::shared_ptr<Array> out;
std::unique_ptr<FileReader> reader;
this->ReaderFromSink(&reader);
this->ReadSingleColumnFile(std::move(reader), &out);

// Reader always read values as DECIMAL128 type
ASSERT_EQ(out->type()->id(), ::arrow::Type::DECIMAL128);

if (values.type()->id() == ::arrow::Type::DECIMAL128) {
AssertArraysEqual(values, *out);
} else {
auto& expected_values = dynamic_cast<const ::arrow::Decimal256Array&>(values);
auto& read_values = dynamic_cast<const ::arrow::Decimal128Array&>(*out);
ASSERT_EQ(expected_values.length(), read_values.length());
ASSERT_EQ(expected_values.null_count(), read_values.null_count());
ASSERT_EQ(expected_values.length(), read_values.length());
for (int64_t i = 0; i < expected_values.length(); ++i) {
ASSERT_EQ(expected_values.IsNull(i), read_values.IsNull(i));
if (!expected_values.IsNull(i)) {
ASSERT_EQ(::arrow::Decimal256(expected_values.Value(i)).ToString(0),
::arrow::Decimal128(read_values.Value(i)).ToString(0));
}
}
}
}
};

typedef ::testing::Types<
DecimalWithPrecisionAndScale<1>, DecimalWithPrecisionAndScale<5>,
DecimalWithPrecisionAndScale<10>, DecimalWithPrecisionAndScale<18>,
Decimal256WithPrecisionAndScale<1>, Decimal256WithPrecisionAndScale<5>,
Decimal256WithPrecisionAndScale<10>, Decimal256WithPrecisionAndScale<18>>
DecimalTestTypes;

TYPED_TEST_SUITE(TestIntegerAnnotateDecimalTypeParquetIO, DecimalTestTypes);

TYPED_TEST(TestIntegerAnnotateDecimalTypeParquetIO, SingleNonNullableDecimalColumn) {
std::shared_ptr<Array> values;
ASSERT_OK(NonNullArray<TypeParam>(SMALL_SIZE, &values));
ASSERT_NO_FATAL_FAILURE(this->WriteColumn(values));
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleDecimalColumnFile(*values));
}

TYPED_TEST(TestIntegerAnnotateDecimalTypeParquetIO, SingleNullableDecimalColumn) {
std::shared_ptr<Array> values;
ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, SMALL_SIZE / 2, kDefaultSeed, &values));
ASSERT_NO_FATAL_FAILURE(this->WriteColumn(values));
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleDecimalColumnFile(*values));
}

} // namespace arrow
} // namespace parquet
41 changes: 25 additions & 16 deletions cpp/src/parquet/arrow/reader_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -638,20 +638,15 @@ struct DecimalConverter<DecimalArrayType, ByteArrayType> {
/// small enough to fit in less 4 bytes or less than 8 bytes, respectively.
/// This function implements the conversion from int32 and int64 arrays to decimal arrays.
template <
typename ParquetIntegerType,
typename DecimalArrayType, typename ParquetIntegerType,
typename = ::arrow::enable_if_t<std::is_same<ParquetIntegerType, Int32Type>::value ||
std::is_same<ParquetIntegerType, Int64Type>::value>>
static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool,
const std::shared_ptr<Field>& field, Datum* out) {
// Decimal128 and Decimal256 are only Arrow constructs. Parquet does not
// specifically distinguish between decimal byte widths.
// Decimal256 isn't relevant here because the Arrow-Parquet C++ bindings never
// write Decimal values as integers and if the decimal value can fit in an
// integer it is wasteful to use Decimal256. Put another way, the only
// way an integer column could be construed as Decimal256 is if an arrow
// schema was stored as metadata in the file indicating the column was
// Decimal256. The current Arrow-Parquet C++ bindings will never do this.
DCHECK(field->type()->id() == ::arrow::Type::DECIMAL128);
DCHECK(field->type()->id() == ::arrow::Type::DECIMAL128 ||
field->type()->id() == ::arrow::Type::DECIMAL256);

const int64_t length = reader->values_written();

Expand All @@ -674,16 +669,21 @@ static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool* pool,
// sign/zero extend int32_t values, otherwise a no-op
const auto value = static_cast<int64_t>(values[i]);

::arrow::Decimal128 decimal(value);
decimal.ToBytes(out_ptr);
if constexpr (std::is_same_v<DecimalArrayType, Decimal128Array>) {
::arrow::Decimal128 decimal(value);
decimal.ToBytes(out_ptr);
} else {
::arrow::Decimal256 decimal(value);
decimal.ToBytes(out_ptr);
}
}

if (reader->nullable_values() && field->nullable()) {
std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
*out = std::make_shared<Decimal128Array>(field->type(), length, std::move(data),
is_valid, reader->null_count());
*out = std::make_shared<DecimalArrayType>(field->type(), length, std::move(data),
is_valid, reader->null_count());
} else {
*out = std::make_shared<Decimal128Array>(field->type(), length, std::move(data));
*out = std::make_shared<DecimalArrayType>(field->type(), length, std::move(data));
}
return Status::OK();
}
Expand Down Expand Up @@ -776,11 +776,11 @@ Status TransferColumnData(RecordReader* reader, const std::shared_ptr<Field>& va
case ::arrow::Type::DECIMAL128: {
switch (descr->physical_type()) {
case ::parquet::Type::INT32: {
auto fn = DecimalIntegerTransfer<Int32Type>;
auto fn = DecimalIntegerTransfer<Decimal128Array, Int32Type>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::INT64: {
auto fn = &DecimalIntegerTransfer<Int64Type>;
auto fn = &DecimalIntegerTransfer<Decimal128Array, Int64Type>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::BYTE_ARRAY: {
Expand All @@ -799,6 +799,14 @@ Status TransferColumnData(RecordReader* reader, const std::shared_ptr<Field>& va
} break;
case ::arrow::Type::DECIMAL256:
switch (descr->physical_type()) {
case ::parquet::Type::INT32: {
auto fn = DecimalIntegerTransfer<Decimal256Array, Int32Type>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::INT64: {
auto fn = &DecimalIntegerTransfer<Decimal256Array, Int64Type>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
} break;
case ::parquet::Type::BYTE_ARRAY: {
auto fn = &TransferDecimal<Decimal256Array, ByteArrayType>;
RETURN_NOT_OK(fn(reader, pool, value_field, &result));
Expand All @@ -809,7 +817,8 @@ Status TransferColumnData(RecordReader* reader, const std::shared_ptr<Field>& va
} break;
default:
return Status::Invalid(
"Physical type for decimal256 must be fixed length binary");
"Physical type for decimal256 must be int32, int64, byte array, or fixed "
"length binary");
}
break;

Expand Down
8 changes: 6 additions & 2 deletions cpp/src/parquet/arrow/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,15 @@ Status FieldToNode(const std::string& name, const std::shared_ptr<Field>& field,
} break;
case ArrowTypeId::DECIMAL128:
case ArrowTypeId::DECIMAL256: {
type = ParquetType::FIXED_LEN_BYTE_ARRAY;
const auto& decimal_type = static_cast<const ::arrow::DecimalType&>(*field->type());
precision = decimal_type.precision();
scale = decimal_type.scale();
length = DecimalType::DecimalSize(precision);
if (properties.integer_annotate_decimal() && 1 <= precision && precision <= 18) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does c++ arrow support converting int32/int64 to decimal128 and decimal256?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean by that. This PR is how the data is encoded in the buffers, not what logical type is saved in the Parquet file. When read back to Arrow it will always come out as a Decimal, so there's no need for conversion.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does c++ arrow support converting int32/int64 to decimal128 and decimal256?

decimal128/decimal256 support construction from integers. But the parquet itself does not use an integer array to represent decimal values.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks

type = precision <= 9 ? ParquetType ::INT32 : ParquetType ::INT64;
} else {
type = ParquetType::FIXED_LEN_BYTE_ARRAY;
length = DecimalType::DecimalSize(precision);
}
PARQUET_CATCH_NOT_OK(logical_type = LogicalType::Decimal(precision, scale));
} break;
case ArrowTypeId::DATE32:
Expand Down
18 changes: 10 additions & 8 deletions cpp/src/parquet/arrow/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,14 @@ ::arrow::enable_if_fixed_size_binary<ArrowType, Status> NonNullArray(
return builder.Finish(out);
}

template <int32_t byte_width>
static void random_decimals(int64_t n, uint32_t seed, int32_t precision, uint8_t* out) {
auto gen = ::arrow::random::RandomArrayGenerator(seed);
std::shared_ptr<Array> decimals;
int32_t byte_width = 0;
if (precision <= ::arrow::Decimal128Type::kMaxPrecision) {
if constexpr (byte_width == 16) {
decimals = gen.Decimal128(::arrow::decimal128(precision, 0), n);
byte_width = ::arrow::Decimal128Type::kByteWidth;
} else {
decimals = gen.Decimal256(::arrow::decimal256(precision, 0), n);
byte_width = ::arrow::Decimal256Type::kByteWidth;
}
std::memcpy(out, decimals->data()->GetValues<uint8_t>(1, 0), byte_width * n);
}
Expand All @@ -158,7 +156,8 @@ NonNullArray(size_t size, std::shared_ptr<Array>* out) {
constexpr int32_t seed = 0;

ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width));
random_decimals(size, seed, kDecimalPrecision, out_buf->mutable_data());
random_decimals<::arrow::Decimal128Type::kByteWidth>(size, seed, kDecimalPrecision,
out_buf->mutable_data());

RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size));
return builder.Finish(out);
Expand All @@ -179,7 +178,8 @@ NonNullArray(size_t size, std::shared_ptr<Array>* out) {
constexpr int32_t seed = 0;

ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width));
random_decimals(size, seed, kDecimalPrecision, out_buf->mutable_data());
random_decimals<::arrow::Decimal256Type::kByteWidth>(size, seed, kDecimalPrecision,
out_buf->mutable_data());

RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size));
return builder.Finish(out);
Expand Down Expand Up @@ -341,7 +341,8 @@ NullableArray(size_t size, size_t num_nulls, uint32_t seed,

ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width));

random_decimals(size, seed, precision, out_buf->mutable_data());
random_decimals<::arrow::Decimal128Type::kByteWidth>(size, seed, precision,
out_buf->mutable_data());

::arrow::Decimal128Builder builder(type);
RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size, valid_bytes.data()));
Expand All @@ -367,7 +368,8 @@ NullableArray(size_t size, size_t num_nulls, uint32_t seed,

ARROW_ASSIGN_OR_RAISE(auto out_buf, ::arrow::AllocateBuffer(size * byte_width));

random_decimals(size, seed, precision, out_buf->mutable_data());
random_decimals<::arrow::Decimal256Type::kByteWidth>(size, seed, precision,
out_buf->mutable_data());

::arrow::Decimal256Builder builder(type);
RETURN_NOT_OK(builder.AppendValues(out_buf->data(), size, valid_bytes.data()));
Expand Down
53 changes: 51 additions & 2 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <cstring>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>

Expand All @@ -41,6 +40,7 @@
#include "arrow/util/endian.h"
#include "arrow/util/logging.h"
#include "arrow/util/rle_encoding.h"
#include "arrow/util/type_traits.h"
#include "arrow/visit_array_inline.h"
#include "parquet/column_page.h"
#include "parquet/encoding.h"
Expand Down Expand Up @@ -1658,6 +1658,47 @@ struct SerializeFunctor<Int32Type, ::arrow::Date64Type> {
}
};

template <typename ParquetType, typename ArrowType>
struct SerializeFunctor<
ParquetType, ArrowType,
::arrow::enable_if_t<::arrow::is_decimal_type<ArrowType>::value&& ::arrow::internal::
IsOneOf<ParquetType, Int32Type, Int64Type>::value>> {
using value_type = typename ParquetType::c_type;

Status Serialize(const typename ::arrow::TypeTraits<ArrowType>::ArrayType& array,
ArrowWriteContext* ctx, value_type* out) {
if (array.null_count() == 0) {
for (int64_t i = 0; i < array.length(); i++) {
out[i] = TransferValue<ArrowType::kByteWidth>(array.Value(i));
}
} else {
for (int64_t i = 0; i < array.length(); i++) {
out[i] =
array.IsValid(i) ? TransferValue<ArrowType::kByteWidth>(array.Value(i)) : 0;
}
}

return Status::OK();
}

template <int byte_width>
value_type TransferValue(const uint8_t* in) const {
static_assert(byte_width == 16 || byte_width == 32,
"only 16 and 32 byte Decimals supported");
value_type value = 0;
if constexpr (byte_width == 16) {
::arrow::Decimal128 decimal_value(in);
PARQUET_THROW_NOT_OK(decimal_value.ToInteger(&value));
} else {
::arrow::Decimal256 decimal_value(in);
// Decimal256 does not provide ToInteger, but we are sure it fits in the target
// integer type.
value = static_cast<value_type>(decimal_value.low_bits());
}
return value;
}
};

template <>
struct SerializeFunctor<Int32Type, ::arrow::Time32Type> {
Status Serialize(const ::arrow::Time32Array& array, ArrowWriteContext*, int32_t* out) {
Expand Down Expand Up @@ -1691,6 +1732,8 @@ Status TypedColumnWriterImpl<Int32Type>::WriteArrowDense(
WRITE_ZERO_COPY_CASE(DATE32, Date32Type, Int32Type)
WRITE_SERIALIZE_CASE(DATE64, Date64Type, Int32Type)
WRITE_SERIALIZE_CASE(TIME32, Time32Type, Int32Type)
WRITE_SERIALIZE_CASE(DECIMAL128, Decimal128Type, Int32Type)
WRITE_SERIALIZE_CASE(DECIMAL256, Decimal256Type, Int32Type)
default:
ARROW_UNSUPPORTED()
}
Expand Down Expand Up @@ -1861,6 +1904,8 @@ Status TypedColumnWriterImpl<Int64Type>::WriteArrowDense(
WRITE_SERIALIZE_CASE(UINT64, UInt64Type, Int64Type)
WRITE_ZERO_COPY_CASE(TIME64, Time64Type, Int64Type)
WRITE_ZERO_COPY_CASE(DURATION, DurationType, Int64Type)
WRITE_SERIALIZE_CASE(DECIMAL128, Decimal128Type, Int64Type)
WRITE_SERIALIZE_CASE(DECIMAL256, Decimal256Type, Int64Type)
default:
ARROW_UNSUPPORTED();
}
Expand Down Expand Up @@ -1979,7 +2024,11 @@ struct SerializeFunctor<
// Requires a custom serializer because decimal in parquet are in big-endian
// format. Thus, a temporary local buffer is required.
template <typename ParquetType, typename ArrowType>
struct SerializeFunctor<ParquetType, ArrowType, ::arrow::enable_if_decimal<ArrowType>> {
struct SerializeFunctor<
ParquetType, ArrowType,
::arrow::enable_if_t<
::arrow::is_decimal_type<ArrowType>::value &&
!::arrow::internal::IsOneOf<ParquetType, Int32Type, Int64Type>::value>> {
Status Serialize(const typename ::arrow::TypeTraits<ArrowType>::ArrayType& array,
ArrowWriteContext* ctx, FLBA* out) {
AllocateScratch(array, ctx);
Expand Down
Loading