Skip to content

Commit

Permalink
PARQUET-812: Read BYTE_ARRAY with no logical type as arrow::BinaryArray
Browse files Browse the repository at this point in the history
Depends on ARROW-374. Need to merge that and update the thirdparty git hash

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes apache#206 from wesm/PARQUET-812 and squashes the following commits:

73fb8d0 [Wes McKinney] Update thirdparty arrow version
5db7908 [Wes McKinney] typo
fc0d559 [Wes McKinney] Read unadorned BYTE_ARRAY into arrow::BinaryArray

Change-Id: I9466e3eb082fee464e832b3106223ac48b0afb4a
  • Loading branch information
wesm authored and xhochy committed Dec 21, 2016
1 parent dc87328 commit 3b49dc9
Show file tree
Hide file tree
Showing 12 changed files with 107 additions and 93 deletions.
15 changes: 11 additions & 4 deletions cpp/src/parquet/arrow/arrow-reader-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,15 @@ struct test_traits<::arrow::StringType> {
static std::string const value;
};

template <>
struct test_traits<::arrow::BinaryType> {
static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY;
static constexpr LogicalType::type logical_enum = LogicalType::NONE;
static std::string const value;
};

const std::string test_traits<::arrow::StringType>::value("Test");
const std::string test_traits<::arrow::BinaryType>::value("\x00\x01\x02\x03");

template <typename T>
using ParquetDataType = DataType<test_traits<T>::parquet_enum>;
Expand Down Expand Up @@ -247,15 +255,15 @@ class TestParquetIO : public ::testing::Test {
std::shared_ptr<InMemoryOutputStream> sink_;
};

// We habe separate tests for UInt32Type as this is currently the only type
// We have separate tests for UInt32Type as this is currently the only type
// where a roundtrip does not yield the identical Array structure.
// There we write an UInt32 Array but receive an Int64 Array as result for
// Parquet version 1.0.

typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type,
::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type,
::arrow::Int64Type, ::arrow::TimestampType, ::arrow::FloatType, ::arrow::DoubleType,
::arrow::StringType>
::arrow::StringType, ::arrow::BinaryType>
TestTypes;

TYPED_TEST_CASE(TestParquetIO, TestTypes);
Expand Down Expand Up @@ -504,8 +512,7 @@ using TestStringParquetIO = TestParquetIO<::arrow::StringType>;

TEST_F(TestStringParquetIO, EmptyStringColumnRequiredWrite) {
std::shared_ptr<Array> values;
::arrow::StringBuilder builder(
::arrow::default_memory_pool(), std::make_shared<::arrow::StringType>());
::arrow::StringBuilder builder(::arrow::default_memory_pool(), ::arrow::utf8());
for (size_t i = 0; i < SMALL_SIZE; i++) {
builder.Append("");
}
Expand Down
29 changes: 15 additions & 14 deletions cpp/src/parquet/arrow/arrow-schema-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,19 @@ namespace parquet {

namespace arrow {

const auto BOOL = std::make_shared<::arrow::BooleanType>();
const auto UINT8 = std::make_shared<::arrow::UInt8Type>();
const auto INT32 = std::make_shared<::arrow::Int32Type>();
const auto INT64 = std::make_shared<::arrow::Int64Type>();
const auto FLOAT = std::make_shared<::arrow::FloatType>();
const auto DOUBLE = std::make_shared<::arrow::DoubleType>();
const auto UTF8 = std::make_shared<::arrow::StringType>();
const auto TIMESTAMP_MS =
std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::MILLI);
const auto TIMESTAMP_NS =
std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::NANO);
const auto BOOL = ::arrow::boolean();
const auto UINT8 = ::arrow::uint8();
const auto INT32 = ::arrow::int32();
const auto INT64 = ::arrow::int64();
const auto FLOAT = ::arrow::float32();
const auto DOUBLE = ::arrow::float64();
const auto UTF8 = ::arrow::utf8();
const auto TIMESTAMP_MS = ::arrow::timestamp(::arrow::TimeUnit::MILLI);
const auto TIMESTAMP_NS = ::arrow::timestamp(::arrow::TimeUnit::NANO);

// TODO: This requires parquet-cpp implementing the MICROS enum value
// const auto TIMESTAMP_US = std::make_shared<TimestampType>(TimestampType::Unit::MICRO);
const auto BINARY =
std::make_shared<::arrow::ListType>(std::make_shared<Field>("", UINT8));
const auto BINARY = ::arrow::binary();
const auto DECIMAL_8_4 = std::make_shared<::arrow::DecimalType>(8, 4);

class TestConvertParquetSchema : public ::testing::Test {
Expand Down Expand Up @@ -412,11 +410,14 @@ TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) {
PrimitiveNode::Make("double", Repetition::OPTIONAL, ParquetType::DOUBLE));
arrow_fields.push_back(std::make_shared<Field>("double", DOUBLE));

// TODO: String types need to be clarified a bit more in the Arrow spec
parquet_fields.push_back(PrimitiveNode::Make(
"string", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, LogicalType::UTF8));
arrow_fields.push_back(std::make_shared<Field>("string", UTF8));

parquet_fields.push_back(PrimitiveNode::Make(
"binary", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, LogicalType::NONE));
arrow_fields.push_back(std::make_shared<Field>("binary", BINARY));

ASSERT_OK(ConvertSchema(arrow_fields));

CheckFlatSchema(parquet_fields);
Expand Down
26 changes: 23 additions & 3 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "parquet/arrow/utils.h"

#include "arrow/api.h"
#include "arrow/type_traits.h"
#include "arrow/util/bit-util.h"

using arrow::Array;
Expand Down Expand Up @@ -90,9 +91,13 @@ class FlatColumnReader::Impl {
virtual ~Impl() {}

Status NextBatch(int batch_size, std::shared_ptr<Array>* out);

template <typename ArrowType, typename ParquetType>
Status TypedReadBatch(int batch_size, std::shared_ptr<Array>* out);

template <typename ArrowType>
Status ReadByteArrayBatch(int batch_size, std::shared_ptr<Array>* out);

template <typename ArrowType>
Status InitDataBuffer(int batch_size);
template <typename ArrowType, typename ParquetType>
Expand Down Expand Up @@ -486,11 +491,13 @@ Status FlatColumnReader::Impl::TypedReadBatch<::arrow::BooleanType, BooleanType>
}
}

template <>
Status FlatColumnReader::Impl::TypedReadBatch<::arrow::StringType, ByteArrayType>(
template <typename ArrowType>
Status FlatColumnReader::Impl::ReadByteArrayBatch(
int batch_size, std::shared_ptr<Array>* out) {
using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType;

int values_to_read = batch_size;
::arrow::StringBuilder builder(pool_, field_->type);
BuilderType builder(pool_, field_->type);
while ((values_to_read > 0) && column_reader_) {
values_buffer_.Resize(values_to_read * sizeof(ByteArray));
if (descr_->max_definition_level() > 0) {
Expand Down Expand Up @@ -528,6 +535,18 @@ Status FlatColumnReader::Impl::TypedReadBatch<::arrow::StringType, ByteArrayType
return builder.Finish(out);
}

template <>
Status FlatColumnReader::Impl::TypedReadBatch<::arrow::BinaryType, ByteArrayType>(
int batch_size, std::shared_ptr<Array>* out) {
return ReadByteArrayBatch<::arrow::BinaryType>(batch_size, out);
}

template <>
Status FlatColumnReader::Impl::TypedReadBatch<::arrow::StringType, ByteArrayType>(
int batch_size, std::shared_ptr<Array>* out) {
return ReadByteArrayBatch<::arrow::StringType>(batch_size, out);
}

#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \
case ::arrow::Type::ENUM: \
return TypedReadBatch<ArrowType, ParquetType>(batch_size, out); \
Expand All @@ -553,6 +572,7 @@ Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>*
TYPED_BATCH_CASE(FLOAT, ::arrow::FloatType, FloatType)
TYPED_BATCH_CASE(DOUBLE, ::arrow::DoubleType, DoubleType)
TYPED_BATCH_CASE(STRING, ::arrow::StringType, ByteArrayType)
TYPED_BATCH_CASE(BINARY, ::arrow::BinaryType, ByteArrayType)
case ::arrow::Type::TIMESTAMP: {
::arrow::TimestampType* timestamp_type =
static_cast<::arrow::TimestampType*>(field_->type.get());
Expand Down
58 changes: 20 additions & 38 deletions cpp/src/parquet/arrow/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,24 +44,8 @@ namespace parquet {

namespace arrow {

const auto BOOL = std::make_shared<::arrow::BooleanType>();
const auto UINT8 = std::make_shared<::arrow::UInt8Type>();
const auto INT8 = std::make_shared<::arrow::Int8Type>();
const auto UINT16 = std::make_shared<::arrow::UInt16Type>();
const auto INT16 = std::make_shared<::arrow::Int16Type>();
const auto UINT32 = std::make_shared<::arrow::UInt32Type>();
const auto INT32 = std::make_shared<::arrow::Int32Type>();
const auto UINT64 = std::make_shared<::arrow::UInt64Type>();
const auto INT64 = std::make_shared<::arrow::Int64Type>();
const auto FLOAT = std::make_shared<::arrow::FloatType>();
const auto DOUBLE = std::make_shared<::arrow::DoubleType>();
const auto UTF8 = std::make_shared<::arrow::StringType>();
const auto TIMESTAMP_MS =
std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::MILLI);
const auto TIMESTAMP_NS =
std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::NANO);
const auto BINARY =
std::make_shared<::arrow::ListType>(std::make_shared<::arrow::Field>("", UINT8));
const auto TIMESTAMP_MS = ::arrow::timestamp(::arrow::TimeUnit::MILLI);
const auto TIMESTAMP_NS = ::arrow::timestamp(::arrow::TimeUnit::NANO);

TypePtr MakeDecimalType(const PrimitiveNode* node) {
int precision = node->decimal_metadata().precision;
Expand All @@ -72,14 +56,14 @@ TypePtr MakeDecimalType(const PrimitiveNode* node) {
static Status FromByteArray(const PrimitiveNode* node, TypePtr* out) {
switch (node->logical_type()) {
case LogicalType::UTF8:
*out = UTF8;
*out = ::arrow::utf8();
break;
case LogicalType::DECIMAL:
*out = MakeDecimalType(node);
break;
default:
// BINARY
*out = BINARY;
*out = ::arrow::binary();
break;
}
return Status::OK();
Expand All @@ -88,7 +72,7 @@ static Status FromByteArray(const PrimitiveNode* node, TypePtr* out) {
static Status FromFLBA(const PrimitiveNode* node, TypePtr* out) {
switch (node->logical_type()) {
case LogicalType::NONE:
*out = BINARY;
*out = ::arrow::binary();
break;
case LogicalType::DECIMAL:
*out = MakeDecimalType(node);
Expand All @@ -104,22 +88,22 @@ static Status FromFLBA(const PrimitiveNode* node, TypePtr* out) {
static Status FromInt32(const PrimitiveNode* node, TypePtr* out) {
switch (node->logical_type()) {
case LogicalType::NONE:
*out = INT32;
*out = ::arrow::int32();
break;
case LogicalType::UINT_8:
*out = UINT8;
*out = ::arrow::uint8();
break;
case LogicalType::INT_8:
*out = INT8;
*out = ::arrow::int8();
break;
case LogicalType::UINT_16:
*out = UINT16;
*out = ::arrow::uint16();
break;
case LogicalType::INT_16:
*out = INT16;
*out = ::arrow::int16();
break;
case LogicalType::UINT_32:
*out = UINT32;
*out = ::arrow::uint32();
break;
case LogicalType::DECIMAL:
*out = MakeDecimalType(node);
Expand All @@ -134,10 +118,10 @@ static Status FromInt32(const PrimitiveNode* node, TypePtr* out) {
static Status FromInt64(const PrimitiveNode* node, TypePtr* out) {
switch (node->logical_type()) {
case LogicalType::NONE:
*out = INT64;
*out = ::arrow::int64();
break;
case LogicalType::UINT_64:
*out = UINT64;
*out = ::arrow::uint64();
break;
case LogicalType::DECIMAL:
*out = MakeDecimalType(node);
Expand All @@ -155,7 +139,7 @@ static Status FromInt64(const PrimitiveNode* node, TypePtr* out) {
Status FromPrimitive(const PrimitiveNode* primitive, TypePtr* out) {
switch (primitive->physical_type()) {
case ParquetType::BOOLEAN:
*out = BOOL;
*out = ::arrow::boolean();
break;
case ParquetType::INT32:
RETURN_NOT_OK(FromInt32(primitive, out));
Expand All @@ -167,13 +151,12 @@ Status FromPrimitive(const PrimitiveNode* primitive, TypePtr* out) {
*out = TIMESTAMP_NS;
break;
case ParquetType::FLOAT:
*out = FLOAT;
*out = ::arrow::float32();
break;
case ParquetType::DOUBLE:
*out = DOUBLE;
*out = ::arrow::float64();
break;
case ParquetType::BYTE_ARRAY:
// TODO: Do we have that type in Arrow?
RETURN_NOT_OK(FromByteArray(primitive, out));
break;
case ParquetType::FIXED_LEN_BYTE_ARRAY:
Expand Down Expand Up @@ -211,21 +194,21 @@ Status NodeToList(const GroupNode* group, TypePtr* out) {
// List of primitive type
std::shared_ptr<Field> item_field;
RETURN_NOT_OK(NodeToField(list_group->field(0), &item_field));
*out = std::make_shared<::arrow::ListType>(item_field);
*out = ::arrow::list(item_field);
} else {
// List of struct
std::shared_ptr<::arrow::DataType> inner_type;
RETURN_NOT_OK(StructFromGroup(list_group, &inner_type));
auto item_field = std::make_shared<Field>(list_node->name(), inner_type, false);
*out = std::make_shared<::arrow::ListType>(item_field);
*out = ::arrow::list(item_field);
}
} else if (list_node->is_repeated()) {
// repeated primitive node
std::shared_ptr<::arrow::DataType> inner_type;
const PrimitiveNode* primitive = static_cast<const PrimitiveNode*>(list_node.get());
RETURN_NOT_OK(FromPrimitive(primitive, &inner_type));
auto item_field = std::make_shared<Field>(list_node->name(), inner_type, false);
*out = std::make_shared<::arrow::ListType>(item_field);
*out = ::arrow::list(item_field);
} else {
return Status::NotImplemented(
"Non-repeated groups in a LIST-annotated group are not supported.");
Expand All @@ -247,7 +230,7 @@ Status NodeToField(const NodePtr& node, std::shared_ptr<Field>* out) {
const PrimitiveNode* primitive = static_cast<const PrimitiveNode*>(node.get());
RETURN_NOT_OK(FromPrimitive(primitive, &inner_type));
auto item_field = std::make_shared<Field>(node->name(), inner_type, false);
type = std::make_shared<::arrow::ListType>(item_field);
type = ::arrow::list(item_field);
nullable = false;
} else if (node->is_group()) {
const GroupNode* group = static_cast<const GroupNode*>(node.get());
Expand Down Expand Up @@ -423,5 +406,4 @@ Status ToParquetSchema(const ::arrow::Schema* arrow_schema,
}

} // namespace arrow

} // namespace parquet
21 changes: 13 additions & 8 deletions cpp/src/parquet/arrow/test-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ using is_arrow_int = std::is_integral<typename ArrowType::c_type>;
template <typename ArrowType>
using is_arrow_string = std::is_same<ArrowType, ::arrow::StringType>;

template <typename ArrowType>
using is_arrow_binary = std::is_same<ArrowType, ::arrow::BinaryType>;

template <typename ArrowType>
using is_arrow_bool = std::is_same<ArrowType, ::arrow::BooleanType>;

Expand All @@ -62,10 +65,11 @@ typename std::enable_if<is_arrow_int<ArrowType>::value, Status>::type NonNullArr
}

template <class ArrowType>
typename std::enable_if<is_arrow_string<ArrowType>::value, Status>::type NonNullArray(
size_t size, std::shared_ptr<Array>* out) {
::arrow::StringBuilder builder(
::arrow::default_memory_pool(), std::make_shared<::arrow::StringType>());
typename std::enable_if<
is_arrow_string<ArrowType>::value || is_arrow_binary<ArrowType>::value, Status>::type
NonNullArray(size_t size, std::shared_ptr<Array>* out) {
using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType;
BuilderType builder(::arrow::default_memory_pool(), std::make_shared<ArrowType>());
for (size_t i = 0; i < size; i++) {
builder.Append("test-string");
}
Expand Down Expand Up @@ -121,16 +125,17 @@ typename std::enable_if<is_arrow_int<ArrowType>::value, Status>::type NullableAr

// This helper function only supports (size/2) nulls yet.
template <typename ArrowType>
typename std::enable_if<is_arrow_string<ArrowType>::value, Status>::type NullableArray(
size_t size, size_t num_nulls, std::shared_ptr<::arrow::Array>* out) {
typename std::enable_if<
is_arrow_string<ArrowType>::value || is_arrow_binary<ArrowType>::value, Status>::type
NullableArray(size_t size, size_t num_nulls, std::shared_ptr<::arrow::Array>* out) {
std::vector<uint8_t> valid_bytes(size, 1);

for (size_t i = 0; i < num_nulls; i++) {
valid_bytes[i * 2] = 0;
}

::arrow::StringBuilder builder(
::arrow::default_memory_pool(), std::make_shared<::arrow::StringType>());
using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType;
BuilderType builder(::arrow::default_memory_pool(), std::make_shared<ArrowType>());
for (size_t i = 0; i < size; i++) {
builder.Append("test-string");
}
Expand Down
10 changes: 5 additions & 5 deletions cpp/src/parquet/arrow/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
namespace parquet {
namespace arrow {

#define PARQUET_CATCH_NOT_OK(s) \
try { \
(s); \
} catch (const ::parquet::ParquetException& e) { \
return ::arrow::Status::IOError(e.what()); \
#define PARQUET_CATCH_NOT_OK(s) \
try { \
(s); \
} catch (const ::parquet::ParquetException& e) { \
return ::arrow::Status::IOError(e.what()); \
}

#define PARQUET_IGNORE_NOT_OK(s) \
Expand Down
Loading

0 comments on commit 3b49dc9

Please sign in to comment.