Skip to content

Commit

Permalink
Merge pull request #28 from sony/feature/20171025-multinode
Browse files Browse the repository at this point in the history
Data Parallel Distributed Training over Multiple Nodes
  • Loading branch information
TakuyaNarihira authored Nov 15, 2017
2 parents 46fd7c5 + 824eb28 commit 43c38cb
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 54 deletions.
23 changes: 23 additions & 0 deletions build-tools/cmake/Modules/FindNCCL.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Find NCCL, NVIDIA Collective Communications Library
# Returns:
# NCCL_INCLUDE_DIR
# NCCLLIBRARIES

find_path(NCCL_INCLUDE_DIR NAMES nccl.h
PATHS
/usr/include
/usr/local/include
$ENV{NCCL_HOME}/include)
find_library(NCCL_LIBRARIES NAMES nccl
PATHS
/lib
/lib64
/usr/lib
/usr/lib64
/usr/local/lib
/usr/local/lib64
$ENV{NCCL_HOME}/lib)

include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(NCCL DEFAULT_MSG NCCL_INCLUDE_DIR NCCL_LIBRARIES)
mark_as_advanced(NCCL_INCLUDE_DIR NCCL_LIBRARIES)
12 changes: 6 additions & 6 deletions include/nbla/cuda/communicator/data_parallel_communicator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,15 @@ class NBLA_API DataParallelCommunicatorNccl
*/
virtual void init();

virtual void reduce(bool division = true);
virtual void allreduce(bool division = true, bool inplace = false);
virtual void reducescatter(bool division = true);
virtual void reduce(bool division = false);
virtual void allreduce(bool division = false, bool inplace = false);
virtual void reducescatter(bool division = false);
virtual void bcast();
virtual void allgather();

virtual void reduce_async(bool division = true);
virtual void allreduce_async(bool division = true, bool inplace = false);
virtual void reducescatter_async(bool division = true);
virtual void reduce_async(bool division = false);
virtual void allreduce_async(bool division = false, bool inplace = false);
virtual void reducescatter_async(bool division = false);
virtual void bcast_async();
virtual void allgather_async();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,15 @@ class NBLA_API MultiProcessDataParallelCommunicatorNccl
*/
virtual void init();

virtual void reduce(bool division = true);
virtual void allreduce(bool division = true, bool inplace = false);
virtual void reducescatter(bool division = true);
virtual void reduce(bool division = false);
virtual void allreduce(bool division = false, bool inplace = false);
virtual void reducescatter(bool division = false);
virtual void bcast();
virtual void allgather();

virtual void reduce_async(bool division = true);
virtual void allreduce_async(bool division = true, bool inplace = false);
virtual void reducescatter_async(bool division = true);
virtual void reduce_async(bool division = false);
virtual void allreduce_async(bool division = false, bool inplace = false);
virtual void reducescatter_async(bool division = false);
virtual void bcast_async();
virtual void allgather_async();

Expand Down
22 changes: 10 additions & 12 deletions src/nbla/cuda/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,16 @@ list(APPEND NBLA_CUDA_LINKER_LIBS
option(WITH_NCCL "Use nccl for distributed training" OFF)
if(WITH_NCCL)
add_definitions(-DFEATURE_DIST_TRAIN)

if(DEFINED ENV{NCCL_HOME})
find_package(MPI REQUIRED)
list(APPEND NBLA_CUDA_INCLUDE_DIRS
$ENV{NCCL_HOME}/build/include
${MPI_INCLUDE_PATH}
)
list(APPEND NBLA_CUDA_LINKER_LIBS
$ENV{NCCL_HOME}/build/lib/libnccl.so
${MPI_LIBRARIES}
)
endif()
find_package(NCCL REQUIRED)
find_package(MPI REQUIRED)
list(APPEND NBLA_CUDA_INCLUDE_DIRS
${NCCL_INCLUDE_DIR}
${MPI_INCLUDE_PATH}
)
list(APPEND NBLA_CUDA_LINKER_LIBS
${NCCL_LIBRARIES}
${MPI_LIBRARIES}
)
endif()

####################################################################################################
Expand Down
119 changes: 98 additions & 21 deletions src/nbla/cuda/communicator/data_parallel_communicator.cu
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <nbla/cuda/array/cuda_array.hpp>
#include <nbla/cuda/common.hpp>
#include <nbla/cuda/communicator/data_parallel_communicator.hpp>

Expand Down Expand Up @@ -83,18 +84,24 @@ void DataParallelCommunicatorNccl<T>::reduce(bool division) {

template <typename T>
void DataParallelCommunicatorNccl<T>::allreduce(bool division, bool inplace) {
if (inplace == false) {
// TODO: currently nnabla uses default stream for computation.
// The following logic relies on that, so if nnabla uses another stream for
// computation,
// we have to issue null kernel to the default stream at the beginning of this
// method
// and at the end of this method for using the implicit synchronization
// technique for
// main thread not to wait for a result of a kernel call.

if (inplace == true) {
NBLA_ERROR(error_code::not_implemented,
"CUDA GPU allreduce with out-of-place is not implemented.")
"CUDA GPU allreduce with out-of-place is only implemented.")
}

// Sync all devices
wait_by_devices_synchronization();

// Once sync to prevent the hang where the memcpy occurs during the allreduce.
this->sync_all_params();

// Inpalce allreduce
// 1. copy inside device
for (int i = 0; i < device_ids_.size(); ++i) { // device-loop
Context ctx = this->contexts_[i];
auto device_id = device_ids_[i];
Expand All @@ -103,28 +110,98 @@ void DataParallelCommunicatorNccl<T>::allreduce(bool division, bool inplace) {
auto func_named_param = this->device_func_named_param_[i];
auto comm = comms_[i];
auto stream = streams_[i];
auto size = func_named_param.size();

for (auto elm : func_named_param) { // function-loop
shared_ptr<CudaCachedArray> arr_buff = // TODO: address 16 bits also here?
make_shared<CudaCachedArray>(this->total_params_, get_dtype<T>(), ctx);

T *buff = arr_buff->pointer<T>();
Size_t type_size = sizeof(T);

for (auto elm : func_named_param) {
VariablePtr vp = elm.second;
const T *dw = vp->get_grad_pointer<T>(ctx);
auto n_param = vp->size();
cudaMemcpyAsync(buff, dw, type_size * n_param, cudaMemcpyDeviceToDevice,
stream);
buff += n_param;
}
}

const T *dw0 = vp->get_grad_pointer<T>(ctx);
T *dw1 = vp->cast_grad_and_get_pointer<T>(ctx);
ncclResult_t res = ncclAllReduce(dw0, dw1, n_param, ncclFloat,
ncclSum, // TODO: address ncclFloat
comm, stream);
if (res != 0) {
NBLA_ERROR(error_code::target_specific, "ncclAllReduce fails with %d.",
res);
}
// 2. allreduce
#ifdef NCCL_MAJOR
ncclGroupStart();
#endif
for (int i = 0; i < device_ids_.size(); ++i) { // device-loop
Context ctx = this->contexts_[i];
auto device_id = device_ids_[i];
// cuda_set_device(device_id);

auto comm = comms_[i];
auto stream = streams_[i];

shared_ptr<CudaCachedArray> arr_buff = // TODO: address 16 bits also here?
make_shared<CudaCachedArray>(this->total_params_, get_dtype<T>(), ctx);

T *buff = arr_buff->pointer<T>();
ncclResult_t ret = ncclAllReduce(buff, buff, this->total_params_,
ncclFloat, // TODO: address ncclFloat
ncclSum, comm, 0); // use default stream

if (ret != ncclSuccess) {
NBLA_ERROR(error_code::target_specific, "ncclAllReduce fails with %d.",
ret);
}
}
// Divide using the same streams
divide_by_num_divices(division);
#ifdef NCCL_MAJOR
ncclGroupEnd();
// wait_by_streams_synchronization();
#endif

// 3. divide
if (division) {
for (int i = 0; i < device_ids_.size(); ++i) { // device-loop
Context ctx = this->contexts_[i];
auto device_id = device_ids_[i];
cuda_set_device(device_id);

// Sync streams
wait_by_streams_synchronization();
auto comm = comms_[i];
auto stream = streams_[i];

shared_ptr<CudaCachedArray> arr_buff = // TODO: address 16 bits also here?
make_shared<CudaCachedArray>(this->total_params_, get_dtype<T>(),
ctx);

T *buff = arr_buff->pointer<T>();
NBLA_CUDA_LAUNCH_KERNEL_IN_STREAM(kernel_divide_inplace, stream,
this->total_params_, n_devices_, buff);
}
}

// 4. copy back inside device
for (int i = 0; i < device_ids_.size(); ++i) { // device-loop
Context ctx = this->contexts_[i];
auto device_id = device_ids_[i];
cuda_set_device(device_id);

auto func_named_param = this->device_func_named_param_[i];
auto comm = comms_[i];
auto stream = streams_[i];

shared_ptr<CudaCachedArray> arr_buff = // TODO: address 16 bits also here?
make_shared<CudaCachedArray>(this->total_params_, get_dtype<T>(), ctx);

T *buff = arr_buff->pointer<T>();
Size_t type_size = sizeof(T);

for (auto elm : func_named_param) {
VariablePtr vp = elm.second;
T *dw = vp->cast_grad_and_get_pointer<T>(ctx);
auto n_param = vp->size();
cudaMemcpyAsync(dw, buff, type_size * n_param, cudaMemcpyDeviceToDevice,
stream);
buff += n_param;
}
}
}

template <typename T>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <memory>

#include "mpi.h"
#include <stdint.h>
#include <unistd.h>

namespace nbla {

Expand All @@ -31,7 +33,28 @@ __global__ void kernel_divide_inplace(const int size, const int n_devices,
NBLA_CUDA_KERNEL_LOOP(i, size) { dw[i] /= n_devices; }
}

__global__ void kernel_null() {}
/*
* Referred from
* http://docs.nvidia.com/deeplearning/sdk/nccl-developer-guide/index.html#onedevprothrd
*/
static uint64_t get_host_hash(const char *string) {
// Based on DJB2, result = result * 33 + char
uint64_t result = 5381;
for (int c = 0; string[c] != '\0'; c++) {
result = ((result << 5) + result) + string[c];
}
return result;
}

static void get_host_name(char *hostname, int maxlen) {
gethostname(hostname, maxlen);
for (int i = 0; i < maxlen; i++) {
if (hostname[i] == '.') {
hostname[i] = '\0';
return;
}
}
}

template <typename T>
MultiProcessDataParallelCommunicatorNccl<
Expand Down Expand Up @@ -67,27 +90,53 @@ template <typename T> void MultiProcessDataParallelCommunicatorNccl<T>::init() {
int requiredThreadLevelSupport = MPI_THREAD_SERIALIZED;
int provided;
MPI_Init_thread(&argc, &argv, requiredThreadLevelSupport, &provided);
if (provided != requiredThreadLevelSupport)
NBLA_ERROR(error_code::target_specific, "MPI_Init_thread failed.");
if (provided != requiredThreadLevelSupport) {
NBLA_ERROR(error_code::target_specific,
"MPI_Init_thread failed since provided (%d) is not equal to "
"requiredThreadLevelSupport (%d)",
provided, requiredThreadLevelSupport);
}
mpi_initialized_ = true;
}
// Create comm, set size, and rank
MPI_Comm mpi_comm;
MPI_Comm_dup(MPI_COMM_WORLD, &mpi_comm);
MPI_Comm_size(mpi_comm, &this->size_);
MPI_Comm_rank(mpi_comm, &this->rank_);
device_id_ = this->rank_;

// We have to set our device before NCCL init
cuda_set_device(device_id_);
// Set local rank and device id
uint64_t host_hashs[this->size_];
char hostname[1024];
get_host_name(hostname, 1024);
host_hashs[this->rank_] = get_host_hash(hostname);

MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, host_hashs,
sizeof(uint64_t), MPI_BYTE, mpi_comm);
MPI_Barrier(mpi_comm);

int local_rank = 0;
for (int i = 0; i < this->size_; ++i) {
if (i == this->rank_) {
break;
}
if (host_hashs[i] == host_hashs[this->rank_]) {
local_rank++;
}
}
this->device_id_ = local_rank;
this->local_rank_ = local_rank;

// Exchange comm_id_ among processes
ncclGetUniqueId(&comm_id_);
MPI_Bcast(&comm_id_, NCCL_UNIQUE_ID_BYTES, MPI_CHAR, 0, mpi_comm);
if (this->rank_ == 0) {
ncclGetUniqueId(&this->comm_id_);
}

MPI_Bcast(&comm_id_, sizeof(this->comm_id_), MPI_BYTE, 0, mpi_comm);
MPI_Barrier(mpi_comm);
MPI_Comm_free(&mpi_comm);

// Nccl Init
cuda_set_device(device_id_);
ncclResult_t ret =
ncclCommInitRank(&comm_, this->size_, comm_id_, this->rank_);
if (ret != ncclSuccess) {
Expand All @@ -96,7 +145,6 @@ template <typename T> void MultiProcessDataParallelCommunicatorNccl<T>::init() {

// Create streams
for (int i = 0; i < streams_.size(); ++i) {
// Stream
cudaStream_t stream;
NBLA_CUDA_CHECK(cudaStreamCreate(&stream));
streams_[i] = stream;
Expand Down

0 comments on commit 43c38cb

Please sign in to comment.