diff --git a/srtcore/api.cpp b/srtcore/api.cpp index f3f0c2282..7b3cf7865 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -129,7 +129,7 @@ void CUDTSocket::makeShutdown() if (m_IncludedGroup) { HLOGC(smlog.Debug, log << "@" << m_SocketID << " IS MEMBER OF $" << m_IncludedGroup->id() << " - REMOVING FROM GROUP"); - removeFromGroup(); + removeFromGroup(true); } HLOGC(smlog.Debug, log << "@" << m_SocketID << " CLOSING AS SOCKET"); @@ -1452,7 +1452,7 @@ int CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, int ar // We know it does belong to a group. // Remove it first because this involves a mutex, and we want // to avoid locking more than one mutex at a time. - ns->removeFromGroup(); + ns->removeFromGroup(false); erc_rloc = e.getErrorCode(); targets[tii].errorcode = e.getErrorCode(); targets[tii].id = CUDT::INVALID_SOCK; @@ -1466,7 +1466,7 @@ int CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, int ar catch (...) { LOGC(aclog.Fatal, log << "groupConnect: IPE: UNKNOWN EXCEPTION from connectIn"); - ns->removeFromGroup(); + ns->removeFromGroup(false); targets[tii].errorcode = SRT_ESYSOBJ; targets[tii].id = CUDT::INVALID_SOCK; ScopedLock cl (m_GlobControlLock); @@ -2993,13 +2993,23 @@ int CUDT::removeSocketFromGroup(SRTSOCKET socket) return APIError(MJ_NOTSUP, MN_INVAL, 0); ScopedLock grd (s->m_ControlLock); - s->removeFromGroup(); + s->removeFromGroup(false); return 0; } -void CUDTSocket::removeFromGroup() +void CUDTSocket::removeFromGroup(bool broken) { m_IncludedGroup->remove(m_SocketID); + if (broken) + { + // Activate the SRT_EPOLL_UPDATE event on the group + // if it was because of a socket that was earlier connected + // and became broken. This is not to be sent in case when + // it is a failure during connection, or the socket was + // explicitly removed from the group. + m_IncludedGroup->activateUpdateEvent(); + } + m_IncludedIter = CUDTGroup::gli_NULL(); m_IncludedGroup = NULL; } diff --git a/srtcore/api.h b/srtcore/api.h index a2dc33f0b..cc8fdb8d4 100644 --- a/srtcore/api.h +++ b/srtcore/api.h @@ -165,7 +165,7 @@ class CUDTSocket /// operation, but continues to be responsive in the connection in order /// to finish sending the data that were scheduled for sending so far. void makeShutdown(); - void removeFromGroup(); + void removeFromGroup(bool broken); // Instrumentally used by select() and also required for non-blocking // mode check in groups diff --git a/srtcore/group.cpp b/srtcore/group.cpp index 6a9381c23..7d2b6be0b 100644 --- a/srtcore/group.cpp +++ b/srtcore/group.cpp @@ -4230,6 +4230,11 @@ void CUDTGroup::updateLatestRcv(CUDTGroup::gli_t current) } } +void CUDTGroup::activateUpdateEvent() +{ + m_pGlobal->m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_UPDATE, true); +} + void CUDTGroup::addEPoll(int eid) { enterCS(m_pGlobal->m_EPoll.m_EPollLock); diff --git a/srtcore/group.h b/srtcore/group.h index f8be4025e..6c7aa7b93 100644 --- a/srtcore/group.h +++ b/srtcore/group.h @@ -283,6 +283,7 @@ class CUDTGroup void updateReadState(SRTSOCKET sock, int32_t sequence); void updateWriteState(); int updateFailedLink(SRTSOCKET sock); + void activateUpdateEvent(); /// Update the in-group array of packet providers per sequence number. /// Also basing on the information already provided by possibly other sockets, diff --git a/testing/testmedia.cpp b/testing/testmedia.cpp index 4cc20440c..c72a1a6b2 100755 --- a/testing/testmedia.cpp +++ b/testing/testmedia.cpp @@ -584,6 +584,32 @@ void SrtCommon::AcceptNewClient() Error("ConfigurePost"); } +static string PrintEpollEvent(int events, int et_events) +{ + static pair const namemap [] = { + make_pair(SRT_EPOLL_IN, "R"), + make_pair(SRT_EPOLL_OUT, "W"), + make_pair(SRT_EPOLL_ERR, "E"), + make_pair(SRT_EPOLL_UPDATE, "U") + }; + + ostringstream os; + int N = Size(namemap); + + for (int i = 0; i < N; ++i) + { + if (events & namemap[i].first) + { + os << "["; + if (et_events & namemap[i].first) + os << "^"; + os << namemap[i].second << "]"; + } + } + + return os.str(); +} + void SrtCommon::Init(string host, int port, string path, map par, SRT_EPOLL_OPT dir) { m_direction = dir; @@ -677,6 +703,11 @@ void SrtCommon::Init(string host, int port, string path, map par, { // Don't add new epoll if already created as a part // of group management: if (srt_epoll == -1)... + + if (m_mode == "caller") + dir = (dir | SRT_EPOLL_UPDATE); + Verb() << "NON-BLOCKING MODE - SUB FOR " << PrintEpollEvent(dir, 0); + srt_epoll = AddPoller(m_sock, dir); } } @@ -909,7 +940,7 @@ void SrtCommon::OpenGroupClient() if ( stat == SRT_ERROR ) Error("ConfigurePre"); - if ( !m_blocking_mode ) + if (!m_blocking_mode) { // Note: here the GROUP is added to the poller. srt_conn_epoll = AddPoller(m_sock, SRT_EPOLL_CONNECT | SRT_EPOLL_ERR); @@ -2147,13 +2178,35 @@ MediaPacket SrtSource::Read(size_t chunk) // EAGAIN for SRT READING if (srt_getlasterror(NULL) == SRT_EASYNCRCV) { +Epoll_again: Verb() << "AGAIN: - waiting for data by epoll(" << srt_epoll << ")..."; // Poll on this descriptor until reading is available, indefinitely. int len = 2; - SRTSOCKET sready[2]; - if (srt_epoll_wait(srt_epoll, sready, &len, 0, 0, -1, 0, 0, 0, 0) != -1) + SRT_EPOLL_EVENT sready[2]; + len = srt_epoll_uwait(srt_epoll, sready, len, -1); + if (len != -1) { Verb() << "... epoll reported ready " << len << " sockets"; + // If the event was SRT_EPOLL_UPDATE, report it, and still wait. + + bool any_read_ready = false; + for (int i = 0; i < len; ++i) + { + if (sready[i].events & SRT_EPOLL_UPDATE) + { + Verb() << "... [BROKEN CONNECTION reported on @" << sready[i].fd << "]"; + } + + if (sready[i].events & SRT_EPOLL_IN) + any_read_ready = true; + } + + if (!any_read_ready) + { + Verb() << " ... [NOT READ READY - AGAIN]"; + goto Epoll_again; + } + continue; } // If was -1, then passthru. @@ -2240,10 +2293,36 @@ void SrtTarget::Write(const MediaPacket& data) // If not, wait indefinitely. if (!m_blocking_mode) { - int ready[2]; +Epoll_again: int len = 2; - if (srt_epoll_wait(srt_epoll, 0, 0, ready, &len, -1, 0, 0, 0, 0) == SRT_ERROR) - Error("srt_epoll_wait"); + SRT_EPOLL_EVENT sready[2]; + len = srt_epoll_uwait(srt_epoll, sready, len, -1); + if (len != -1) + { + bool any_write_ready = false; + for (int i = 0; i < len; ++i) + { + if (sready[i].events & SRT_EPOLL_UPDATE) + { + Verb() << "... [BROKEN CONNECTION reported on @" << sready[i].fd << "]"; + } + + if (sready[i].events & SRT_EPOLL_OUT) + any_write_ready = true; + } + + if (!any_write_ready) + { + Verb() << " ... [NOT WRITE READY - AGAIN]"; + goto Epoll_again; + } + + // Pass on, write ready. + } + else + { + Error("srt_epoll_uwait"); + } } SRT_MSGCTRL mctrl = srt_msgctrl_default;