Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BP] Fixes for large size clusters. (#10880) #10899

Merged
merged 1 commit into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions include/xgboost/collective/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -548,13 +548,10 @@ class TCPSocket {
[[nodiscard]] HandleT const &Handle() const { return handle_; }
/**
* @brief Listen to incoming requests. Should be called after bind.
*
* Both the default and minimum backlog is set to 256.
*/
[[nodiscard]] Result Listen(std::int32_t backlog = 16) {
if (listen(handle_, backlog) != 0) {
return system::FailWithCode("Failed to listen.");
}
return Success();
}
[[nodiscard]] Result Listen(std::int32_t backlog = 256);
/**
* @brief Bind socket to INADDR_ANY, return the port selected by the OS.
*/
Expand Down
9 changes: 9 additions & 0 deletions src/collective/socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/
#include "xgboost/collective/socket.h"

#include <algorithm> // for max
#include <array> // for array
#include <cstddef> // std::size_t
#include <cstdint> // std::int32_t
Expand Down Expand Up @@ -58,6 +59,14 @@ SockAddrV4 SockAddrV4::InaddrAny() { return MakeSockAddress("0.0.0.0", 0).V4();
SockAddrV6 SockAddrV6::Loopback() { return MakeSockAddress("::1", 0).V6(); }
SockAddrV6 SockAddrV6::InaddrAny() { return MakeSockAddress("::", 0).V6(); }

[[nodiscard]] Result TCPSocket::Listen(std::int32_t backlog) {
backlog = std::max(backlog, 256);
if (listen(this->handle_, backlog) != 0) {
return system::FailWithCode("Failed to listen.");
}
return Success();
}

std::size_t TCPSocket::Send(StringView str) {
CHECK(!this->IsClosed());
CHECK_LT(str.size(), std::numeric_limits<std::int32_t>::max());
Expand Down
3 changes: 2 additions & 1 deletion src/collective/tracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ RabitTracker::RabitTracker(Json const& config) : Tracker{config} {
listener_ = TCPSocket::Create(addr.IsV4() ? SockDomain::kV4 : SockDomain::kV6);
return listener_.Bind(host_, &this->port_);
} << [&] {
return listener_.Listen();
CHECK_GT(this->n_workers_, 0);
return listener_.Listen(this->n_workers_);
};
SafeColl(rc);
}
Expand Down
7 changes: 0 additions & 7 deletions src/common/device_helpers.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -224,13 +224,6 @@ __global__ void LaunchNKernel(size_t begin, size_t end, L lambda) {
lambda(i);
}
}
template <typename L>
__global__ void LaunchNKernel(int device_idx, size_t begin, size_t end,
L lambda) {
for (auto i : GridStrideRange(begin, end)) {
lambda(i, device_idx);
}
}

/* \brief A wrapper around kernel launching syntax, used to guard against empty input.
*
Expand Down
15 changes: 9 additions & 6 deletions src/tree/gpu_hist/row_partitioner.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,11 @@ void SortPositionBatch(common::Span<const PerNodeData<OpDataT>> d_batch_info,

// Value found by experimentation
const int kItemsThread = 12;
const int grid_size = xgboost::common::DivRoundUp(total_rows, kBlockSize * kItemsThread);

SortPositionCopyKernel<kBlockSize, RowIndexT, OpDataT>
<<<grid_size, kBlockSize, 0>>>(batch_info_itr, ridx, ridx_tmp, total_rows);
std::uint32_t const kGridSize =
xgboost::common::DivRoundUp(total_rows, kBlockSize * kItemsThread);
dh::LaunchKernel{kGridSize, kBlockSize, 0}(SortPositionCopyKernel<kBlockSize, RowIndexT, OpDataT>,
batch_info_itr, ridx, ridx_tmp, total_rows);
}

struct NodePositionInfo {
Expand Down Expand Up @@ -328,11 +329,13 @@ class RowPartitioner {
sizeof(NodePositionInfo) * ridx_segments_.size(),
cudaMemcpyDefault));

constexpr int kBlockSize = 512;
constexpr std::uint32_t kBlockSize = 512;
const int kItemsThread = 8;
const int grid_size = xgboost::common::DivRoundUp(ridx_.size(), kBlockSize * kItemsThread);
const std::uint32_t grid_size =
xgboost::common::DivRoundUp(ridx_.size(), kBlockSize * kItemsThread);
common::Span<const RowIndexT> d_ridx(ridx_.data().get(), ridx_.size());
FinalisePositionKernel<kBlockSize><<<grid_size, kBlockSize, 0>>>(
dh::LaunchKernel{grid_size, kBlockSize}(
FinalisePositionKernel<kBlockSize, RowIndexT, FinalisePositionOpT>,
dh::ToSpan(d_node_info_storage), d_ridx, d_out_position, op);
}
};
Expand Down
16 changes: 8 additions & 8 deletions tests/cpp/tree/gpu_hist/test_row_partitioner.cu
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,12 @@
#include <thrust/host_vector.h>
#include <thrust/sequence.h>

#include <algorithm>
#include <vector>

#include "../../../../src/tree/gpu_hist/row_partitioner.cuh"
#include "../../helpers.h"
#include "xgboost/base.h"
#include "xgboost/context.h"
#include "xgboost/task.h"
#include "xgboost/tree_model.h"
#include "../../helpers.h" // for RandomDataGenerator

namespace xgboost::tree {
void TestUpdatePositionBatch() {
Expand Down Expand Up @@ -55,7 +52,9 @@ void TestSortPositionBatch(const std::vector<int>& ridx_in, const std::vector<Se
thrust::device_vector<uint32_t> ridx_tmp(ridx_in.size());
thrust::device_vector<bst_uint> counts(segments.size());

auto op = [=] __device__(auto ridx, int split_index, int data) { return ridx % 2 == 0; };
auto op = [=] __device__(auto ridx, int split_index, int data) {
return ridx % 2 == 0;
};
std::vector<int> op_data(segments.size());
std::vector<PerNodeData<int>> h_batch_info(segments.size());
dh::TemporaryArray<PerNodeData<int>> d_batch_info(segments.size());
Expand All @@ -73,7 +72,9 @@ void TestSortPositionBatch(const std::vector<int>& ridx_in, const std::vector<Se
dh::ToSpan(ridx_tmp), dh::ToSpan(counts),
total_rows, op, &tmp);

auto op_without_data = [=] __device__(auto ridx) { return ridx % 2 == 0; };
auto op_without_data = [=] __device__(auto ridx) {
return ridx % 2 == 0;
};
for (size_t i = 0; i < segments.size(); i++) {
auto begin = ridx.begin() + segments[i].begin;
auto end = ridx.begin() + segments[i].end;
Expand All @@ -87,11 +88,10 @@ void TestSortPositionBatch(const std::vector<int>& ridx_in, const std::vector<Se
}
}

TEST(GpuHist, SortPositionBatch) {
TEST(RowPartitioner, SortPositionBatch) {
TestSortPositionBatch({0, 1, 2, 3, 4, 5}, {{0, 3}, {3, 6}});
TestSortPositionBatch({0, 1, 2, 3, 4, 5}, {{0, 1}, {3, 6}});
TestSortPositionBatch({0, 1, 2, 3, 4, 5}, {{0, 6}});
TestSortPositionBatch({0, 1, 2, 3, 4, 5}, {{3, 6}, {0, 2}});
}

} // namespace xgboost::tree
Loading