Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add condition variable to control the loop #88

Merged
merged 5 commits into from
Apr 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions bluefog/common/global_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#define BLUEFOG_COMMON_GLOBAL_STATE_H

#include <atomic>
#include <condition_variable>
#include <chrono>
#include <memory>
#include <queue>
Expand Down Expand Up @@ -54,6 +55,14 @@ struct BluefogGlobalState {
// Whether collective context has been completed on the background thread.
std::atomic_bool initialization_done{false};

// Condition variable and its mutex for main loop in communication thread.
std::condition_variable loop_cv;
std::mutex loop_mutex;

// Under negotiation, the entries sends to master first and wait until it
// returns ok to run. This variable keeps the records of that.
std::atomic_int unfinished_enqueued_entries{0};

// Timeline writer.
Timeline timeline;

Expand Down Expand Up @@ -84,10 +93,8 @@ struct BluefogGlobalState {
FusionBufferManager fusion_buffer;

// Because setting topology happens in the main thread instead of communication
// thread. Following three variables are to sync between them.
// thread. Not really used since the condition variable refactor.
std::atomic_bool setting_topology{false};
std::atomic_bool setting_topology_done{false};
std::atomic_bool ready_to_setting_topology{false};

// Only exists on the coordinator node (rank zero). Maintains a vector of
// requests to allreduce every tensor (keyed by tensor name).
Expand Down
2 changes: 1 addition & 1 deletion bluefog/common/mpi_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ bool WindowManager::InitializeMutexWin(const MPI_Comm& mpi_comm) {
std::vector<int> WindowManager::GetVersionMemoryCopy() { return version_mem_; }

void WindowManager::resetVersionWinMem(int initialValue /*=0*/) {
for (int i = 0; i < version_mem_.size(); i++) {
for (size_t i = 0; i < version_mem_.size(); i++) {
version_mem_[i] = initialValue;
}
}
Expand Down
116 changes: 52 additions & 64 deletions bluefog/common/operations.cc
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ bool IsSameList (std::shared_ptr<std::vector<T>> n1, std::shared_ptr<std::vector
if (n1 == nullptr || n2 == nullptr) return false;
if (n1->size() != n2->size()) return false;
// The order matters as well.
for (int i = 0; i < n1->size(); i++) {
for (size_t i = 0; i < n1->size(); i++) {
if (n1->at(i) != n2->at(i)) {
return false;
}
Expand Down Expand Up @@ -853,7 +853,9 @@ void PerformOperationWithFusion(std::vector<TensorTableEntry>& entries) {
void NegotiateOfRequestOfMaster(BluefogGlobalState& state,
std::deque<Request>& message_queue_buffer,
bool& should_change_topo,
bool& should_shut_down) {
bool& should_shut_down) {
state.unfinished_enqueued_entries.fetch_add(message_queue_buffer.size());

std::vector<std::string> ready_to_reduce;
RequestList message_list;
message_list.set_shutdown(should_shut_down);
Expand Down Expand Up @@ -1035,6 +1037,7 @@ void NegotiateOfRequestOfMaster(BluefogGlobalState& state,
} else {
PerformOperation(nego_entries);
}
state.unfinished_enqueued_entries.fetch_sub(nego_entries.size());
}

// Check for stalled tensors.
Expand All @@ -1049,6 +1052,7 @@ void NegotiateOfRequestOfSlave(BluefogGlobalState& state,
std::deque<Request>& message_queue_buffer,
bool& should_change_topo,
bool& should_shut_down) {
state.unfinished_enqueued_entries.fetch_add(message_queue_buffer.size());
std::string encoded_message;
RequestList message_list;
message_list.set_shutdown(state.shut_down);
Expand All @@ -1057,6 +1061,7 @@ void NegotiateOfRequestOfSlave(BluefogGlobalState& state,
message_list.add_request(message_queue_buffer.front());
message_queue_buffer.pop_front();
}

RequestList::SerializeToString(message_list, encoded_message);
int encoded_message_length = (int)encoded_message.length() + 1;
MPI_Gather(&encoded_message_length, 1, MPI_INT, nullptr, 1, MPI_INT,
Expand Down Expand Up @@ -1084,6 +1089,7 @@ void NegotiateOfRequestOfSlave(BluefogGlobalState& state,
} else {
PerformOperation(nego_entries);
}
state.unfinished_enqueued_entries.fetch_sub(nego_entries.size());
}

if (response_list.shutdown()) {
Expand All @@ -1097,6 +1103,8 @@ void NegotiateOfRequestOfSlave(BluefogGlobalState& state,
void NegotiationOfRequest(BluefogGlobalState& state,
std::deque<Request>& message_queue_buffer,
bool& should_change_topo, bool& should_shut_down) {
// TODO(ybc) should_change_topo has no effect after condition variable refactor.
// Just keep it for a while. will remove.
if (bluefog_rank() == COORDINATE_RANK) {
NegotiateOfRequestOfMaster(state, message_queue_buffer, should_change_topo,
should_shut_down);
Expand All @@ -1111,6 +1119,15 @@ bool RunLoopOnce(BluefogGlobalState& state) {
bool should_shut_down = state.shut_down;
bool should_change_topo = state.setting_topology;

std::unique_lock<std::mutex> lk(state.loop_mutex);
// The real mutex for queue is the on under TensorQueue.
state.loop_cv.wait_for(lk, std::chrono::seconds(10), [&state] {
// When we requesting shut_down, or any unfinished entries waiting in the
// negotiation we should not wait.
return state.shut_down || (state.unfinished_enqueued_entries > 0) ||
(state.tensor_queue.size() > 0);
});

// This delay determines thread frequency and MPI message latency
auto sleep_duration =
state.last_cycle_start +
Expand All @@ -1131,7 +1148,7 @@ bool RunLoopOnce(BluefogGlobalState& state) {
std::vector<TensorTableEntry> entries;
auto IsRequestConvertToEntryDirectly = [](const Request& request) -> bool {
return global_skip_negotiate_stage ||
(request.request_type() != Request::ALLREDUCE &&
(request.request_type() != Request::ALLREDUCE &&
request.request_type() != Request::ALLGATHER &&
request.request_type() != Request::BROADCAST &&
request.request_type() != Request::NEIGHBOR_ALLREDUCE &&
Expand All @@ -1150,7 +1167,8 @@ bool RunLoopOnce(BluefogGlobalState& state) {
std::remove_if(message_queue_buffer.begin(), message_queue_buffer.end(),
IsRequestConvertToEntryDirectly),
message_queue_buffer.end());


lk.unlock(); // Never hold the mutex when there is remote function.
PerformOperation(entries);

// For the rest requests, they needs to coordinate and neogiate.
Expand All @@ -1163,20 +1181,6 @@ bool RunLoopOnce(BluefogGlobalState& state) {
NegotiationOfRequest(state, message_queue_buffer, should_change_topo,
should_shut_down);
}
// Seperate the setting topology and negotiate communnication.
// TODO(ybc) Use conditional variable and mutex to re-implement this.
if (should_change_topo) {
bluefog_global.ready_to_setting_topology = true;
while (!bluefog_global.setting_topology_done) {
std::this_thread::sleep_for(SUSPEND_BACKGROUND_WAITTING_DURATION);
}
bluefog_global.ready_to_setting_topology = false;
// Wait for main thread reset.
while (bluefog_global.setting_topology_done) {
std::this_thread::sleep_for(SUSPEND_BACKGROUND_WAITTING_DURATION);
}
}

return !should_shut_down;
}

Expand Down Expand Up @@ -1215,6 +1219,7 @@ void bluefog_init() { InitializeBluefogOnce(); }
void bluefog_shutdown() {
if (bluefog_global.background_thread.joinable()) {
bluefog_global.shut_down = true;
bluefog_global.loop_cv.notify_all();
bluefog_global.background_thread.join();
// Reset the initialization flag to allow restarting with bluefog_init(...)
//bluefog_global.initialize_flag.clear();
Expand Down Expand Up @@ -1290,36 +1295,21 @@ int bluefog_set_topology(int indegree, const int* sources, int outdegree,
return -1;
}
#endif
bluefog_global.setting_topology = true;
while (!bluefog_global.ready_to_setting_topology.load()) {
std::this_thread::sleep_for(SUSPEND_BACKGROUND_WAITTING_DURATION);
}
bluefog_global.tensor_queue.LockTensorQueue();
if (bluefog_global.tensor_queue.size() > 0) {
BFLOG(ERROR)
<< "Cannot set the topology because there are unfinished MPI ops.";
bluefog_global.tensor_queue.UnlockTensorQueue();
return -1;
}

bool mpi_result = bluefog_global.controller->SetTopology(
indegree, sources, outdegree, destinations);
bool mpi_result;
// When we change the topology, there should be no entries being processed at
// same time.
{
std::lock_guard<std::mutex> lk(bluefog_global.loop_mutex);
mpi_result = bluefog_global.controller->SetTopology(
indegree, sources, outdegree, destinations);
#if HAVE_NCCL && NCCL_MINOR < 7
if (mpi_result && nccl_context.is_initialized) {
bluefog_global.nccl_controller->DestroyPeerCommunicators();
bluefog_global.nccl_controller->InitPeerCommunicators();
}
if (mpi_result && nccl_context.is_initialized) {
bluefog_global.nccl_controller->DestroyPeerCommunicators();
bluefog_global.nccl_controller->InitPeerCommunicators();
}
#endif
bluefog_global.tensor_queue.UnlockTensorQueue();

bluefog_global.setting_topology = false;
bluefog_global.setting_topology_done = true;
// Wait for the background thread receive the setting_topology_done and
// close the ready_to_setting_topology epoch.
while (bluefog_global.ready_to_setting_topology) {
std::this_thread::sleep_for(SUSPEND_BACKGROUND_WAITTING_DURATION);
}
bluefog_global.setting_topology_done = false;
bluefog_global.loop_cv.notify_one();
return mpi_result;
}

Expand Down Expand Up @@ -1447,6 +1437,7 @@ Status EnqueueTensorAllreduce(std::shared_ptr<Tensor> tensor,
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
bluefog_global.loop_cv.notify_one();
return status;
}

Expand Down Expand Up @@ -1482,6 +1473,7 @@ Status EnqueueTensorBroadcast(std::shared_ptr<Tensor> tensor,
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
bluefog_global.loop_cv.notify_one();
return status;
}

Expand Down Expand Up @@ -1516,6 +1508,7 @@ Status EnqueueTensorAllgather(std::shared_ptr<Tensor> tensor,
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
bluefog_global.loop_cv.notify_one();
return status;
}

Expand Down Expand Up @@ -1560,6 +1553,7 @@ Status EnqueueTensorNeighborAllgather(std::shared_ptr<Tensor> tensor,
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
bluefog_global.loop_cv.notify_one();
return status;
}

Expand Down Expand Up @@ -1611,6 +1605,7 @@ Status EnqueueTensorNeighborAllreduce(std::shared_ptr<Tensor> tensor,
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
bluefog_global.loop_cv.notify_one();
return status;
}

Expand Down Expand Up @@ -1651,6 +1646,7 @@ Status EnqueueTensorPairGossip(std::shared_ptr<Tensor> tensor,
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
bluefog_global.loop_cv.notify_one();
return status;
}

Expand Down Expand Up @@ -1683,6 +1679,7 @@ Status EnqueueTensorWindowCreate(
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
bluefog_global.loop_cv.notify_one();
return status;
}

Expand All @@ -1707,6 +1704,7 @@ Status EnqueueTensorWindowFree(const std::string& name, int device,
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
bluefog_global.loop_cv.notify_one();
return status;
}

Expand Down Expand Up @@ -1740,6 +1738,7 @@ Status EnqueueTensorWindowPut(std::shared_ptr<Tensor> tensor,
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
bluefog_global.loop_cv.notify_one();
return status;
}

Expand Down Expand Up @@ -1771,6 +1770,7 @@ Status EnqueueTensorWindowAccumulate(
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
bluefog_global.loop_cv.notify_one();
return status;
}

Expand Down Expand Up @@ -1798,6 +1798,7 @@ Status EnqueueTensorWindowGet(const std::string& name,
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
bluefog_global.loop_cv.notify_one();
return status;
}

Expand Down Expand Up @@ -2040,28 +2041,15 @@ void SetSkipNegotiateStageState(bool value) {
if (value == global_skip_negotiate_stage) {
return;
}
if (value) {
// From running negotiate to skipping negotiate, we need to properly turn
// off negotiate stage. Otherwise, it may hang the processes. Use setting
// topology flag to suspend the negotiate stage then skip it.
bluefog_global.setting_topology = true;
while (!bluefog_global.ready_to_setting_topology.load()) {
std::this_thread::sleep_for(SUSPEND_BACKGROUND_WAITTING_DURATION);
}

global_skip_negotiate_stage = value;

bluefog_global.setting_topology = false;
bluefog_global.setting_topology_done = true;
// Wait for the background thread receive the setting_topology_done and
// close the ready_to_setting_topology epoch.
while (bluefog_global.ready_to_setting_topology) {
std::this_thread::sleep_for(SUSPEND_BACKGROUND_WAITTING_DURATION);
}
bluefog_global.setting_topology_done = false;
} else {
// From running negotiate to skipping negotiate, we need to properly turn
// off negotiate stage. Otherwise, it may hang the processes. Use setting
// topology flag to suspend the negotiate stage then skip it.
{
std::lock_guard<std::mutex> lk(bluefog_global.loop_mutex);
global_skip_negotiate_stage = value;
}
bluefog_global.loop_cv.notify_one();
}

bool GetSkipNegotiateStageState() {
Expand Down
Loading