From 39342d9c91acaa8f39a9f9ea3b2d002540059ae3 Mon Sep 17 00:00:00 2001 From: Tuvie Date: Fri, 2 Dec 2022 17:56:58 +0800 Subject: [PATCH 1/3] return 0 for append_user_data_with_meta when size=0 --- src/butil/iobuf.cpp | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/butil/iobuf.cpp b/src/butil/iobuf.cpp index b2a43386c0..b585c92c8e 100644 --- a/src/butil/iobuf.cpp +++ b/src/butil/iobuf.cpp @@ -1217,10 +1217,6 @@ int IOBuf::append_user_data_with_meta(void* data, size_t size, void (*deleter)(void*), uint64_t meta) { - if (size == 0) { - LOG(WARNING) << "data_size should not be 0"; - return -1; - } if (size > 0xFFFFFFFFULL - 100) { LOG(FATAL) << "data_size=" << size << " is too large"; return -1; From 77be720219ff5d2740ad4ae07e7b0251bb379202 Mon Sep 17 00:00:00 2001 From: Tuvie Date: Sun, 4 Dec 2022 22:16:57 +0800 Subject: [PATCH 2/3] fix bug: QP cannot be put back to QP pool --- src/brpc/rdma/rdma_endpoint.cpp | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp index 49daa8f8c9..fe282b640a 100644 --- a/src/brpc/rdma/rdma_endpoint.cpp +++ b/src/brpc/rdma/rdma_endpoint.cpp @@ -1211,7 +1211,8 @@ void RdmaEndpoint::DeallocateResources() { } bool move_to_rdma_resource_list = false; if (_sq_size <= FLAGS_rdma_prepared_qp_size && - _rq_size <= FLAGS_rdma_prepared_qp_size) { + _rq_size <= FLAGS_rdma_prepared_qp_size && + FLAGS_rdma_prepared_qp_cnt > 0) { ibv_qp_attr attr; attr.qp_state = IBV_QPS_RESET; if (IbvModifyQp(_resource->qp, &attr, IBV_QP_STATE) == 0) { @@ -1224,12 +1225,14 @@ void RdmaEndpoint::DeallocateResources() { if (IbvDestroyQp(_resource->qp) < 0) { PLOG(WARNING) << "Fail to destroy QP"; } + _resource->qp = NULL; } if (_resource->cq) { IbvAckCqEvents(_resource->cq, _cq_events); if (IbvDestroyCq(_resource->cq) < 0) { PLOG(WARNING) << "Fail to destroy CQ"; } + _resource->cq = NULL; } if (_resource->comp_channel) { // destroy comp_channel will destroy this fd @@ -1239,8 +1242,10 @@ void RdmaEndpoint::DeallocateResources() { if (IbvDestroyCompChannel(_resource->comp_channel) < 0) { PLOG(WARNING) << "Fail to destroy CQ channel"; } + _resource->comp_channel = NULL; } delete _resource; + _resource = NULL; } SocketUniquePtr s; @@ -1256,7 +1261,7 @@ void RdmaEndpoint::DeallocateResources() { _cq_sid = INVALID_SOCKET_ID; } - if (!move_to_rdma_resource_list) { + if (move_to_rdma_resource_list) { if (_resource->cq) { IbvAckCqEvents(_resource->cq, _cq_events); } From 1e5d06a6341c21f54213c528f512737f28c44aef Mon Sep 17 00:00:00 2001 From: Tuvie Date: Sat, 28 Jan 2023 17:03:25 +0800 Subject: [PATCH 3/3] reduce bthread_flush when using rdma; avoid ibv_post_send error in a rare case --- src/brpc/rdma/rdma_endpoint.cpp | 47 ++++++++++++++++++++++++--------- 1 file changed, 35 insertions(+), 12 deletions(-) diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp index fe282b640a..d3a91560cd 100644 --- a/src/brpc/rdma/rdma_endpoint.cpp +++ b/src/brpc/rdma/rdma_endpoint.cpp @@ -91,6 +91,7 @@ static const uint8_t MAX_HOP_LIMIT = 16; static const uint8_t TIMEOUT = 14; static const uint8_t RETRY_CNT = 7; static const uint16_t MIN_QP_SIZE = 16; +static const uint16_t MAX_QP_SIZE = 4096; static const uint16_t MIN_BLOCK_SIZE = 1024; static const uint32_t ACK_MSG_RDMA_OK = 0x1; @@ -187,9 +188,15 @@ RdmaEndpoint::RdmaEndpoint(Socket* s) if (_sq_size < MIN_QP_SIZE) { _sq_size = MIN_QP_SIZE; } + if (_sq_size > MAX_QP_SIZE) { + _sq_size = MAX_QP_SIZE; + } if (_rq_size < MIN_QP_SIZE) { _rq_size = MIN_QP_SIZE; } + if (_rq_size > MAX_QP_SIZE) { + _rq_size = MAX_QP_SIZE; + } _read_butex = bthread::butex_create_checked >(); } @@ -1027,7 +1034,7 @@ int RdmaEndpoint::PostRecv(uint32_t num, bool zerocopy) { return 0; } -static RdmaResource* AllocateQpCq() { +static RdmaResource* AllocateQpCq(uint16_t sq_size, uint16_t rq_size) { RdmaResource* res = new (std::nothrow) RdmaResource; if (!res) { return NULL; @@ -1059,8 +1066,18 @@ static RdmaResource* AllocateQpCq() { memset(&attr, 0, sizeof(attr)); attr.send_cq = res->cq; attr.recv_cq = res->cq; - attr.cap.max_send_wr = FLAGS_rdma_prepared_qp_size; - attr.cap.max_recv_wr = FLAGS_rdma_prepared_qp_size; + // NOTE: Since we hope to reduce send completion events, we set signaled + // send_wr every 1/4 of the total wnd. The wnd will increase when the ack + // is received, which means the receive side has already received the data + // in the corresponding send_wr. However, the ack does not mean the send_wr + // has been removed from SQ if it is set unsignaled. The reason is that + // the unsignaled send_wr is removed from SQ only after the CQE of next + // signaled send_wr is polled. Thus in a rare case, a new send_wr cannot be + // posted to SQ even in the wnd is not empty. In order to solve this + // problem, we enlarge the size of SQ to contain redundant 1/4 of the wnd, + // which is the maximum number of unsignaled send_wrs. + attr.cap.max_send_wr = sq_size * 5 / 4; /*NOTE*/ + attr.cap.max_recv_wr = rq_size; attr.cap.max_send_sge = GetRdmaMaxSge(); attr.cap.max_recv_sge = 1; attr.qp_type = IBV_QPT_RC; @@ -1091,7 +1108,7 @@ int RdmaEndpoint::AllocateResources() { } } if (!_resource) { - _resource = AllocateQpCq(); + _resource = AllocateQpCq(_sq_size, _rq_size); } else { _resource->next = NULL; } @@ -1360,6 +1377,7 @@ void RdmaEndpoint::PollCq(Socket* m) { } notified = false; + ssize_t bytes = 0; for (int i = 0; i < cnt; ++i) { if (s->Failed()) { continue; @@ -1380,15 +1398,19 @@ void RdmaEndpoint::PollCq(Socket* m) { s->SetFailed(saved_errno, "Fail to handle rdma completion from %s: %s", s->description().c_str(), berror(saved_errno)); } else if (nr > 0) { - const int64_t received_us = butil::cpuwide_time_us(); - const int64_t base_realtime = butil::gettimeofday_us() - received_us; - InputMessenger* messenger = static_cast(s->user()); - if (messenger->ProcessNewMessage( - s.get(), nr, false, received_us, base_realtime, last_msg) < 0) { - return; - } + bytes += nr; } } + + // Just call PrcessNewMessage once for all of these CQEs. + // Otherwise it may call too many bthread_flush to affect performance. + const int64_t received_us = butil::cpuwide_time_us(); + const int64_t base_realtime = butil::gettimeofday_us() - received_us; + InputMessenger* messenger = static_cast(s->user()); + if (messenger->ProcessNewMessage( + s.get(), bytes, false, received_us, base_realtime, last_msg) < 0) { + return; + } } } @@ -1442,7 +1464,8 @@ int RdmaEndpoint::GlobalInitialize() { g_rdma_resource_mutex = new butil::Mutex; for (int i = 0; i < FLAGS_rdma_prepared_qp_cnt; ++i) { - RdmaResource* res = AllocateQpCq(); + RdmaResource* res = AllocateQpCq(FLAGS_rdma_prepared_qp_size, + FLAGS_rdma_prepared_qp_size); if (!res) { return -1; }