From b50e626a60163b411bbd7a3d3e9d159d5601b8a1 Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Tue, 20 Dec 2016 16:18:13 -0500 Subject: [PATCH] PARQUET-805: Read Int96 into Arrow Timestamp(ns) Author: Uwe L. Korn Closes #204 from xhochy/PARQUET-805 and squashes the following commits: 895dc30 [Uwe L. Korn] Add missing return type a2f7f5b [Uwe L. Korn] Incorporate review f2255a3 [Uwe L. Korn] PARQUET-805: Read Int96 into Arrow Timestamp(ns) Change-Id: I0ee249383962f3cf260243adb17b7d572ec577be --- .../parquet/arrow/arrow-reader-writer-test.cc | 52 +++++++++++++++++++ cpp/src/parquet/arrow/arrow-schema-test.cc | 11 ++-- cpp/src/parquet/arrow/reader.cc | 51 +++++++++++++++++- cpp/src/parquet/arrow/schema.cc | 7 +-- 4 files changed, 111 insertions(+), 10 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc index 25ba457967eee..a8a5db06995e8 100644 --- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc @@ -391,6 +391,58 @@ TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) { this->ReadAndCheckSingleColumnTable(values); } +using TestInt96ParquetIO = TestParquetIO<::arrow::TimestampType>; + +TEST_F(TestInt96ParquetIO, ReadIntoTimestamp) { + // This test explicitly tests the conversion from an Impala-style timestamp + // to a nanoseconds-since-epoch one. + + // 2nd January 1970, 11:35min 145738543ns + Int96 day; + day.value[2] = 2440589l; + int64_t seconds = ((1 * 24 + 11) * 60 + 35) * 60; + *(reinterpret_cast(&(day.value))) = + seconds * 1000l * 1000l * 1000l + 145738543; + // Compute the corresponding nanosecond timestamp + struct tm datetime = {0}; + datetime.tm_year = 70; + datetime.tm_mon = 0; + datetime.tm_mday = 2; + datetime.tm_hour = 11; + datetime.tm_min = 35; + struct tm epoch = {0}; + epoch.tm_year = 70; + epoch.tm_mday = 1; + // Nanoseconds since the epoch + int64_t val = lrint(difftime(mktime(&datetime), mktime(&epoch))) * 1000000000; + val += 145738543; + + std::vector> fields( + {schema::PrimitiveNode::Make("int96", Repetition::REQUIRED, ParquetType::INT96)}); + std::shared_ptr schema = std::static_pointer_cast( + schema::GroupNode::Make("schema", Repetition::REQUIRED, fields)); + + // We cannot write this column with Arrow, so we have to use the plain parquet-cpp API + // to write an Int96 file. + this->sink_ = std::make_shared(); + auto writer = ParquetFileWriter::Open(this->sink_, schema); + RowGroupWriter* rg_writer = writer->AppendRowGroup(1); + ColumnWriter* c_writer = rg_writer->NextColumn(); + auto typed_writer = dynamic_cast*>(c_writer); + ASSERT_NE(typed_writer, nullptr); + typed_writer->WriteBatch(1, nullptr, nullptr, &day); + c_writer->Close(); + rg_writer->Close(); + writer->Close(); + + ::arrow::TimestampBuilder builder( + default_memory_pool(), ::arrow::timestamp(::arrow::TimeUnit::NANO)); + builder.Append(val); + std::shared_ptr values; + ASSERT_OK(builder.Finish(&values)); + this->ReadAndCheckSingleColumnFile(values.get()); +} + using TestUInt32ParquetIO = TestParquetIO<::arrow::UInt32Type>; TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) { diff --git a/cpp/src/parquet/arrow/arrow-schema-test.cc b/cpp/src/parquet/arrow/arrow-schema-test.cc index 360680f5bab14..3d075617de259 100644 --- a/cpp/src/parquet/arrow/arrow-schema-test.cc +++ b/cpp/src/parquet/arrow/arrow-schema-test.cc @@ -47,6 +47,8 @@ const auto DOUBLE = std::make_shared<::arrow::DoubleType>(); const auto UTF8 = std::make_shared<::arrow::StringType>(); const auto TIMESTAMP_MS = std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::MILLI); +const auto TIMESTAMP_NS = + std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::NANO); // TODO: This requires parquet-cpp implementing the MICROS enum value // const auto TIMESTAMP_US = std::make_shared(TimestampType::Unit::MICRO); const auto BINARY = @@ -98,9 +100,9 @@ TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) { ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS)); arrow_fields.push_back(std::make_shared("timestamp", TIMESTAMP_MS, false)); - // parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED, - // ParquetType::INT64, LogicalType::TIMESTAMP_MICROS)); - // arrow_fields.push_back(std::make_shared("timestamp", TIMESTAMP_US, false)); + parquet_fields.push_back( + PrimitiveNode::Make("timestamp96", Repetition::REQUIRED, ParquetType::INT96)); + arrow_fields.push_back(std::make_shared("timestamp96", TIMESTAMP_NS, false)); parquet_fields.push_back( PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT)); @@ -339,9 +341,6 @@ TEST_F(TestConvertParquetSchema, ParquetLists) { TEST_F(TestConvertParquetSchema, UnsupportedThings) { std::vector unsupported_nodes; - unsupported_nodes.push_back( - PrimitiveNode::Make("int96", Repetition::REQUIRED, ParquetType::INT96)); - unsupported_nodes.push_back(PrimitiveNode::Make( "int32", Repetition::OPTIONAL, ParquetType::INT32, LogicalType::DATE)); diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index f2d463976a580..2efa8066f34ba 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -18,6 +18,7 @@ #include "parquet/arrow/reader.h" #include +#include #include #include #include @@ -44,6 +45,15 @@ using ParquetReader = parquet::ParquetFileReader; namespace parquet { namespace arrow { +constexpr int64_t kJulianToUnixEpochDays = 2440588L; +constexpr int64_t kNanosecondsInADay = 86400L * 1000L * 1000L * 1000L; + +static inline int64_t impala_timestamp_to_nanoseconds(const Int96& impala_timestamp) { + int64_t days_since_epoch = impala_timestamp.value[2] - kJulianToUnixEpochDays; + int64_t nanoseconds = *(reinterpret_cast(&(impala_timestamp.value))); + return days_since_epoch * kNanosecondsInADay + nanoseconds; +} + template struct ArrowTypeTraits { typedef ::arrow::NumericArray array_type; @@ -238,6 +248,15 @@ void FlatColumnReader::Impl::ReadNonNullableBatch( valid_bits_idx_ += values_read; } +template <> +void FlatColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int96Type>( + Int96* values, int64_t values_read) { + int64_t* out_ptr = reinterpret_cast(data_buffer_ptr_); + for (int64_t i = 0; i < values_read; i++) { + out_ptr[i] = impala_timestamp_to_nanoseconds(values[i]); + } +} + template <> void FlatColumnReader::Impl::ReadNonNullableBatch<::arrow::BooleanType, BooleanType>( bool* values, int64_t values_read) { @@ -265,6 +284,22 @@ void FlatColumnReader::Impl::ReadNullableFlatBatch(const int16_t* def_levels, } } +template <> +void FlatColumnReader::Impl::ReadNullableFlatBatch<::arrow::TimestampType, Int96Type>( + const int16_t* def_levels, Int96* values, int64_t values_read, int64_t levels_read) { + auto data_ptr = reinterpret_cast(data_buffer_ptr_); + int values_idx = 0; + for (int64_t i = 0; i < levels_read; i++) { + if (def_levels[i] < descr_->max_definition_level()) { + null_count_++; + } else { + ::arrow::BitUtil::SetBit(valid_bits_ptr_, valid_bits_idx_); + data_ptr[valid_bits_idx_] = impala_timestamp_to_nanoseconds(values[values_idx++]); + } + valid_bits_idx_++; + } +} + template <> void FlatColumnReader::Impl::ReadNullableFlatBatch<::arrow::BooleanType, BooleanType>( const int16_t* def_levels, bool* values, int64_t values_read, int64_t levels_read) { @@ -518,7 +553,21 @@ Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr* TYPED_BATCH_CASE(FLOAT, ::arrow::FloatType, FloatType) TYPED_BATCH_CASE(DOUBLE, ::arrow::DoubleType, DoubleType) TYPED_BATCH_CASE(STRING, ::arrow::StringType, ByteArrayType) - TYPED_BATCH_CASE(TIMESTAMP, ::arrow::TimestampType, Int64Type) + case ::arrow::Type::TIMESTAMP: { + ::arrow::TimestampType* timestamp_type = + static_cast<::arrow::TimestampType*>(field_->type.get()); + switch (timestamp_type->unit) { + case ::arrow::TimeUnit::MILLI: + return TypedReadBatch<::arrow::TimestampType, Int64Type>(batch_size, out); + break; + case ::arrow::TimeUnit::NANO: + return TypedReadBatch<::arrow::TimestampType, Int96Type>(batch_size, out); + break; + default: + return Status::NotImplemented("TimeUnit not supported"); + } + break; + } default: return Status::NotImplemented(field_->type->ToString()); } diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index 2875dc68cb561..e578ec2626609 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -58,6 +58,8 @@ const auto DOUBLE = std::make_shared<::arrow::DoubleType>(); const auto UTF8 = std::make_shared<::arrow::StringType>(); const auto TIMESTAMP_MS = std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::MILLI); +const auto TIMESTAMP_NS = + std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::NANO); const auto BINARY = std::make_shared<::arrow::ListType>(std::make_shared<::arrow::Field>("", UINT8)); @@ -162,9 +164,8 @@ Status FromPrimitive(const PrimitiveNode* primitive, TypePtr* out) { RETURN_NOT_OK(FromInt64(primitive, out)); break; case ParquetType::INT96: - // TODO: Do we have that type in Arrow? - // type = TypePtr(new Int96Type()); - return Status::NotImplemented("int96"); + *out = TIMESTAMP_NS; + break; case ParquetType::FLOAT: *out = FLOAT; break;