From 475ad1a02d01f76015b8bce0b2e9f825c105a13d Mon Sep 17 00:00:00 2001 From: "Korn, Uwe" Date: Tue, 7 Mar 2017 15:21:51 -0500 Subject: [PATCH] PARQUET-890: Support I/O of DATE columns in parquet_arrow Also fixes a bug on reading INT96 timestamps. Author: Korn, Uwe Closes #266 from xhochy/PARQUET-890 and squashes the following commits: 8481c2c [Korn, Uwe] ninja lint 666d41b [Korn, Uwe] PARQUET-890: Support I/O of DATE columns in parquet_arrow Change-Id: I0c4db73db72211c5bc59f260692eafed21d3c5a8 --- .../parquet/arrow/arrow-reader-writer-test.cc | 13 ++- cpp/src/parquet/arrow/arrow-schema-test.cc | 11 +- cpp/src/parquet/arrow/reader.cc | 49 +++++++- cpp/src/parquet/arrow/schema.cc | 18 ++- cpp/src/parquet/arrow/test-util.h | 49 +++++++- cpp/src/parquet/arrow/writer.cc | 106 ++++++++++++------ 6 files changed, 200 insertions(+), 46 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc index 2dfdbd2de549a..a0a39f12aa9b9 100644 --- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc @@ -148,6 +148,15 @@ struct test_traits<::arrow::TimestampType> { const int64_t test_traits<::arrow::TimestampType>::value(14695634030000); +template <> +struct test_traits<::arrow::DateType> { + static constexpr ParquetType::type parquet_enum = ParquetType::INT32; + static constexpr LogicalType::type logical_enum = LogicalType::DATE; + static int64_t const value; +}; + +const int64_t test_traits<::arrow::DateType>::value(14688000000000); + template <> struct test_traits<::arrow::FloatType> { static constexpr ParquetType::type parquet_enum = ParquetType::FLOAT; @@ -309,8 +318,8 @@ class TestParquetIO : public ::testing::Test { typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type, - ::arrow::Int64Type, ::arrow::TimestampType, ::arrow::FloatType, ::arrow::DoubleType, - ::arrow::StringType, ::arrow::BinaryType> + ::arrow::Int64Type, ::arrow::TimestampType, ::arrow::DateType, ::arrow::FloatType, + ::arrow::DoubleType, ::arrow::StringType, ::arrow::BinaryType> TestTypes; TYPED_TEST_CASE(TestParquetIO, TestTypes); diff --git a/cpp/src/parquet/arrow/arrow-schema-test.cc b/cpp/src/parquet/arrow/arrow-schema-test.cc index 43e57d8e87e1e..8db792f28e1e6 100644 --- a/cpp/src/parquet/arrow/arrow-schema-test.cc +++ b/cpp/src/parquet/arrow/arrow-schema-test.cc @@ -98,6 +98,10 @@ TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) { ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS)); arrow_fields.push_back(std::make_shared("timestamp", TIMESTAMP_MS, false)); + parquet_fields.push_back(PrimitiveNode::Make( + "date", Repetition::REQUIRED, ParquetType::INT32, LogicalType::DATE)); + arrow_fields.push_back(std::make_shared("date", ::arrow::date(), false)); + parquet_fields.push_back( PrimitiveNode::Make("timestamp96", Repetition::REQUIRED, ParquetType::INT96)); arrow_fields.push_back(std::make_shared("timestamp96", TIMESTAMP_NS, false)); @@ -339,9 +343,6 @@ TEST_F(TestConvertParquetSchema, ParquetLists) { TEST_F(TestConvertParquetSchema, UnsupportedThings) { std::vector unsupported_nodes; - unsupported_nodes.push_back(PrimitiveNode::Make( - "int32", Repetition::OPTIONAL, ParquetType::INT32, LogicalType::DATE)); - for (const NodePtr& node : unsupported_nodes) { ASSERT_RAISES(NotImplemented, ConvertSchema({node})); } @@ -394,6 +395,10 @@ TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) { PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64)); arrow_fields.push_back(std::make_shared("int64", INT64, false)); + parquet_fields.push_back(PrimitiveNode::Make( + "date", Repetition::REQUIRED, ParquetType::INT32, LogicalType::DATE)); + arrow_fields.push_back(std::make_shared("date", ::arrow::date(), false)); + parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED, ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS)); arrow_fields.push_back(std::make_shared("timestamp", TIMESTAMP_MS, false)); diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index df34d4c65b7b3..73f6d87323a44 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -361,7 +361,7 @@ Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int96Typ PARQUET_CATCH_NOT_OK(*levels_read = reader->ReadBatch( values_to_read, nullptr, nullptr, values, &values_read)); - int64_t* out_ptr = reinterpret_cast(data_buffer_ptr_); + int64_t* out_ptr = reinterpret_cast(data_buffer_ptr_) + valid_bits_idx_; for (int64_t i = 0; i < values_read; i++) { *out_ptr++ = impala_timestamp_to_nanoseconds(values[i]); } @@ -370,6 +370,24 @@ Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int96Typ return Status::OK(); } +template <> +Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::DateType, Int32Type>( + TypedColumnReader* reader, int64_t values_to_read, int64_t* levels_read) { + RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(int32_t), false)); + auto values = reinterpret_cast(values_buffer_.mutable_data()); + int64_t values_read; + PARQUET_CATCH_NOT_OK(*levels_read = reader->ReadBatch( + values_to_read, nullptr, nullptr, values, &values_read)); + + int64_t* out_ptr = reinterpret_cast(data_buffer_ptr_) + valid_bits_idx_; + for (int64_t i = 0; i < values_read; i++) { + *out_ptr++ = static_cast(values[i]) * 86400000; + } + valid_bits_idx_ += values_read; + + return Status::OK(); +} + template <> Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::BooleanType, BooleanType>( TypedColumnReader* reader, int64_t values_to_read, @@ -463,6 +481,30 @@ Status ColumnReader::Impl::ReadNullableBatch<::arrow::TimestampType, Int96Type>( return Status::OK(); } +template <> +Status ColumnReader::Impl::ReadNullableBatch<::arrow::DateType, Int32Type>( + TypedColumnReader* reader, int16_t* def_levels, int16_t* rep_levels, + int64_t values_to_read, int64_t* levels_read, int64_t* values_read) { + RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(int32_t), false)); + auto values = reinterpret_cast(values_buffer_.mutable_data()); + int64_t null_count; + PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(values_to_read, def_levels, rep_levels, + values, valid_bits_ptr_, valid_bits_idx_, levels_read, values_read, &null_count)); + + auto data_ptr = reinterpret_cast(data_buffer_ptr_); + INIT_BITSET(valid_bits_ptr_, valid_bits_idx_); + for (int64_t i = 0; i < *values_read; i++) { + if (bitset_valid_bits_ptr_ & (1 << bit_offset_valid_bits_ptr_)) { + data_ptr[valid_bits_idx_ + i] = static_cast(values[i]) * 86400000; + } + READ_NEXT_BITSET(valid_bits_ptr_); + } + null_count_ += null_count; + valid_bits_idx_ += *values_read; + + return Status::OK(); +} + template <> Status ColumnReader::Impl::ReadNullableBatch<::arrow::BooleanType, BooleanType>( TypedColumnReader* reader, int16_t* def_levels, int16_t* rep_levels, @@ -843,6 +885,7 @@ Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr* out TYPED_BATCH_CASE(INT16, ::arrow::Int16Type, Int32Type) TYPED_BATCH_CASE(UINT32, ::arrow::UInt32Type, Int32Type) TYPED_BATCH_CASE(INT32, ::arrow::Int32Type, Int32Type) + TYPED_BATCH_CASE(DATE, ::arrow::DateType, Int32Type) TYPED_BATCH_CASE(UINT64, ::arrow::UInt64Type, Int64Type) TYPED_BATCH_CASE(INT64, ::arrow::Int64Type, Int64Type) TYPED_BATCH_CASE(FLOAT, ::arrow::FloatType, FloatType) @@ -865,7 +908,9 @@ Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr* out break; } default: - return Status::NotImplemented(field_->type->ToString()); + std::stringstream ss; + ss << "No support for reading columns of type " << field_->type->ToString(); + return Status::NotImplemented(ss.str()); } } diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index 65e3381c65051..0c336d959ef79 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -77,7 +77,10 @@ static Status FromFLBA(const PrimitiveNode* node, TypePtr* out) { *out = MakeDecimalType(node); break; default: - return Status::NotImplemented("unhandled type"); + std::stringstream ss; + ss << "Unhandled logical type " << LogicalTypeToString(node->logical_type()) + << " for fixed-length binary array"; + return Status::NotImplemented(ss.str()); break; } @@ -104,11 +107,17 @@ static Status FromInt32(const PrimitiveNode* node, TypePtr* out) { case LogicalType::UINT_32: *out = ::arrow::uint32(); break; + case LogicalType::DATE: + *out = ::arrow::date(); + break; case LogicalType::DECIMAL: *out = MakeDecimalType(node); break; default: - return Status::NotImplemented("Unhandled logical type for int32"); + std::stringstream ss; + ss << "Unhandled logical type " << LogicalTypeToString(node->logical_type()) + << " for INT32"; + return Status::NotImplemented(ss.str()); break; } return Status::OK(); @@ -129,7 +138,10 @@ static Status FromInt64(const PrimitiveNode* node, TypePtr* out) { *out = TIMESTAMP_MS; break; default: - return Status::NotImplemented("Unhandled logical type for int64"); + std::stringstream ss; + ss << "Unhandled logical type " << LogicalTypeToString(node->logical_type()) + << " for INT64"; + return Status::NotImplemented(ss.str()); break; } return Status::OK(); diff --git a/cpp/src/parquet/arrow/test-util.h b/cpp/src/parquet/arrow/test-util.h index bfc9ce1890a5d..07f1f282ba6f8 100644 --- a/cpp/src/parquet/arrow/test-util.h +++ b/cpp/src/parquet/arrow/test-util.h @@ -33,6 +33,9 @@ using is_arrow_float = std::is_floating_point; template using is_arrow_int = std::is_integral; +template +using is_arrow_date = std::is_same; + template using is_arrow_string = std::is_same; @@ -53,10 +56,27 @@ typename std::enable_if::value, Status>::type NonNullA } template -typename std::enable_if::value, Status>::type NonNullArray( +typename std::enable_if< + is_arrow_int::value && !is_arrow_date::value, Status>::type +NonNullArray(size_t size, std::shared_ptr* out) { + std::vector values; + ::arrow::test::randint(size, 0, 64, &values); + + // Passing data type so this will work with TimestampType too + ::arrow::NumericBuilder builder( + ::arrow::default_memory_pool(), std::make_shared()); + builder.Append(values.data(), values.size()); + return builder.Finish(out); +} + +template +typename std::enable_if::value, Status>::type NonNullArray( size_t size, std::shared_ptr* out) { std::vector values; ::arrow::test::randint(size, 0, 64, &values); + for (size_t i = 0; i < size; i++) { + values[i] *= 86400000; + } // Passing data type so this will work with TimestampType too ::arrow::NumericBuilder builder( @@ -107,13 +127,38 @@ typename std::enable_if::value, Status>::type Nullable // This helper function only supports (size/2) nulls. template -typename std::enable_if::value, Status>::type NullableArray( +typename std::enable_if< + is_arrow_int::value && !is_arrow_date::value, Status>::type +NullableArray(size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr* out) { + std::vector values; + + // Seed is random in Arrow right now + (void)seed; + ::arrow::test::randint(size, 0, 64, &values); + std::vector valid_bytes(size, 1); + + for (size_t i = 0; i < num_nulls; i++) { + valid_bytes[i * 2] = 0; + } + + // Passing data type so this will work with TimestampType too + ::arrow::NumericBuilder builder( + ::arrow::default_memory_pool(), std::make_shared()); + builder.Append(values.data(), values.size(), valid_bytes.data()); + return builder.Finish(out); +} + +template +typename std::enable_if::value, Status>::type NullableArray( size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr* out) { std::vector values; // Seed is random in Arrow right now (void)seed; ::arrow::test::randint(size, 0, 64, &values); + for (size_t i = 0; i < size; i++) { + values[i] *= 86400000; + } std::vector valid_bytes(size, 1); for (size_t i = 0; i < num_nulls; i++) { diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index 90e037f2ba185..6d2f9c08143b2 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -256,40 +256,17 @@ class FileWriter::Impl { Status TypedWriteBatch(ColumnWriter* writer, const std::shared_ptr& data, int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels); + template + Status WriteNonNullableBatch(TypedColumnWriter* writer, int64_t num_values, + int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels, + const typename ArrowType::c_type* data_ptr); + template Status WriteNullableBatch(TypedColumnWriter* writer, int64_t num_values, int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset, const typename ArrowType::c_type* data_ptr); - // TODO(uwe): Same code as in reader.cc the only difference is the name of the temporary - // buffer - template - struct can_copy_ptr { - static constexpr bool value = - std::is_same::value || - (std::is_integral{} && std::is_integral{} && - (sizeof(InType) == sizeof(OutType))); - }; - - template ::value>::type* = nullptr> - Status ConvertPhysicalType(const InType* in_ptr, int64_t, const OutType** out_ptr) { - *out_ptr = reinterpret_cast(in_ptr); - return Status::OK(); - } - - template ::value>::type* = nullptr> - Status ConvertPhysicalType( - const InType* in_ptr, int64_t length, const OutType** out_ptr) { - RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(OutType))); - OutType* mutable_out_ptr = reinterpret_cast(data_buffer_.mutable_data()); - std::copy(in_ptr, in_ptr + length, mutable_out_ptr); - *out_ptr = mutable_out_ptr; - return Status::OK(); - } - Status WriteColumnChunk(const Array& data); Status Close(); @@ -323,7 +300,6 @@ Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer, const std::shared_ptr& array, int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels) { using ArrowCType = typename ArrowType::c_type; - using ParquetCType = typename ParquetType::c_type; auto data = static_cast(array.get()); auto data_ptr = reinterpret_cast(data->data()->data()); @@ -331,11 +307,8 @@ Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer, if (writer->descr()->schema_node()->is_required() || (data->null_count() == 0)) { // no nulls, just dump the data - const ParquetCType* data_writer_ptr = nullptr; - RETURN_NOT_OK((ConvertPhysicalType( - data_ptr + data->offset(), array->length(), &data_writer_ptr))); - PARQUET_CATCH_NOT_OK( - writer->WriteBatch(num_levels, def_levels, rep_levels, data_writer_ptr)); + RETURN_NOT_OK((WriteNonNullableBatch(writer, array->length(), + num_levels, def_levels, rep_levels, data_ptr + data->offset()))); } else { const uint8_t* valid_bits = data->null_bitmap_data(); RETURN_NOT_OK((WriteNullableBatch(writer, data->length(), @@ -346,6 +319,49 @@ Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer, return Status::OK(); } +template +Status FileWriter::Impl::WriteNonNullableBatch(TypedColumnWriter* writer, + int64_t num_values, int64_t num_levels, const int16_t* def_levels, + const int16_t* rep_levels, const typename ArrowType::c_type* data_ptr) { + using ParquetCType = typename ParquetType::c_type; + RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(ParquetCType))); + auto buffer_ptr = reinterpret_cast(data_buffer_.mutable_data()); + std::copy(data_ptr, data_ptr + num_values, buffer_ptr); + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr)); + return Status::OK(); +} + +template <> +Status FileWriter::Impl::WriteNonNullableBatch( + TypedColumnWriter* writer, int64_t num_values, int64_t num_levels, + const int16_t* def_levels, const int16_t* rep_levels, const int64_t* data_ptr) { + RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t))); + auto buffer_ptr = reinterpret_cast(data_buffer_.mutable_data()); + for (int i = 0; i < num_values; i++) { + buffer_ptr[i] = static_cast(data_ptr[i] / 86400000); + } + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr)); + return Status::OK(); +} + +#define NONNULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \ + template <> \ + Status FileWriter::Impl::WriteNonNullableBatch( \ + TypedColumnWriter * writer, int64_t num_values, int64_t num_levels, \ + const int16_t* def_levels, const int16_t* rep_levels, const CType* data_ptr) { \ + PARQUET_CATCH_NOT_OK( \ + writer->WriteBatch(num_levels, def_levels, rep_levels, data_ptr)); \ + return Status::OK(); \ + } + +NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t) +NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t) +NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t) +NONNULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float) +NONNULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double) + template Status FileWriter::Impl::WriteNullableBatch(TypedColumnWriter* writer, int64_t num_values, int64_t num_levels, const int16_t* def_levels, @@ -368,6 +384,27 @@ Status FileWriter::Impl::WriteNullableBatch(TypedColumnWriter* writ return Status::OK(); } +template <> +Status FileWriter::Impl::WriteNullableBatch( + TypedColumnWriter* writer, int64_t num_values, int64_t num_levels, + const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, + int64_t valid_bits_offset, const int64_t* data_ptr) { + RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t))); + auto buffer_ptr = reinterpret_cast(data_buffer_.mutable_data()); + INIT_BITSET(valid_bits, valid_bits_offset); + for (int i = 0; i < num_values; i++) { + if (bitset_valid_bits & (1 << bit_offset_valid_bits)) { + // Convert from milliseconds into days since the epoch + buffer_ptr[i] = static_cast(data_ptr[i] / 86400000); + } + READ_NEXT_BITSET(valid_bits); + } + PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced( + num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, buffer_ptr)); + + return Status::OK(); +} + #define NULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \ template <> \ Status FileWriter::Impl::WriteNullableBatch( \ @@ -519,6 +556,7 @@ Status FileWriter::Impl::WriteColumnChunk(const Array& data) { WRITE_BATCH_CASE(INT16, Int16Type, Int32Type) WRITE_BATCH_CASE(UINT16, UInt16Type, Int32Type) WRITE_BATCH_CASE(INT32, Int32Type, Int32Type) + WRITE_BATCH_CASE(DATE, DateType, Int32Type) WRITE_BATCH_CASE(INT64, Int64Type, Int64Type) WRITE_BATCH_CASE(TIMESTAMP, TimestampType, Int64Type) WRITE_BATCH_CASE(UINT64, UInt64Type, Int64Type)