Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

{EPOLL} New API functions #872

Merged
merged 4 commits into from
Sep 12, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1433,6 +1433,20 @@ int CUDTUnited::epoll_wait(
return m_EPoll.wait(eid, readfds, writefds, msTimeOut, lrfds, lwfds);
}

int CUDTUnited::epoll_uwait(
const int eid,
SRT_EPOLL_EVENT* fdsSet,
int fdsSize,
int64_t msTimeOut)
{
return m_EPoll.uwait(eid, fdsSet, fdsSize, msTimeOut);
}

int32_t CUDTUnited::epoll_set(int eid, int32_t flags)
{
return m_EPoll.setflags(eid, flags);
}

int CUDTUnited::epoll_release(const int eid)
{
return m_EPoll.release(eid);
Expand Down Expand Up @@ -2663,6 +2677,52 @@ int CUDT::epoll_wait(
}
}

int CUDT::epoll_uwait(
const int eid,
SRT_EPOLL_EVENT* fdsSet,
int fdsSize,
int64_t msTimeOut)
{
try
{
return s_UDTUnited.epoll_uwait(eid, fdsSet, fdsSize, msTimeOut);
}
catch (CUDTException e)
{
s_UDTUnited.setError(new CUDTException(e));
return ERROR;
}
catch (std::exception& ee)
{
LOGC(mglog.Fatal, log << "epoll_uwait: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
s_UDTUnited.setError(new CUDTException(MJ_UNKNOWN, MN_NONE, 0));
return ERROR;
}
}

int32_t CUDT::epoll_set(
const int eid,
int32_t flags)
{
try
{
return s_UDTUnited.epoll_set(eid, flags);
}
catch (CUDTException e)
{
s_UDTUnited.setError(new CUDTException(e));
return ERROR;
}
catch (std::exception& ee)
{
LOGC(mglog.Fatal, log << "epoll_set: UNEXPECTED EXCEPTION: "
<< typeid(ee).name() << ": " << ee.what());
s_UDTUnited.setError(new CUDTException(MJ_UNKNOWN, MN_NONE, 0));
return ERROR;
}
}

int CUDT::epoll_release(const int eid)
{
try
Expand Down Expand Up @@ -3102,6 +3162,11 @@ int epoll_wait2(
return ret;
}

int epoll_uwait(int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut)
{
return CUDT::epoll_uwait(eid, fdsSet, fdsSize, msTimeOut);
}

int epoll_release(int eid)
{
return CUDT::epoll_release(eid);
Expand Down
2 changes: 2 additions & 0 deletions srtcore/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ friend class CRendezvousQueue;
int epoll_update_usock(const int eid, const SRTSOCKET u, const int* events = NULL);
int epoll_update_ssock(const int eid, const SYSSOCKET s, const int* events = NULL);
int epoll_wait(const int eid, std::set<SRTSOCKET>* readfds, std::set<SRTSOCKET>* writefds, int64_t msTimeOut, std::set<SYSSOCKET>* lrfds = NULL, std::set<SYSSOCKET>* lwfds = NULL);
int epoll_uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut);
int32_t epoll_set(const int eid, int32_t flags);
int epoll_release(const int eid);

/// record the UDT exception.
Expand Down
2 changes: 2 additions & 0 deletions srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ class CUDT
static int epoll_update_usock(const int eid, const SRTSOCKET u, const int* events = NULL);
static int epoll_update_ssock(const int eid, const SYSSOCKET s, const int* events = NULL);
static int epoll_wait(const int eid, std::set<SRTSOCKET>* readfds, std::set<SRTSOCKET>* writefds, int64_t msTimeOut, std::set<SYSSOCKET>* lrfds = NULL, std::set<SYSSOCKET>* wrfds = NULL);
static int epoll_uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut);
static int32_t epoll_set(const int eid, int32_t flags);
static int epoll_release(const int eid);
static CUDTException& getlasterror();
static int perfmon(SRTSOCKET u, CPerfMon* perf, bool clear = true);
Expand Down
89 changes: 89 additions & 0 deletions srtcore/epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,95 @@ int CEPoll::update_ssock(const int eid, const SYSSOCKET& s, const int* events)
return 0;
}

int CEPoll::setflags(const int eid, int32_t flags)
{
CGuard pg(m_EPollLock);
map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
if (p == m_mPolls.end())
throw CUDTException(MJ_NOTSUP, MN_EIDINVAL);
CEPollDesc& ed = p->second;

int32_t oflags = ed.flags();

if (flags == -1)
return oflags;

if (flags == 0)
{
ed.clr_flags(~int32_t());
}
else
{
ed.set_flags(flags);
}

return oflags;
}

int CEPoll::uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut)
{
// It is allowed to call this function witn fdsSize == 0
// and therefore also NULL fdsSet. This will then only report
// the number of ready sockets, just without information which.
if (fdsSize < 0 || (fdsSize > 0 && !fdsSet))
throw CUDTException(MJ_NOTSUP, MN_INVAL);

int64_t entertime = CTimer::getTime();

while (true)
{
{
CGuard pg(m_EPollLock);
map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
if (p == m_mPolls.end())
throw CUDTException(MJ_NOTSUP, MN_EIDINVAL);
CEPollDesc& ed = p->second;

if (!ed.flags(SRT_EPOLL_ENABLE_EMPTY) && ed.watch_empty())
{
// Empty EID is not allowed, report error.
throw CUDTException(MJ_NOTSUP, MN_INVAL);
}

if (ed.flags(SRT_EPOLL_ENABLE_OUTPUTCHECK) && (fdsSet == NULL || fdsSize == 0))
{
// Empty EID is not allowed, report error.
throw CUDTException(MJ_NOTSUP, MN_INVAL);
}

if (!ed.m_sLocals.empty())
{
// XXX Add error log
// uwait should not be used with EIDs subscribed to system sockets
throw CUDTException(MJ_NOTSUP, MN_INVAL);
}

int total = 0; // This is a list, so count it during iteration
CEPollDesc::enotice_t::iterator i = ed.enotice_begin();
while (i != ed.enotice_end())
{
int pos = total; // previous past-the-end position
++total;

if (total > fdsSize)
break;

fdsSet[pos] = *i;

ed.checkEdge(i++); // NOTE: potentially deletes `i`
}
if (total)
return total;
}

if ((msTimeOut >= 0) && (int64_t(CTimer::getTime() - entertime) >= msTimeOut * int64_t(1000)))
break; // official wait does: throw CUDTException(MJ_AGAIN, MN_XMTIMEOUT, 0);

CTimer::waitForEvent();
}

return 0;
}

int CEPoll::wait(const int eid, set<SRTSOCKET>* readfds, set<SRTSOCKET>* writefds, int64_t msTimeOut, set<SYSSOCKET>* lrfds, set<SYSSOCKET>* lwfds)
{
Expand Down
12 changes: 12 additions & 0 deletions srtcore/epoll.h
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,16 @@ friend class CRendezvousQueue;

int wait(const int eid, std::set<SRTSOCKET>* readfds, std::set<SRTSOCKET>* writefds, int64_t msTimeOut, std::set<SYSSOCKET>* lrfds, std::set<SYSSOCKET>* lwfds);

/// wait for EPoll events or timeout optimized with explicit EPOLL_ERR event and the edge mode option.
/// @param [in] eid EPoll ID.
/// @param [out] fdsSet array of user socket events (SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR).
/// @param [int] fdsSize of fds array
/// @param [in] msTimeOut timeout threshold, in milliseconds.
/// @param [bool] edgeMode if true the events returned in fdsSet are then erased
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no 'edgeMode' parameter in the current version of the function

/// @return total of available events in the epoll system (can be greater than fdsSize)

int uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut);

/// close and release an EPoll.
/// @param [in] eid EPoll ID.
/// @return 0 if success, otherwise an error number.
Expand All @@ -358,6 +368,8 @@ friend class CRendezvousQueue;

int update_events(const SRTSOCKET& uid, std::set<int>& eids, int events, bool enable);

int setflags(const int eid, int32_t flags);

private:
int m_iIDSeed; // seed to generate a new ID
pthread_mutex_t m_SeedLock;
Expand Down
3 changes: 3 additions & 0 deletions srtcore/srt.h
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,9 @@ typedef struct SRT_EPOLL_EVENT_
SRTSOCKET fd;
int events; // SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR
} SRT_EPOLL_EVENT;
SRT_API int srt_epoll_uwait(int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut);

SRT_API int32_t srt_epoll_set(int eid, int32_t flags);
SRT_API int srt_epoll_release(int eid);

// Logging control
Expand Down
14 changes: 14 additions & 0 deletions srtcore/srt_c_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,20 @@ int srt_epoll_wait(
lrfds, lrnum, lwfds, lwnum);
}

int srt_epoll_uwait(int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut)
{
return UDT::epoll_uwait(
eid,
fdsSet,
fdsSize,
msTimeOut);
}

// use this function to set flags. Default flags are always "everything unset".
// Pass 0 here to clear everything, or nonzero to set a desired flag.
// Pass -1 to not change anything (but still get the current flag value).
int32_t srt_epoll_set(int eid, int32_t flags) { return CUDT::epoll_set(eid, flags); }

int srt_epoll_release(int eid) { return CUDT::epoll_release(eid); }

void srt_setloglevel(int ll)
Expand Down
1 change: 1 addition & 0 deletions srtcore/udt.h
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ UDT_API int epoll_wait(int eid, std::set<UDTSOCKET>* readfds, std::set<UDTSOCKET
std::set<SYSSOCKET>* lrfds = NULL, std::set<SYSSOCKET>* wrfds = NULL);
UDT_API int epoll_wait2(int eid, UDTSOCKET* readfds, int* rnum, UDTSOCKET* writefds, int* wnum, int64_t msTimeOut,
SYSSOCKET* lrfds = NULL, int* lrnum = NULL, SYSSOCKET* lwfds = NULL, int* lwnum = NULL);
UDT_API int epoll_uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut);
UDT_API int epoll_release(int eid);
UDT_API ERRORINFO& getlasterror();
UDT_API int getlasterror_code();
Expand Down