Skip to content

Commit

Permalink
[fix] fix some bug in epoll poll
Browse files Browse the repository at this point in the history
  • Loading branch information
liuli-neko committed Sep 23, 2024
1 parent 8f9d750 commit aa58c09
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 46 deletions.
1 change: 1 addition & 0 deletions include/ilias/http/transfer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <ilias/http/headers.hpp>
#include <ilias/url.hpp>
#include <span>
#include <memory>

ILIAS_NS_BEGIN

Expand Down
4 changes: 2 additions & 2 deletions include/ilias/platform/detail/epoll_event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ class EpollAwaiter {
auto ret = 0;
ret = ::epoll_ctl(mEpollEvent.epollfd, EPOLL_CTL_MOD, mEpollEvent.fd, &event);
if (ret == -1) {
ILIAS_ERROR("Epoll", "epoll_ctl error: {}", strerror(errno));
ILIAS_ERROR("Epoll", "epoll_ctl {} error: {}", mEpollEvent.fd, strerror(errno));
mEpollError = errno;
return true;
}
ILIAS_TRACE("Epoll", "epoll_ctl success: {}", (uint32_t)event.events);
ILIAS_TRACE("Epoll", "epoll_ctl : {}, fd: {}", (uint32_t)event.events, mEpollEvent.fd);
return false; //< Wating Epoll
}
auto await_suspend(TaskView<> caller) -> void {
Expand Down
94 changes: 50 additions & 44 deletions include/ilias/platform/epoll.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,15 @@ ILIAS_NS_BEGIN
namespace detail {

/**
* @brief The IOCP descriptor, if alloc is too frequently, maybe we can use memory pool
* @brief The Epoll descriptor, if alloc is too frequently, maybe we can use memory pool
*
*/
class EpollDescriptor final : public IoDescriptor {
public:
int fd = -1;
IoDescriptor::Type type = Unknown;
bool isPollable = false;
bool isRemoved = false;

// < For Socket data
struct {
Expand Down Expand Up @@ -208,8 +209,8 @@ inline auto EpollContext::addDescriptor(fd_t fd, IoDescriptor::Type type) -> Res
if (type == IoDescriptor::Pipe || type == IoDescriptor::Tty) {
nfd->isPollable = true;
}

auto ret = fcntl(fd, F_SETFD, O_NONBLOCK);
int flags = fcntl(fd, F_GETFL, 0);
auto ret = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
if (ret == -1) {
ILIAS_WARN("Epoll", "Failed to set descriptor to non-blocking. error: {}", SystemError::fromErrno());
}
Expand All @@ -236,7 +237,17 @@ inline auto EpollContext::removeDescriptor(IoDescriptor *fd) -> Result<void> {
}
}
mDescriptors.erase(descriptor->fd);
delete descriptor;
auto epollret = ::epoll_ctl(mEpollFd, EPOLL_CTL_DEL, descriptor->fd, nullptr); // remove from epoll control
if (epollret != 0 && errno != ENOENT) {
ILIAS_WARN("Epoll", "Failed to remove fd {} from epoll: {}", descriptor->fd, strerror(errno));
}
descriptor->isRemoved = true; // make removed flag, ask poll this descriptor is removed.
if (descriptor->events.size() == 0) {
delete descriptor;
}
else {
post(+[](void *descriptor) { delete static_cast<detail::EpollDescriptor *>(descriptor); }, descriptor);
}
if (ret != 0) {
return Unexpected<Error>(SystemError(EALREADY));
}
Expand All @@ -257,11 +268,9 @@ inline auto EpollContext::read(IoDescriptor *fd, ::std::span<::std::byte> buffer
}
while (true) {
int ret = 0;
if (descriptor->type == detail::EpollDescriptor::Socket) {
ret = ::recv(descriptor->fd, buffer.data(), buffer.size(), MSG_DONTWAIT);
}
else if (descriptor->type == detail::EpollDescriptor::File) {
ret = ::pread(descriptor->fd, buffer.data(), buffer.size(), offset.value_or(0));
if (offset.has_value()) {
ILIAS_ASSERT(descriptor->type == detail::EpollDescriptor::File);
ret = ::pread(descriptor->fd, buffer.data(), buffer.size(), offset.value());
}
else {
ret = ::read(descriptor->fd, buffer.data(), buffer.size());
Expand All @@ -283,6 +292,7 @@ inline auto EpollContext::read(IoDescriptor *fd, ::std::span<::std::byte> buffer
inline auto EpollContext::write(IoDescriptor *fd, ::std::span<const ::std::byte> buffer,
::std::optional<size_t> offset) -> Task<size_t> {
auto descriptor = static_cast<detail::EpollDescriptor *>(fd);
ILIAS_TRACE("Epoll", "fd {} write {} bytes", descriptor->fd, buffer.size());
ILIAS_ASSERT(descriptor != nullptr);
if (!descriptor->isPollable) {
// not supported operation
Expand All @@ -291,11 +301,9 @@ inline auto EpollContext::write(IoDescriptor *fd, ::std::span<const ::std::byte>
ILIAS_ASSERT(descriptor->type != detail::EpollDescriptor::Unknown);
while (true) {
int ret = 0;
if (descriptor->type == detail::EpollDescriptor::Socket) {
ret = ::send(descriptor->fd, buffer.data(), buffer.size(), MSG_DONTWAIT);
}
else if (descriptor->type == detail::EpollDescriptor::File) {
ret = ::pwrite(descriptor->fd, buffer.data(), buffer.size(), offset.value_or(0));
if (offset.has_value()) {
ILIAS_ASSERT(descriptor->type == detail::EpollDescriptor::File);
ret = ::pwrite(descriptor->fd, buffer.data(), buffer.size(), offset.value());
}
else {
ret = ::write(descriptor->fd, buffer.data(), buffer.size());
Expand Down Expand Up @@ -348,20 +356,20 @@ inline auto EpollContext::connect(IoDescriptor *fd, const IPEndpoint &endpoint)

inline auto EpollContext::accept(IoDescriptor *fd, IPEndpoint *remoteEndpoint) -> Task<socket_t> {
auto descriptor = static_cast<detail::EpollDescriptor *>(fd);
ILIAS_TRACE("Epoll", "fd {} accept", descriptor->fd);
ILIAS_ASSERT(descriptor != nullptr);
ILIAS_ASSERT(descriptor->type == IoDescriptor::Socket);
ILIAS_ASSERT(descriptor->fd != -1);
auto socket = SocketView(descriptor->fd);
while (true) {
sockaddr_storage storage;
socklen_t storageLen = sizeof(storage);
auto ret = ::accept(descriptor->fd, reinterpret_cast<sockaddr *>(&storage), &storageLen);
if (ret == 0) {
auto ret = socket.accept<socket_t>();
if (ret.has_value()) {
if (remoteEndpoint != nullptr) {
*remoteEndpoint->fromRaw(&storage, storageLen);
*remoteEndpoint = ret.value().second;
}
co_return ret;
co_return ret.value().first;
}
if (ret == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
if (ret.error() != SystemError(EAGAIN) && ret.error() != SystemError(EWOULDBLOCK)) {
co_return Unexpected(SystemError::fromErrno());
}
auto pollRet = co_await poll(fd, EPOLLIN);
Expand All @@ -375,23 +383,25 @@ inline auto EpollContext::accept(IoDescriptor *fd, IPEndpoint *remoteEndpoint) -
inline auto EpollContext::sendto(IoDescriptor *fd, ::std::span<const ::std::byte> buffer, int flags,
const IPEndpoint *endpoint) -> Task<size_t> {
auto descriptor = static_cast<detail::EpollDescriptor *>(fd);
ILIAS_TRACE("Epoll", "fd {} sendto", descriptor->fd);
ILIAS_ASSERT(descriptor != nullptr);
ILIAS_ASSERT(descriptor->type == IoDescriptor::Socket);
ILIAS_ASSERT(descriptor->fd != -1);
SocketView socket(descriptor->fd);
while (true) {
int ret = -1;
Result<size_t> ret;
if (endpoint != nullptr) {
ret = ::sendto(descriptor->fd, buffer.data(), buffer.size(), flags, endpoint->cast<sockaddr *>(),
endpoint->length());
ret = socket.sendto(buffer, flags | MSG_DONTWAIT, *endpoint);
}
else {
ret = ::send(descriptor->fd, buffer.data(), buffer.size(), MSG_DONTWAIT);
ret = socket.send(buffer, flags | MSG_DONTWAIT);
}
if (ret >= 0) {
if (ret.has_value()) {
co_return ret;
}
else if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
co_return Unexpected(SystemError::fromErrno());
else if (ret.error() != SystemError(EINTR) && ret.error() != SystemError(EAGAIN) &&
ret.error() != SystemError(EWOULDBLOCK)) {
co_return ret;
}
auto pollRet = co_await poll(descriptor, EPOLLOUT);
if (!pollRet && pollRet.error() != SystemError(EINTR) && pollRet.error() != SystemError(EAGAIN)) {
Expand All @@ -404,28 +414,20 @@ inline auto EpollContext::sendto(IoDescriptor *fd, ::std::span<const ::std::byte
inline auto EpollContext::recvfrom(IoDescriptor *fd, ::std::span<::std::byte> buffer, int flags,
IPEndpoint *endpoint) -> Task<size_t> {
auto descriptor = static_cast<detail::EpollDescriptor *>(fd);
ILIAS_TRACE("Epoll", "fd {} recvfrom", descriptor->fd);
ILIAS_ASSERT(descriptor != nullptr);
ILIAS_ASSERT(descriptor->type == IoDescriptor::Socket);
ILIAS_ASSERT(descriptor->fd != -1);
SocketView socket(descriptor->fd);
while (true) {
int ret = -1;
if (endpoint != nullptr) {
sockaddr_storage storage;
socklen_t len = sizeof(storage);
ret = ::recvfrom(descriptor->fd, buffer.data(), buffer.size(), flags,
reinterpret_cast<sockaddr *>(&storage), &len);
if (ret >= 0) {
endpoint->fromRaw(&storage, len);
}
}
else {
ret = ::recv(descriptor->fd, buffer.data(), buffer.size(), MSG_DONTWAIT);
}
if (ret >= 0) {
Result<size_t> ret;
ret = socket.recvfrom(buffer, flags | MSG_DONTWAIT, endpoint);
if (ret.has_value()) {
co_return ret;
}
else if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
co_return Unexpected(SystemError::fromErrno());
else if (ret.error() != SystemError(EINTR) && ret.error() != SystemError(EAGAIN) &&
ret.error() != SystemError(EWOULDBLOCK)) {
co_return ret;
}
auto pollRet = co_await poll(descriptor, EPOLLIN);
if (!pollRet && pollRet.error() != SystemError(EINTR) && pollRet.error() != SystemError(EAGAIN)) {
Expand Down Expand Up @@ -474,6 +476,9 @@ inline auto EpollContext::poll(IoDescriptor *fd, uint32_t event) -> Task<uint32_
auto ret = co_await detail::EpollAwaiter(epollevent, listenings_events);
ILIAS_TRACE("Epoll", "epoll_wait returned {}",
ret.has_value() ? std::to_string(ret.value()) : ret.error().message());
if (descriptor->isRemoved) {
co_return ret; // descriptor has been removed, this descriptor while deleted by next event.
}
// remove event from descriptor's event queue
eventsIt->second.erase(epolleventIt);
if (eventsIt->second.empty()) {
Expand All @@ -491,6 +496,7 @@ inline auto EpollContext::poll(IoDescriptor *fd, uint32_t event) -> Task<uint32_

inline auto EpollContext::post(void (*fn)(void *), void *args) -> void {
ILIAS_ASSERT(fn != nullptr);
ILIAS_TRACE("Epoll", "Posting task {}, args: {}", (void *)fn, args);
PostTask task {fn, args};
char buf[sizeof(PostTask)] = {0};
memcpy(buf, &task, sizeof(task));
Expand Down
4 changes: 4 additions & 0 deletions tests/unit/net/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@
using namespace ILIAS_NAMESPACE;

TEST(Net, TcpTransfer) {
ILIAS_LOG_SET_LEVEL(ILIAS_TRACE_LEVEL);
auto ctxt = IoContext::currentThread();
ILIAS_TRACE("test", "create io context");
TcpListener listener(*ctxt, AF_INET);
ILIAS_TRACE("test", "create listener");

ASSERT_TRUE(listener.bind("127.0.0.1:0"));
ILIAS_TRACE("test", "listener to 127.0.0.1");
auto endpoint = listener.localEndpoint().value();
std::cout << endpoint.toString() << std::endl;

Expand Down

0 comments on commit aa58c09

Please sign in to comment.