Skip to content

Commit

Permalink
ARROW-92: Arrow to Parquet Schema conversion
Browse files Browse the repository at this point in the history
My current WIP state. To make the actual schema conversion complete, we probably need the physical structure too as Arrow schemas only care about logical types whereas Parquet schema is about logical and physical types.

Author: Uwe L. Korn <uwelk@xhochy.com>

Closes #68 from xhochy/arrow-92 and squashes the following commits:

e3aa261 [Uwe L. Korn] Add macro to convert ParquetException to Status
9c5b085 [Uwe L. Korn] Include string
42ed0ea [Uwe L. Korn] Add struct conversion
38e68e5 [Uwe L. Korn] make format
9a6c876 [Uwe L. Korn] Add more types
8a0293e [Uwe L. Korn] ARROW-92: Arrow to Parquet Schema conversion
  • Loading branch information
xhochy authored and wesm committed May 1, 2016
1 parent 56514d9 commit 355f7c9
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 0 deletions.
75 changes: 75 additions & 0 deletions cpp/src/arrow/parquet/parquet-schema-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,81 @@ TEST_F(TestConvertParquetSchema, UnsupportedThings) {
}
}

class TestConvertArrowSchema : public ::testing::Test {
public:
virtual void SetUp() {}

void CheckFlatSchema(const std::vector<NodePtr>& nodes) {
NodePtr schema_node = GroupNode::Make("schema", Repetition::REPEATED, nodes);
const GroupNode* expected_schema_node =
static_cast<const GroupNode*>(schema_node.get());
const GroupNode* result_schema_node =
static_cast<const GroupNode*>(result_schema_->schema().get());

ASSERT_EQ(expected_schema_node->field_count(), result_schema_node->field_count());

for (int i = 0; i < expected_schema_node->field_count(); i++) {
auto lhs = result_schema_node->field(i);
auto rhs = expected_schema_node->field(i);
EXPECT_TRUE(lhs->Equals(rhs.get()));
}
}

Status ConvertSchema(const std::vector<std::shared_ptr<Field>>& fields) {
arrow_schema_ = std::make_shared<Schema>(fields);
return ToParquetSchema(arrow_schema_.get(), &result_schema_);
}

protected:
std::shared_ptr<Schema> arrow_schema_;
std::shared_ptr<::parquet::SchemaDescriptor> result_schema_;
};

TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) {
std::vector<NodePtr> parquet_fields;
std::vector<std::shared_ptr<Field>> arrow_fields;

parquet_fields.push_back(
PrimitiveNode::Make("boolean", Repetition::REQUIRED, ParquetType::BOOLEAN));
arrow_fields.push_back(std::make_shared<Field>("boolean", BOOL, false));

parquet_fields.push_back(
PrimitiveNode::Make("int32", Repetition::REQUIRED, ParquetType::INT32));
arrow_fields.push_back(std::make_shared<Field>("int32", INT32, false));

parquet_fields.push_back(
PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64));
arrow_fields.push_back(std::make_shared<Field>("int64", INT64, false));

parquet_fields.push_back(
PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT));
arrow_fields.push_back(std::make_shared<Field>("float", FLOAT));

parquet_fields.push_back(
PrimitiveNode::Make("double", Repetition::OPTIONAL, ParquetType::DOUBLE));
arrow_fields.push_back(std::make_shared<Field>("double", DOUBLE));

// TODO: String types need to be clarified a bit more in the Arrow spec
parquet_fields.push_back(PrimitiveNode::Make(
"string", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, LogicalType::UTF8));
arrow_fields.push_back(std::make_shared<Field>("string", UTF8));

ASSERT_OK(ConvertSchema(arrow_fields));

CheckFlatSchema(parquet_fields);
}

TEST_F(TestConvertArrowSchema, ParquetFlatDecimals) {
std::vector<NodePtr> parquet_fields;
std::vector<std::shared_ptr<Field>> arrow_fields;

// TODO: Test Decimal Arrow -> Parquet conversion

ASSERT_OK(ConvertSchema(arrow_fields));

CheckFlatSchema(parquet_fields);
}

TEST(TestNodeConversion, DateAndTime) {}

} // namespace parquet
Expand Down
130 changes: 130 additions & 0 deletions cpp/src/arrow/parquet/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@

#include "arrow/parquet/schema.h"

#include <string>
#include <vector>

#include "parquet/api/schema.h"
#include "parquet/exception.h"

#include "arrow/types/decimal.h"
#include "arrow/types/string.h"
#include "arrow/util/status.h"

using parquet::ParquetException;
using parquet::Repetition;
using parquet::schema::Node;
using parquet::schema::NodePtr;
using parquet::schema::GroupNode;
Expand All @@ -36,6 +41,11 @@ namespace arrow {

namespace parquet {

#define PARQUET_CATCH_NOT_OK(s) \
try { \
(s); \
} catch (const ParquetException& e) { return Status::Invalid(e.what()); }

const auto BOOL = std::make_shared<BooleanType>();
const auto UINT8 = std::make_shared<UInt8Type>();
const auto INT32 = std::make_shared<Int32Type>();
Expand Down Expand Up @@ -182,6 +192,126 @@ Status FromParquetSchema(
return Status::OK();
}

Status StructToNode(const std::shared_ptr<StructType>& type, const std::string& name,
bool nullable, NodePtr* out) {
Repetition::type repetition = Repetition::REQUIRED;
if (nullable) { repetition = Repetition::OPTIONAL; }

std::vector<NodePtr> children(type->num_children());
for (int i = 0; i < type->num_children(); i++) {
RETURN_NOT_OK(FieldToNode(type->child(i), &children[i]));
}

*out = GroupNode::Make(name, repetition, children);
return Status::OK();
}

Status FieldToNode(const std::shared_ptr<Field>& field, NodePtr* out) {
LogicalType::type logical_type = LogicalType::NONE;
ParquetType::type type;
Repetition::type repetition = Repetition::REQUIRED;
if (field->nullable) { repetition = Repetition::OPTIONAL; }
int length = -1;

switch (field->type->type) {
// TODO:
// case Type::NA:
// break;
case Type::BOOL:
type = ParquetType::BOOLEAN;
break;
case Type::UINT8:
type = ParquetType::INT32;
logical_type = LogicalType::UINT_8;
break;
case Type::INT8:
type = ParquetType::INT32;
logical_type = LogicalType::INT_8;
break;
case Type::UINT16:
type = ParquetType::INT32;
logical_type = LogicalType::UINT_16;
break;
case Type::INT16:
type = ParquetType::INT32;
logical_type = LogicalType::INT_16;
break;
case Type::UINT32:
type = ParquetType::INT32;
logical_type = LogicalType::UINT_32;
break;
case Type::INT32:
type = ParquetType::INT32;
break;
case Type::UINT64:
type = ParquetType::INT64;
logical_type = LogicalType::UINT_64;
break;
case Type::INT64:
type = ParquetType::INT64;
break;
case Type::FLOAT:
type = ParquetType::FLOAT;
break;
case Type::DOUBLE:
type = ParquetType::DOUBLE;
break;
case Type::CHAR:
type = ParquetType::FIXED_LEN_BYTE_ARRAY;
logical_type = LogicalType::UTF8;
length = static_cast<CharType*>(field->type.get())->size;
break;
case Type::STRING:
type = ParquetType::BYTE_ARRAY;
logical_type = LogicalType::UTF8;
break;
case Type::BINARY:
type = ParquetType::BYTE_ARRAY;
break;
case Type::DATE:
type = ParquetType::INT32;
logical_type = LogicalType::DATE;
break;
case Type::TIMESTAMP:
type = ParquetType::INT64;
logical_type = LogicalType::TIMESTAMP_MILLIS;
break;
case Type::TIMESTAMP_DOUBLE:
type = ParquetType::INT64;
// This is specified as seconds since the UNIX epoch
// TODO: Converted type in Parquet?
// logical_type = LogicalType::TIMESTAMP_MILLIS;
break;
case Type::TIME:
type = ParquetType::INT64;
logical_type = LogicalType::TIME_MILLIS;
break;
case Type::STRUCT: {
auto struct_type = std::static_pointer_cast<StructType>(field->type);
return StructToNode(struct_type, field->name, field->nullable, out);
} break;
default:
// TODO: LIST, DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL, DECIMAL_TEXT, VARCHAR
return Status::NotImplemented("unhandled type");
}
*out = PrimitiveNode::Make(field->name, repetition, type, logical_type, length);
return Status::OK();
}

Status ToParquetSchema(
const Schema* arrow_schema, std::shared_ptr<::parquet::SchemaDescriptor>* out) {
std::vector<NodePtr> nodes(arrow_schema->num_fields());
for (int i = 0; i < arrow_schema->num_fields(); i++) {
RETURN_NOT_OK(FieldToNode(arrow_schema->field(i), &nodes[i]));
}

NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes);
*out = std::make_shared<::parquet::SchemaDescriptor>();
PARQUET_CATCH_NOT_OK((*out)->Init(schema));

return Status::OK();
}

} // namespace parquet

} // namespace arrow
5 changes: 5 additions & 0 deletions cpp/src/arrow/parquet/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ Status NodeToField(const ::parquet::schema::NodePtr& node, std::shared_ptr<Field
Status FromParquetSchema(
const ::parquet::SchemaDescriptor* parquet_schema, std::shared_ptr<Schema>* out);

Status FieldToNode(const std::shared_ptr<Field>& field, ::parquet::schema::NodePtr* out);

Status ToParquetSchema(
const Schema* arrow_schema, std::shared_ptr<::parquet::SchemaDescriptor>* out);

} // namespace parquet

} // namespace arrow
Expand Down

0 comments on commit 355f7c9

Please sign in to comment.