Skip to content

Commit

Permalink
PARQUET-583: Parquet to Thrift schema conversion
Browse files Browse the repository at this point in the history
Depends on apache#86

Author: Uwe L. Korn <uwelk@xhochy.com>

Closes apache#87 from xhochy/parquet-583 and squashes the following commits:

9f3f050 [Uwe L. Korn] Incoperate feedback
86aed44 [Uwe L. Korn] PARQUET-583: Parquet to Thrift schema conversion

Change-Id: I0b255221ad1c5f47db6842b0141da27dd649ff8e
  • Loading branch information
xhochy authored and wesm committed Apr 19, 2016
1 parent 3b88573 commit 2f1002b
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 20 deletions.
39 changes: 39 additions & 0 deletions cpp/src/parquet/schema/converter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,45 @@ std::shared_ptr<SchemaDescriptor> FromParquet(const std::vector<SchemaElement>&
return descr;
}

void ToParquet(const GroupNode* schema, std::vector<format::SchemaElement>* out) {
SchemaFlattener flattener(schema, out);
flattener.Flatten();
}

class SchemaVisitor : public Node::ConstVisitor {
public:
explicit SchemaVisitor(std::vector<format::SchemaElement>* elements)
: elements_(elements) {}
virtual ~SchemaVisitor() {}

void Visit(const Node* node) override {
format::SchemaElement element;
node->ToParquet(&element);
// Override field_id here as we can get user-generated Nodes without a valid id
element.__set_field_id(elements_->size());
elements_->push_back(element);

if (node->is_group()) {
const GroupNode* group_node = static_cast<const GroupNode*>(node);
for (int i = 0; i < group_node->field_count(); ++i) {
group_node->field(i)->VisitConst(this);
}
}
}

private:
std::vector<format::SchemaElement>* elements_;
};

SchemaFlattener::SchemaFlattener(const GroupNode* schema,
std::vector<format::SchemaElement>* out)
: root_(schema), elements_(out) {}

void SchemaFlattener::Flatten() {
SchemaVisitor visitor(elements_);
root_->VisitConst(&visitor);
}

} // namespace schema

} // namespace parquet
4 changes: 3 additions & 1 deletion cpp/src/parquet/schema/converter.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,11 @@ class SchemaFlattener {
public:
SchemaFlattener(const GroupNode* schema, std::vector<format::SchemaElement>* out);

void Flatten();

private:
const GroupNode* root_;
std::vector<format::SchemaElement>* schema_;
std::vector<format::SchemaElement>* elements_;
};

} // namespace schema
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/parquet/schema/printer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ namespace parquet {

namespace schema {

class SchemaPrinter : public Node::Visitor {
class SchemaPrinter : public Node::ConstVisitor {
public:
explicit SchemaPrinter(std::ostream& stream, int indent_width) :
stream_(stream),
indent_(0),
indent_width_(2) {}

virtual void Visit(const Node* node);
void Visit(const Node* node) override;

private:
void Visit(const PrimitiveNode* node);
Expand Down Expand Up @@ -108,7 +108,7 @@ void SchemaPrinter::Visit(const GroupNode* node) {

indent_ += indent_width_;
for (int i = 0; i < node->field_count(); ++i) {
node->field(i)->Visit(this);
node->field(i)->VisitConst(this);
}
indent_ -= indent_width_;
Indent();
Expand Down
73 changes: 63 additions & 10 deletions cpp/src/parquet/schema/schema-converter-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,21 +82,21 @@ bool check_for_parent_consistency(const GroupNode* node) {
TEST_F(TestSchemaConverter, NestedExample) {
SchemaElement elt;
std::vector<SchemaElement> elements;
elements.push_back(NewGroup(name_, FieldRepetitionType::REPEATED, 2));
elements.push_back(NewGroup(name_, FieldRepetitionType::REPEATED, 2, 0));

// A primitive one
elements.push_back(NewPrimitive("a", FieldRepetitionType::REQUIRED,
format::Type::INT32));
format::Type::INT32, 1));

// A group
elements.push_back(NewGroup("bag", FieldRepetitionType::OPTIONAL, 1));
elements.push_back(NewGroup("bag", FieldRepetitionType::OPTIONAL, 1, 2));

// 3-level list encoding, by hand
elt = NewGroup("b", FieldRepetitionType::REPEATED, 1);
elt = NewGroup("b", FieldRepetitionType::REPEATED, 1, 3);
elt.__set_converted_type(ConvertedType::LIST);
elements.push_back(elt);
elements.push_back(NewPrimitive("item", FieldRepetitionType::OPTIONAL,
format::Type::INT64));
format::Type::INT64, 4));

Convert(&elements[0], elements.size());

Expand Down Expand Up @@ -127,33 +127,86 @@ TEST_F(TestSchemaConverter, InvalidRoot) {

SchemaElement elements[2];
elements[0] = NewPrimitive("not-a-group", FieldRepetitionType::REQUIRED,
format::Type::INT32);
format::Type::INT32, 0);
ASSERT_THROW(Convert(elements, 2), ParquetException);

// While the Parquet spec indicates that the root group should have REPEATED
// repetition type, some implementations may return REQUIRED or OPTIONAL
// groups as the first element. These tests check that this is okay as a
// practicality matter.
elements[0] = NewGroup("not-repeated", FieldRepetitionType::REQUIRED, 1);
elements[0] = NewGroup("not-repeated", FieldRepetitionType::REQUIRED, 1, 0);
elements[1] = NewPrimitive("a", FieldRepetitionType::REQUIRED,
format::Type::INT32);
format::Type::INT32, 1);
Convert(elements, 2);

elements[0] = NewGroup("not-repeated", FieldRepetitionType::OPTIONAL, 1);
elements[0] = NewGroup("not-repeated", FieldRepetitionType::OPTIONAL, 1, 0);
Convert(elements, 2);
}

TEST_F(TestSchemaConverter, NotEnoughChildren) {
// Throw a ParquetException, but don't core dump or anything
SchemaElement elt;
std::vector<SchemaElement> elements;
elements.push_back(NewGroup(name_, FieldRepetitionType::REPEATED, 2));
elements.push_back(NewGroup(name_, FieldRepetitionType::REPEATED, 2, 0));
ASSERT_THROW(Convert(&elements[0], 1), ParquetException);
}

// ----------------------------------------------------------------------
// Schema tree flatten / unflatten

class TestSchemaFlatten : public ::testing::Test {
public:
void setUp() {
name_ = "parquet_schema";
}

void Flatten(const GroupNode* schema) {
ToParquet(schema, &elements_);
}

protected:
std::string name_;
std::vector<format::SchemaElement> elements_;
};

TEST_F(TestSchemaFlatten, NestedExample) {
SchemaElement elt;
std::vector<SchemaElement> elements;
elements.push_back(NewGroup(name_, FieldRepetitionType::REPEATED, 2, 0));

// A primitive one
elements.push_back(NewPrimitive("a", FieldRepetitionType::REQUIRED,
format::Type::INT32, 1));

// A group
elements.push_back(NewGroup("bag", FieldRepetitionType::OPTIONAL, 1, 2));

// 3-level list encoding, by hand
elt = NewGroup("b", FieldRepetitionType::REPEATED, 1, 3);
elt.__set_converted_type(ConvertedType::LIST);
elements.push_back(elt);
elements.push_back(NewPrimitive("item", FieldRepetitionType::OPTIONAL,
format::Type::INT64, 4));

// Construct the schema
NodeVector fields;
fields.push_back(Int32("a", Repetition::REQUIRED));

// 3-level list encoding
NodePtr item = Int64("item");
NodePtr list(GroupNode::Make("b", Repetition::REPEATED, {item}, LogicalType::LIST));
NodePtr bag(GroupNode::Make("bag", Repetition::OPTIONAL, {list}));
fields.push_back(bag);

NodePtr schema = GroupNode::Make(name_, Repetition::REPEATED, fields);

Flatten(static_cast<GroupNode*>(schema.get()));
ASSERT_EQ(elements_.size(), elements.size());
for (size_t i = 0; i < elements_.size(); i++) {
ASSERT_EQ(elements_[i], elements[i]);
}
}

} // namespace schema

} // namespace parquet
8 changes: 4 additions & 4 deletions cpp/src/parquet/schema/schema-types-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ TEST_F(TestPrimitiveNode, Attrs) {

TEST_F(TestPrimitiveNode, FromParquet) {
SchemaElement elt = NewPrimitive(name_, FieldRepetitionType::OPTIONAL,
format::Type::INT32);
format::Type::INT32, 0);
Convert(&elt);
ASSERT_EQ(name_, prim_node_->name());
ASSERT_EQ(id_, prim_node_->id());
Expand All @@ -121,7 +121,7 @@ TEST_F(TestPrimitiveNode, FromParquet) {
ASSERT_EQ(LogicalType::NONE, prim_node_->logical_type());

// Test a logical type
elt = NewPrimitive(name_, FieldRepetitionType::REQUIRED, format::Type::BYTE_ARRAY);
elt = NewPrimitive(name_, FieldRepetitionType::REQUIRED, format::Type::BYTE_ARRAY, 0);
elt.__set_converted_type(ConvertedType::UTF8);

Convert(&elt);
Expand All @@ -131,7 +131,7 @@ TEST_F(TestPrimitiveNode, FromParquet) {

// FIXED_LEN_BYTE_ARRAY
elt = NewPrimitive(name_, FieldRepetitionType::OPTIONAL,
format::Type::FIXED_LEN_BYTE_ARRAY);
format::Type::FIXED_LEN_BYTE_ARRAY, 0);
elt.__set_type_length(16);

Convert(&elt);
Expand All @@ -143,7 +143,7 @@ TEST_F(TestPrimitiveNode, FromParquet) {

// ConvertedType::Decimal
elt = NewPrimitive(name_, FieldRepetitionType::OPTIONAL,
format::Type::FIXED_LEN_BYTE_ARRAY);
format::Type::FIXED_LEN_BYTE_ARRAY, 0);
elt.__set_converted_type(ConvertedType::DECIMAL);
elt.__set_type_length(6);
elt.__set_scale(2);
Expand Down
10 changes: 8 additions & 2 deletions cpp/src/parquet/schema/test-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,28 @@ namespace parquet {
namespace schema {

static inline SchemaElement NewPrimitive(const std::string& name,
FieldRepetitionType::type repetition, format::Type::type type) {
FieldRepetitionType::type repetition, format::Type::type type, int id = 0) {
SchemaElement result;
result.__set_name(name);
result.__set_repetition_type(repetition);
result.__set_type(type);
result.__set_num_children(0);
result.__set_field_id(id);
// Set default (non-set) values
result.__set_type_length(-1);
result.__set_precision(-1);
result.__set_scale(-1);

return result;
}

static inline SchemaElement NewGroup(const std::string& name,
FieldRepetitionType::type repetition, int num_children) {
FieldRepetitionType::type repetition, int num_children, int id = 0) {
SchemaElement result;
result.__set_name(name);
result.__set_repetition_type(repetition);
result.__set_num_children(num_children);
result.__set_field_id(id);

return result;
}
Expand Down
37 changes: 37 additions & 0 deletions cpp/src/parquet/schema/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ void PrimitiveNode::Visit(Node::Visitor* visitor) {
visitor->Visit(this);
}

void PrimitiveNode::VisitConst(Node::ConstVisitor* visitor) const {
visitor->Visit(this);
}

// ----------------------------------------------------------------------
// Group node

Expand Down Expand Up @@ -232,6 +236,10 @@ void GroupNode::Visit(Node::Visitor* visitor) {
visitor->Visit(this);
}

void GroupNode::VisitConst(Node::ConstVisitor* visitor) const {
visitor->Visit(this);
}

// ----------------------------------------------------------------------
// Node construction from Parquet metadata

Expand Down Expand Up @@ -280,6 +288,35 @@ std::unique_ptr<Node> PrimitiveNode::FromParquet(const void* opaque_element,
return std::unique_ptr<Node>(result.release());
}

void GroupNode::ToParquet(void* opaque_element) const {
format::SchemaElement* element =
static_cast<format::SchemaElement*>(opaque_element);
element->__set_name(name_);
element->__set_num_children(field_count());
element->__set_repetition_type(ToThrift(repetition_));
if (logical_type_ != LogicalType::NONE) {
element->__set_converted_type(ToThrift(logical_type_));
}
// FIXME: SchemaFlattener does this for us: element->__set_field_id(id_);
}

void PrimitiveNode::ToParquet(void* opaque_element) const {
format::SchemaElement* element =
static_cast<format::SchemaElement*>(opaque_element);

element->__set_name(name_);
element->__set_num_children(0);
element->__set_repetition_type(ToThrift(repetition_));
if (logical_type_ != LogicalType::NONE) {
element->__set_converted_type(ToThrift(logical_type_));
}
element->__set_type(ToThrift(physical_type_));
// FIXME: SchemaFlattener does this for us: element->__set_field_id(id_);
element->__set_type_length(type_length_);
element->__set_precision(decimal_metadata_.precision);
element->__set_scale(decimal_metadata_.scale);
}

} // namespace schema

} // namespace parquet
15 changes: 15 additions & 0 deletions cpp/src/parquet/schema/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,15 +163,26 @@ class Node {
return parent_;
}

// ToParquet returns an opaque void* to avoid exporting
// parquet::SchemaElement into the public API
virtual void ToParquet(void* opaque_element) const = 0;

// Node::Visitor abstract class for walking schemas with the visitor pattern
class Visitor {
public:
virtual ~Visitor() {}

virtual void Visit(Node* node) = 0;
};
class ConstVisitor {
public:
virtual ~ConstVisitor() {}

virtual void Visit(const Node* node) = 0;
};

virtual void Visit(Visitor* visitor) = 0;
virtual void VisitConst(ConstVisitor* visitor) const = 0;

protected:
friend class GroupNode;
Expand Down Expand Up @@ -224,7 +235,9 @@ class PrimitiveNode : public Node {
return decimal_metadata_;
}

void ToParquet(void* opaque_element) const override;
virtual void Visit(Visitor* visitor);
void VisitConst(ConstVisitor* visitor) const override;

private:
PrimitiveNode(const std::string& name, Repetition::type repetition,
Expand Down Expand Up @@ -278,7 +291,9 @@ class GroupNode : public Node {
return fields_.size();
}

void ToParquet(void* opaque_element) const override;
virtual void Visit(Visitor* visitor);
void VisitConst(ConstVisitor* visitor) const override;

private:
GroupNode(const std::string& name, Repetition::type repetition,
Expand Down

0 comments on commit 2f1002b

Please sign in to comment.