Skip to content

Commit

Permalink
PARQUET-805: Read Int96 into Arrow Timestamp(ns)
Browse files Browse the repository at this point in the history
Author: Uwe L. Korn <uwelk@xhochy.com>

Closes apache#204 from xhochy/PARQUET-805 and squashes the following commits:

895dc30 [Uwe L. Korn] Add missing return type
a2f7f5b [Uwe L. Korn] Incorporate review
f2255a3 [Uwe L. Korn] PARQUET-805: Read Int96 into Arrow Timestamp(ns)

Change-Id: I0ee249383962f3cf260243adb17b7d572ec577be
  • Loading branch information
xhochy authored and wesm committed Dec 20, 2016
1 parent 7752273 commit b50e626
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 10 deletions.
52 changes: 52 additions & 0 deletions cpp/src/parquet/arrow/arrow-reader-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,58 @@ TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) {
this->ReadAndCheckSingleColumnTable(values);
}

using TestInt96ParquetIO = TestParquetIO<::arrow::TimestampType>;

TEST_F(TestInt96ParquetIO, ReadIntoTimestamp) {
// This test explicitly tests the conversion from an Impala-style timestamp
// to a nanoseconds-since-epoch one.

// 2nd January 1970, 11:35min 145738543ns
Int96 day;
day.value[2] = 2440589l;
int64_t seconds = ((1 * 24 + 11) * 60 + 35) * 60;
*(reinterpret_cast<int64_t*>(&(day.value))) =
seconds * 1000l * 1000l * 1000l + 145738543;
// Compute the corresponding nanosecond timestamp
struct tm datetime = {0};
datetime.tm_year = 70;
datetime.tm_mon = 0;
datetime.tm_mday = 2;
datetime.tm_hour = 11;
datetime.tm_min = 35;
struct tm epoch = {0};
epoch.tm_year = 70;
epoch.tm_mday = 1;
// Nanoseconds since the epoch
int64_t val = lrint(difftime(mktime(&datetime), mktime(&epoch))) * 1000000000;
val += 145738543;

std::vector<std::shared_ptr<schema::Node>> fields(
{schema::PrimitiveNode::Make("int96", Repetition::REQUIRED, ParquetType::INT96)});
std::shared_ptr<schema::GroupNode> schema = std::static_pointer_cast<GroupNode>(
schema::GroupNode::Make("schema", Repetition::REQUIRED, fields));

// We cannot write this column with Arrow, so we have to use the plain parquet-cpp API
// to write an Int96 file.
this->sink_ = std::make_shared<InMemoryOutputStream>();
auto writer = ParquetFileWriter::Open(this->sink_, schema);
RowGroupWriter* rg_writer = writer->AppendRowGroup(1);
ColumnWriter* c_writer = rg_writer->NextColumn();
auto typed_writer = dynamic_cast<TypedColumnWriter<Int96Type>*>(c_writer);
ASSERT_NE(typed_writer, nullptr);
typed_writer->WriteBatch(1, nullptr, nullptr, &day);
c_writer->Close();
rg_writer->Close();
writer->Close();

::arrow::TimestampBuilder builder(
default_memory_pool(), ::arrow::timestamp(::arrow::TimeUnit::NANO));
builder.Append(val);
std::shared_ptr<Array> values;
ASSERT_OK(builder.Finish(&values));
this->ReadAndCheckSingleColumnFile(values.get());
}

using TestUInt32ParquetIO = TestParquetIO<::arrow::UInt32Type>;

TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) {
Expand Down
11 changes: 5 additions & 6 deletions cpp/src/parquet/arrow/arrow-schema-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ const auto DOUBLE = std::make_shared<::arrow::DoubleType>();
const auto UTF8 = std::make_shared<::arrow::StringType>();
const auto TIMESTAMP_MS =
std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::MILLI);
const auto TIMESTAMP_NS =
std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::NANO);
// TODO: This requires parquet-cpp implementing the MICROS enum value
// const auto TIMESTAMP_US = std::make_shared<TimestampType>(TimestampType::Unit::MICRO);
const auto BINARY =
Expand Down Expand Up @@ -98,9 +100,9 @@ TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) {
ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS));
arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_MS, false));

// parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED,
// ParquetType::INT64, LogicalType::TIMESTAMP_MICROS));
// arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_US, false));
parquet_fields.push_back(
PrimitiveNode::Make("timestamp96", Repetition::REQUIRED, ParquetType::INT96));
arrow_fields.push_back(std::make_shared<Field>("timestamp96", TIMESTAMP_NS, false));

parquet_fields.push_back(
PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT));
Expand Down Expand Up @@ -339,9 +341,6 @@ TEST_F(TestConvertParquetSchema, ParquetLists) {
TEST_F(TestConvertParquetSchema, UnsupportedThings) {
std::vector<NodePtr> unsupported_nodes;

unsupported_nodes.push_back(
PrimitiveNode::Make("int96", Repetition::REQUIRED, ParquetType::INT96));

unsupported_nodes.push_back(PrimitiveNode::Make(
"int32", Repetition::OPTIONAL, ParquetType::INT32, LogicalType::DATE));

Expand Down
51 changes: 50 additions & 1 deletion cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "parquet/arrow/reader.h"

#include <algorithm>
#include <chrono>
#include <queue>
#include <string>
#include <vector>
Expand All @@ -44,6 +45,15 @@ using ParquetReader = parquet::ParquetFileReader;
namespace parquet {
namespace arrow {

constexpr int64_t kJulianToUnixEpochDays = 2440588L;
constexpr int64_t kNanosecondsInADay = 86400L * 1000L * 1000L * 1000L;

static inline int64_t impala_timestamp_to_nanoseconds(const Int96& impala_timestamp) {
int64_t days_since_epoch = impala_timestamp.value[2] - kJulianToUnixEpochDays;
int64_t nanoseconds = *(reinterpret_cast<const int64_t*>(&(impala_timestamp.value)));
return days_since_epoch * kNanosecondsInADay + nanoseconds;
}

template <typename ArrowType>
struct ArrowTypeTraits {
typedef ::arrow::NumericArray<ArrowType> array_type;
Expand Down Expand Up @@ -238,6 +248,15 @@ void FlatColumnReader::Impl::ReadNonNullableBatch(
valid_bits_idx_ += values_read;
}

template <>
void FlatColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int96Type>(
Int96* values, int64_t values_read) {
int64_t* out_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_);
for (int64_t i = 0; i < values_read; i++) {
out_ptr[i] = impala_timestamp_to_nanoseconds(values[i]);
}
}

template <>
void FlatColumnReader::Impl::ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(
bool* values, int64_t values_read) {
Expand Down Expand Up @@ -265,6 +284,22 @@ void FlatColumnReader::Impl::ReadNullableFlatBatch(const int16_t* def_levels,
}
}

template <>
void FlatColumnReader::Impl::ReadNullableFlatBatch<::arrow::TimestampType, Int96Type>(
const int16_t* def_levels, Int96* values, int64_t values_read, int64_t levels_read) {
auto data_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_);
int values_idx = 0;
for (int64_t i = 0; i < levels_read; i++) {
if (def_levels[i] < descr_->max_definition_level()) {
null_count_++;
} else {
::arrow::BitUtil::SetBit(valid_bits_ptr_, valid_bits_idx_);
data_ptr[valid_bits_idx_] = impala_timestamp_to_nanoseconds(values[values_idx++]);
}
valid_bits_idx_++;
}
}

template <>
void FlatColumnReader::Impl::ReadNullableFlatBatch<::arrow::BooleanType, BooleanType>(
const int16_t* def_levels, bool* values, int64_t values_read, int64_t levels_read) {
Expand Down Expand Up @@ -518,7 +553,21 @@ Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>*
TYPED_BATCH_CASE(FLOAT, ::arrow::FloatType, FloatType)
TYPED_BATCH_CASE(DOUBLE, ::arrow::DoubleType, DoubleType)
TYPED_BATCH_CASE(STRING, ::arrow::StringType, ByteArrayType)
TYPED_BATCH_CASE(TIMESTAMP, ::arrow::TimestampType, Int64Type)
case ::arrow::Type::TIMESTAMP: {
::arrow::TimestampType* timestamp_type =
static_cast<::arrow::TimestampType*>(field_->type.get());
switch (timestamp_type->unit) {
case ::arrow::TimeUnit::MILLI:
return TypedReadBatch<::arrow::TimestampType, Int64Type>(batch_size, out);
break;
case ::arrow::TimeUnit::NANO:
return TypedReadBatch<::arrow::TimestampType, Int96Type>(batch_size, out);
break;
default:
return Status::NotImplemented("TimeUnit not supported");
}
break;
}
default:
return Status::NotImplemented(field_->type->ToString());
}
Expand Down
7 changes: 4 additions & 3 deletions cpp/src/parquet/arrow/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ const auto DOUBLE = std::make_shared<::arrow::DoubleType>();
const auto UTF8 = std::make_shared<::arrow::StringType>();
const auto TIMESTAMP_MS =
std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::MILLI);
const auto TIMESTAMP_NS =
std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::NANO);
const auto BINARY =
std::make_shared<::arrow::ListType>(std::make_shared<::arrow::Field>("", UINT8));

Expand Down Expand Up @@ -162,9 +164,8 @@ Status FromPrimitive(const PrimitiveNode* primitive, TypePtr* out) {
RETURN_NOT_OK(FromInt64(primitive, out));
break;
case ParquetType::INT96:
// TODO: Do we have that type in Arrow?
// type = TypePtr(new Int96Type());
return Status::NotImplemented("int96");
*out = TIMESTAMP_NS;
break;
case ParquetType::FLOAT:
*out = FLOAT;
break;
Expand Down

0 comments on commit b50e626

Please sign in to comment.