Skip to content

Commit

Permalink
[core] Added UPDATE event raised when a member connection broken (#1509)
Browse files Browse the repository at this point in the history
* Also added to srt-test-live
  • Loading branch information
ethouris committed Sep 15, 2020
1 parent c576662 commit eeddc4c
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 12 deletions.
20 changes: 15 additions & 5 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ void CUDTSocket::makeShutdown()
if (m_IncludedGroup)
{
HLOGC(smlog.Debug, log << "@" << m_SocketID << " IS MEMBER OF $" << m_IncludedGroup->id() << " - REMOVING FROM GROUP");
removeFromGroup();
removeFromGroup(true);
}

HLOGC(smlog.Debug, log << "@" << m_SocketID << " CLOSING AS SOCKET");
Expand Down Expand Up @@ -1452,7 +1452,7 @@ int CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, int ar
// We know it does belong to a group.
// Remove it first because this involves a mutex, and we want
// to avoid locking more than one mutex at a time.
ns->removeFromGroup();
ns->removeFromGroup(false);
erc_rloc = e.getErrorCode();
targets[tii].errorcode = e.getErrorCode();
targets[tii].id = CUDT::INVALID_SOCK;
Expand All @@ -1466,7 +1466,7 @@ int CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, int ar
catch (...)
{
LOGC(aclog.Fatal, log << "groupConnect: IPE: UNKNOWN EXCEPTION from connectIn");
ns->removeFromGroup();
ns->removeFromGroup(false);
targets[tii].errorcode = SRT_ESYSOBJ;
targets[tii].id = CUDT::INVALID_SOCK;
ScopedLock cl (m_GlobControlLock);
Expand Down Expand Up @@ -2993,13 +2993,23 @@ int CUDT::removeSocketFromGroup(SRTSOCKET socket)
return APIError(MJ_NOTSUP, MN_INVAL, 0);

ScopedLock grd (s->m_ControlLock);
s->removeFromGroup();
s->removeFromGroup(false);
return 0;
}

void CUDTSocket::removeFromGroup()
void CUDTSocket::removeFromGroup(bool broken)
{
m_IncludedGroup->remove(m_SocketID);
if (broken)
{
// Activate the SRT_EPOLL_UPDATE event on the group
// if it was because of a socket that was earlier connected
// and became broken. This is not to be sent in case when
// it is a failure during connection, or the socket was
// explicitly removed from the group.
m_IncludedGroup->activateUpdateEvent();
}

m_IncludedIter = CUDTGroup::gli_NULL();
m_IncludedGroup = NULL;
}
Expand Down
2 changes: 1 addition & 1 deletion srtcore/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ class CUDTSocket
/// operation, but continues to be responsive in the connection in order
/// to finish sending the data that were scheduled for sending so far.
void makeShutdown();
void removeFromGroup();
void removeFromGroup(bool broken);

// Instrumentally used by select() and also required for non-blocking
// mode check in groups
Expand Down
5 changes: 5 additions & 0 deletions srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4230,6 +4230,11 @@ void CUDTGroup::updateLatestRcv(CUDTGroup::gli_t current)
}
}

void CUDTGroup::activateUpdateEvent()
{
m_pGlobal->m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_UPDATE, true);
}

void CUDTGroup::addEPoll(int eid)
{
enterCS(m_pGlobal->m_EPoll.m_EPollLock);
Expand Down
1 change: 1 addition & 0 deletions srtcore/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ class CUDTGroup
void updateReadState(SRTSOCKET sock, int32_t sequence);
void updateWriteState();
int updateFailedLink(SRTSOCKET sock);
void activateUpdateEvent();

/// Update the in-group array of packet providers per sequence number.
/// Also basing on the information already provided by possibly other sockets,
Expand Down
91 changes: 85 additions & 6 deletions testing/testmedia.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,32 @@ void SrtCommon::AcceptNewClient()
Error("ConfigurePost");
}

static string PrintEpollEvent(int events, int et_events)
{
static pair<int, const char*> const namemap [] = {
make_pair(SRT_EPOLL_IN, "R"),
make_pair(SRT_EPOLL_OUT, "W"),
make_pair(SRT_EPOLL_ERR, "E"),
make_pair(SRT_EPOLL_UPDATE, "U")
};

ostringstream os;
int N = Size(namemap);

for (int i = 0; i < N; ++i)
{
if (events & namemap[i].first)
{
os << "[";
if (et_events & namemap[i].first)
os << "^";
os << namemap[i].second << "]";
}
}

return os.str();
}

void SrtCommon::Init(string host, int port, string path, map<string,string> par, SRT_EPOLL_OPT dir)
{
m_direction = dir;
Expand Down Expand Up @@ -677,6 +703,11 @@ void SrtCommon::Init(string host, int port, string path, map<string,string> par,
{
// Don't add new epoll if already created as a part
// of group management: if (srt_epoll == -1)...

if (m_mode == "caller")
dir = (dir | SRT_EPOLL_UPDATE);
Verb() << "NON-BLOCKING MODE - SUB FOR " << PrintEpollEvent(dir, 0);

srt_epoll = AddPoller(m_sock, dir);
}
}
Expand Down Expand Up @@ -909,7 +940,7 @@ void SrtCommon::OpenGroupClient()
if ( stat == SRT_ERROR )
Error("ConfigurePre");

if ( !m_blocking_mode )
if (!m_blocking_mode)
{
// Note: here the GROUP is added to the poller.
srt_conn_epoll = AddPoller(m_sock, SRT_EPOLL_CONNECT | SRT_EPOLL_ERR);
Expand Down Expand Up @@ -2147,13 +2178,35 @@ MediaPacket SrtSource::Read(size_t chunk)
// EAGAIN for SRT READING
if (srt_getlasterror(NULL) == SRT_EASYNCRCV)
{
Epoll_again:
Verb() << "AGAIN: - waiting for data by epoll(" << srt_epoll << ")...";
// Poll on this descriptor until reading is available, indefinitely.
int len = 2;
SRTSOCKET sready[2];
if (srt_epoll_wait(srt_epoll, sready, &len, 0, 0, -1, 0, 0, 0, 0) != -1)
SRT_EPOLL_EVENT sready[2];
len = srt_epoll_uwait(srt_epoll, sready, len, -1);
if (len != -1)
{
Verb() << "... epoll reported ready " << len << " sockets";
// If the event was SRT_EPOLL_UPDATE, report it, and still wait.

bool any_read_ready = false;
for (int i = 0; i < len; ++i)
{
if (sready[i].events & SRT_EPOLL_UPDATE)
{
Verb() << "... [BROKEN CONNECTION reported on @" << sready[i].fd << "]";
}

if (sready[i].events & SRT_EPOLL_IN)
any_read_ready = true;
}

if (!any_read_ready)
{
Verb() << " ... [NOT READ READY - AGAIN]";
goto Epoll_again;
}

continue;
}
// If was -1, then passthru.
Expand Down Expand Up @@ -2240,10 +2293,36 @@ void SrtTarget::Write(const MediaPacket& data)
// If not, wait indefinitely.
if (!m_blocking_mode)
{
int ready[2];
Epoll_again:
int len = 2;
if (srt_epoll_wait(srt_epoll, 0, 0, ready, &len, -1, 0, 0, 0, 0) == SRT_ERROR)
Error("srt_epoll_wait");
SRT_EPOLL_EVENT sready[2];
len = srt_epoll_uwait(srt_epoll, sready, len, -1);
if (len != -1)
{
bool any_write_ready = false;
for (int i = 0; i < len; ++i)
{
if (sready[i].events & SRT_EPOLL_UPDATE)
{
Verb() << "... [BROKEN CONNECTION reported on @" << sready[i].fd << "]";
}

if (sready[i].events & SRT_EPOLL_OUT)
any_write_ready = true;
}

if (!any_write_ready)
{
Verb() << " ... [NOT WRITE READY - AGAIN]";
goto Epoll_again;
}

// Pass on, write ready.
}
else
{
Error("srt_epoll_uwait");
}
}

SRT_MSGCTRL mctrl = srt_msgctrl_default;
Expand Down

0 comments on commit eeddc4c

Please sign in to comment.