Skip to content

Commit

Permalink
dedup feature, opt parse feature (PaddlePaddle#25)
Browse files Browse the repository at this point in the history
* dedup feature, opt parse feature

* remove debug code

Co-authored-by: root <root@yq01-inf-hic-k8s-a100-ab2-0009.yq01.baidu.com>
  • Loading branch information
huwei02 and root authored Jun 11, 2022
1 parent 3bde5e4 commit 68c2cc4
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 64 deletions.
101 changes: 57 additions & 44 deletions paddle/fluid/distributed/ps/table/common_graph_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -965,12 +965,18 @@ GraphNode *GraphShard::add_graph_node(Node *node) {
}
return (GraphNode *)bucket[node_location[id]];
}
FeatureNode *GraphShard::add_feature_node(uint64_t id) {

FeatureNode *GraphShard::add_feature_node(uint64_t id, bool is_overlap) {
if (node_location.find(id) == node_location.end()) {
node_location[id] = bucket.size();
bucket.push_back(new FeatureNode(id));
return (FeatureNode *)bucket[node_location[id]];
}
if (is_overlap) {
return (FeatureNode *)bucket[node_location[id]];
}
return (FeatureNode *)bucket[node_location[id]];

return NULL;
}

void GraphShard::add_neighbor(uint64_t id, uint64_t dst_id, float weight) {
Expand Down Expand Up @@ -1075,15 +1081,14 @@ int32_t GraphTable::load_nodes(const std::string &path, std::string node_type) {
for (size_t i = 0; i < paths.size(); i++) {
tasks.push_back(load_node_edge_task_pool[i % load_thread_num]->enqueue(
[&, i, idx, this]() -> int {
VLOG(2) << "Begin GraphTable::load_nodes(), path[" << paths[i] << "]";
VLOG(0) << "Begin GraphTable::load_nodes(), path[" << paths[i] << "]";
std::ifstream file(paths[i]);
std::string line;
uint64_t local_count = 0;
while (std::getline(file, line)) {
auto values = paddle::string::split_string<std::string>(line, "\t");
if (values.size() < 2) continue;
std::string nt = values[0];
if (nt != node_type) {
if (values[0] != node_type) {
continue;
}

Expand All @@ -1096,20 +1101,14 @@ int32_t GraphTable::load_nodes(const std::string &path, std::string node_type) {
}

size_t index = shard_id - shard_start;
auto node = feature_shards[idx][index]->add_feature_node(id);
node->set_feature_size(feat_name[idx].size());

// TODO(huwei02): open it when slot fea ready
//for (size_t slice = 2; slice < values.size(); slice++) {
// auto feat = this->parse_feature(idx, values[slice]);
// if (feat.first >= 0) {
// node->set_feature(feat.first, feat.second);
// } else {
// VLOG(4) << "Node feature: " << values[slice]
// << " not in feature_map.";
// }
//}
local_count++;
auto node = feature_shards[idx][index]->add_feature_node(id, false);
if (node != NULL) {
node->set_feature_size(feat_name[idx].size());
for (size_t slice = 2; slice < values.size(); slice++) {
parse_feature(idx, values[slice], node);
}
local_count++;
}
}
VLOG(0) << "node_type[" << node_type << "] loads " << local_count << " nodes from filepath->" << paths[i];
return 0;
Expand Down Expand Up @@ -1503,42 +1502,56 @@ int32_t GraphTable::set_node_feat(
return 0;
}

std::pair<int32_t, std::string> GraphTable::parse_feature(
int idx, std::string feat_str) {
void string_vector_2_string(std::vector<std::string>::iterator strs_begin,
std::vector<std::string>::iterator strs_end, char delim, std::string* output) {
size_t i = 0;
for (std::vector<std::string>::iterator iter = strs_begin; iter != strs_end; ++iter) {
if (i > 0) {
*output += delim;
}

*output += *iter;
++i;
}
}

int GraphTable::parse_feature(int idx, const std::string& feat_str,
FeatureNode* node) {
// Return (feat_id, btyes) if name are in this->feat_name, else return (-1,
// "")
auto fields =
std::vector<std::string> fields =
paddle::string::split_string<std::string>(feat_str, feature_separator_);
if (feat_id_map[idx].count(fields[0])) {
// if (this->feat_id_map.count(fields[0])) {
int32_t id = this->feat_id_map[idx][fields[0]];
auto it = feat_id_map[idx].find(fields[0]);
if (it != feat_id_map[idx].end()) {
int32_t id = it->second;
std::string* fea_ptr = node->mutable_feature(id);
std::string dtype = this->feat_dtype[idx][id];
std::vector<std::string> values(fields.begin() + 1, fields.end());
if (dtype == "feasign") {
return std::make_pair<int32_t, std::string>(
int32_t(id), paddle::string::join_strings(values, ' '));
string_vector_2_string(fields.begin() + 1, fields.end(), ' ', fea_ptr);
return 0;
} else if (dtype == "string") {
return std::make_pair<int32_t, std::string>(
int32_t(id), paddle::string::join_strings(values, ' '));
string_vector_2_string(fields.begin() + 1, fields.end(), ' ', fea_ptr);
return 0;
} else if (dtype == "float32") {
return std::make_pair<int32_t, std::string>(
int32_t(id), FeatureNode::parse_value_to_bytes<float>(values));
FeatureNode::parse_value_to_bytes<float>(fields.begin() + 1, fields.end(), fea_ptr);
return 0;
} else if (dtype == "float64") {
return std::make_pair<int32_t, std::string>(
int32_t(id), FeatureNode::parse_value_to_bytes<double>(values));
FeatureNode::parse_value_to_bytes<double>(fields.begin() + 1, fields.end(), fea_ptr);
return 0;
} else if (dtype == "int32") {
return std::make_pair<int32_t, std::string>(
int32_t(id), FeatureNode::parse_value_to_bytes<int32_t>(values));
FeatureNode::parse_value_to_bytes<int32_t>(fields.begin() + 1, fields.end(), fea_ptr);
return 0;
} else if (dtype == "int64") {
return std::make_pair<int32_t, std::string>(
int32_t(id), FeatureNode::parse_value_to_bytes<uint64_t>(values));
FeatureNode::parse_value_to_bytes<uint64_t>(fields.begin() + 1, fields.end(), fea_ptr);
return 0;
}
} else {
VLOG(2) << "feature_name[" << fields[0]
<< "] is not in feat_id_map, ntype_id[" << idx
<< "] feat_id_map_size[" << feat_id_map.size() << "]";
}
return std::make_pair<int32_t, std::string>(-1, "");

return -1;
}

std::vector<std::vector<uint64_t>> GraphTable::get_all_id(int type_id, int slice_num) {
Expand Down Expand Up @@ -1588,9 +1601,9 @@ std::vector<std::vector<uint64_t>> GraphTable::get_all_id(int type_id, int idx,
return res;
}

std::vector<std::vector<uint64_t>> GraphTable::get_all_feature_ids(int type_id, int idx,
int slice_num) {
std::vector<std::vector<uint64_t>> res(slice_num);
int GraphTable::get_all_feature_ids(int type_id, int idx, int slice_num,
std::vector<std::vector<uint64_t>>* output) {
output->resize(slice_num);
auto &search_shards = type_id == 0 ? edge_shards[idx] : feature_shards[idx];
std::vector<std::future<std::vector<uint64_t>>> tasks;
for (int i = 0; i < search_shards.size(); i++) {
Expand All @@ -1604,9 +1617,9 @@ std::vector<std::vector<uint64_t>> GraphTable::get_all_feature_ids(int type_id,
}
for (size_t i = 0; i < tasks.size(); i++) {
auto ids = tasks[i].get();
for (auto &id : ids) res[id % slice_num].push_back(id);
for (auto &id : ids) (*output)[id % slice_num].push_back(id);
}
return res;
return 0;
}

int32_t GraphTable::pull_graph_list(int type_id, int idx, int start,
Expand Down
15 changes: 8 additions & 7 deletions paddle/fluid/distributed/ps/table/common_graph_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,17 @@ class GraphShard {
std::vector<uint64_t> get_all_feature_ids() {
// TODO by huwei02, dedup
std::vector<uint64_t> total_res;
std::set<uint64_t> res;
for (int i = 0; i < (int)bucket.size(); i++) {
std::vector<uint64_t> res;
res.push_back(bucket[i]->get_feature_ids(&res));
res.clear();
bucket[i]->get_feature_ids(&res);
total_res.insert(total_res.end(), res.begin(), res.end());
}
return total_res;
}
GraphNode *add_graph_node(uint64_t id);
GraphNode *add_graph_node(Node *node);
FeatureNode *add_feature_node(uint64_t id);
FeatureNode *add_feature_node(uint64_t id, bool is_overlap = true);
Node *find_node(uint64_t id);
void delete_node(uint64_t id);
void clear();
Expand Down Expand Up @@ -488,8 +489,8 @@ class GraphTable : public Table {
std::vector<std::vector<uint64_t>> get_all_id(int type, int slice_num);
std::vector<std::vector<uint64_t>> get_all_id(int type, int idx,
int slice_num);
std::vector<std::vector<uint64_t>> get_all_feature_ids(int type, int idx,
int slice_num);
int get_all_feature_ids(int type, int idx,
int slice_num, std::vector<std::vector<uint64_t>>* output);
int32_t load_nodes(const std::string &path, std::string node_type);

int32_t add_graph_node(int idx, std::vector<uint64_t> &id_list,
Expand Down Expand Up @@ -526,8 +527,8 @@ class GraphTable : public Table {
}
virtual uint32_t get_thread_pool_index_by_shard_index(uint64_t shard_index);
virtual uint32_t get_thread_pool_index(uint64_t node_id);
virtual std::pair<int32_t, std::string> parse_feature(int idx,
std::string feat_str);
virtual int parse_feature(int idx, const std::string& feat_str,
FeatureNode* node);

virtual int32_t get_node_feat(int idx, const std::vector<uint64_t> &node_ids,
const std::vector<std::string> &feature_names,
Expand Down
38 changes: 31 additions & 7 deletions paddle/fluid/distributed/ps/table/graph/graph_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <memory>
#include <sstream>
#include <vector>
#include <set>
#include "glog/logging.h"
#include "paddle/fluid/distributed/ps/table/graph/graph_weighted_sampler.h"
#include "paddle/fluid/string/string_helper.h"
Expand Down Expand Up @@ -50,13 +51,13 @@ class Node {
virtual void to_buffer(char *buffer, bool need_feature);
virtual void recover_from_buffer(char *buffer);
virtual std::string get_feature(int idx) { return std::string(""); }
virtual int get_feature_ids(std::vector<uint64_t> *res) const {
virtual int get_feature_ids(std::set<uint64_t> *res) const {
return 0;
}
virtual int get_feature_ids(int slot_idx, std::vector<uint64_t> *res) const {
return 0;
}
virtual void set_feature(int idx, std::string str) {}
virtual void set_feature(int idx, const std::string& str) {}
virtual void set_feature_size(int size) {}
virtual int get_feature_size() { return 0; }
virtual size_t get_neighbor_size() { return 0; }
Expand Down Expand Up @@ -105,9 +106,8 @@ class FeatureNode : public Node {
}
}

virtual int get_feature_ids(std::vector<uint64_t> *res) const {
virtual int get_feature_ids(std::set<uint64_t> *res) const {
PADDLE_ENFORCE_NOT_NULL(res);
res->clear();
errno = 0;
for (auto& feature_item: feature) {
const char *feat_str = feature_item.c_str();
Expand All @@ -117,7 +117,7 @@ class FeatureNode : public Node {
PADDLE_ENFORCE_EQ(field.empty(), false);
uint64_t feasign = strtoull(field.c_str(), &head_ptr, 10);
PADDLE_ENFORCE_EQ(field.c_str() + field.length(), head_ptr);
res->push_back(feasign);
res->insert(feasign);
}
}
PADDLE_ENFORCE_EQ(errno, 0);
Expand All @@ -143,7 +143,14 @@ class FeatureNode : public Node {
return 0;
}

virtual void set_feature(int idx, std::string str) {
virtual std::string* mutable_feature(int idx) {
if (idx >= (int)this->feature.size()) {
this->feature.resize(idx + 1);
}
return &(this->feature[idx]);
}

virtual void set_feature(int idx, const std::string& str) {
if (idx >= (int)this->feature.size()) {
this->feature.resize(idx + 1);
}
Expand All @@ -165,6 +172,22 @@ class FeatureNode : public Node {
return std::string(buffer, Tsize);
}

template <typename T>
static void parse_value_to_bytes(std::vector<std::string>::iterator feat_str_begin,
std::vector<std::string>::iterator feat_str_end,
std::string* output) {
T v;
size_t feat_str_size = feat_str_end - feat_str_begin;
size_t Tsize = sizeof(T) * feat_str_size;
char buffer[Tsize] = {'\0'};
for (size_t i = 0; i < feat_str_size; i++) {
std::stringstream ss(*(feat_str_begin + i));
ss >> v;
std::memcpy(buffer + sizeof(T) * i, (char *)&v, sizeof(T));
}
output->assign(buffer);
}

template <typename T>
static std::vector<T> parse_bytes_to_array(std::string feat_str) {
T v;
Expand All @@ -179,8 +202,9 @@ class FeatureNode : public Node {
return out;
}

protected:
protected:
std::vector<std::string> feature;
};

} // namespace distributed
} // namespace paddle
7 changes: 3 additions & 4 deletions paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.cu
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,10 @@ std::vector<std::vector<uint64_t>> GraphGpuWrapper::get_all_id(int type,
->cpu_graph_table_->get_all_id(type, idx, slice_num);
}

std::vector<std::vector<uint64_t>> GraphGpuWrapper::get_all_feature_ids(int type,
int idx,
int slice_num) {
int GraphGpuWrapper::get_all_feature_ids(int type, int idx, int slice_num,
std::vector<std::vector<uint64_t>>* output) {
return ((GpuPsGraphTable *)graph_table)
->cpu_graph_table_->get_all_feature_ids(type, idx, slice_num);
->cpu_graph_table_->get_all_feature_ids(type, idx, slice_num, output);
}

void GraphGpuWrapper::set_up_types(std::vector<std::string> &edge_types,
Expand Down
4 changes: 2 additions & 2 deletions paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ class GraphGpuWrapper {
std::vector<std::vector<uint64_t>> get_all_id(int type, int slice_num);
std::vector<std::vector<uint64_t>> get_all_id(int type, int idx,
int slice_num);
std::vector<std::vector<uint64_t>> get_all_feature_ids(int type, int idx,
int slice_num);
int get_all_feature_ids(int type, int idx, int slice_num,
std::vector<std::vector<uint64_t>>* output);
NodeQueryResult query_node_list(int gpu_id, int idx, int start,
int query_size);
NeighborSampleResult graph_neighbor_sample_v3(NeighborSampleQuery q,
Expand Down

0 comments on commit 68c2cc4

Please sign in to comment.