Skip to content

Commit

Permalink
Add condition variable to control the loop (#88)
Browse files Browse the repository at this point in the history
* Add condition variable to control the loop

* Minor update on topology_setting in global_state

* Add missing <condition_variable> header

* Change cv.wait to cv.wait_for 10 seconds

* Address comment and remove adjusting resetVersionWinMem in ibfrun
  • Loading branch information
BichengYing authored Apr 11, 2021
1 parent 96887b5 commit 36c073a
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 87 deletions.
13 changes: 10 additions & 3 deletions bluefog/common/global_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#define BLUEFOG_COMMON_GLOBAL_STATE_H

#include <atomic>
#include <condition_variable>
#include <chrono>
#include <memory>
#include <queue>
Expand Down Expand Up @@ -54,6 +55,14 @@ struct BluefogGlobalState {
// Whether collective context has been completed on the background thread.
std::atomic_bool initialization_done{false};

// Condition variable and its mutex for main loop in communication thread.
std::condition_variable loop_cv;
std::mutex loop_mutex;

// Under negotiation, the entries sends to master first and wait until it
// returns ok to run. This variable keeps the records of that.
std::atomic_int unfinished_enqueued_entries{0};

// Timeline writer.
Timeline timeline;

Expand Down Expand Up @@ -84,10 +93,8 @@ struct BluefogGlobalState {
FusionBufferManager fusion_buffer;

// Because setting topology happens in the main thread instead of communication
// thread. Following three variables are to sync between them.
// thread. Not really used since the condition variable refactor.
std::atomic_bool setting_topology{false};
std::atomic_bool setting_topology_done{false};
std::atomic_bool ready_to_setting_topology{false};

// Only exists on the coordinator node (rank zero). Maintains a vector of
// requests to allreduce every tensor (keyed by tensor name).
Expand Down
2 changes: 1 addition & 1 deletion bluefog/common/mpi_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ bool WindowManager::InitializeMutexWin(const MPI_Comm& mpi_comm) {
std::vector<int> WindowManager::GetVersionMemoryCopy() { return version_mem_; }

void WindowManager::resetVersionWinMem(int initialValue /*=0*/) {
for (int i = 0; i < version_mem_.size(); i++) {
for (size_t i = 0; i < version_mem_.size(); i++) {
version_mem_[i] = initialValue;
}
}
Expand Down
116 changes: 52 additions & 64 deletions bluefog/common/operations.cc
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ bool IsSameList (std::shared_ptr<std::vector<T>> n1, std::shared_ptr<std::vector
if (n1 == nullptr || n2 == nullptr) return false;
if (n1->size() != n2->size()) return false;
// The order matters as well.
for (int i = 0; i < n1->size(); i++) {
for (size_t i = 0; i < n1->size(); i++) {
if (n1->at(i) != n2->at(i)) {
return false;
}
Expand Down Expand Up @@ -853,7 +853,9 @@ void PerformOperationWithFusion(std::vector<TensorTableEntry>& entries) {
void NegotiateOfRequestOfMaster(BluefogGlobalState& state,
std::deque<Request>& message_queue_buffer,
bool& should_change_topo,
bool& should_shut_down) {
bool& should_shut_down) {
state.unfinished_enqueued_entries.fetch_add(message_queue_buffer.size());

std::vector<std::string> ready_to_reduce;
RequestList message_list;
message_list.set_shutdown(should_shut_down);
Expand Down Expand Up @@ -1035,6 +1037,7 @@ void NegotiateOfRequestOfMaster(BluefogGlobalState& state,
} else {
PerformOperation(nego_entries);
}
state.unfinished_enqueued_entries.fetch_sub(nego_entries.size());
}

// Check for stalled tensors.
Expand All @@ -1049,6 +1052,7 @@ void NegotiateOfRequestOfSlave(BluefogGlobalState& state,
std::deque<Request>& message_queue_buffer,
bool& should_change_topo,
bool& should_shut_down) {
state.unfinished_enqueued_entries.fetch_add(message_queue_buffer.size());
std::string encoded_message;
RequestList message_list;
message_list.set_shutdown(state.shut_down);
Expand All @@ -1057,6 +1061,7 @@ void NegotiateOfRequestOfSlave(BluefogGlobalState& state,
message_list.add_request(message_queue_buffer.front());
message_queue_buffer.pop_front();
}

RequestList::SerializeToString(message_list, encoded_message);
int encoded_message_length = (int)encoded_message.length() + 1;
MPI_Gather(&encoded_message_length, 1, MPI_INT, nullptr, 1, MPI_INT,
Expand Down Expand Up @@ -1084,6 +1089,7 @@ void NegotiateOfRequestOfSlave(BluefogGlobalState& state,
} else {
PerformOperation(nego_entries);
}
state.unfinished_enqueued_entries.fetch_sub(nego_entries.size());
}

if (response_list.shutdown()) {
Expand All @@ -1097,6 +1103,8 @@ void NegotiateOfRequestOfSlave(BluefogGlobalState& state,
void NegotiationOfRequest(BluefogGlobalState& state,
std::deque<Request>& message_queue_buffer,
bool& should_change_topo, bool& should_shut_down) {
// TODO(ybc) should_change_topo has no effect after condition variable refactor.
// Just keep it for a while. will remove.
if (bluefog_rank() == COORDINATE_RANK) {
NegotiateOfRequestOfMaster(state, message_queue_buffer, should_change_topo,
should_shut_down);
Expand All @@ -1111,6 +1119,15 @@ bool RunLoopOnce(BluefogGlobalState& state) {
bool should_shut_down = state.shut_down;
bool should_change_topo = state.setting_topology;

std::unique_lock<std::mutex> lk(state.loop_mutex);
// The real mutex for queue is the on under TensorQueue.
state.loop_cv.wait_for(lk, std::chrono::seconds(10), [&state] {
// When we requesting shut_down, or any unfinished entries waiting in the
// negotiation we should not wait.
return state.shut_down || (state.unfinished_enqueued_entries > 0) ||
(state.tensor_queue.size() > 0);
});

// This delay determines thread frequency and MPI message latency
auto sleep_duration =
state.last_cycle_start +
Expand All @@ -1131,7 +1148,7 @@ bool RunLoopOnce(BluefogGlobalState& state) {
std::vector<TensorTableEntry> entries;
auto IsRequestConvertToEntryDirectly = [](const Request& request) -> bool {
return global_skip_negotiate_stage ||
(request.request_type() != Request::ALLREDUCE &&
(request.request_type() != Request::ALLREDUCE &&
request.request_type() != Request::ALLGATHER &&
request.request_type() != Request::BROADCAST &&
request.request_type() != Request::NEIGHBOR_ALLREDUCE &&
Expand All @@ -1150,7 +1167,8 @@ bool RunLoopOnce(BluefogGlobalState& state) {
std::remove_if(message_queue_buffer.begin(), message_queue_buffer.end(),
IsRequestConvertToEntryDirectly),
message_queue_buffer.end());


lk.unlock(); // Never hold the mutex when there is remote function.
PerformOperation(entries);

// For the rest requests, they needs to coordinate and neogiate.
Expand All @@ -1163,20 +1181,6 @@ bool RunLoopOnce(BluefogGlobalState& state) {
NegotiationOfRequest(state, message_queue_buffer, should_change_topo,
should_shut_down);
}
// Seperate the setting topology and negotiate communnication.
// TODO(ybc) Use conditional variable and mutex to re-implement this.
if (should_change_topo) {
bluefog_global.ready_to_setting_topology = true;
while (!bluefog_global.setting_topology_done) {
std::this_thread::sleep_for(SUSPEND_BACKGROUND_WAITTING_DURATION);
}
bluefog_global.ready_to_setting_topology = false;
// Wait for main thread reset.
while (bluefog_global.setting_topology_done) {
std::this_thread::sleep_for(SUSPEND_BACKGROUND_WAITTING_DURATION);
}
}

return !should_shut_down;
}

Expand Down Expand Up @@ -1215,6 +1219,7 @@ void bluefog_init() { InitializeBluefogOnce(); }
void bluefog_shutdown() {
if (bluefog_global.background_thread.joinable()) {
bluefog_global.shut_down = true;
bluefog_global.loop_cv.notify_all();
bluefog_global.background_thread.join();
// Reset the initialization flag to allow restarting with bluefog_init(...)
//bluefog_global.initialize_flag.clear();
Expand Down Expand Up @@ -1290,36 +1295,21 @@ int bluefog_set_topology(int indegree, const int* sources, int outdegree,
return -1;
}
#endif
bluefog_global.setting_topology = true;
while (!bluefog_global.ready_to_setting_topology.load()) {
std::this_thread::sleep_for(SUSPEND_BACKGROUND_WAITTING_DURATION);
}
bluefog_global.tensor_queue.LockTensorQueue();
if (bluefog_global.tensor_queue.size() > 0) {
BFLOG(ERROR)
<< "Cannot set the topology because there are unfinished MPI ops.";
bluefog_global.tensor_queue.UnlockTensorQueue();
return -1;
}

bool mpi_result = bluefog_global.controller->SetTopology(
indegree, sources, outdegree, destinations);
bool mpi_result;
// When we change the topology, there should be no entries being processed at
// same time.
{
std::lock_guard<std::mutex> lk(bluefog_global.loop_mutex);
mpi_result = bluefog_global.controller->SetTopology(
indegree, sources, outdegree, destinations);
#if HAVE_NCCL && NCCL_MINOR < 7
if (mpi_result && nccl_context.is_initialized) {
bluefog_global.nccl_controller->DestroyPeerCommunicators();
bluefog_global.nccl_controller->InitPeerCommunicators();
}
if (mpi_result && nccl_context.is_initialized) {
bluefog_global.nccl_controller->DestroyPeerCommunicators();
bluefog_global.nccl_controller->InitPeerCommunicators();
}
#endif
bluefog_global.tensor_queue.UnlockTensorQueue();

bluefog_global.setting_topology = false;
bluefog_global.setting_topology_done = true;
// Wait for the background thread receive the setting_topology_done and
// close the ready_to_setting_topology epoch.
while (bluefog_global.ready_to_setting_topology) {
std::this_thread::sleep_for(SUSPEND_BACKGROUND_WAITTING_DURATION);
}
bluefog_global.setting_topology_done = false;
bluefog_global.loop_cv.notify_one();
return mpi_result;
}

Expand Down Expand Up @@ -1447,6 +1437,7 @@ Status EnqueueTensorAllreduce(std::shared_ptr<Tensor> tensor,
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
bluefog_global.loop_cv.notify_one();
return status;
}

Expand Down Expand Up @@ -1482,6 +1473,7 @@ Status EnqueueTensorBroadcast(std::shared_ptr<Tensor> tensor,
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
bluefog_global.loop_cv.notify_one();
return status;
}

Expand Down Expand Up @@ -1516,6 +1508,7 @@ Status EnqueueTensorAllgather(std::shared_ptr<Tensor> tensor,
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
bluefog_global.loop_cv.notify_one();
return status;
}

Expand Down Expand Up @@ -1560,6 +1553,7 @@ Status EnqueueTensorNeighborAllgather(std::shared_ptr<Tensor> tensor,
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
bluefog_global.loop_cv.notify_one();
return status;
}

Expand Down Expand Up @@ -1611,6 +1605,7 @@ Status EnqueueTensorNeighborAllreduce(std::shared_ptr<Tensor> tensor,
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
bluefog_global.loop_cv.notify_one();
return status;
}

Expand Down Expand Up @@ -1651,6 +1646,7 @@ Status EnqueueTensorPairGossip(std::shared_ptr<Tensor> tensor,
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
bluefog_global.loop_cv.notify_one();
return status;
}

Expand Down Expand Up @@ -1683,6 +1679,7 @@ Status EnqueueTensorWindowCreate(
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
bluefog_global.loop_cv.notify_one();
return status;
}

Expand All @@ -1707,6 +1704,7 @@ Status EnqueueTensorWindowFree(const std::string& name, int device,
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
bluefog_global.loop_cv.notify_one();
return status;
}

Expand Down Expand Up @@ -1740,6 +1738,7 @@ Status EnqueueTensorWindowPut(std::shared_ptr<Tensor> tensor,
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
bluefog_global.loop_cv.notify_one();
return status;
}

Expand Down Expand Up @@ -1771,6 +1770,7 @@ Status EnqueueTensorWindowAccumulate(
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
bluefog_global.loop_cv.notify_one();
return status;
}

Expand Down Expand Up @@ -1798,6 +1798,7 @@ Status EnqueueTensorWindowGet(const std::string& name,
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
bluefog_global.loop_cv.notify_one();
return status;
}

Expand Down Expand Up @@ -2040,28 +2041,15 @@ void SetSkipNegotiateStageState(bool value) {
if (value == global_skip_negotiate_stage) {
return;
}
if (value) {
// From running negotiate to skipping negotiate, we need to properly turn
// off negotiate stage. Otherwise, it may hang the processes. Use setting
// topology flag to suspend the negotiate stage then skip it.
bluefog_global.setting_topology = true;
while (!bluefog_global.ready_to_setting_topology.load()) {
std::this_thread::sleep_for(SUSPEND_BACKGROUND_WAITTING_DURATION);
}

global_skip_negotiate_stage = value;

bluefog_global.setting_topology = false;
bluefog_global.setting_topology_done = true;
// Wait for the background thread receive the setting_topology_done and
// close the ready_to_setting_topology epoch.
while (bluefog_global.ready_to_setting_topology) {
std::this_thread::sleep_for(SUSPEND_BACKGROUND_WAITTING_DURATION);
}
bluefog_global.setting_topology_done = false;
} else {
// From running negotiate to skipping negotiate, we need to properly turn
// off negotiate stage. Otherwise, it may hang the processes. Use setting
// topology flag to suspend the negotiate stage then skip it.
{
std::lock_guard<std::mutex> lk(bluefog_global.loop_mutex);
global_skip_negotiate_stage = value;
}
bluefog_global.loop_cv.notify_one();
}

bool GetSkipNegotiateStageState() {
Expand Down
Loading

0 comments on commit 36c073a

Please sign in to comment.