Skip to content

Commit

Permalink
Merge pull request #2100 from Tuvie/master
Browse files Browse the repository at this point in the history
Reduce cpu overhead when using rdma
  • Loading branch information
wwbmmm authored Jan 30, 2023
2 parents f44d190 + 1e5d06a commit 7ae916d
Showing 1 changed file with 35 additions and 12 deletions.
47 changes: 35 additions & 12 deletions src/brpc/rdma/rdma_endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ static const uint8_t MAX_HOP_LIMIT = 16;
static const uint8_t TIMEOUT = 14;
static const uint8_t RETRY_CNT = 7;
static const uint16_t MIN_QP_SIZE = 16;
static const uint16_t MAX_QP_SIZE = 4096;
static const uint16_t MIN_BLOCK_SIZE = 1024;
static const uint32_t ACK_MSG_RDMA_OK = 0x1;

Expand Down Expand Up @@ -187,9 +188,15 @@ RdmaEndpoint::RdmaEndpoint(Socket* s)
if (_sq_size < MIN_QP_SIZE) {
_sq_size = MIN_QP_SIZE;
}
if (_sq_size > MAX_QP_SIZE) {
_sq_size = MAX_QP_SIZE;
}
if (_rq_size < MIN_QP_SIZE) {
_rq_size = MIN_QP_SIZE;
}
if (_rq_size > MAX_QP_SIZE) {
_rq_size = MAX_QP_SIZE;
}
_read_butex = bthread::butex_create_checked<butil::atomic<int> >();
}

Expand Down Expand Up @@ -1027,7 +1034,7 @@ int RdmaEndpoint::PostRecv(uint32_t num, bool zerocopy) {
return 0;
}

static RdmaResource* AllocateQpCq() {
static RdmaResource* AllocateQpCq(uint16_t sq_size, uint16_t rq_size) {
RdmaResource* res = new (std::nothrow) RdmaResource;
if (!res) {
return NULL;
Expand Down Expand Up @@ -1059,8 +1066,18 @@ static RdmaResource* AllocateQpCq() {
memset(&attr, 0, sizeof(attr));
attr.send_cq = res->cq;
attr.recv_cq = res->cq;
attr.cap.max_send_wr = FLAGS_rdma_prepared_qp_size;
attr.cap.max_recv_wr = FLAGS_rdma_prepared_qp_size;
// NOTE: Since we hope to reduce send completion events, we set signaled
// send_wr every 1/4 of the total wnd. The wnd will increase when the ack
// is received, which means the receive side has already received the data
// in the corresponding send_wr. However, the ack does not mean the send_wr
// has been removed from SQ if it is set unsignaled. The reason is that
// the unsignaled send_wr is removed from SQ only after the CQE of next
// signaled send_wr is polled. Thus in a rare case, a new send_wr cannot be
// posted to SQ even in the wnd is not empty. In order to solve this
// problem, we enlarge the size of SQ to contain redundant 1/4 of the wnd,
// which is the maximum number of unsignaled send_wrs.
attr.cap.max_send_wr = sq_size * 5 / 4; /*NOTE*/
attr.cap.max_recv_wr = rq_size;
attr.cap.max_send_sge = GetRdmaMaxSge();
attr.cap.max_recv_sge = 1;
attr.qp_type = IBV_QPT_RC;
Expand Down Expand Up @@ -1091,7 +1108,7 @@ int RdmaEndpoint::AllocateResources() {
}
}
if (!_resource) {
_resource = AllocateQpCq();
_resource = AllocateQpCq(_sq_size, _rq_size);
} else {
_resource->next = NULL;
}
Expand Down Expand Up @@ -1360,6 +1377,7 @@ void RdmaEndpoint::PollCq(Socket* m) {
}
notified = false;

ssize_t bytes = 0;
for (int i = 0; i < cnt; ++i) {
if (s->Failed()) {
continue;
Expand All @@ -1380,15 +1398,19 @@ void RdmaEndpoint::PollCq(Socket* m) {
s->SetFailed(saved_errno, "Fail to handle rdma completion from %s: %s",
s->description().c_str(), berror(saved_errno));
} else if (nr > 0) {
const int64_t received_us = butil::cpuwide_time_us();
const int64_t base_realtime = butil::gettimeofday_us() - received_us;
InputMessenger* messenger = static_cast<InputMessenger*>(s->user());
if (messenger->ProcessNewMessage(
s.get(), nr, false, received_us, base_realtime, last_msg) < 0) {
return;
}
bytes += nr;
}
}

// Just call PrcessNewMessage once for all of these CQEs.
// Otherwise it may call too many bthread_flush to affect performance.
const int64_t received_us = butil::cpuwide_time_us();
const int64_t base_realtime = butil::gettimeofday_us() - received_us;
InputMessenger* messenger = static_cast<InputMessenger*>(s->user());
if (messenger->ProcessNewMessage(
s.get(), bytes, false, received_us, base_realtime, last_msg) < 0) {
return;
}
}
}

Expand Down Expand Up @@ -1442,7 +1464,8 @@ int RdmaEndpoint::GlobalInitialize() {

g_rdma_resource_mutex = new butil::Mutex;
for (int i = 0; i < FLAGS_rdma_prepared_qp_cnt; ++i) {
RdmaResource* res = AllocateQpCq();
RdmaResource* res = AllocateQpCq(FLAGS_rdma_prepared_qp_size,
FLAGS_rdma_prepared_qp_size);
if (!res) {
return -1;
}
Expand Down

0 comments on commit 7ae916d

Please sign in to comment.