Skip to content

Commit

Permalink
Gpugraph (PaddlePaddle#24)
Browse files Browse the repository at this point in the history
* load edge parallel

* load node and edge parallel

* add node and edge parallel

* comment slot fea parse

* clear some log

* add some log

Co-authored-by: root <root@yq01-inf-hic-k8s-a100-ab2-0009.yq01.baidu.com>
  • Loading branch information
miaoli06 and root authored Jun 10, 2022
1 parent ccccb5f commit 3bde5e4
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 112 deletions.
195 changes: 95 additions & 100 deletions paddle/fluid/distributed/ps/table/common_graph_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1054,7 +1054,6 @@ int32_t GraphTable::get_nodes_ids_by_ranges(

// TODO opt load all node_types in once reading
int32_t GraphTable::load_nodes(const std::string &path, std::string node_type) {
VLOG(2) << "Begin GraphTable::load_nodes() node_type[" << node_type << "]";
auto paths = paddle::string::split_string<std::string>(path, ";");
uint64_t count = 0;
uint64_t valid_count = 0;
Expand All @@ -1070,57 +1069,54 @@ int32_t GraphTable::load_nodes(const std::string &path, std::string node_type) {
}
idx = feature_to_id[node_type];
}
for (auto path : paths) {
VLOG(2) << "Begin GraphTable::load_nodes(), path[" << path << "]";
std::ifstream file(path);
std::string line;
while (std::getline(file, line)) {
auto values = paddle::string::split_string<std::string>(line, "\t");
if (values.size() < 2) continue;
auto id = std::stoull(values[1]);

size_t shard_id = id % shard_num;
if (shard_id >= shard_end || shard_id < shard_start) {
VLOG(4) << "will not load " << id << " from " << path

VLOG(0) << "Begin GraphTable::load_nodes() node_type[" << node_type << "]";
std::vector<std::future<int>> tasks;
for (size_t i = 0; i < paths.size(); i++) {
tasks.push_back(load_node_edge_task_pool[i % load_thread_num]->enqueue(
[&, i, idx, this]() -> int {
VLOG(2) << "Begin GraphTable::load_nodes(), path[" << paths[i] << "]";
std::ifstream file(paths[i]);
std::string line;
uint64_t local_count = 0;
while (std::getline(file, line)) {
auto values = paddle::string::split_string<std::string>(line, "\t");
if (values.size() < 2) continue;
std::string nt = values[0];
if (nt != node_type) {
continue;
}

auto id = std::stoull(values[1]);
size_t shard_id = id % shard_num;
if (shard_id >= shard_end || shard_id < shard_start) {
VLOG(4) << "will not load " << id << " from " << path
<< ", please check id distribution";
continue;
}

if (count % 1000000 == 0) {
VLOG(0) << count << " nodes are loaded from filepath";
VLOG(0) << line;
}
count++;

std::string nt = values[0];
if (nt != node_type) {
continue;
}

size_t index = shard_id - shard_start;

// auto node = shards[index]->add_feature_node(id);
auto node = feature_shards[idx][index]->add_feature_node(id);
node->set_feature_size(feat_name[idx].size());

for (size_t slice = 2; slice < values.size(); slice++) {
auto feat = this->parse_feature(idx, values[slice]);
if (feat.first >= 0) {
node->set_feature(feat.first, feat.second);
} else {
VLOG(4) << "Node feature: " << values[slice]
<< " not in feature_map.";
continue;
}

size_t index = shard_id - shard_start;
auto node = feature_shards[idx][index]->add_feature_node(id);
node->set_feature_size(feat_name[idx].size());

// TODO(huwei02): open it when slot fea ready
//for (size_t slice = 2; slice < values.size(); slice++) {
// auto feat = this->parse_feature(idx, values[slice]);
// if (feat.first >= 0) {
// node->set_feature(feat.first, feat.second);
// } else {
// VLOG(4) << "Node feature: " << values[slice]
// << " not in feature_map.";
// }
//}
local_count++;
}
valid_count++;
}
VLOG(2) << "End GraphTable::load_nodes(), node_type[" << node_type
<< "] node_num[" << valid_count << "]";
VLOG(0) << "node_type[" << node_type << "] loads " << local_count << " nodes from filepath->" << paths[i];
return 0;
}));
}

VLOG(0) << valid_count << "/" << count << " nodes in type " << node_type
<< " are loaded successfully in " << path << " node_type["
<< node_type << "]";
for (int i = 0; i < (int)tasks.size(); i++) tasks[i].get();
VLOG(0) << "successfully load all node_type[" << node_type << "] data";
return 0;
}

Expand Down Expand Up @@ -1156,67 +1152,63 @@ int32_t GraphTable::load_edges(const std::string &path, bool reverse_edge,
uint64_t count = 0;
std::string sample_type = "random";
bool is_weighted = false;
uint64_t valid_count = 0;
for (auto path : paths) {
std::ifstream file(path);
std::string line;
while (std::getline(file, line)) {
auto values = paddle::string::split_string<std::string>(line, "\t");
count++;
if (values.size() < 2) continue;
auto src_id = std::stoull(values[0]);
auto dst_id = std::stoull(values[1]);
if (reverse_edge) {
std::swap(src_id, dst_id);
}
float weight = 1;
if (values.size() == 3) {
weight = std::stof(values[2]);
sample_type = "weighted";
is_weighted = true;
}

size_t src_shard_id = src_id % shard_num;

VLOG(0) << "Begin GraphTable::load_edges() edge_type[" << edge_type << "]";
std::vector<std::future<int>> tasks;
for (int i = 0; i < paths.size(); i++) {
tasks.push_back(load_node_edge_task_pool[i % load_thread_num]->enqueue(
[&, i, idx, this]() -> int {
uint64_t local_count = 0;
std::ifstream file(paths[i]);
std::string line;
auto path_split = paddle::string::split_string<std::string>(paths[i], "/");
auto part_name_split = paddle::string::split_string<std::string>(path_split[path_split.size() - 1], "-");
auto part_num = std::stoull(part_name_split[part_name_split.size() - 1]);

while (std::getline(file, line)) {
auto values = paddle::string::split_string<std::string>(line, "\t");
local_count++;
if (values.size() < 2) continue;
auto src_id = std::stoull(values[0]);
auto dst_id = std::stoull(values[1]);
if (reverse_edge) {
std::swap(src_id, dst_id);
}
size_t src_shard_id = src_id % shard_num;
if (src_shard_id != (part_num % shard_num)) {
continue;
}

float weight = 1;
if (values.size() == 3) {
weight = std::stof(values[2]);
sample_type = "weighted";
is_weighted = true;
}

if (src_shard_id >= shard_end || src_shard_id < shard_start) {
VLOG(4) << "will not load " << src_id << " from " << path
if (src_shard_id >= shard_end || src_shard_id < shard_start) {
VLOG(4) << "will not load " << src_id << " from " << path
<< ", please check id distribution";
continue;
}

if (count % 1000000 == 0) {
VLOG(0) << count << " edges are loaded from filepath";
VLOG(0) << line;
continue;
}
size_t index = src_shard_id - shard_start;
edge_shards[idx][index]->add_graph_node(src_id)->build_edges(is_weighted);
edge_shards[idx][index]->add_neighbor(src_id, dst_id, weight);
}

size_t index = src_shard_id - shard_start;
edge_shards[idx][index]->add_graph_node(src_id)->build_edges(is_weighted);
edge_shards[idx][index]->add_neighbor(src_id, dst_id, weight);
valid_count++;
#ifdef PADDLE_WITH_HETERPS
if (count > fixed_load_edges && search_level == 2) {
if (search_level == 2) {
dump_edges_to_ssd(idx);
VLOG(0) << "dumping edges to ssd, edge count is reset to 0";
clear_graph(idx);
count = 0;
}
#endif
}
}
VLOG(0) << valid_count << "/" << count << " edges are loaded successfully in "
<< path;

#ifdef PADDLE_WITH_HETERPS
if (search_level == 2) {
if (count > 0) {
dump_edges_to_ssd(idx);
VLOG(0) << "dumping edges to ssd, edge count is reset to 0";
clear_graph(idx);
count = 0;
}
return 0;
VLOG(0) << local_count << " edges are loaded from filepath->" << paths[i];
return 0;
}));
}
#endif
for (int j = 0; j < (int)tasks.size(); j++) tasks[j].get();
VLOG(0) << "successfully load all edge_type[" << edge_type << "] data";

#ifdef PADDLE_WITH_GPU_GRAPH
// To reduce memory overhead, CPU samplers won't be created in gpugraph.
Expand All @@ -1232,7 +1224,6 @@ int32_t GraphTable::load_edges(const std::string &path, bool reverse_edge,
}
}
#endif

return 0;
}

Expand Down Expand Up @@ -1747,6 +1738,10 @@ int32_t GraphTable::Initialize(const GraphParameter &graph) {
_shards_task_pool[i].reset(new ::ThreadPool(1));
_shards_task_rng_pool.push_back(paddle::framework::GetCPURandomEngine(0));
}
load_node_edge_task_pool.resize(load_thread_num);
for (size_t i = 0; i< load_node_edge_task_pool.size(); i++) {
load_node_edge_task_pool[i].reset(new ::ThreadPool(1));
}
auto graph_feature = graph.graph_feature();
auto node_types = graph.node_types();
auto edge_types = graph.edge_types();
Expand Down
2 changes: 2 additions & 0 deletions paddle/fluid/distributed/ps/table/common_graph_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,7 @@ class GraphTable : public Table {
std::vector<std::vector<GraphShard *>> edge_shards, feature_shards;
size_t shard_start, shard_end, server_num, shard_num_per_server, shard_num;
int task_pool_size_ = 24;
int load_thread_num = 150;
const int random_sample_nodes_ranges = 3;

std::vector<std::vector<std::unordered_map<uint64_t, double>>> node_weight;
Expand All @@ -615,6 +616,7 @@ class GraphTable : public Table {

std::vector<std::shared_ptr<::ThreadPool>> _shards_task_pool;
std::vector<std::shared_ptr<std::mt19937_64>> _shards_task_rng_pool;
std::vector<std::shared_ptr<::ThreadPool>> load_node_edge_task_pool;
std::shared_ptr<ScaledLRU<SampleKey, SampleResult>> scaled_lru;
std::unordered_set<uint64_t> extra_nodes;
std::unordered_map<uint64_t, size_t> extra_nodes_to_thread_index;
Expand Down
26 changes: 14 additions & 12 deletions paddle/fluid/framework/data_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -475,18 +475,20 @@ void DatasetImpl<T>::LoadIntoMemory() {
}
cnt++;
}
for (auto& iter : node_to_id) {
int node_idx = iter.second;
auto gpu_graph_device_keys =
gpu_graph_ptr->get_all_feature_ids(1, node_idx, thread_num_);
for (size_t i = 0; i < gpu_graph_device_keys.size(); i++) {
VLOG(2) << "node type: " << node_idx << ", gpu_graph_device_keys[" << i
<< "] = " << gpu_graph_device_keys[i].size();
for (size_t j = 0; j < gpu_graph_device_keys[i].size(); j++) {
gpu_graph_total_keys_.push_back(gpu_graph_device_keys[i][j]);
}
}
}
//TODO(huwei02): open it when slot fea ready
//for (auto& iter : node_to_id) {
// int node_idx = iter.second;
// auto gpu_graph_device_keys =
// gpu_graph_ptr->get_all_feature_ids(1, node_idx, thread_num_);
// for (size_t i = 0; i < gpu_graph_device_keys.size(); i++) {
// VLOG(2) << "node type: " << node_idx << ", gpu_graph_device_keys[" << i
// << "] = " << gpu_graph_device_keys[i].size();
// for (size_t j = 0; j < gpu_graph_device_keys[i].size(); j++) {
// gpu_graph_total_keys_.push_back(gpu_graph_device_keys[i][j]);
// }
// }
//}

// FIX: trick for iterate edge table
for (auto& iter : edge_to_id) {
int edge_idx = iter.second;
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.cu
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ void GraphGpuWrapper::init_search_level(int level) { search_level = level; }

void GraphGpuWrapper::init_service() {
table_proto.set_task_pool_size(24);
table_proto.set_shard_num(1000);
table_proto.set_search_level(search_level);
table_proto.set_table_name("cpu_graph_table_");
table_proto.set_use_cache(false);
Expand Down

0 comments on commit 3bde5e4

Please sign in to comment.