Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Hanbin Hu committed Mar 27, 2021
1 parent 3d90087 commit 96887b5
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 89 deletions.
2 changes: 1 addition & 1 deletion bluefog/common/global_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ struct BluefogGlobalState {
// Threshold for Tensor Fusion. All tensors that occupy memory beyond this
// threshold will be fused.
int64_t tensor_fusion_threshold = 8 * 1024 * 1024;
int64_t tensor_fusion_threshold_for_dst_weight = 8 * 1024 * 1024;
int64_t tensor_fusion_threshold_for_dst_weight = tensor_fusion_threshold;
FusionBufferManager fusion_buffer;

// Because setting topology happens in the main thread instead of communication
Expand Down
92 changes: 51 additions & 41 deletions bluefog/common/mpi_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -449,13 +449,13 @@ void MPIController::NeighborAllreduce(TensorTableEntry& entry) {
// including itself is more intuitive.
std::string error_message = "";

if (!entry.is_hierarchical) {
if (!entry.dynamic_neighbors_enabled) {
if (!entry.is_hierarchical) { // neighbor allreduce without hierarchy
if (!entry.dynamic_neighbors_enabled) { // static topology
MPICHECK(MPI_Neighbor_allgather(
sendbuf, num_elements, mpi_ctx_.GetMPIDataType(entry.tensor),
buffer_data, num_elements, mpi_ctx_.GetMPIDataType(entry.output),
mpi_ctx_.GetMPICommunicator(Communicator::GRAPH)));
} else {
} else { // dynamic topology
int nsend = entry.send_neighbors->size();
int nrecv = entry.recv_neighbors->size();

Expand All @@ -468,8 +468,9 @@ void MPIController::NeighborAllreduce(TensorTableEntry& entry) {
}
}
// TODO(ybc) #83 Better design pattern for data_weight synchronization
// This ready event make sure the data_weight computation is done before communication, as
// Pytorch CUDA stream is not synchronized with our CUDA stream.
// This ready event makes sure the data_weight computation is done before communication, as
// Pytorch CUDA stream is not synchronized with our CUDA stream, and it does nothing when
// it is running on CPU.
std::shared_ptr<common::ReadyEvent> ready_event =
entry.context->RecordReadyEvent(entry.device);

Expand All @@ -486,16 +487,14 @@ void MPIController::NeighborAllreduce(TensorTableEntry& entry) {
}

if (entry.dst_weighting_enabled) {
if (ready_event != nullptr) {
while (!ready_event->Ready()) {
std::this_thread::sleep_for(std::chrono::nanoseconds(100));
}
while ((ready_event != nullptr) && !ready_event->Ready()) {
std::this_thread::sleep_for(std::chrono::nanoseconds(100));
}
}
for (int i = 0; i < nsend; ++i) {
const void* buffer_send = sendbuf;
if (entry.dst_weighting_enabled)
buffer_send = weighted_tensors[i].get()->data();
const void* buffer_send = (entry.dst_weighting_enabled)
? weighted_tensors[i]->data()
: sendbuf;
MPICHECK(MPI_Isend(buffer_send, num_elements,
mpi_ctx_.GetMPIDataType(entry.tensor), entry.send_neighbors->at(i),
/*tag=*/mpi_ctx_.rank_ + entry.send_neighbors->at(i),
Expand All @@ -505,7 +504,7 @@ void MPIController::NeighborAllreduce(TensorTableEntry& entry) {
error_message =
GenerateNeighborExchangeErrorMessage(statuses, nsend, nrecv);
}
} else {
} else { // hierarchical neighbor allreduce
if (entry.send_neighbors->empty()) {
throw std::runtime_error(
"Under hierarchical neighbor_allreduce, argument "
Expand Down Expand Up @@ -630,26 +629,8 @@ void MPIController::NeighborAllreduce(std::vector<TensorTableEntry>& entries) {
if (first_entry.dst_weighting_enabled) {
// Generate weighted data fusion for sending
timeline_ptr->ActivityStartAll(entries, "MEMCPY_IN_WEIGHT_FUSION_BUFFER");
void* weight_buffer_data;
MemcpyInWeightFusionBuffer(weight_buffer_data, first_entry.send_neighbors->size(),
fused_input_data, num_elements, element_size,
first_entry.context, first_entry.device);
int64_t offset = 0;
for (size_t i = 0; i < first_entry.send_neighbors->size(); ++i) {
double dst_weight = first_entry.send_weights->at(i);
void* weight_buffer_data_offset = (uint8_t*)weight_buffer_data + offset;
if (first_entry.device == CPU_DEVICE_ID) {
ScaleCPUBuffer(dst_weight, weight_buffer_data_offset, num_elements,
first_entry.tensor->dtype());
} else {
#if HAVE_CUDA
ScaleBufferCudaImpl(dst_weight, weight_buffer_data_offset, num_elements,
first_entry.tensor->dtype(), mpi_ctx_.stream);
#endif
}
offset += num_elements * element_size;
}
weighted_fused_input_data = weight_buffer_data;
weighted_fused_input_data = GenerateWeightedFusedInputData(fused_input_data, first_entry,
num_elements, element_size);
timeline_ptr->ActivityEndAll(entries);
}

Expand All @@ -667,13 +648,13 @@ void MPIController::NeighborAllreduce(std::vector<TensorTableEntry>& entries) {
// including itself is more intuitive.
std::string error_message = "";

if (!first_entry.is_hierarchical) {
if (!first_entry.dynamic_neighbors_enabled) {
if (!first_entry.is_hierarchical) { // neighbor allreduce without hierarchy
if (!first_entry.dynamic_neighbors_enabled) { // static topology
MPICHECK(MPI_Neighbor_allgather(
fused_input_data, num_elements, mpi_ctx_.GetMPIDataType(first_entry.tensor),
buffer_data, num_elements, mpi_ctx_.GetMPIDataType(first_entry.output),
mpi_ctx_.GetMPICommunicator(Communicator::GRAPH)));
} else {
} else { // dynamic topology
int nsend = first_entry.send_neighbors->size();
int nrecv = first_entry.recv_neighbors->size();
std::vector<MPI_Request> requests(nsend + nrecv);
Expand All @@ -692,10 +673,10 @@ void MPIController::NeighborAllreduce(std::vector<TensorTableEntry>& entries) {
}
#endif
for (int i = 0; i < nsend; ++i) {
const void* sendbuf = fused_input_data;
if (first_entry.dst_weighting_enabled)
sendbuf =
(void*)((uint8_t*)weighted_fused_input_data + num_elements * i * element_size);
const void* sendbuf =
(first_entry.dst_weighting_enabled)
? (void*)((uint8_t*)weighted_fused_input_data + num_elements * i * element_size)
: fused_input_data;
MPICHECK(MPI_Isend(sendbuf, num_elements,
mpi_ctx_.GetMPIDataType(first_entry.tensor), first_entry.send_neighbors->at(i),
/*tag=*/mpi_ctx_.rank_ + first_entry.send_neighbors->at(i),
Expand All @@ -705,7 +686,7 @@ void MPIController::NeighborAllreduce(std::vector<TensorTableEntry>& entries) {
error_message =
GenerateNeighborExchangeErrorMessage(statuses, nsend, nrecv);
}
} else {
} else { // hierarchical neighbor allreduce
if (first_entry.send_neighbors->empty()) {
throw std::runtime_error(
"Under hierarchical neighbor_allreduce, argument "
Expand Down Expand Up @@ -1442,6 +1423,35 @@ void MPIController::MemcpyInWeightFusionBuffer(
}
}

const void* MPIController::GenerateWeightedFusedInputData(const void* fused_input_data,
const TensorTableEntry& entry,
int64_t num_elements, int element_size) {
// Given a fused_input_data like [t_1, t_2], the storage for neighbor_allreduce in
// weighted fused input data is like [t_1_w1, t_2_w1 | t_1_w2, t_2_w2 | t_1_w3, t_2_w3].
// Here t_1 and t_2 means self tensor 1 and 2 and _w1, _w2, and _w3 means the
// destination weights to destination 1, 2, and 3.
void* weight_buffer_data;
MemcpyInWeightFusionBuffer(weight_buffer_data, entry.send_neighbors->size(),
fused_input_data, num_elements, element_size,
entry.context, entry.device);
int64_t offset = 0;
for (size_t i = 0; i < entry.send_neighbors->size(); ++i) {
double dst_weight = entry.send_weights->at(i);
void* weight_buffer_data_offset = (uint8_t*)weight_buffer_data + offset;
if (entry.device == CPU_DEVICE_ID) {
ScaleCPUBuffer(dst_weight, weight_buffer_data_offset, num_elements,
entry.tensor->dtype());
} else {
#if HAVE_CUDA
ScaleBufferCudaImpl(dst_weight, weight_buffer_data_offset, num_elements,
entry.tensor->dtype(), mpi_ctx_.stream);
#endif
}
offset += num_elements * element_size;
}
return weight_buffer_data;
}

void MPIController::MemcpyInFusionBuffer(
const std::vector<TensorTableEntry>& entries, void*& buffer_data,
size_t& buffer_len) {
Expand Down
4 changes: 4 additions & 0 deletions bluefog/common/mpi_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ class MPIController {
const void* buffer_data, int64_t num_elements, int element_size,
std::shared_ptr<OpContext> context, int device);

const void* GenerateWeightedFusedInputData(const void* fused_input_data,
const TensorTableEntry& entry,
int64_t num_elements, int element_size);

void MemcpyOutFusionBufferForInputs(const void* fused_input_data,
std::vector<TensorTableEntry>& entries);

Expand Down
82 changes: 48 additions & 34 deletions bluefog/common/nccl_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -739,9 +739,9 @@ void NCCLController::NeighborAllreduce(TensorTableEntry& entry) {
// Ensure the lifecycle of the weighted tensors are alive after communication.

#if NCCL_MINOR > 6
if (!entry.is_hierarchical) {
if (!entry.is_hierarchical) { // neighbor allreduce without hierarchy
ncclGroupStart();
if (!entry.dynamic_neighbors_enabled) {
if (!entry.dynamic_neighbors_enabled) { // static topology
for (int i = 0; i < mpi_ctx_.neighbor_indgree_; i++) {
int recv_rank = mpi_ctx_.neighbor_in_ranks_[i];
void* recvbuf = (void*)(static_cast<const char*>(entry.output->data()) +
Expand All @@ -753,13 +753,17 @@ void NCCLController::NeighborAllreduce(TensorTableEntry& entry) {
NCCLCHECK(ncclSend(sendbuf, num_elements, GetNCCLDataType(entry.tensor),
send_rank, nccl_ctx_.nccl_comm, nccl_ctx_.stream));
}
} else {
} else { // dynamic topology
if(entry.dst_weighting_enabled) {
for (size_t i = 0; i < entry.send_neighbors->size(); ++i) {
auto weighted_tensor_ptr = entry.tensor->data_weight(entry.send_weights->at(i));
weighted_tensors.push_back(std::move(weighted_tensor_ptr));
}
}
// TODO(ybc) #83 Better design pattern for data_weight synchronization
// This ready event makes sure the data_weight computation is done before communication, as
// Pytorch CUDA stream is not synchronized with our CUDA stream, and it does nothing when
// it is running on CPU.
std::shared_ptr<common::ReadyEvent> ready_event =
entry.context->RecordReadyEvent(entry.device);
for (size_t i = 0; i < entry.recv_neighbors->size(); ++i) {
Expand All @@ -770,22 +774,20 @@ void NCCLController::NeighborAllreduce(TensorTableEntry& entry) {
recv_rank, nccl_ctx_.nccl_comm, nccl_ctx_.stream));
}
if(entry.dst_weighting_enabled) {
if (ready_event != nullptr) {
while (!ready_event->Ready()) {
std::this_thread::sleep_for(std::chrono::nanoseconds(100));
}
while ((ready_event != nullptr) && !ready_event->Ready()) {
std::this_thread::sleep_for(std::chrono::nanoseconds(100));
}
}
for (size_t i = 0; i < entry.send_neighbors->size(); ++i) {
const void* buffer_send = sendbuf;
if (entry.dst_weighting_enabled)
buffer_send = weighted_tensors[i].get()->data();
const void* buffer_send = (entry.dst_weighting_enabled)
? weighted_tensors[i]->data()
: sendbuf;
NCCLCHECK(ncclSend(buffer_send, num_elements, GetNCCLDataType(entry.tensor),
entry.send_neighbors->at(i), nccl_ctx_.nccl_comm, nccl_ctx_.stream));
}
}
ncclGroupEnd();
} else {
} else { // hierarchical neighbor allreduce
if (entry.send_neighbors->empty()) {
throw std::runtime_error(
"Under hierarchical neighbor_allreduce, argument "
Expand Down Expand Up @@ -1016,22 +1018,11 @@ void NCCLController::NeighborAllreduce(std::vector<TensorTableEntry>& entries) {
RecordEvent(event_queue, "MEM_CPY_IN");
}

const void* weighted_fused_input_data = nullptr;
if (first_entry.dst_weighting_enabled) {
void* weight_buffer_data = nullptr;
MemcpyInWeightFusionBuffer(weight_buffer_data, first_entry.send_neighbors->size(),
fused_input_data, num_elements, element_size,
first_entry.context, first_entry.device);
int64_t offset = 0;
for (size_t i = 0; i < first_entry.send_neighbors->size(); ++i) {
double dst_weight = first_entry.send_weights->at(i);
void* weight_buffer_data_offset = (uint8_t*)weight_buffer_data + offset;
ScaleBufferCudaImpl(dst_weight, weight_buffer_data_offset, num_elements,
first_entry.tensor->dtype(), nccl_ctx_.stream);
offset += num_elements * element_size;
}
weighted_fused_input_data = weight_buffer_data;
}
const void* weighted_fused_input_data =
(first_entry.dst_weighting_enabled)
? GenerateWeightedFusedInputData(fused_input_data, first_entry,
num_elements, element_size)
: nullptr;

// Unlike allreduce, the storage for neighbor_allreduce in fusion buffer
// is like [t_1, t_2 | t_1_n1, t_2_n1, t_1_n2, t_2_n2].
Expand All @@ -1040,9 +1031,9 @@ void NCCLController::NeighborAllreduce(std::vector<TensorTableEntry>& entries) {
// Hence, we need to offset the buffer data to location for neighbors.
buffer_data = (uint8_t*)buffer_data + num_elements * element_size;

if (!first_entry.is_hierarchical) {
if (!first_entry.is_hierarchical) { // neighbor allreduce without hierarchy
ncclGroupStart();
if (!first_entry.dynamic_neighbors_enabled) {
if (!first_entry.dynamic_neighbors_enabled) { // static topology
for (int i = 0; i < mpi_ctx_.neighbor_indgree_; i++) {
int recv_rank = mpi_ctx_.neighbor_in_ranks_[i];
void* recvbuf =
Expand All @@ -1056,7 +1047,7 @@ void NCCLController::NeighborAllreduce(std::vector<TensorTableEntry>& entries) {
GetNCCLDataType(first_entry.tensor), send_rank,
nccl_ctx_.nccl_comm, nccl_ctx_.stream));
}
} else {
} else { // dynamic topology
for (size_t i = 0; i < first_entry.recv_neighbors->size(); ++i) {
int recv_rank = first_entry.recv_neighbors->at(i);
void* recvbuf =
Expand All @@ -1066,16 +1057,17 @@ void NCCLController::NeighborAllreduce(std::vector<TensorTableEntry>& entries) {
nccl_ctx_.nccl_comm, nccl_ctx_.stream));
}
for (size_t i = 0; i < first_entry.send_neighbors->size(); ++i) {
const void* sendbuf = fused_input_data;
if (first_entry.dst_weighting_enabled)
sendbuf = (void*)((uint8_t*)weighted_fused_input_data + num_elements * i * element_size);
const void* sendbuf =
(first_entry.dst_weighting_enabled)
? (void*)((uint8_t*)weighted_fused_input_data + num_elements * i * element_size)
: fused_input_data;
NCCLCHECK(ncclSend(sendbuf, num_elements, GetNCCLDataType(first_entry.tensor),
first_entry.send_neighbors->at(i),
nccl_ctx_.nccl_comm, nccl_ctx_.stream));
}
}
ncclGroupEnd();
} else {
} else { // hierarchical neighbor allreduce
if (first_entry.send_neighbors->empty()) {
throw std::runtime_error(
"Under hierarchical neighbor_allreduce, argument "
Expand Down Expand Up @@ -1925,6 +1917,28 @@ void NCCLController::MemcpyInWeightFusionBuffer(
}
}

const void* NCCLController::GenerateWeightedFusedInputData(const void* fused_input_data,
const TensorTableEntry& entry,
int64_t num_elements, int element_size) {
// Given a fused_input_data like [t_1, t_2], the storage for neighbor_allreduce in
// weighted fused input data is like [t_1_w1, t_2_w1 | t_1_w2, t_2_w2 | t_1_w3, t_2_w3].
// Here t_1 and t_2 means self tensor 1 and 2 and _w1, _w2, and _w3 means the
// destination weights to destination 1, 2, and 3.
void* weight_buffer_data = nullptr;
MemcpyInWeightFusionBuffer(weight_buffer_data, entry.send_neighbors->size(),
fused_input_data, num_elements, element_size,
entry.context, entry.device);
int64_t offset = 0;
for (size_t i = 0; i < entry.send_neighbors->size(); ++i) {
double dst_weight = entry.send_weights->at(i);
void* weight_buffer_data_offset = (uint8_t*)weight_buffer_data + offset;
ScaleBufferCudaImpl(dst_weight, weight_buffer_data_offset, num_elements,
entry.tensor->dtype(), nccl_ctx_.stream);
offset += num_elements * element_size;
}
return weight_buffer_data;
}

void NCCLController::MemcpyOutFusionBuffer(
const void* buffer_data, std::vector<TensorTableEntry>& entries) {
int64_t offset = 0;
Expand Down
4 changes: 4 additions & 0 deletions bluefog/common/nccl_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ class NCCLController {
const void* buffer_data, int64_t num_elements, int element_size,
std::shared_ptr<OpContext> context, int device);

const void* GenerateWeightedFusedInputData(const void* fused_input_data,
const TensorTableEntry& entry,
int64_t num_elements, int element_size);

void MemcpyOutFusionBufferForInputs(const void* fused_input_data,
std::vector<TensorTableEntry>& entries);

Expand Down
2 changes: 2 additions & 0 deletions bluefog/common/operations.cc
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,8 @@ void BackgroundThreadLoop(BluefogGlobalState& state) {
if (bluefog_fusion_threshold != nullptr) {
state.tensor_fusion_threshold =
std::strtol(bluefog_fusion_threshold, nullptr, 10);
state.tensor_fusion_threshold_for_dst_weight =
state.tensor_fusion_threshold;
}

// Initialize the tensor count table. No tensors are available yet.
Expand Down
3 changes: 3 additions & 0 deletions bluefog/common/tensor_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ class FusionBufferManager {
std::function<void()> on_end_init);

// Initializes a buffer of the given threshold size times MPI size if not already cached.
// There is one constraint to noticed here. We need WeightBuffer is always larger than
// (size-1)*fusion Buffer since we don't want to tensor being able to put into the fusion
// buffer but not able to put into weightbuffer.
//
// Args:
// threshold: Size of the buffer in bytes.
Expand Down
Loading

0 comments on commit 96887b5

Please sign in to comment.