Skip to content

Commit

Permalink
Reorg fragment loader and basic fragment loaders. (#90)
Browse files Browse the repository at this point in the history
  • Loading branch information
luoxiaojian authored Dec 4, 2020
1 parent 42d1984 commit 74c574e
Show file tree
Hide file tree
Showing 8 changed files with 1,399 additions and 1,064 deletions.
19 changes: 19 additions & 0 deletions modules/basic/ds/arrow_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,25 @@ limitations under the License.

namespace vineyard {

std::shared_ptr<arrow::Table> ConcatenateTables(
std::vector<std::shared_ptr<arrow::Table>>& tables) {
if (tables.size() == 1) {
return tables[0];
}
auto col_names = tables[0]->ColumnNames();
for (size_t i = 1; i < tables.size(); ++i) {
#if defined(ARROW_VERSION) && ARROW_VERSION < 17000
CHECK_ARROW_ERROR(tables[i]->RenameColumns(col_names, &tables[i]));
#else
CHECK_ARROW_ERROR_AND_ASSIGN(tables[i],
tables[i]->RenameColumns(col_names));
#endif
}
std::shared_ptr<arrow::Table> table;
CHECK_ARROW_ERROR_AND_ASSIGN(table, arrow::ConcatenateTables(tables));
return table;
}

std::shared_ptr<arrow::DataType> FromAnyType(AnyType type) {
switch (type) {
case AnyType::Int32:
Expand Down
54 changes: 54 additions & 0 deletions modules/graph/fragment/arrow_fragment_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ limitations under the License.
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>

#include "client/client.h"

Expand Down Expand Up @@ -142,6 +143,59 @@ class ArrowFragmentGroupBuilder : public vineyard::ObjectBuilder {
std::unordered_map<fid_t, uint64_t> fragment_locations_;
};

inline boost::leaf::result<ObjectID> ConstructFragmentGroup(
Client& client, ObjectID frag_id, const grape::CommSpec& comm_spec) {
ObjectID group_object_id;
uint64_t instance_id = client.instance_id();

if (comm_spec.worker_id() == 0) {
std::vector<uint64_t> gathered_instance_ids(comm_spec.worker_num());
std::vector<ObjectID> gathered_object_ids(comm_spec.worker_num());

MPI_Gather(&instance_id, sizeof(uint64_t), MPI_CHAR,
&gathered_instance_ids[0], sizeof(uint64_t), MPI_CHAR, 0,
comm_spec.comm());

MPI_Gather(&frag_id, sizeof(ObjectID), MPI_CHAR, &gathered_object_ids[0],
sizeof(ObjectID), MPI_CHAR, 0, comm_spec.comm());

ArrowFragmentGroupBuilder builder;
builder.set_total_frag_num(comm_spec.fnum());
auto fragment =
std::dynamic_pointer_cast<ArrowFragmentBase>(client.GetObject(frag_id));
auto& meta = fragment->meta();

builder.set_vertex_label_num(
meta.GetKeyValue<typename ArrowFragmentBase::label_id_t>(
"vertex_label_num"));
builder.set_edge_label_num(
meta.GetKeyValue<typename ArrowFragmentBase::label_id_t>(
"edge_label_num"));
for (fid_t i = 0; i < comm_spec.fnum(); ++i) {
builder.AddFragmentObject(
i, gathered_object_ids[comm_spec.FragToWorker(i)],
gathered_instance_ids[comm_spec.FragToWorker(i)]);
}

auto group_object =
std::dynamic_pointer_cast<ArrowFragmentGroup>(builder.Seal(client));
group_object_id = group_object->id();
VY_OK_OR_RAISE(client.Persist(group_object_id));

MPI_Bcast(&group_object_id, sizeof(ObjectID), MPI_CHAR, 0,
comm_spec.comm());
} else {
MPI_Gather(&instance_id, sizeof(uint64_t), MPI_CHAR, NULL, sizeof(uint64_t),
MPI_CHAR, 0, comm_spec.comm());
MPI_Gather(&frag_id, sizeof(ObjectID), MPI_CHAR, NULL, sizeof(ObjectID),
MPI_CHAR, 0, comm_spec.comm());

MPI_Bcast(&group_object_id, sizeof(ObjectID), MPI_CHAR, 0,
comm_spec.comm());
}
return group_object_id;
}

} // namespace vineyard

#endif // MODULES_GRAPH_FRAGMENT_ARROW_FRAGMENT_GROUP_H_
Loading

0 comments on commit 74c574e

Please sign in to comment.