From f3f3b590e8bc581dc8d47d9d949c2ea9ecfdcfe3 Mon Sep 17 00:00:00 2001 From: ybc Date: Mon, 5 Apr 2021 20:07:28 -0700 Subject: [PATCH 1/5] Add condition variable to control the loop --- bluefog/common/global_state.h | 8 +++ bluefog/common/operations.cc | 116 +++++++++++++++------------------- bluefog/common/tensor_queue.h | 36 ++++++----- test/torch_ops_test.py | 8 ++- 4 files changed, 86 insertions(+), 82 deletions(-) diff --git a/bluefog/common/global_state.h b/bluefog/common/global_state.h index 2be439a3..3d3f84c3 100644 --- a/bluefog/common/global_state.h +++ b/bluefog/common/global_state.h @@ -54,6 +54,14 @@ struct BluefogGlobalState { // Whether collective context has been completed on the background thread. std::atomic_bool initialization_done{false}; + // Condition variable and its mutex for main loop in communication thread. + std::condition_variable loop_cv; + std::mutex loop_mutex; + + // Under negotiation, the entries sends to master first and wait until it + // returns ok to run. This variable keeps the records of that. + std::atomic_int unfinished_enqueued_entries{0}; + // Timeline writer. Timeline timeline; diff --git a/bluefog/common/operations.cc b/bluefog/common/operations.cc index bb6940cb..b9136717 100644 --- a/bluefog/common/operations.cc +++ b/bluefog/common/operations.cc @@ -438,7 +438,7 @@ bool IsSameList (std::shared_ptr> n1, std::shared_ptrsize() != n2->size()) return false; // The order matters as well. - for (int i = 0; i < n1->size(); i++) { + for (size_t i = 0; i < n1->size(); i++) { if (n1->at(i) != n2->at(i)) { return false; } @@ -853,7 +853,9 @@ void PerformOperationWithFusion(std::vector& entries) { void NegotiateOfRequestOfMaster(BluefogGlobalState& state, std::deque& message_queue_buffer, bool& should_change_topo, - bool& should_shut_down) { + bool& should_shut_down) { + state.unfinished_enqueued_entries.fetch_add(message_queue_buffer.size()); + std::vector ready_to_reduce; RequestList message_list; message_list.set_shutdown(should_shut_down); @@ -1035,6 +1037,7 @@ void NegotiateOfRequestOfMaster(BluefogGlobalState& state, } else { PerformOperation(nego_entries); } + state.unfinished_enqueued_entries.fetch_sub(nego_entries.size()); } // Check for stalled tensors. @@ -1049,6 +1052,7 @@ void NegotiateOfRequestOfSlave(BluefogGlobalState& state, std::deque& message_queue_buffer, bool& should_change_topo, bool& should_shut_down) { + state.unfinished_enqueued_entries.fetch_add(message_queue_buffer.size()); std::string encoded_message; RequestList message_list; message_list.set_shutdown(state.shut_down); @@ -1057,6 +1061,7 @@ void NegotiateOfRequestOfSlave(BluefogGlobalState& state, message_list.add_request(message_queue_buffer.front()); message_queue_buffer.pop_front(); } + RequestList::SerializeToString(message_list, encoded_message); int encoded_message_length = (int)encoded_message.length() + 1; MPI_Gather(&encoded_message_length, 1, MPI_INT, nullptr, 1, MPI_INT, @@ -1084,6 +1089,7 @@ void NegotiateOfRequestOfSlave(BluefogGlobalState& state, } else { PerformOperation(nego_entries); } + state.unfinished_enqueued_entries.fetch_sub(nego_entries.size()); } if (response_list.shutdown()) { @@ -1097,6 +1103,8 @@ void NegotiateOfRequestOfSlave(BluefogGlobalState& state, void NegotiationOfRequest(BluefogGlobalState& state, std::deque& message_queue_buffer, bool& should_change_topo, bool& should_shut_down) { + // TODO(ybc) should_change_topo has no effect after condition variable refactor. + // Just keep it for a while. will remove. if (bluefog_rank() == COORDINATE_RANK) { NegotiateOfRequestOfMaster(state, message_queue_buffer, should_change_topo, should_shut_down); @@ -1111,6 +1119,15 @@ bool RunLoopOnce(BluefogGlobalState& state) { bool should_shut_down = state.shut_down; bool should_change_topo = state.setting_topology; + std::unique_lock lk(state.loop_mutex); + // The real mutex for queue is the on under TensorQueue. + state.loop_cv.wait(lk, [&state] { + // When we requesting shut_down, or any unfinished entries waiting in the + // negotiation we should not wait. + return state.shut_down || (state.unfinished_enqueued_entries > 0) || + (state.tensor_queue.size() > 0); + }); + // This delay determines thread frequency and MPI message latency auto sleep_duration = state.last_cycle_start + @@ -1131,7 +1148,7 @@ bool RunLoopOnce(BluefogGlobalState& state) { std::vector entries; auto IsRequestConvertToEntryDirectly = [](const Request& request) -> bool { return global_skip_negotiate_stage || - (request.request_type() != Request::ALLREDUCE && + (request.request_type() != Request::ALLREDUCE && request.request_type() != Request::ALLGATHER && request.request_type() != Request::BROADCAST && request.request_type() != Request::NEIGHBOR_ALLREDUCE && @@ -1150,7 +1167,8 @@ bool RunLoopOnce(BluefogGlobalState& state) { std::remove_if(message_queue_buffer.begin(), message_queue_buffer.end(), IsRequestConvertToEntryDirectly), message_queue_buffer.end()); - + + lk.unlock(); // Never hold the mutex when there is remote function. PerformOperation(entries); // For the rest requests, they needs to coordinate and neogiate. @@ -1163,20 +1181,6 @@ bool RunLoopOnce(BluefogGlobalState& state) { NegotiationOfRequest(state, message_queue_buffer, should_change_topo, should_shut_down); } - // Seperate the setting topology and negotiate communnication. - // TODO(ybc) Use conditional variable and mutex to re-implement this. - if (should_change_topo) { - bluefog_global.ready_to_setting_topology = true; - while (!bluefog_global.setting_topology_done) { - std::this_thread::sleep_for(SUSPEND_BACKGROUND_WAITTING_DURATION); - } - bluefog_global.ready_to_setting_topology = false; - // Wait for main thread reset. - while (bluefog_global.setting_topology_done) { - std::this_thread::sleep_for(SUSPEND_BACKGROUND_WAITTING_DURATION); - } - } - return !should_shut_down; } @@ -1215,6 +1219,7 @@ void bluefog_init() { InitializeBluefogOnce(); } void bluefog_shutdown() { if (bluefog_global.background_thread.joinable()) { bluefog_global.shut_down = true; + bluefog_global.loop_cv.notify_all(); bluefog_global.background_thread.join(); // Reset the initialization flag to allow restarting with bluefog_init(...) //bluefog_global.initialize_flag.clear(); @@ -1290,36 +1295,21 @@ int bluefog_set_topology(int indegree, const int* sources, int outdegree, return -1; } #endif - bluefog_global.setting_topology = true; - while (!bluefog_global.ready_to_setting_topology.load()) { - std::this_thread::sleep_for(SUSPEND_BACKGROUND_WAITTING_DURATION); - } - bluefog_global.tensor_queue.LockTensorQueue(); - if (bluefog_global.tensor_queue.size() > 0) { - BFLOG(ERROR) - << "Cannot set the topology because there are unfinished MPI ops."; - bluefog_global.tensor_queue.UnlockTensorQueue(); - return -1; - } - - bool mpi_result = bluefog_global.controller->SetTopology( - indegree, sources, outdegree, destinations); + bool mpi_result; + // When we change the topology, there should be no entries being processed at + // same time. + { + std::lock_guard lk(bluefog_global.loop_mutex); + mpi_result = bluefog_global.controller->SetTopology( + indegree, sources, outdegree, destinations); #if HAVE_NCCL && NCCL_MINOR < 7 - if (mpi_result && nccl_context.is_initialized) { - bluefog_global.nccl_controller->DestroyPeerCommunicators(); - bluefog_global.nccl_controller->InitPeerCommunicators(); - } + if (mpi_result && nccl_context.is_initialized) { + bluefog_global.nccl_controller->DestroyPeerCommunicators(); + bluefog_global.nccl_controller->InitPeerCommunicators(); + } #endif - bluefog_global.tensor_queue.UnlockTensorQueue(); - - bluefog_global.setting_topology = false; - bluefog_global.setting_topology_done = true; - // Wait for the background thread receive the setting_topology_done and - // close the ready_to_setting_topology epoch. - while (bluefog_global.ready_to_setting_topology) { - std::this_thread::sleep_for(SUSPEND_BACKGROUND_WAITTING_DURATION); } - bluefog_global.setting_topology_done = false; + bluefog_global.loop_cv.notify_one(); return mpi_result; } @@ -1447,6 +1437,7 @@ Status EnqueueTensorAllreduce(std::shared_ptr tensor, return SUSPEND_ERROR; } Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message); + bluefog_global.loop_cv.notify_one(); return status; } @@ -1482,6 +1473,7 @@ Status EnqueueTensorBroadcast(std::shared_ptr tensor, return SUSPEND_ERROR; } Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message); + bluefog_global.loop_cv.notify_one(); return status; } @@ -1516,6 +1508,7 @@ Status EnqueueTensorAllgather(std::shared_ptr tensor, return SUSPEND_ERROR; } Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message); + bluefog_global.loop_cv.notify_one(); return status; } @@ -1560,6 +1553,7 @@ Status EnqueueTensorNeighborAllgather(std::shared_ptr tensor, return SUSPEND_ERROR; } Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message); + bluefog_global.loop_cv.notify_one(); return status; } @@ -1611,6 +1605,7 @@ Status EnqueueTensorNeighborAllreduce(std::shared_ptr tensor, return SUSPEND_ERROR; } Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message); + bluefog_global.loop_cv.notify_one(); return status; } @@ -1651,6 +1646,7 @@ Status EnqueueTensorPairGossip(std::shared_ptr tensor, return SUSPEND_ERROR; } Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message); + bluefog_global.loop_cv.notify_one(); return status; } @@ -1683,6 +1679,7 @@ Status EnqueueTensorWindowCreate( return SUSPEND_ERROR; } Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message); + bluefog_global.loop_cv.notify_one(); return status; } @@ -1707,6 +1704,7 @@ Status EnqueueTensorWindowFree(const std::string& name, int device, return SUSPEND_ERROR; } Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message); + bluefog_global.loop_cv.notify_one(); return status; } @@ -1740,6 +1738,7 @@ Status EnqueueTensorWindowPut(std::shared_ptr tensor, return SUSPEND_ERROR; } Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message); + bluefog_global.loop_cv.notify_one(); return status; } @@ -1771,6 +1770,7 @@ Status EnqueueTensorWindowAccumulate( return SUSPEND_ERROR; } Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message); + bluefog_global.loop_cv.notify_one(); return status; } @@ -1798,6 +1798,7 @@ Status EnqueueTensorWindowGet(const std::string& name, return SUSPEND_ERROR; } Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message); + bluefog_global.loop_cv.notify_one(); return status; } @@ -2040,28 +2041,15 @@ void SetSkipNegotiateStageState(bool value) { if (value == global_skip_negotiate_stage) { return; } - if (value) { - // From running negotiate to skipping negotiate, we need to properly turn - // off negotiate stage. Otherwise, it may hang the processes. Use setting - // topology flag to suspend the negotiate stage then skip it. - bluefog_global.setting_topology = true; - while (!bluefog_global.ready_to_setting_topology.load()) { - std::this_thread::sleep_for(SUSPEND_BACKGROUND_WAITTING_DURATION); - } - - global_skip_negotiate_stage = value; - bluefog_global.setting_topology = false; - bluefog_global.setting_topology_done = true; - // Wait for the background thread receive the setting_topology_done and - // close the ready_to_setting_topology epoch. - while (bluefog_global.ready_to_setting_topology) { - std::this_thread::sleep_for(SUSPEND_BACKGROUND_WAITTING_DURATION); - } - bluefog_global.setting_topology_done = false; - } else { + // From running negotiate to skipping negotiate, we need to properly turn + // off negotiate stage. Otherwise, it may hang the processes. Use setting + // topology flag to suspend the negotiate stage then skip it. + { + std::lock_guard lk(bluefog_global.loop_mutex); global_skip_negotiate_stage = value; } + bluefog_global.loop_cv.notify_one(); } bool GetSkipNegotiateStageState() { diff --git a/bluefog/common/tensor_queue.h b/bluefog/common/tensor_queue.h index a0302496..de476743 100644 --- a/bluefog/common/tensor_queue.h +++ b/bluefog/common/tensor_queue.h @@ -46,15 +46,20 @@ class TensorQueue { void PushMessageToQueue(Request& message); - // Used when setting Topology, which require the tensor queue should be empty always. + // Used when setting Topology, which require the tensor queue should be empty + // always. inline void LockTensorQueue() { mutex_.lock(); } inline void UnlockTensorQueue() { mutex_.unlock(); } - inline size_t size() { return message_queue_.size(); } + inline size_t size() { + std::lock_guard guard(mutex_); + return message_queue_.size(); + } protected: // Tensors waiting to be processed. - // Key is based upon the message name since tensor_name in table entry for win ops - // is for window and we need to add "win_put."/"win_create." before it in message. + // Key is based upon the message name since tensor_name in table entry for win + // ops is for window and we need to add "win_put."/"win_create." before it in + // message. std::unordered_map tensor_table_; // Queue of MPI requests waiting to be sent to the coordinator node. @@ -65,8 +70,8 @@ class TensorQueue { mutable std::mutex mutex_; }; -// Encapsulates the process of creating and destroying fusion buffers as the requested -// threshold is changed. +// Encapsulates the process of creating and destroying fusion buffers as the +// requested threshold is changed. class FusionBufferManager { public: // Initializes a buffer of the given threshold size if not already cached. @@ -82,10 +87,11 @@ class FusionBufferManager { std::function on_start_init, std::function on_end_init); - // Initializes a buffer of the given threshold size times MPI size if not already cached. - // There is one constraint to noticed here. We need WeightBuffer is always larger than - // (size-1)*fusion Buffer since we don't want to tensor being able to put into the fusion - // buffer but not able to put into weightbuffer. + // Initializes a buffer of the given threshold size times MPI size if not + // already cached. There is one constraint to noticed here. We need + // WeightBuffer is always larger than (size-1)*fusion Buffer since we don't + // want to tensor being able to put into the fusion buffer but not able to put + // into weightbuffer. // // Args: // threshold: Size of the buffer in bytes. @@ -94,9 +100,7 @@ class FusionBufferManager { // context: Framework used to create the buffer and associate it. // on_start_init: Callback on starting buffer initialization. // on_end_init: Callback on completing buffer initialization. - Status InitializeWeightBuffer(int64_t threshold, - int world_size, - int device, + Status InitializeWeightBuffer(int64_t threshold, int world_size, int device, std::shared_ptr context, std::function on_start_init, std::function on_end_init); @@ -104,7 +108,8 @@ class FusionBufferManager { // Returns the buffer associated with the given device and framework, or null. std::shared_ptr GetBuffer(int device); - // Returns the weight buffer associated with the given device and framework, or null. + // Returns the weight buffer associated with the given device and framework, + // or null. std::shared_ptr GetWeightBuffer(int device); private: @@ -112,7 +117,8 @@ class FusionBufferManager { std::unordered_map, int64_t>> tensor_fusion_buffers_; - // Memory buffers for Tensor Fusion with dst weight. They are keyed by device ID. + // Memory buffers for Tensor Fusion with dst weight. They are keyed by device + // ID. std::unordered_map, int64_t>> weight_tensor_fusion_buffers_; }; diff --git a/test/torch_ops_test.py b/test/torch_ops_test.py index 974a1aca..c8c8ca5a 100644 --- a/test/torch_ops_test.py +++ b/test/torch_ops_test.py @@ -171,6 +171,7 @@ def test_allreduce_sum(self): dims = [1, 2, 3] for dtype, dim in itertools.product(dtypes, dims): + torch.manual_seed(123456) tensor = torch.FloatTensor(*([23] * dim)).random_(-100, 100) tensor = self.cast_and_place(tensor, dtype) name = "allreduce_tensor_{}_{}".format(dim, dtype) @@ -1207,7 +1208,7 @@ def test_neighbor_allgather_dynamic_variable_size(self): torch.ByteTensor, torch.CharTensor, torch.ShortTensor, torch.HalfTensor] if TEST_ON_GPU: dtypes += [torch.cuda.FloatTensor, torch.cuda.DoubleTensor] - + # Connect to all other ranks neighbor_ranks = [i for i in range(size) if i != rank] dims = [1, 2, 3] @@ -1222,10 +1223,11 @@ def test_neighbor_allgather_dynamic_variable_size(self): tensor = torch.FloatTensor( *([tensor_sizes[rank]] + [17] * (dim - 1))).fill_(1).mul_(rank) tensor = self.cast_and_place(tensor, dtype) - gathered = bf.neighbor_allgather(tensor, dst_ranks=neighbor_ranks, src_ranks=neighbor_ranks) + gathered = bf.neighbor_allgather( + tensor, dst_ranks=neighbor_ranks, src_ranks=neighbor_ranks) tensor, gathered = self.convert_cpu_fp16_to_fp32(tensor, gathered) - tensor_sizes[rank] = 0 # remove self-size since neighbor_allgather does not include self. + tensor_sizes[rank] = 0 # remove self since neighbor_allgather does not include self expected_size = sum(tensor_sizes) assert list(gathered.shape) == [expected_size] + [17] * (dim - 1) From 8cb9afd8cfb0c7c4e28f053c065970c4205d460c Mon Sep 17 00:00:00 2001 From: ybc Date: Mon, 5 Apr 2021 20:14:43 -0700 Subject: [PATCH 2/5] Minor update on topology_setting in global_state --- bluefog/common/global_state.h | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/bluefog/common/global_state.h b/bluefog/common/global_state.h index 3d3f84c3..9aa171b4 100644 --- a/bluefog/common/global_state.h +++ b/bluefog/common/global_state.h @@ -92,10 +92,8 @@ struct BluefogGlobalState { FusionBufferManager fusion_buffer; // Because setting topology happens in the main thread instead of communication - // thread. Following three variables are to sync between them. + // thread. Not really used since the condition variable refactor. std::atomic_bool setting_topology{false}; - std::atomic_bool setting_topology_done{false}; - std::atomic_bool ready_to_setting_topology{false}; // Only exists on the coordinator node (rank zero). Maintains a vector of // requests to allreduce every tensor (keyed by tensor name). From c92a710312e125edc1f887d50f07d46ffc7ed6db Mon Sep 17 00:00:00 2001 From: ybc Date: Mon, 5 Apr 2021 20:22:06 -0700 Subject: [PATCH 3/5] Add missing header --- bluefog/common/global_state.h | 1 + bluefog/common/mpi_context.cc | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/bluefog/common/global_state.h b/bluefog/common/global_state.h index 9aa171b4..2683ad14 100644 --- a/bluefog/common/global_state.h +++ b/bluefog/common/global_state.h @@ -18,6 +18,7 @@ #define BLUEFOG_COMMON_GLOBAL_STATE_H #include +#include #include #include #include diff --git a/bluefog/common/mpi_context.cc b/bluefog/common/mpi_context.cc index 7a68ce39..485a4a50 100644 --- a/bluefog/common/mpi_context.cc +++ b/bluefog/common/mpi_context.cc @@ -75,7 +75,7 @@ bool WindowManager::InitializeMutexWin(const MPI_Comm& mpi_comm) { std::vector WindowManager::GetVersionMemoryCopy() { return version_mem_; } void WindowManager::resetVersionWinMem(int initialValue /*=0*/) { - for (int i = 0; i < version_mem_.size(); i++) { + for (int i = 0; i < (int) version_mem_.size(); i++) { version_mem_[i] = initialValue; } } From dca33a88d6a10ace23670e93a8f6a41dec88d1de Mon Sep 17 00:00:00 2001 From: ybc Date: Mon, 5 Apr 2021 21:11:19 -0700 Subject: [PATCH 4/5] Change cv.wait to cv.wait_for 10 seconds --- bluefog/common/operations.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bluefog/common/operations.cc b/bluefog/common/operations.cc index b9136717..578291e0 100644 --- a/bluefog/common/operations.cc +++ b/bluefog/common/operations.cc @@ -1121,7 +1121,7 @@ bool RunLoopOnce(BluefogGlobalState& state) { std::unique_lock lk(state.loop_mutex); // The real mutex for queue is the on under TensorQueue. - state.loop_cv.wait(lk, [&state] { + state.loop_cv.wait_for(lk, std::chrono::seconds(10), [&state] { // When we requesting shut_down, or any unfinished entries waiting in the // negotiation we should not wait. return state.shut_down || (state.unfinished_enqueued_entries > 0) || From fd82d65e76af7f18a73762de04c7bac22b6cda7d Mon Sep 17 00:00:00 2001 From: ybc Date: Sat, 10 Apr 2021 18:47:24 -0700 Subject: [PATCH 5/5] Address comment and remove adjusting resetVersionWinMem in ibfrun --- bluefog/common/mpi_context.cc | 2 +- bluefog/run/interactive_run.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/bluefog/common/mpi_context.cc b/bluefog/common/mpi_context.cc index 485a4a50..e5ef0e6c 100644 --- a/bluefog/common/mpi_context.cc +++ b/bluefog/common/mpi_context.cc @@ -75,7 +75,7 @@ bool WindowManager::InitializeMutexWin(const MPI_Comm& mpi_comm) { std::vector WindowManager::GetVersionMemoryCopy() { return version_mem_; } void WindowManager::resetVersionWinMem(int initialValue /*=0*/) { - for (int i = 0; i < (int) version_mem_.size(); i++) { + for (size_t i = 0; i < version_mem_.size(); i++) { version_mem_[i] = initialValue; } } diff --git a/bluefog/run/interactive_run.py b/bluefog/run/interactive_run.py index ccfc0c79..6682c8e6 100644 --- a/bluefog/run/interactive_run.py +++ b/bluefog/run/interactive_run.py @@ -406,7 +406,8 @@ def handler(signum, frame): signal.signal(signal.SIGINT, handler) env = os.environ.copy() - env['BLUEFOG_CYCLE_TIME'] = str(20) # Increase the cycle time + # No longer needed after using condition variable. + # env['BLUEFOG_CYCLE_TIME'] = str(20) # Increase the cycle time # action of stop if args.action == "stop":