Skip to content

Commit

Permalink
PARQUET-890: Support I/O of DATE columns in parquet_arrow
Browse files Browse the repository at this point in the history
Also fixes a bug on reading INT96 timestamps.

Author: Korn, Uwe <Uwe.Korn@blue-yonder.com>

Closes apache#266 from xhochy/PARQUET-890 and squashes the following commits:

8481c2c [Korn, Uwe] ninja lint
666d41b [Korn, Uwe] PARQUET-890: Support I/O of DATE columns in parquet_arrow

Change-Id: I0c4db73db72211c5bc59f260692eafed21d3c5a8
  • Loading branch information
xhochy authored and wesm committed Mar 7, 2017
1 parent 9ca26c7 commit fb325c3
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 46 deletions.
13 changes: 11 additions & 2 deletions cpp/src/parquet/arrow/arrow-reader-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,15 @@ struct test_traits<::arrow::TimestampType> {

const int64_t test_traits<::arrow::TimestampType>::value(14695634030000);

template <>
struct test_traits<::arrow::DateType> {
static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
static constexpr LogicalType::type logical_enum = LogicalType::DATE;
static int64_t const value;
};

const int64_t test_traits<::arrow::DateType>::value(14688000000000);

template <>
struct test_traits<::arrow::FloatType> {
static constexpr ParquetType::type parquet_enum = ParquetType::FLOAT;
Expand Down Expand Up @@ -309,8 +318,8 @@ class TestParquetIO : public ::testing::Test {

typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type,
::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type,
::arrow::Int64Type, ::arrow::TimestampType, ::arrow::FloatType, ::arrow::DoubleType,
::arrow::StringType, ::arrow::BinaryType>
::arrow::Int64Type, ::arrow::TimestampType, ::arrow::DateType, ::arrow::FloatType,
::arrow::DoubleType, ::arrow::StringType, ::arrow::BinaryType>
TestTypes;

TYPED_TEST_CASE(TestParquetIO, TestTypes);
Expand Down
11 changes: 8 additions & 3 deletions cpp/src/parquet/arrow/arrow-schema-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) {
ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS));
arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_MS, false));

parquet_fields.push_back(PrimitiveNode::Make(
"date", Repetition::REQUIRED, ParquetType::INT32, LogicalType::DATE));
arrow_fields.push_back(std::make_shared<Field>("date", ::arrow::date(), false));

parquet_fields.push_back(
PrimitiveNode::Make("timestamp96", Repetition::REQUIRED, ParquetType::INT96));
arrow_fields.push_back(std::make_shared<Field>("timestamp96", TIMESTAMP_NS, false));
Expand Down Expand Up @@ -339,9 +343,6 @@ TEST_F(TestConvertParquetSchema, ParquetLists) {
TEST_F(TestConvertParquetSchema, UnsupportedThings) {
std::vector<NodePtr> unsupported_nodes;

unsupported_nodes.push_back(PrimitiveNode::Make(
"int32", Repetition::OPTIONAL, ParquetType::INT32, LogicalType::DATE));

for (const NodePtr& node : unsupported_nodes) {
ASSERT_RAISES(NotImplemented, ConvertSchema({node}));
}
Expand Down Expand Up @@ -394,6 +395,10 @@ TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) {
PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64));
arrow_fields.push_back(std::make_shared<Field>("int64", INT64, false));

parquet_fields.push_back(PrimitiveNode::Make(
"date", Repetition::REQUIRED, ParquetType::INT32, LogicalType::DATE));
arrow_fields.push_back(std::make_shared<Field>("date", ::arrow::date(), false));

parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED,
ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS));
arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_MS, false));
Expand Down
49 changes: 47 additions & 2 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int96Typ
PARQUET_CATCH_NOT_OK(*levels_read = reader->ReadBatch(
values_to_read, nullptr, nullptr, values, &values_read));

int64_t* out_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_);
int64_t* out_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_) + valid_bits_idx_;
for (int64_t i = 0; i < values_read; i++) {
*out_ptr++ = impala_timestamp_to_nanoseconds(values[i]);
}
Expand All @@ -370,6 +370,24 @@ Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int96Typ
return Status::OK();
}

template <>
Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::DateType, Int32Type>(
TypedColumnReader<Int32Type>* reader, int64_t values_to_read, int64_t* levels_read) {
RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(int32_t), false));
auto values = reinterpret_cast<int32_t*>(values_buffer_.mutable_data());
int64_t values_read;
PARQUET_CATCH_NOT_OK(*levels_read = reader->ReadBatch(
values_to_read, nullptr, nullptr, values, &values_read));

int64_t* out_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_) + valid_bits_idx_;
for (int64_t i = 0; i < values_read; i++) {
*out_ptr++ = static_cast<int64_t>(values[i]) * 86400000;
}
valid_bits_idx_ += values_read;

return Status::OK();
}

template <>
Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(
TypedColumnReader<BooleanType>* reader, int64_t values_to_read,
Expand Down Expand Up @@ -463,6 +481,30 @@ Status ColumnReader::Impl::ReadNullableBatch<::arrow::TimestampType, Int96Type>(
return Status::OK();
}

template <>
Status ColumnReader::Impl::ReadNullableBatch<::arrow::DateType, Int32Type>(
TypedColumnReader<Int32Type>* reader, int16_t* def_levels, int16_t* rep_levels,
int64_t values_to_read, int64_t* levels_read, int64_t* values_read) {
RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(int32_t), false));
auto values = reinterpret_cast<int32_t*>(values_buffer_.mutable_data());
int64_t null_count;
PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(values_to_read, def_levels, rep_levels,
values, valid_bits_ptr_, valid_bits_idx_, levels_read, values_read, &null_count));

auto data_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_);
INIT_BITSET(valid_bits_ptr_, valid_bits_idx_);
for (int64_t i = 0; i < *values_read; i++) {
if (bitset_valid_bits_ptr_ & (1 << bit_offset_valid_bits_ptr_)) {
data_ptr[valid_bits_idx_ + i] = static_cast<int64_t>(values[i]) * 86400000;
}
READ_NEXT_BITSET(valid_bits_ptr_);
}
null_count_ += null_count;
valid_bits_idx_ += *values_read;

return Status::OK();
}

template <>
Status ColumnReader::Impl::ReadNullableBatch<::arrow::BooleanType, BooleanType>(
TypedColumnReader<BooleanType>* reader, int16_t* def_levels, int16_t* rep_levels,
Expand Down Expand Up @@ -843,6 +885,7 @@ Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out
TYPED_BATCH_CASE(INT16, ::arrow::Int16Type, Int32Type)
TYPED_BATCH_CASE(UINT32, ::arrow::UInt32Type, Int32Type)
TYPED_BATCH_CASE(INT32, ::arrow::Int32Type, Int32Type)
TYPED_BATCH_CASE(DATE, ::arrow::DateType, Int32Type)
TYPED_BATCH_CASE(UINT64, ::arrow::UInt64Type, Int64Type)
TYPED_BATCH_CASE(INT64, ::arrow::Int64Type, Int64Type)
TYPED_BATCH_CASE(FLOAT, ::arrow::FloatType, FloatType)
Expand All @@ -865,7 +908,9 @@ Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out
break;
}
default:
return Status::NotImplemented(field_->type->ToString());
std::stringstream ss;
ss << "No support for reading columns of type " << field_->type->ToString();
return Status::NotImplemented(ss.str());
}
}

Expand Down
18 changes: 15 additions & 3 deletions cpp/src/parquet/arrow/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ static Status FromFLBA(const PrimitiveNode* node, TypePtr* out) {
*out = MakeDecimalType(node);
break;
default:
return Status::NotImplemented("unhandled type");
std::stringstream ss;
ss << "Unhandled logical type " << LogicalTypeToString(node->logical_type())
<< " for fixed-length binary array";
return Status::NotImplemented(ss.str());
break;
}

Expand All @@ -104,11 +107,17 @@ static Status FromInt32(const PrimitiveNode* node, TypePtr* out) {
case LogicalType::UINT_32:
*out = ::arrow::uint32();
break;
case LogicalType::DATE:
*out = ::arrow::date();
break;
case LogicalType::DECIMAL:
*out = MakeDecimalType(node);
break;
default:
return Status::NotImplemented("Unhandled logical type for int32");
std::stringstream ss;
ss << "Unhandled logical type " << LogicalTypeToString(node->logical_type())
<< " for INT32";
return Status::NotImplemented(ss.str());
break;
}
return Status::OK();
Expand All @@ -129,7 +138,10 @@ static Status FromInt64(const PrimitiveNode* node, TypePtr* out) {
*out = TIMESTAMP_MS;
break;
default:
return Status::NotImplemented("Unhandled logical type for int64");
std::stringstream ss;
ss << "Unhandled logical type " << LogicalTypeToString(node->logical_type())
<< " for INT64";
return Status::NotImplemented(ss.str());
break;
}
return Status::OK();
Expand Down
49 changes: 47 additions & 2 deletions cpp/src/parquet/arrow/test-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ using is_arrow_float = std::is_floating_point<typename ArrowType::c_type>;
template <typename ArrowType>
using is_arrow_int = std::is_integral<typename ArrowType::c_type>;

template <typename ArrowType>
using is_arrow_date = std::is_same<ArrowType, ::arrow::DateType>;

template <typename ArrowType>
using is_arrow_string = std::is_same<ArrowType, ::arrow::StringType>;

Expand All @@ -53,10 +56,27 @@ typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type NonNullA
}

template <class ArrowType>
typename std::enable_if<is_arrow_int<ArrowType>::value, Status>::type NonNullArray(
typename std::enable_if<
is_arrow_int<ArrowType>::value && !is_arrow_date<ArrowType>::value, Status>::type
NonNullArray(size_t size, std::shared_ptr<Array>* out) {
std::vector<typename ArrowType::c_type> values;
::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);

// Passing data type so this will work with TimestampType too
::arrow::NumericBuilder<ArrowType> builder(
::arrow::default_memory_pool(), std::make_shared<ArrowType>());
builder.Append(values.data(), values.size());
return builder.Finish(out);
}

template <class ArrowType>
typename std::enable_if<is_arrow_date<ArrowType>::value, Status>::type NonNullArray(
size_t size, std::shared_ptr<Array>* out) {
std::vector<typename ArrowType::c_type> values;
::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
for (size_t i = 0; i < size; i++) {
values[i] *= 86400000;
}

// Passing data type so this will work with TimestampType too
::arrow::NumericBuilder<ArrowType> builder(
Expand Down Expand Up @@ -107,13 +127,38 @@ typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type Nullable

// This helper function only supports (size/2) nulls.
template <typename ArrowType>
typename std::enable_if<is_arrow_int<ArrowType>::value, Status>::type NullableArray(
typename std::enable_if<
is_arrow_int<ArrowType>::value && !is_arrow_date<ArrowType>::value, Status>::type
NullableArray(size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<Array>* out) {
std::vector<typename ArrowType::c_type> values;

// Seed is random in Arrow right now
(void)seed;
::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
std::vector<uint8_t> valid_bytes(size, 1);

for (size_t i = 0; i < num_nulls; i++) {
valid_bytes[i * 2] = 0;
}

// Passing data type so this will work with TimestampType too
::arrow::NumericBuilder<ArrowType> builder(
::arrow::default_memory_pool(), std::make_shared<ArrowType>());
builder.Append(values.data(), values.size(), valid_bytes.data());
return builder.Finish(out);
}

template <typename ArrowType>
typename std::enable_if<is_arrow_date<ArrowType>::value, Status>::type NullableArray(
size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<Array>* out) {
std::vector<typename ArrowType::c_type> values;

// Seed is random in Arrow right now
(void)seed;
::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
for (size_t i = 0; i < size; i++) {
values[i] *= 86400000;
}
std::vector<uint8_t> valid_bytes(size, 1);

for (size_t i = 0; i < num_nulls; i++) {
Expand Down
Loading

0 comments on commit fb325c3

Please sign in to comment.