diff --git a/srtcore/api.cpp b/srtcore/api.cpp index d9875a8f4..65760da61 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -1433,6 +1433,20 @@ int CUDTUnited::epoll_wait( return m_EPoll.wait(eid, readfds, writefds, msTimeOut, lrfds, lwfds); } +int CUDTUnited::epoll_uwait( + const int eid, + SRT_EPOLL_EVENT* fdsSet, + int fdsSize, + int64_t msTimeOut) +{ + return m_EPoll.uwait(eid, fdsSet, fdsSize, msTimeOut); +} + +int32_t CUDTUnited::epoll_set(int eid, int32_t flags) +{ + return m_EPoll.setflags(eid, flags); +} + int CUDTUnited::epoll_release(const int eid) { return m_EPoll.release(eid); @@ -2663,6 +2677,52 @@ int CUDT::epoll_wait( } } +int CUDT::epoll_uwait( + const int eid, + SRT_EPOLL_EVENT* fdsSet, + int fdsSize, + int64_t msTimeOut) +{ + try + { + return s_UDTUnited.epoll_uwait(eid, fdsSet, fdsSize, msTimeOut); + } + catch (CUDTException e) + { + s_UDTUnited.setError(new CUDTException(e)); + return ERROR; + } + catch (std::exception& ee) + { + LOGC(mglog.Fatal, log << "epoll_uwait: UNEXPECTED EXCEPTION: " + << typeid(ee).name() << ": " << ee.what()); + s_UDTUnited.setError(new CUDTException(MJ_UNKNOWN, MN_NONE, 0)); + return ERROR; + } +} + +int32_t CUDT::epoll_set( + const int eid, + int32_t flags) +{ + try + { + return s_UDTUnited.epoll_set(eid, flags); + } + catch (CUDTException e) + { + s_UDTUnited.setError(new CUDTException(e)); + return ERROR; + } + catch (std::exception& ee) + { + LOGC(mglog.Fatal, log << "epoll_set: UNEXPECTED EXCEPTION: " + << typeid(ee).name() << ": " << ee.what()); + s_UDTUnited.setError(new CUDTException(MJ_UNKNOWN, MN_NONE, 0)); + return ERROR; + } +} + int CUDT::epoll_release(const int eid) { try @@ -3102,6 +3162,11 @@ int epoll_wait2( return ret; } +int epoll_uwait(int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut) +{ + return CUDT::epoll_uwait(eid, fdsSet, fdsSize, msTimeOut); +} + int epoll_release(int eid) { return CUDT::epoll_release(eid); diff --git a/srtcore/api.h b/srtcore/api.h index 2036d5d7c..54d1713ef 100644 --- a/srtcore/api.h +++ b/srtcore/api.h @@ -190,6 +190,8 @@ friend class CRendezvousQueue; int epoll_update_usock(const int eid, const SRTSOCKET u, const int* events = NULL); int epoll_update_ssock(const int eid, const SYSSOCKET s, const int* events = NULL); int epoll_wait(const int eid, std::set* readfds, std::set* writefds, int64_t msTimeOut, std::set* lrfds = NULL, std::set* lwfds = NULL); + int epoll_uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut); + int32_t epoll_set(const int eid, int32_t flags); int epoll_release(const int eid); /// record the UDT exception. diff --git a/srtcore/core.h b/srtcore/core.h index 183c0a253..043a438a0 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -200,6 +200,8 @@ class CUDT static int epoll_update_usock(const int eid, const SRTSOCKET u, const int* events = NULL); static int epoll_update_ssock(const int eid, const SYSSOCKET s, const int* events = NULL); static int epoll_wait(const int eid, std::set* readfds, std::set* writefds, int64_t msTimeOut, std::set* lrfds = NULL, std::set* wrfds = NULL); + static int epoll_uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut); + static int32_t epoll_set(const int eid, int32_t flags); static int epoll_release(const int eid); static CUDTException& getlasterror(); static int perfmon(SRTSOCKET u, CPerfMon* perf, bool clear = true); diff --git a/srtcore/epoll.cpp b/srtcore/epoll.cpp index a2b8d51d3..99f75fb44 100644 --- a/srtcore/epoll.cpp +++ b/srtcore/epoll.cpp @@ -326,6 +326,95 @@ int CEPoll::update_ssock(const int eid, const SYSSOCKET& s, const int* events) return 0; } +int CEPoll::setflags(const int eid, int32_t flags) +{ + CGuard pg(m_EPollLock); + map::iterator p = m_mPolls.find(eid); + if (p == m_mPolls.end()) + throw CUDTException(MJ_NOTSUP, MN_EIDINVAL); + CEPollDesc& ed = p->second; + + int32_t oflags = ed.flags(); + + if (flags == -1) + return oflags; + + if (flags == 0) + { + ed.clr_flags(~int32_t()); + } + else + { + ed.set_flags(flags); + } + + return oflags; +} + +int CEPoll::uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut) +{ + // It is allowed to call this function witn fdsSize == 0 + // and therefore also NULL fdsSet. This will then only report + // the number of ready sockets, just without information which. + if (fdsSize < 0 || (fdsSize > 0 && !fdsSet)) + throw CUDTException(MJ_NOTSUP, MN_INVAL); + + int64_t entertime = CTimer::getTime(); + + while (true) + { + { + CGuard pg(m_EPollLock); + map::iterator p = m_mPolls.find(eid); + if (p == m_mPolls.end()) + throw CUDTException(MJ_NOTSUP, MN_EIDINVAL); + CEPollDesc& ed = p->second; + + if (!ed.flags(SRT_EPOLL_ENABLE_EMPTY) && ed.watch_empty()) + { + // Empty EID is not allowed, report error. + throw CUDTException(MJ_NOTSUP, MN_INVAL); + } + + if (ed.flags(SRT_EPOLL_ENABLE_OUTPUTCHECK) && (fdsSet == NULL || fdsSize == 0)) + { + // Empty EID is not allowed, report error. + throw CUDTException(MJ_NOTSUP, MN_INVAL); + } + + if (!ed.m_sLocals.empty()) + { + // XXX Add error log + // uwait should not be used with EIDs subscribed to system sockets + throw CUDTException(MJ_NOTSUP, MN_INVAL); + } + + int total = 0; // This is a list, so count it during iteration + CEPollDesc::enotice_t::iterator i = ed.enotice_begin(); + while (i != ed.enotice_end()) + { + int pos = total; // previous past-the-end position + ++total; + + if (total > fdsSize) + break; + + fdsSet[pos] = *i; + + ed.checkEdge(i++); // NOTE: potentially deletes `i` + } + if (total) + return total; + } + + if ((msTimeOut >= 0) && (int64_t(CTimer::getTime() - entertime) >= msTimeOut * int64_t(1000))) + break; // official wait does: throw CUDTException(MJ_AGAIN, MN_XMTIMEOUT, 0); + + CTimer::waitForEvent(); + } + + return 0; +} int CEPoll::wait(const int eid, set* readfds, set* writefds, int64_t msTimeOut, set* lrfds, set* lwfds) { diff --git a/srtcore/epoll.h b/srtcore/epoll.h old mode 100644 new mode 100755 index 527fa956a..014323df8 --- a/srtcore/epoll.h +++ b/srtcore/epoll.h @@ -341,6 +341,15 @@ friend class CRendezvousQueue; int wait(const int eid, std::set* readfds, std::set* writefds, int64_t msTimeOut, std::set* lrfds, std::set* lwfds); + /// wait for EPoll events or timeout optimized with explicit EPOLL_ERR event and the edge mode option. + /// @param [in] eid EPoll ID. + /// @param [out] fdsSet array of user socket events (SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR). + /// @param [int] fdsSize of fds array + /// @param [in] msTimeOut timeout threshold, in milliseconds. + /// @return total of available events in the epoll system (can be greater than fdsSize) + + int uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut); + /// close and release an EPoll. /// @param [in] eid EPoll ID. /// @return 0 if success, otherwise an error number. @@ -358,6 +367,8 @@ friend class CRendezvousQueue; int update_events(const SRTSOCKET& uid, std::set& eids, int events, bool enable); + int setflags(const int eid, int32_t flags); + private: int m_iIDSeed; // seed to generate a new ID pthread_mutex_t m_SeedLock; diff --git a/srtcore/srt.h b/srtcore/srt.h index 91ab48f66..65fd2c7de 100644 --- a/srtcore/srt.h +++ b/srtcore/srt.h @@ -685,6 +685,9 @@ typedef struct SRT_EPOLL_EVENT_ SRTSOCKET fd; int events; // SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR } SRT_EPOLL_EVENT; +SRT_API int srt_epoll_uwait(int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut); + +SRT_API int32_t srt_epoll_set(int eid, int32_t flags); SRT_API int srt_epoll_release(int eid); // Logging control diff --git a/srtcore/srt_c_api.cpp b/srtcore/srt_c_api.cpp index 67244ff08..56d875f7b 100644 --- a/srtcore/srt_c_api.cpp +++ b/srtcore/srt_c_api.cpp @@ -251,6 +251,20 @@ int srt_epoll_wait( lrfds, lrnum, lwfds, lwnum); } +int srt_epoll_uwait(int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut) +{ + return UDT::epoll_uwait( + eid, + fdsSet, + fdsSize, + msTimeOut); +} + +// use this function to set flags. Default flags are always "everything unset". +// Pass 0 here to clear everything, or nonzero to set a desired flag. +// Pass -1 to not change anything (but still get the current flag value). +int32_t srt_epoll_set(int eid, int32_t flags) { return CUDT::epoll_set(eid, flags); } + int srt_epoll_release(int eid) { return CUDT::epoll_release(eid); } void srt_setloglevel(int ll) diff --git a/srtcore/udt.h b/srtcore/udt.h index 1aab44623..77f903bd5 100644 --- a/srtcore/udt.h +++ b/srtcore/udt.h @@ -376,6 +376,7 @@ UDT_API int epoll_wait(int eid, std::set* readfds, std::set* lrfds = NULL, std::set* wrfds = NULL); UDT_API int epoll_wait2(int eid, UDTSOCKET* readfds, int* rnum, UDTSOCKET* writefds, int* wnum, int64_t msTimeOut, SYSSOCKET* lrfds = NULL, int* lrnum = NULL, SYSSOCKET* lwfds = NULL, int* lwnum = NULL); +UDT_API int epoll_uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut); UDT_API int epoll_release(int eid); UDT_API ERRORINFO& getlasterror(); UDT_API int getlasterror_code();