Skip to content

Commit

Permalink
active_poller and poller_t support for file descriptors
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanschim committed Jul 19, 2023
1 parent c94c207 commit b5ce0ad
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 4 deletions.
16 changes: 16 additions & 0 deletions zmq.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2681,6 +2681,13 @@ template<typename T = no_user_data> class poller_t
}
}

void remove(fd_t fd)
{
if (0 != zmq_poller_remove_fd(poller_ptr.get(), fd)) {
throw error_t();
}
}

void modify(zmq::socket_ref socket, event_flags events)
{
if (0
Expand All @@ -2690,6 +2697,15 @@ template<typename T = no_user_data> class poller_t
}
}

void modify(fd_t fd, event_flags events)
{
if (0
!= zmq_poller_modify_fd(poller_ptr.get(), fd,
static_cast<short>(events))) {
throw error_t();
}
}

size_t wait_all(std::vector<event_type> &poller_events,
const std::chrono::milliseconds timeout)
{
Expand Down
46 changes: 42 additions & 4 deletions zmq_addon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -683,10 +683,12 @@ class active_poller_t

void add(zmq::socket_ref socket, event_flags events, handler_type handler)
{
const Ref ref{socket};

if (!handler)
throw std::invalid_argument("null handler in active_poller_t::add");
throw std::invalid_argument("null handler in active_poller_t::add (socket)");
auto ret = handlers.emplace(
socket, std::make_shared<handler_type>(std::move(handler)));
ref, std::make_shared<handler_type>(std::move(handler)));
if (!ret.second)
throw error_t(EINVAL); // already added
try {
Expand All @@ -695,7 +697,28 @@ class active_poller_t
}
catch (...) {
// rollback
handlers.erase(socket);
handlers.erase(ref);
throw;
}
}

void add(fd_t fd, event_flags events, handler_type handler)
{
const Ref ref{fd};

if (!handler)
throw std::invalid_argument("null handler in active_poller_t::add (fd)");
auto ret = handlers.emplace(
ref, std::make_shared<handler_type>(std::move(handler)));
if (!ret.second)
throw error_t(EINVAL); // already added
try {
base_poller.add(fd, events, ret.first->second.get());
need_rebuild = true;
}
catch (...) {
// rollback
handlers.erase(ref);
throw;
}
}
Expand All @@ -707,11 +730,23 @@ class active_poller_t
need_rebuild = true;
}

void remove(fd_t fd)
{
base_poller.remove(fd);
handlers.erase(fd);
need_rebuild = true;
}

void modify(zmq::socket_ref socket, event_flags events)
{
base_poller.modify(socket, events);
}

void modify(fd_t fd, event_flags events)
{
base_poller.modify(fd, events);
}

size_t wait(std::chrono::milliseconds timeout)
{
if (need_rebuild) {
Expand Down Expand Up @@ -741,7 +776,10 @@ class active_poller_t
bool need_rebuild{false};

poller_t<handler_type> base_poller{};
std::unordered_map<socket_ref, std::shared_ptr<handler_type>> handlers{};

using Ref = std::variant<socket_ref, fd_t>;
std::unordered_map<Ref, std::shared_ptr<handler_type>> handlers{};

std::vector<decltype(base_poller)::event_type> poller_events{};
std::vector<std::shared_ptr<handler_type>> poller_handlers{};
}; // class active_poller_t
Expand Down

0 comments on commit b5ce0ad

Please sign in to comment.