Skip to content

Commit

Permalink
add slot attr for push sparse op (#44422)
Browse files Browse the repository at this point in the history
* add slot attr for push sparse op

* add pybind

* remove fleet

* add unittest

* fix
  • Loading branch information
zhaocaibei123 authored Jul 21, 2022
1 parent 1a7f2de commit 85c6937
Show file tree
Hide file tree
Showing 13 changed files with 100 additions and 36 deletions.
23 changes: 11 additions & 12 deletions paddle/fluid/distributed/ps/wrapper/fleet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -529,10 +529,12 @@ void FleetWrapper::PushSparseFromTensorAsync(
uint64_t padding_id,
platform::Place place,
std::vector<const LoDTensor*>* inputs,
std::vector<int>& slots,
const LoDTensor* shows,
const LoDTensor* clks,
std::vector<LoDTensor*>* outputs,
bool use_cvm_op) {
CHECK(slots.size() == inputs->size());
int batch_size = -1;
bool batch_size_consist = true;
for (auto* input : *inputs) {
Expand Down Expand Up @@ -568,8 +570,8 @@ void FleetWrapper::PushSparseFromTensorAsync(
// TODO(zhaocaibei123): check type of show/clk is int? float? uint64?
// const long int* show_tensor = shows->data<int64_t>();
// const long int* clk_tensor = clks->data<int64_t>();
const int64_t* show_tensor = shows->data<int64_t>();
const int64_t* clk_tensor = clks->data<int64_t>();
const float* show_tensor = shows->data<float>();
const float* clk_tensor = clks->data<float>();

for (size_t index = 0; index < inputs->size(); ++index) {
framework::LoDTensor* g_tensor = outputs->at(index);
Expand Down Expand Up @@ -603,15 +605,14 @@ void FleetWrapper::PushSparseFromTensorAsync(
push_keys.emplace_back(real_id);
if (use_cvm_op) {
push_values.emplace_back(fea_dim + 1);
push_values.back()[0] = 2; // TODO(zhaocaibei123): slot
push_values.back()[0] = static_cast<float>(slots[index]);
float* data = push_values.back().data() + 1;
memcpy(data, g + output_len, sizeof(float) * fea_dim);
} else {
push_values.emplace_back(fea_dim + 3);
// slot show clk grad... consistent with CtrCommonPushValue defined
// in
// ctr_accessor.h
push_values.back()[0] = 2; // TODO(zhaocaibei123): slot
// in ctr_accessor.h
push_values.back()[0] = static_cast<float>(slots[index]);
push_values.back()[1] =
(i >= show_size ? 1 : static_cast<float>(show_tensor[i]));
push_values.back()[2] =
Expand All @@ -631,18 +632,16 @@ void FleetWrapper::PushSparseFromTensorAsync(
push_keys.emplace_back(real_id);
if (use_cvm_op) {
push_values.emplace_back(fea_dim + 1);
push_values.back()[0] = 2; // TODO(zhaocaibei123): slot
push_values.back()[0] = static_cast<float>(slots[index]);
float* data = push_values.back().data() + 1;
memcpy(data, g + output_len, sizeof(float) * fea_dim);
} else {
push_values.emplace_back(fea_dim + 3);
// slot show clk grad... consistent with CtrCommonPushValue defined in
// ctr_accessor.h
push_values.back()[0] = 2; // TODO(zhaocaibei123): slot
push_values.back()[1] =
(i >= show_size ? 1 : static_cast<float>(show_tensor[i]));
push_values.back()[2] =
(i >= clk_size ? 0 : static_cast<float>(clk_tensor[i]));
push_values.back()[0] = static_cast<float>(slots[index]);
push_values.back()[1] = (i >= show_size ? 1 : show_tensor[i]);
push_values.back()[2] = (i >= clk_size ? 0 : clk_tensor[i]);
float* data = push_values.back().data() + 3;
memcpy(data, g + output_len, sizeof(float) * fea_dim);
}
Expand Down
2 changes: 2 additions & 0 deletions paddle/fluid/distributed/ps/wrapper/fleet.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,13 @@ class FleetWrapper {
const std::vector<std::string>& input_names,
std::vector<const LoDTensor*>* inputs, // NOLINT
std::vector<const LoDTensor*>* outputs); // NOLINT

void PushSparseFromTensorAsync(const uint64_t table_id,
int fea_dim,
uint64_t padding_id,
platform::Place place,
std::vector<const LoDTensor*>* inputs,
std::vector<int>& slots, // NOLINT
const LoDTensor* shows,
const LoDTensor* clicks,
std::vector<LoDTensor*>* outputs,
Expand Down
14 changes: 7 additions & 7 deletions paddle/fluid/framework/data_feed.cc
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ void PrivateQueueDataFeed<T>::ReadThread() {
std::string filename;
while (PickOneFile(&filename)) {
int err_no = 0;
fp_ = fs_open_read(filename, &err_no, pipe_command_);
fp_ = fs_open_read(filename, &err_no, pipe_command_, true);
__fsetlocking(&*fp_, FSETLOCKING_BYCALLER);
T instance;
while (ParseOneInstanceFromPipe(&instance)) {
Expand Down Expand Up @@ -538,7 +538,7 @@ void InMemoryDataFeed<T>::LoadIntoMemory() {
} else {
#endif
int err_no = 0;
this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_);
this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_, true);
#ifdef PADDLE_WITH_BOX_PS
}
#endif
Expand Down Expand Up @@ -574,7 +574,7 @@ void InMemoryDataFeed<T>::LoadIntoMemoryFromSo() {
(defined PADDLE_WITH_PSLIB)
VLOG(3) << "LoadIntoMemoryFromSo() begin, thread_id=" << thread_id_;
int buf_len = 1024 * 1024 * 10;
char* buf = (char*)malloc(buf_len + 10);
char* buf = reinterpret_cast<char*>(malloc(buf_len + 10));
auto ps_gpu_ptr = PSGPUWrapper::GetInstance();

paddle::framework::CustomParser* parser =
Expand Down Expand Up @@ -681,7 +681,7 @@ void MultiSlotDataFeed::ReadThread() {
std::string filename;
while (PickOneFile(&filename)) {
int err_no = 0;
fp_ = fs_open_read(filename, &err_no, pipe_command_);
fp_ = fs_open_read(filename, &err_no, pipe_command_, true);
CHECK(fp_ != nullptr);
__fsetlocking(&*fp_, FSETLOCKING_BYCALLER);
std::vector<MultiSlotType> instance;
Expand Down Expand Up @@ -2175,7 +2175,7 @@ void SlotRecordInMemoryDataFeed::LoadIntoMemoryByFile(void) {
lines);
} else {
int err_no = 0;
this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_);
this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_, true);

CHECK(this->fp_ != nullptr);
__fsetlocking(&*(this->fp_), FSETLOCKING_BYCALLER);
Expand Down Expand Up @@ -2265,7 +2265,7 @@ void SlotRecordInMemoryDataFeed::LoadIntoMemoryByLine(void) {

do {
int err_no = 0;
this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_);
this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_, true);
CHECK(this->fp_ != nullptr);
__fsetlocking(&*(this->fp_), FSETLOCKING_BYCALLER);
lines = line_reader.read_file(this->fp_.get(), line_func, lines);
Expand Down Expand Up @@ -2314,7 +2314,7 @@ void SlotRecordInMemoryDataFeed::LoadIntoMemoryByCommand(void) {

do {
int err_no = 0;
this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_);
this->fp_ = fs_open_read(filename, &err_no, this->pipe_command_, true);
CHECK(this->fp_ != nullptr);
__fsetlocking(&*(this->fp_), FSETLOCKING_BYCALLER);

Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/framework/data_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ void DatasetImpl<T>::SetHdfsConfig(const std::string& fs_name,
cmd += " -D fs.default.name=" + fs_name;
cmd += " -D hadoop.job.ugi=" + fs_ugi;
cmd += " -Ddfs.client.block.write.retries=15 -Ddfs.rpc.timeout=500000";
paddle::framework::hdfs_set_command(cmd);
paddle::framework::dataset_hdfs_set_command(cmd);
}

template <typename T>
Expand Down
40 changes: 33 additions & 7 deletions paddle/fluid/framework/io/fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,20 @@ const std::string& hdfs_command() { return hdfs_command_internal(); }

void hdfs_set_command(const std::string& x) { hdfs_command_internal() = x; }

// dataset and model may be on different afs cluster
static std::string& dataset_hdfs_command_internal() {
static std::string x = "hadoop fs";
return x;
}

const std::string& dataset_hdfs_command() {
return dataset_hdfs_command_internal();
}

void dataset_hdfs_set_command(const std::string& x) {
dataset_hdfs_command_internal() = x;
}

static std::string& customized_download_cmd_internal() {
static std::string x = "";
return x;
Expand All @@ -243,17 +257,28 @@ void set_download_command(const std::string& x) {

std::shared_ptr<FILE> hdfs_open_read(std::string path,
int* err_no,
const std::string& converter) {
const std::string& converter,
bool read_data) {
if (download_cmd() != "") { // use customized download command
path = string::format_string(
"%s \"%s\"", download_cmd().c_str(), path.c_str());
} else {
if (fs_end_with_internal(path, ".gz")) {
path = string::format_string(
"%s -text \"%s\"", hdfs_command().c_str(), path.c_str());
if (read_data) {
path = string::format_string(
"%s -text \"%s\"", dataset_hdfs_command().c_str(), path.c_str());
} else {
path = string::format_string(
"%s -text \"%s\"", hdfs_command().c_str(), path.c_str());
}
} else {
path = string::format_string(
"%s -cat \"%s\"", hdfs_command().c_str(), path.c_str());
if (read_data) {
path = string::format_string(
"%s -cat \"%s\"", dataset_hdfs_command().c_str(), path.c_str());
} else {
path = string::format_string(
"%s -cat \"%s\"", hdfs_command().c_str(), path.c_str());
}
}
}

Expand Down Expand Up @@ -370,13 +395,14 @@ int fs_select_internal(const std::string& path) {

std::shared_ptr<FILE> fs_open_read(const std::string& path,
int* err_no,
const std::string& converter) {
const std::string& converter,
bool read_data) {
switch (fs_select_internal(path)) {
case 0:
return localfs_open_read(path, converter);

case 1:
return hdfs_open_read(path, err_no, converter);
return hdfs_open_read(path, err_no, converter, read_data);

default:
PADDLE_THROW(platform::errors::Unimplemented(
Expand Down
10 changes: 8 additions & 2 deletions paddle/fluid/framework/io/fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,18 @@ extern const std::string& hdfs_command();

extern void hdfs_set_command(const std::string& x);

extern const std::string& dataset_hdfs_command();

extern void dataset_hdfs_set_command(const std::string& x);

extern const std::string& download_cmd();

extern void set_download_command(const std::string& x);

extern std::shared_ptr<FILE> hdfs_open_read(std::string path,
int* err_no,
const std::string& converter);
const std::string& converter,
bool read_data);

extern std::shared_ptr<FILE> hdfs_open_write(std::string path,
int* err_no,
Expand All @@ -91,7 +96,8 @@ extern void hdfs_mv(const std::string& src, const std::string& dest);
// aut-detect fs
extern std::shared_ptr<FILE> fs_open_read(const std::string& path,
int* err_no,
const std::string& converter);
const std::string& converter,
bool read_data = false);

extern std::shared_ptr<FILE> fs_open_write(const std::string& path,
int* err_no,
Expand Down
13 changes: 13 additions & 0 deletions paddle/fluid/framework/io/test_fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,18 @@ TEST(FS, mv) {
} catch (...) {
VLOG(3) << "test hdfs_mv, catch expected errors of unknown prefix";
}

try {
paddle::framework::dataset_hdfs_set_command(
"hadoop -D hadoop.job.ugi=anotherxxx fs -text");
int err_no = 0;
paddle::framework::hdfs_open_read("afs:/none.gz", &err_no, "", true);
paddle::framework::hdfs_open_read("afs:/none.gz", &err_no, "", false);
paddle::framework::hdfs_open_read("afs:/none", &err_no, "", true);
paddle::framework::hdfs_open_read("afs:/none", &err_no, "", false);
} catch (...) {
VLOG(3) << "test hdfs_open_read, catch expected errors of unknown path";
}

#endif
}
1 change: 1 addition & 0 deletions paddle/fluid/operators/lookup_table_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class LookupTableOpMaker : public framework::OpProtoAndCheckerMaker {
"in the order of input variables for mapping")
.SetDefault({});
AddAttr<int>("trainer_id", "trainer id from 0 ~ worker_num.").SetDefault(0);
AddAttr<int>("slot", "slot of id").SetDefault(0).AsExtra();
AddAttr<bool>("grad_inplace",
"(boolean, default false) "
"If the grad op reuse the input's variable.")
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/operators/lookup_table_v2_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class LookupTableV2OpMaker : public framework::OpProtoAndCheckerMaker {
AddAttr<int>("trainer_id", "trainer id from 0 ~ worker_num.")
.SetDefault(0)
.AsExtra();
AddAttr<int>("slot", "slot of id").SetDefault(0).AsExtra();
AddAttr<std::vector<int64_t>>("height_sections",
"Height for each output SelectedRows.")
.SetDefault(std::vector<int64_t>({}))
Expand Down
5 changes: 5 additions & 0 deletions paddle/fluid/operators/pscore/distributed_push_sparse_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ class DistributedPushSparseOpMaker : public framework::OpProtoAndCheckerMaker {
AddAttr<bool>("use_cvm_op", "(boolean, default false) Use cvm op or not.")
.SetDefault(false);

AddAttr<std::vector<int>>("slots",
"[slot_id1, slot_id2] Slots array of Ids.")
.SetDefault({})
.AsExtra();

AddComment(R"DOC(
Lookup Tablel Prefetch Operator.
This operator is used to perform lookup on parameter W,
Expand Down
3 changes: 3 additions & 0 deletions paddle/fluid/operators/pscore/distributed_push_sparse_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class DistributedPushSparseKernel : public framework::OpKernel<T> {
auto table_id = context.Attr<int>("table_id");
auto emb_dim = context.Attr<int>("size");
auto use_cvm_op = context.Attr<bool>("use_cvm_op");
auto slots = context.Attr<std::vector<int>>("slots");

auto inputs = context.MultiInput<framework::LoDTensor>("Ids");
auto shows = context.Input<framework::LoDTensor>("Shows");
Expand All @@ -47,6 +48,7 @@ class DistributedPushSparseKernel : public framework::OpKernel<T> {
static_cast<uint64_t>(padding_idx),
context.GetPlace(),
&inputs,
slots,
shows,
clks,
&outputs,
Expand Down Expand Up @@ -103,6 +105,7 @@ class DistributedPushSparseKernel : public framework::OpKernel<T> {
static_cast<uint64_t>(padding_idx),
context.GetPlace(),
&tmp_input_vec,
slots,
tmp_shows_tensor,
tmp_clicks_tensor,
&tmp_output_vec);
Expand Down
12 changes: 8 additions & 4 deletions python/paddle/distributed/passes/ps_trainer_pass.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def _push_sparse_fuse(self, _program, push_sparse_ops, attrs, use_cvm_op):
print('ShowClickEntry not configured, will not use')
show = _program.global_block().create_var(
name="show",
dtype=core.VarDesc.VarType.INT64,
dtype=core.VarDesc.VarType.FP32,
persistable=False,
stop_gradient=True)
_program.global_block()._insert_op(index=0,
Expand All @@ -165,7 +165,7 @@ def _push_sparse_fuse(self, _program, push_sparse_ops, attrs, use_cvm_op):

clk = _program.global_block().create_var(
name="clk",
dtype=core.VarDesc.VarType.INT64,
dtype=core.VarDesc.VarType.FP32,
persistable=False,
stop_gradient=True)
_program.global_block()._insert_op(index=0,
Expand All @@ -190,6 +190,9 @@ def _push_sparse_fuse(self, _program, push_sparse_ops, attrs, use_cvm_op):
padding_idx = ops[0].attr("padding_idx")
is_distributed = ops[0].attr("is_distributed")
op_type = ops[0].type

slots = [op.attr("slot") for op in ops]
print('debug zcb slots: ', slots)
outputs = [
_program.global_block().vars[op.input("Out@GRAD")[0]]
for op in ops
Expand All @@ -204,7 +207,7 @@ def _push_sparse_fuse(self, _program, push_sparse_ops, attrs, use_cvm_op):
'W': w,
"Outputs": outputs,
"Shows": show,
"Clicks": clk
"Clicks": clk,
},
outputs={"Outputs": outputs},
attrs={
Expand All @@ -213,7 +216,8 @@ def _push_sparse_fuse(self, _program, push_sparse_ops, attrs, use_cvm_op):
"padding_idx": padding_idx,
"table_id": table_id,
"size": self.emb_size[param],
"use_cvm_op": use_cvm_op
"use_cvm_op": use_cvm_op,
"slots": slots
})

def _pull_sparse_fuse(self, _program, pull_sparse_ops, attrs, send_ctx):
Expand Down
Loading

0 comments on commit 85c6937

Please sign in to comment.