Skip to content

Commit

Permalink
[core] Fixed bug: if all group connection failed, E epoll event shoul…
Browse files Browse the repository at this point in the history
…d be set on the group (#1527).

Also fixed the group-connection procedure in srt-test-live application in order to:
- Wait for async connection, if non-blocking mode was set
- Get the success or failure statement from the waiting function
- Delete the configuration only when the link table is destroyed (immediate destruction would crash async connection).
  • Loading branch information
ethouris authored Sep 3, 2020
1 parent 633da74 commit 5469cd3
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 50 deletions.
6 changes: 5 additions & 1 deletion srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10631,7 +10631,11 @@ bool CUDT::checkExpTimer(const steady_clock::time_point& currtime, int check_rea

// app can call any UDT API to learn the connection_broken error
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR, true);

if (m_parent->m_IncludedGroup)
{
// Bound to one call because this requires locking
m_parent->m_IncludedGroup->updateFailedLink(m_SocketID);
}
CGlobEvent::triggerEvent();

return true;
Expand Down
37 changes: 37 additions & 0 deletions srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4242,6 +4242,43 @@ void CUDTGroup::removeEPollID(const int eid)
leaveCS(m_pGlobal->m_EPoll.m_EPollLock);
}

int CUDTGroup::updateFailedLink(SRTSOCKET sock)
{
ScopedLock lg(m_GroupLock);

// Check all members if they are in the pending
// or connected state.

int nhealthy = 0;

for (gli_t i = m_Group.begin(); i != m_Group.end(); ++i)
{
if (i->id == sock)
{
// This socket.
i->sndstate = SRT_GST_BROKEN;
i->rcvstate = SRT_GST_BROKEN;
continue;
}

if (i->sndstate < SRT_GST_BROKEN)
nhealthy++;
}

if (!nhealthy)
{
// No healthy links, set ERR on epoll.
HLOGC(gmlog.Debug, log << "group/updateFailedLink: All sockets broken");
m_pGlobal->m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_ERR, true);
}
else
{
HLOGC(gmlog.Debug, log << "group/updateFailedLink: Still " << nhealthy << " links in the group");
}

return 0;
}

#if ENABLE_HEAVY_LOGGING
void CUDTGroup::debugGroup()
{
Expand Down
1 change: 1 addition & 0 deletions srtcore/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ class CUDTGroup
void removeEPollID(const int eid);
void updateReadState(SRTSOCKET sock, int32_t sequence);
void updateWriteState();
int updateFailedLink(SRTSOCKET sock);

/// Update the in-group array of packet providers per sequence number.
/// Also basing on the information already provided by possibly other sockets,
Expand Down
7 changes: 7 additions & 0 deletions srtcore/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,13 @@ void CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst, con
i->m_pUDT->m_RejectReason = SRT_REJ_PEER;
}
CUDT::s_UDTUnited.m_EPoll.update_events(i->m_iID, i->m_pUDT->m_sPollID, SRT_EPOLL_ERR, true);
if (i->m_pUDT->m_parent->m_IncludedGroup)
{
// Bound to one call because this requires locking
i->m_pUDT->m_parent->m_IncludedGroup->updateFailedLink(i->m_iID);
}
CGlobEvent::triggerEvent();

/*
* Setting m_bConnecting to false but keeping socket in rendezvous queue is not a good idea.
* Next CUDT::close will not remove it from rendezvous queue (because !m_bConnecting)
Expand Down
98 changes: 49 additions & 49 deletions testing/testmedia.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -920,18 +920,23 @@ void SrtCommon::OpenGroupClient()
gd.config = c.options;
targets.push_back(gd);
}

Verb() << "Waiting for group connection... " << VerbNoEOL;
int fisock = srt_connect_group(m_sock, targets.data(), targets.size());

// Delete config objects before prospective exception
for (auto& gd: targets)
srt_delete_config(gd.config);

if (fisock == SRT_ERROR)
{
Error("srt_connect_group");
}

if (m_blocking_mode)
{
Verb() << "SUCCESSFUL";
}
else
{
Verb() << "INITIATED [ASYNC]";
}

// Configuration change applied on a group should
// spread the setting on all sockets.
ConfigurePost(m_sock);
Expand Down Expand Up @@ -974,74 +979,69 @@ void SrtCommon::OpenGroupClient()
continue;
}

/*
if (!m_blocking_mode)
{
// EXPERIMENTAL version. Add all sockets to epoll
// in the direction used for this medium.
int modes = m_direction;
srt_epoll_add_usock(srt_epoll, insock, &modes);
Verb() << "Added @" << insock << " to epoll (" << srt_epoll << ") in modes: " << modes;
}
*/

// Have socket, store it into the group socket array.
any_node = true;
}

stat = ConfigurePost(m_sock);
if (stat == -1)
{
// This kind of error must reject the whole operation.
// Usually you'll get this error on the first socket,
// and doing this on the others would result in the same.
Error("ConfigurePost");
}


Verb() << "Group connection report:";
for (auto& d: m_group_data)
{
// id, status, result, peeraddr
Verb() << "@" << d.id << " <" << SockStatusStr(d.sockstate) << "> (=" << d.result << ") PEER:"
<< sockaddr_any((sockaddr*)&d.peeraddr, sizeof d.peeraddr).str();
}

/*
XXX Temporarily disabled, until the nonblocking mode
is added to groups.
if (!any_node)
Error("All connections failed");

// Wait for REAL connected state if nonblocking mode, for AT LEAST one node.
if ( !m_blocking_mode )
if (!m_blocking_mode)
{
Verb() << "[ASYNC] " << VerbNoEOL;

// SPIN-WAITING version. Don't use it unless you know what you're doing.
// SpinWaitAsync();

// Socket readiness for connection is checked by polling on WRITE allowed sockets.
int len = 2;
SRTSOCKET ready[2];
if ( srt_epoll_wait(srt_conn_epoll,
NULL, NULL,
ready, &len,
int len1 = 2, len2 = 2;
SRTSOCKET ready_conn[2], ready_err[2];
if (srt_epoll_wait(srt_conn_epoll,
ready_err, &len2,
ready_conn, &len1,
-1, // Wait infinitely
NULL, NULL,
NULL, NULL) != -1 )
NULL, NULL) != -1)
{
Verb() << "[EPOLL: " << len << " sockets] " << VerbNoEOL;
// We are waiting for one entity to be ready so it's either
// in one or the other
if (find(ready_err, ready_err+len2, m_sock) != ready_err+len2)
{
Verb() << "[EPOLL: " << len2 << " entities FAILED]";
Error("All group connections failed", SRT_REJ_UNKNOWN, SRT_ENOCONN);
}
else if (find(ready_conn, ready_conn+len1, m_sock) != ready_conn+len1)
{
Verb() << "[EPOLL: " << len1 << " entities] " << VerbNoEOL;
}
else
{
Error("Group: SPURIOUS epoll readiness");
}
}
else
{
Error("srt_epoll_wait");
}
}
*/

if (!any_node)
stat = ConfigurePost(m_sock);
if (stat == -1)
{
// This kind of error must reject the whole operation.
// Usually you'll get this error on the first socket,
// and doing this on the others would result in the same.
Error("ConfigurePost");
}


Verb() << "Group connection report:";
for (auto& d: m_group_data)
{
Error("REDUNDANCY: all redundant connections failed");
// id, status, result, peeraddr
Verb() << "@" << d.id << " <" << SockStatusStr(d.sockstate) << "> (=" << d.result << ") PEER:"
<< sockaddr_any((sockaddr*)&d.peeraddr, sizeof d.peeraddr).str();
}

// Prepare group data for monitoring the group status.
Expand Down
4 changes: 4 additions & 0 deletions testing/testmedia.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ class SrtCommon
SRT_SOCKOPT_CONFIG* options = nullptr;

Connection(string h, int p): host(h), port(p), source(AF_INET) {}
~Connection()
{
srt_delete_config(options);
}
};

int srt_epoll = -1;
Expand Down

0 comments on commit 5469cd3

Please sign in to comment.