diff --git a/paddle/fluid/distributed/fleet_executor/dist_model.cc b/paddle/fluid/distributed/fleet_executor/dist_model.cc index add575cd0aaa2..2a6da1b437a1b 100644 --- a/paddle/fluid/distributed/fleet_executor/dist_model.cc +++ b/paddle/fluid/distributed/fleet_executor/dist_model.cc @@ -299,6 +299,10 @@ void DistModel::InsertCommOp(std::string tmp_var_name, ss << ep << ", "; } VLOG(3) << ss.str(); + std::string endpoints_str = config_.current_endpoint; + for (const auto &peer : peer_endpoints) { + endpoints_str += "," + peer; + } if (config_.place == "GPU") { framework::VarDesc *new_var = block->Var(tmp_var_name); new_var->SetType(framework::proto::VarType::RAW); @@ -319,6 +323,7 @@ void DistModel::InsertCommOp(std::string tmp_var_name, comm_init_op->SetAttr("rank", rank); comm_init_op->SetAttr("nranks", nranks); comm_init_op->SetAttr("ring_id", ring_id); + comm_init_op->SetAttr("endpoints", endpoints_str); comm_init_op->SetAttr("op_role", static_cast(framework::OpRole::kForward)); comm_init_op->CheckAttrs(); @@ -342,6 +347,7 @@ void DistModel::InsertCommOp(std::string tmp_var_name, comm_init_op->SetAttr("rank", rank); comm_init_op->SetAttr("nranks", nranks); comm_init_op->SetAttr("ring_id", ring_id); + comm_init_op->SetAttr("endpoints", endpoints_str); comm_init_op->SetAttr("op_role", static_cast(framework::OpRole::kForward)); comm_init_op->CheckAttrs(); @@ -365,6 +371,7 @@ void DistModel::InsertCommOp(std::string tmp_var_name, comm_init_op->SetAttr("rank", rank); comm_init_op->SetAttr("nranks", nranks); comm_init_op->SetAttr("ring_id", ring_id); + comm_init_op->SetAttr("endpoints", endpoints_str); comm_init_op->SetAttr("op_role", static_cast(framework::OpRole::kForward)); comm_init_op->CheckAttrs(); diff --git a/paddle/fluid/inference/api/analysis_predictor.cc b/paddle/fluid/inference/api/analysis_predictor.cc index 53fb3f9e73450..bf20bd73cc488 100644 --- a/paddle/fluid/inference/api/analysis_predictor.cc +++ b/paddle/fluid/inference/api/analysis_predictor.cc @@ -838,6 +838,10 @@ void AnalysisPredictor::InsertCommOp( ss << ep << ", "; } VLOG(3) << ss.str(); + std::string endpoints_str = config_.dist_config().current_endpoint(); + for (const auto &peer : peer_endpoints) { + endpoints_str += "," + peer; + } if (config_.use_gpu()) { framework::VarDesc *new_var = block->Var(tmp_var_name); new_var->SetType(framework::proto::VarType::RAW); @@ -859,6 +863,7 @@ void AnalysisPredictor::InsertCommOp( comm_init_op->SetAttr("rank", rank); comm_init_op->SetAttr("nranks", nranks); comm_init_op->SetAttr("ring_id", ring_id); + comm_init_op->SetAttr("endpoints", endpoints_str); comm_init_op->SetAttr("op_role", static_cast(framework::OpRole::kForward)); comm_init_op->CheckAttrs(); @@ -883,6 +888,7 @@ void AnalysisPredictor::InsertCommOp( comm_init_op->SetAttr("rank", rank); comm_init_op->SetAttr("nranks", nranks); comm_init_op->SetAttr("ring_id", ring_id); + comm_init_op->SetAttr("endpoints", endpoints_str); comm_init_op->SetAttr("op_role", static_cast(framework::OpRole::kForward)); comm_init_op->CheckAttrs(); @@ -907,6 +913,7 @@ void AnalysisPredictor::InsertCommOp( comm_init_op->SetAttr("rank", rank); comm_init_op->SetAttr("nranks", nranks); comm_init_op->SetAttr("ring_id", ring_id); + comm_init_op->SetAttr("endpoints", endpoints_str); comm_init_op->SetAttr("op_role", static_cast(framework::OpRole::kForward)); comm_init_op->CheckAttrs(); diff --git a/paddle/fluid/inference/utils/CMakeLists.txt b/paddle/fluid/inference/utils/CMakeLists.txt index 7f0dcbbf79138..41a6c8ac3379c 100644 --- a/paddle/fluid/inference/utils/CMakeLists.txt +++ b/paddle/fluid/inference/utils/CMakeLists.txt @@ -19,6 +19,7 @@ cc_test_old( DEPS infer_io_utils fleet_executor + parallel_executor python) if(WITH_ONNXRUNTIME AND WIN32) diff --git a/paddle/fluid/operators/collective/c_comm_init_op.cc b/paddle/fluid/operators/collective/c_comm_init_op.cc index e3bf8a63805e4..9f34211a6169b 100644 --- a/paddle/fluid/operators/collective/c_comm_init_op.cc +++ b/paddle/fluid/operators/collective/c_comm_init_op.cc @@ -29,6 +29,15 @@ limitations under the License. */ #include "paddle/fluid/platform/collective_helper.h" #endif +#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) +#include "paddle/phi/core/distributed/nccl_comm_context.h" +#endif + +#include "paddle/phi/core/distributed/auto_parallel/reshard_utils.h" +#include "paddle/phi/core/distributed/comm_context_manager.h" +#include "paddle/phi/core/distributed/store/store_utils.h" +#include "paddle/phi/core/distributed/store/tcp_store.h" + namespace paddle { namespace framework { class Scope; @@ -95,8 +104,6 @@ class CCommInitOp : public framework::OperatorBase { PADDLE_ENFORCE_NOT_NULL( var, platform::errors::InvalidArgument("Input con not be empty.")); - UniqueId* comm_id = var->GetMutable(); - int nranks = Attr("nranks"); int rid = Attr("ring_id"); @@ -105,6 +112,25 @@ class CCommInitOp : public framework::OperatorBase { device_id = Attr("device_id"); } int rank_id = Attr("rank"); +#endif +#if defined(PADDLE_WITH_NCCL) + const char* dynamic_static_unified_comm = + getenv("FLAGS_dynamic_static_unified_comm"); + if (dynamic_static_unified_comm && + std::string(dynamic_static_unified_comm) == "1") { + VLOG(3) << "#### use new comm lab ####"; + auto store = phi::distributed::CreateOrGetGlobalTCPStore(); + phi::distributed::CommContextManager::SetDeviceId(device_id); + std::string endpoints = Attr("endpoints"); + phi::distributed::CommContextManager::CreateNCCLCommContext( + store, std::to_string(rid), rank_id, nranks, endpoints); + return; + } +#endif +#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \ + defined(PADDLE_WITH_XPU_BKCL) + VLOG(3) << "#### use old comm lab ####"; + UniqueId* comm_id = var->GetMutable(); CommContext::Instance().CreateComm( comm_id, nranks, rank_id, device_id, rid); #endif @@ -131,6 +157,10 @@ Initialize collective communication context within this trainer .SetDefault(-1); AddAttr("ring_id", "(int default 0) user specified ring id") .SetDefault(0); + AddAttr("endpoints", + "['trainer1_ip:port', 'trainer2_ip:port', ...] " + "list of other trainer endpoints") + .SetDefault(""); } }; diff --git a/paddle/fluid/operators/collective/c_gen_nccl_id_op.cc b/paddle/fluid/operators/collective/c_gen_nccl_id_op.cc index 6ed4d71eaf67f..3f6d6348ba925 100644 --- a/paddle/fluid/operators/collective/c_gen_nccl_id_op.cc +++ b/paddle/fluid/operators/collective/c_gen_nccl_id_op.cc @@ -66,18 +66,23 @@ class CGenNCCLIdOp : public framework::OperatorBase { }; std::string endpoint = Attr("endpoint"); - int server_fd = platform::SocketServer::GetInstance(endpoint).socket(); std::vector nccl_ids; nccl_ids.resize(1); - if (rank == 0) { - GenNCCLID(&nccl_ids); - std::vector endpoint_list = - Attr>("other_endpoints"); - platform::SendBroadCastCommID(endpoint_list, &nccl_ids, ring_id); - } else { - platform::RecvBroadCastCommID(server_fd, endpoint, &nccl_ids, ring_id); + const char* dynamic_static_unified_comm = + getenv("FLAGS_dynamic_static_unified_comm"); + if (!dynamic_static_unified_comm || + std::string(dynamic_static_unified_comm) != "1") { + int server_fd = platform::SocketServer::GetInstance(endpoint).socket(); + if (rank == 0) { + GenNCCLID(&nccl_ids); + std::vector endpoint_list = + Attr>("other_endpoints"); + platform::SendBroadCastCommID(endpoint_list, &nccl_ids, ring_id); + } else { + platform::RecvBroadCastCommID(server_fd, endpoint, &nccl_ids, ring_id); + } } CopyNCCLIDToVar(nccl_ids, func, scope); diff --git a/paddle/fluid/operators/collective/c_softmax_with_cross_entropy_op.cu b/paddle/fluid/operators/collective/c_softmax_with_cross_entropy_op.cu index 344dcd36e5235..9f3394a7f54c1 100644 --- a/paddle/fluid/operators/collective/c_softmax_with_cross_entropy_op.cu +++ b/paddle/fluid/operators/collective/c_softmax_with_cross_entropy_op.cu @@ -286,6 +286,8 @@ struct CSoftmaxWithCrossEntropyFunctor { comm->comm(), stream)); } + + // step 4, obtain exp(logit) eigen_softmax.device(*dev_ctx.eigen_device()) = eigen_softmax.exp(); // step 5, obtain sum_exp_logits diff --git a/python/paddle/distributed/fleet/meta_optimizers/common.py b/python/paddle/distributed/fleet/meta_optimizers/common.py index 15bd883e970be..9625e2481d400 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/common.py +++ b/python/paddle/distributed/fleet/meta_optimizers/common.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os import paddle from paddle.framework import core @@ -91,13 +92,16 @@ def _init_communicator( ): # if current_endpoint is None, it means just for sync, # no group is created. + endpoints_str = ",".join(endpoints) if current_endpoint: nranks = len(endpoints) other_endpoints = endpoints[:] other_endpoints.remove(current_endpoint) if rank == 0 and wait_port: - wait_server_ready(other_endpoints) + use_new_comm = os.getenv("FLAGS_dynamic_static_unified_comm", "0") + if use_new_comm not in [1, "1", "True", "true"]: + wait_server_ready(other_endpoints) def _add_sync_by_allreduce(block): sync_var = block.create_var( @@ -168,6 +172,7 @@ def _add_sync_by_allreduce(block): 'nranks': nranks, 'rank': rank, 'ring_id': ring_id, + 'endpoints': endpoints_str, OP_ROLE_KEY: OpRole.Forward, }, ) @@ -192,6 +197,7 @@ def _add_sync_by_allreduce(block): 'nranks': nranks, 'rank': rank, 'ring_id': ring_id, + 'endpoints': endpoints_str, OP_ROLE_KEY: OpRole.Forward, }, ) diff --git a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py index 987e3eb135917..be6aad3208d09 100755 --- a/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.py @@ -716,7 +716,9 @@ def minimize_impl( self._recreate_not_persist_param_as_var() self._dump_program_for_debug() - self._wait() + use_new_comm = os.getenv("FLAGS_dynamic_static_unified_comm", "0") + if use_new_comm not in ["1", "True", "true"]: + self._wait() return optimize_ops, params_grads def _init_pair_comm(self, pair, ring_id): diff --git a/python/paddle/distributed/ps/utils/collective_transpiler.py b/python/paddle/distributed/ps/utils/collective_transpiler.py index 99bb76a3b315b..7f398842fd701 100644 --- a/python/paddle/distributed/ps/utils/collective_transpiler.py +++ b/python/paddle/distributed/ps/utils/collective_transpiler.py @@ -125,6 +125,7 @@ def _init_communicator( wait_port, has_multitrainer=False, ): + endpoints_str = ",".join(endpoints) nranks = len(endpoints) other_endpoints = endpoints[:] other_endpoints.remove(current_endpoint) @@ -161,6 +162,7 @@ def _init_communicator( 'nranks': nranks, 'rank': rank, 'ring_id': ring_id, + 'endpoints': endpoints_str, self.op_role_key: OpRole.Forward, }, ) @@ -190,6 +192,7 @@ def _init_communicator( 'nranks': nranks, 'rank': rank, 'ring_id': ring_id, + 'endpoints': endpoints_str, self.op_role_key: OpRole.Forward, }, ) @@ -234,6 +237,7 @@ def _init_communicator( 'nranks': nranks, 'rank': rank, 'ring_id': ring_id, + 'endpoints': endpoints_str, self.op_role_key: OpRole.Forward, }, ) diff --git a/python/paddle/distributed/transpiler/collective.py b/python/paddle/distributed/transpiler/collective.py index e76238e02af43..bc9281efd1f50 100644 --- a/python/paddle/distributed/transpiler/collective.py +++ b/python/paddle/distributed/transpiler/collective.py @@ -123,6 +123,7 @@ def _init_communicator( wait_port, has_multitrainer=False, ): + endpoints_str = ",".join(endpoints) nranks = len(endpoints) other_endpoints = endpoints[:] other_endpoints.remove(current_endpoint) @@ -158,6 +159,7 @@ def _init_communicator( 'nranks': nranks, 'rank': rank, 'ring_id': ring_id, + 'endpoints': endpoints_str, self.op_role_key: OpRole.Forward, }, ) @@ -198,6 +200,7 @@ def _init_communicator( 'nranks': nranks, 'rank': rank, 'ring_id': ring_id, + 'endpoints': endpoints_str, self.op_role_key: OpRole.Forward, }, ) @@ -229,6 +232,7 @@ def _init_communicator( 'nranks': nranks, 'rank': rank, 'ring_id': ring_id, + 'endpoints': endpoints_str, self.op_role_key: OpRole.Forward, }, ) diff --git a/python/paddle/hapi/model.py b/python/paddle/hapi/model.py index 8ff1a557673af..8ca5712a3036c 100644 --- a/python/paddle/hapi/model.py +++ b/python/paddle/hapi/model.py @@ -122,6 +122,7 @@ def init_communicator( ): if nranks < 2: return + endpoints_str = ",".join(endpoints) other_endpoints = endpoints[:] other_endpoints.remove(current_endpoint) block = program.global_block() @@ -153,6 +154,7 @@ def init_communicator( 'nranks': nranks, 'rank': rank, 'ring_id': 0, + 'endpoints': endpoints_str, }, ) elif core.is_compiled_with_xpu(): @@ -181,6 +183,7 @@ def init_communicator( 'nranks': nranks, 'rank': rank, 'ring_id': 0, + 'endpoints': endpoints_str, }, ) elif ( @@ -212,6 +215,7 @@ def init_communicator( 'nranks': nranks, 'rank': rank, 'ring_id': 0, + 'endpoints': endpoints_str, }, ) diff --git a/python/paddle/incubate/optimizer/distributed_fused_lamb.py b/python/paddle/incubate/optimizer/distributed_fused_lamb.py index fe6442756e3c7..210cffdcb606b 100644 --- a/python/paddle/incubate/optimizer/distributed_fused_lamb.py +++ b/python/paddle/incubate/optimizer/distributed_fused_lamb.py @@ -77,7 +77,12 @@ def init_communicator(block, rank, ranks, ring_id): type='c_comm_init', inputs={'X': comm_id_var}, outputs={}, - attrs={'nranks': len(ranks), 'rank': local_rank, 'ring_id': ring_id}, + attrs={ + 'nranks': len(ranks), + 'rank': local_rank, + 'ring_id': ring_id, + 'endpoints': ','.join(eps), + }, ) tmp_var = block.create_var(name=unique_name.generate('tmp')) block.append_op(