Skip to content

Commit

Permalink
Linux平台下适配recvmmsg接口
Browse files Browse the repository at this point in the history
  • Loading branch information
xia-chu committed Jun 29, 2024
1 parent 79c10fe commit fb695d2
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 78 deletions.
128 changes: 128 additions & 0 deletions src/Network/BufferSock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -466,4 +466,132 @@ BufferList::Ptr BufferList::create(List<std::pair<Buffer::Ptr, bool> > list, Sen
#endif
}

#if defined(__linux) || defined(__linux__)
class SocketRecvmmsgBuffer : public SocketRecvBuffer {
public:
SocketRecvmmsgBuffer(size_t count, size_t size)
: _size(size)
, _iovec(count)
, _mmsgs(count)
, _buffers(count)
, _address(count) {
for (auto i = 0u; i < count; ++i) {
auto buf = BufferRaw::create();
buf->setCapacity(size);

_buffers[i] = buf;
auto &mmsg = _mmsgs[i];
auto &addr = _address[i];
mmsg.msg_len = 0;
mmsg.msg_hdr.msg_name = &addr;
mmsg.msg_hdr.msg_namelen = sizeof(addr);
mmsg.msg_hdr.msg_iov = &_iovec[i];
mmsg.msg_hdr.msg_iov->iov_base = buf->data();
mmsg.msg_hdr.msg_iov->iov_len = buf->getCapacity() - 1;
mmsg.msg_hdr.msg_iovlen = 1;
mmsg.msg_hdr.msg_control = nullptr;
mmsg.msg_hdr.msg_controllen = 0;
mmsg.msg_hdr.msg_flags = 0;
}
}

ssize_t recvFromSocket(int fd, ssize_t &count) override {
for (auto i = 0; i < _last_count; ++i) {
auto &mmsg = _mmsgs[i];
mmsg.msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
auto &buf = _buffers[i];
if (!buf) {
auto raw = BufferRaw::create();
raw->setCapacity(_size);
buf = raw;
mmsg.msg_hdr.msg_iov->iov_base = buf->data();
}
}
do {
count = recvmmsg(fd, &_mmsgs[0], _mmsgs.size(), 0, nullptr);
} while (-1 == count && UV_EINTR == get_uv_error(true));

_last_count = count;
if (count <= 0) {
return count;
}

ssize_t nread = 0;
for (auto i = 0; i < count; ++i) {
auto &mmsg = _mmsgs[i];
nread += mmsg.msg_len;

auto buf = std::static_pointer_cast<BufferRaw>(_buffers[i]);
buf->setSize(mmsg.msg_len);
buf->data()[mmsg.msg_len] = '\0';
}
return nread;
}

Buffer::Ptr &getBuffer(size_t index) override { return _buffers[index]; }

struct sockaddr_storage &getAddress(size_t index) override { return _address[index]; }

private:
size_t _size;
ssize_t _last_count { 0 };
std::vector<struct iovec> _iovec;
std::vector<struct mmsghdr> _mmsgs;
std::vector<Buffer::Ptr> _buffers;
std::vector<struct sockaddr_storage> _address;
};
#endif

class SocketRecvFromBuffer : public SocketRecvBuffer {
public:
SocketRecvFromBuffer(size_t size): _size(size) {}

ssize_t recvFromSocket(int fd, ssize_t &count) override {
ssize_t nread;
socklen_t len = sizeof(_address);
if (!_buffer) {
allocBuffer();
}

do {
nread = recvfrom(fd, _buffer->data(), _buffer->getCapacity() - 1, 0, (struct sockaddr *)&_address, &len);
} while (-1 == nread && UV_EINTR == get_uv_error(true));

if (nread > 0) {
count = 1;
_buffer->data()[nread] = '\0';
std::static_pointer_cast<BufferRaw>(_buffer)->setSize(nread);
}
return nread;
}

Buffer::Ptr &getBuffer(size_t index) override { return _buffer; }

struct sockaddr_storage &getAddress(size_t index) override { return _address; }

private:
void allocBuffer() {
auto buf = BufferRaw::create();
buf->setCapacity(_size);
_buffer = std::move(buf);
}

private:
size_t _size;
Buffer::Ptr _buffer;
struct sockaddr_storage _address;
};

static constexpr auto kPacketCount = 32;
static constexpr auto kBufferCapacity = 4 * 1024u;

SocketRecvBuffer::Ptr SocketRecvBuffer::create(bool is_udp) {
#if defined(__linux) || defined(__linux__)
if (is_udp) {
return std::make_shared<SocketRecvmmsgBuffer>(kPacketCount, kBufferCapacity);
}
#endif
return std::make_shared<SocketRecvFromBuffer>(kPacketCount * kBufferCapacity);
}

} //toolkit
13 changes: 13 additions & 0 deletions src/Network/BufferSock.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,18 @@ class BufferList : public noncopyable {
ObjectStatistic<BufferList> _statistic;
};

class SocketRecvBuffer {
public:
using Ptr = std::shared_ptr<SocketRecvBuffer>;

virtual ~SocketRecvBuffer() = default;

virtual ssize_t recvFromSocket(int fd, ssize_t &count) = 0;
virtual Buffer::Ptr &getBuffer(size_t index) = 0;
virtual struct sockaddr_storage &getAddress(size_t index) = 0;

static Ptr create(bool is_udp);
};

}
#endif //ZLTOOLKIT_BUFFERSOCK_H
50 changes: 27 additions & 23 deletions src/Network/Socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,27 @@ Socket::~Socket() {
}

void Socket::setOnRead(onReadCB cb) {
onMultiReadCB cb2;
if (cb) {
cb2 = [cb](Buffer::Ptr *buf, struct sockaddr_storage *addr, size_t count) {
for (auto i = 0u; i < count; ++i) {
cb(buf[i], (struct sockaddr *)(addr + i), sizeof(struct sockaddr_storage));
}
};
}
setOnMultiRead(std::move(cb2));
}

void Socket::setOnMultiRead(onMultiReadCB cb) {
LOCK_GUARD(_mtx_event);
if (cb) {
_on_read = std::move(cb);
_on_multi_read = std::move(cb);
} else {
_on_read = [](const Buffer::Ptr &buf, struct sockaddr *, int) { WarnL << "Socket not set read callback, data ignored: " << buf->size(); };
_on_multi_read = [](Buffer::Ptr *buf, struct sockaddr_storage *addr, size_t count) {
for (auto i = 0u; i < count; ++i) {
WarnL << "Socket not set read callback, data ignored: " << buf[i]->size();
}
};
}
}

Expand Down Expand Up @@ -246,7 +262,7 @@ bool Socket::attachEvent(const SockNum::Ptr &sock) {
}

// tcp客户端或udp
auto read_buffer = _poller->getSharedBuffer();
auto read_buffer = _poller->getSharedBuffer(sock->type() == SockNum::Sock_UDP);
auto result = _poller->addEvent(sock->rawFd(), EventPoller::Event_Read | EventPoller::Event_Error | EventPoller::Event_Write, [weak_self, sock, read_buffer](int event) {
auto strong_self = weak_self.lock();
if (!strong_self) {
Expand All @@ -267,20 +283,11 @@ bool Socket::attachEvent(const SockNum::Ptr &sock) {
return -1 != result;
}

ssize_t Socket::onRead(const SockNum::Ptr &sock, const BufferRaw::Ptr &buffer) noexcept {
ssize_t ret = 0, nread = 0;
auto data = buffer->data();
// 最后一个字节设置为'\0'
auto capacity = buffer->getCapacity() - 1;

struct sockaddr_storage addr;
socklen_t len = sizeof(addr);
ssize_t Socket::onRead(const SockNum::Ptr &sock, const SocketRecvBuffer::Ptr &buffer) noexcept {
ssize_t ret = 0, nread = 0, count = 0;

while (_enable_recv) {
do {
nread = recvfrom(sock->rawFd(), data, capacity, 0, (struct sockaddr *)&addr, &len);
} while (-1 == nread && UV_EINTR == get_uv_error(true));

nread = buffer->recvFromSocket(sock->rawFd(), count);
if (nread == 0) {
if (sock->type() == SockNum::Sock_TCP) {
emitErr(SockException(Err_eof, "end of file"));
Expand All @@ -302,21 +309,18 @@ ssize_t Socket::onRead(const SockNum::Ptr &sock, const BufferRaw::Ptr &buffer) n
return ret;
}

ret += nread;
if (_enable_speed) {
// 更新接收速率
_recv_speed += nread;
}

ret += nread;
data[nread] = '\0';
// 设置buffer有效数据大小
buffer->setSize(nread);

// 触发回调
LOCK_GUARD(_mtx_event);
auto &buf = buffer->getBuffer(0);
auto &addr = buffer->getAddress(0);
try {
// 此处捕获异常,目的是防止数据未读尽,epoll边沿触发失效的问题
_on_read(buffer, (struct sockaddr *)&addr, len);
LOCK_GUARD(_mtx_event);
_on_multi_read(&buf, &addr, count);
} catch (std::exception &ex) {
ErrorL << "Exception occurred when emit on_read: " << ex.what();
}
Expand Down
69 changes: 36 additions & 33 deletions src/Network/Socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,9 @@ class Socket : public std::enable_shared_from_this<Socket>, public noncopyable,
public:
using Ptr = std::shared_ptr<Socket>;
//接收数据回调
using onReadCB = std::function<void(const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len)>;
using onReadCB = std::function<void(Buffer::Ptr &buf, struct sockaddr *addr, int addr_len)>;
using onMultiReadCB = std::function<void(Buffer::Ptr *buf, struct sockaddr_storage *addr, size_t count)>;

//发生错误回调
using onErrCB = std::function<void(const SockException &err)>;
//tcp监听接收到连接请求
Expand Down Expand Up @@ -352,6 +354,7 @@ class Socket : public std::enable_shared_from_this<Socket>, public noncopyable,
* @param cb 回调对象
*/
void setOnRead(onReadCB cb);
void setOnMultiRead(onMultiReadCB cb);

/**
* 设置异常事件(包括eof等)回调
Expand Down Expand Up @@ -515,7 +518,7 @@ class Socket : public std::enable_shared_from_this<Socket>, public noncopyable,

void setSock(SockNum::Ptr sock);
int onAccept(const SockNum::Ptr &sock, int event) noexcept;
ssize_t onRead(const SockNum::Ptr &sock, const BufferRaw::Ptr &buffer) noexcept;
ssize_t onRead(const SockNum::Ptr &sock, const SocketRecvBuffer::Ptr &buffer) noexcept;
void onWriteAble(const SockNum::Ptr &sock);
void onConnected(const SockNum::Ptr &sock, const onErrCB &cb);
void onFlushed();
Expand All @@ -528,67 +531,67 @@ class Socket : public std::enable_shared_from_this<Socket>, public noncopyable,
bool fromSock_l(SockNum::Ptr sock);

private:
//send socket时的flag
// send socket时的flag
int _sock_flags = SOCKET_DEFAULE_FLAGS;
//最大发送缓存,单位毫秒,距上次发送缓存清空时间不能超过该参数
// 最大发送缓存,单位毫秒,距上次发送缓存清空时间不能超过该参数
uint32_t _max_send_buffer_ms = SEND_TIME_OUT_SEC * 1000;
//控制是否接收监听socket可读事件,关闭后可用于流量控制
std::atomic<bool> _enable_recv {true};
//标记该socket是否可写,socket写缓存满了就不可写
std::atomic<bool> _sendable {true};
//是否已经触发err回调了
// 控制是否接收监听socket可读事件,关闭后可用于流量控制
std::atomic<bool> _enable_recv { true };
// 标记该socket是否可写,socket写缓存满了就不可写
std::atomic<bool> _sendable { true };
// 是否已经触发err回调了
bool _err_emit = false;
//是否启用网速统计
// 是否启用网速统计
bool _enable_speed = false;
// udp发送目标地址
std::shared_ptr<struct sockaddr_storage> _udp_send_dst;

//接收速率统计
// 接收速率统计
BytesSpeed _recv_speed;
//发送速率统计
// 发送速率统计
BytesSpeed _send_speed;

//tcp连接超时定时器
// tcp连接超时定时器
Timer::Ptr _con_timer;
//tcp连接结果回调对象
// tcp连接结果回调对象
std::shared_ptr<void> _async_con_cb;

//记录上次发送缓存(包括socket写缓存、应用层缓存)清空的计时器
// 记录上次发送缓存(包括socket写缓存、应用层缓存)清空的计时器
Ticker _send_flush_ticker;
//socket fd的抽象类
// socket fd的抽象类
SockFD::Ptr _sock_fd;
//本socket绑定的poller线程,事件触发于此线程
// 本socket绑定的poller线程,事件触发于此线程
EventPoller::Ptr _poller;
//跨线程访问_sock_fd时需要上锁
// 跨线程访问_sock_fd时需要上锁
mutable MutexWrapper<std::recursive_mutex> _mtx_sock_fd;

//socket异常事件(比如说断开)
// socket异常事件(比如说断开)
onErrCB _on_err;
//收到数据事件
onReadCB _on_read;
//socket缓存清空事件(可用于发送流速控制)
// 收到数据事件
onMultiReadCB _on_multi_read;
// socket缓存清空事件(可用于发送流速控制)
onFlush _on_flush;
//tcp监听收到accept请求事件
// tcp监听收到accept请求事件
onAcceptCB _on_accept;
//tcp监听收到accept请求,自定义创建peer Socket事件(可以控制子Socket绑定到其他poller线程)
// tcp监听收到accept请求,自定义创建peer Socket事件(可以控制子Socket绑定到其他poller线程)
onCreateSocket _on_before_accept;
//设置上述回调函数的锁
// 设置上述回调函数的锁
MutexWrapper<std::recursive_mutex> _mtx_event;

//一级发送缓存, socket可写时,会把一级缓存批量送入到二级缓存
List<std::pair<Buffer::Ptr, bool> > _send_buf_waiting;
//一级发送缓存锁
// 一级发送缓存, socket可写时,会把一级缓存批量送入到二级缓存
List<std::pair<Buffer::Ptr, bool>> _send_buf_waiting;
// 一级发送缓存锁
MutexWrapper<std::recursive_mutex> _mtx_send_buf_waiting;
//二级发送缓存, socket可写时,会把二级缓存批量写入到socket
// 二级发送缓存, socket可写时,会把二级缓存批量写入到socket
List<BufferList::Ptr> _send_buf_sending;
//二级发送缓存锁
// 二级发送缓存锁
MutexWrapper<std::recursive_mutex> _mtx_send_buf_sending;
//发送buffer结果回调
// 发送buffer结果回调
BufferList::SendResult _send_result;
//对象个数统计
// 对象个数统计
ObjectStatistic<Socket> _statistic;

//链接缓存地址,防止tcp reset 导致无法获取对端的地址
// 链接缓存地址,防止tcp reset 导致无法获取对端的地址
struct sockaddr_storage _local_addr;
struct sockaddr_storage _peer_addr;
};
Expand Down
Loading

0 comments on commit fb695d2

Please sign in to comment.