From 8f68f613c5546aca16b301ab0dceae6db28bb6c3 Mon Sep 17 00:00:00 2001 From: Guangqing Chen Date: Wed, 8 Dec 2021 21:08:39 +0800 Subject: [PATCH] [core] refactor Group::recv() base on new rcv buffer to support message mode --- srtcore/buffer_rcv.cpp | 47 +++++++++-- srtcore/buffer_rcv.h | 2 + srtcore/group.cpp | 188 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 232 insertions(+), 5 deletions(-) diff --git a/srtcore/buffer_rcv.cpp b/srtcore/buffer_rcv.cpp index dd32f518c..8ac7e7a81 100644 --- a/srtcore/buffer_rcv.cpp +++ b/srtcore/buffer_rcv.cpp @@ -292,8 +292,6 @@ int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl) IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBufferNew::readMessage. m_iStartSeqNo " << m_iStartSeqNo); const int readPos = canReadInOrder ? m_iStartPos : m_iFirstReadableOutOfOrder; - // Remember if we actually read out of order packet. - const bool readingOutOfOrderPacket = !canReadInOrder || m_iStartPos == m_iFirstReadableOutOfOrder; size_t remain = len; char* dst = data; @@ -331,13 +329,14 @@ int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl) const bool pbLast = packet.getMsgBoundary() & PB_LAST; if (msgctrl && (packet.getMsgBoundary() & PB_FIRST)) { - msgctrl->pktseq = pktseqno; msgctrl->msgno = packet.getMsgSeq(m_bPeerRexmitFlag); } if (msgctrl && pbLast) { msgctrl->srctime = count_microseconds(getPktTsbPdTime(packet.getMsgTimeStamp()).time_since_epoch()); } + if (msgctrl) + msgctrl->pktseq = pktseqno; releaseUnitInPos(i); if (updateStartPos) @@ -362,8 +361,6 @@ int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl) } countBytes(-pkts_read, -bytes_extracted); - if (!m_tsbpd.isEnabled() && readingOutOfOrderPacket) - updateFirstReadableOutOfOrder(); releaseNextFillerEntries(); @@ -373,6 +370,11 @@ int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl) //updateNonreadPos(); } + if (!m_tsbpd.isEnabled()) + // We need updateFirstReadableOutOfOrder() here even if we are reading inorder, + // incase readable inorder packets are all read out. + updateFirstReadableOutOfOrder(); + const int bytes_read = dst - data; if (bytes_read < bytes_extracted) { @@ -606,6 +608,41 @@ bool CRcvBufferNew::isRcvDataReady(time_point time_now) const return info.tsbpd_time <= time_now; } +CRcvBufferNew::PacketInfo CRcvBufferNew::getFirstReadablePacketInfo(time_point time_now) const +{ + const PacketInfo unreadableInfo = {SRT_SEQNO_NONE, false, time_point()}; + const bool hasInorderPackets = hasReadableInorderPkts(); + + if (!m_tsbpd.isEnabled()) + { + if (hasInorderPackets) + { + const CPacket& packet = m_entries[m_iStartPos].pUnit->m_Packet; + const PacketInfo info = {packet.getSeqNo(), false, time_point()}; + return info; + } + SRT_ASSERT((!m_bMessageAPI && m_numOutOfOrderPackets == 0) || m_bMessageAPI); + if (m_iFirstReadableOutOfOrder >= 0) + { + SRT_ASSERT(m_numOutOfOrderPackets > 0); + const CPacket& packet = m_entries[m_iFirstReadableOutOfOrder].pUnit->m_Packet; + const PacketInfo info = {packet.getSeqNo(), true, time_point()}; + return info; + } + return unreadableInfo; + } + + if (!hasInorderPackets) + return unreadableInfo; + + const PacketInfo info = getFirstValidPacketInfo(); + + if (info.tsbpd_time <= time_now) + return info; + else + return unreadableInfo; +} + void CRcvBufferNew::countBytes(int pkts, int bytes) { ScopedLock lock(m_BytesCountLock); diff --git a/srtcore/buffer_rcv.h b/srtcore/buffer_rcv.h index 0ad8d4245..c21b037c3 100644 --- a/srtcore/buffer_rcv.h +++ b/srtcore/buffer_rcv.h @@ -164,6 +164,8 @@ class CRcvBufferNew /// IF skipseqno == -1, no missing packet but 1st not ready to play. PacketInfo getFirstValidPacketInfo() const; + PacketInfo getFirstReadablePacketInfo(time_point time_now) const; + /// Get information on packets available to be read. /// @returns a pair of sequence numbers (first available; first unavailable). /// diff --git a/srtcore/group.cpp b/srtcore/group.cpp index b9c18090a..c1ba15d75 100644 --- a/srtcore/group.cpp +++ b/srtcore/group.cpp @@ -2195,6 +2195,193 @@ static bool isValidSeqno(int32_t iBaseSeqno, int32_t iPktSeqno) return false; } +#ifdef ENABLE_NEW_RCVBUFFER +int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc) +{ + // First, acquire GlobControlLock to make sure all member sockets still exist + enterCS(m_Global.m_GlobControlLock); + ScopedLock guard(m_GroupLock); + + if (m_bClosing) + { + // The group could be set closing in the meantime, but if + // this is only about to be set by another thread, this thread + // must fist wait for being able to acquire this lock. + // The group will not be deleted now because it is added usage counter + // by this call, but will be released once it exits. + leaveCS(m_Global.m_GlobControlLock); + throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); + } + + // Now, still under lock, check if all sockets still can be dispatched + send_CheckValidSockets(); + leaveCS(m_Global.m_GlobControlLock); + + if (m_bClosing) + throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); + + // Later iteration over it might be less efficient than + // by vector, but we'll also often try to check a single id + // if it was ever seen broken, so that it's skipped. + set broken; + + for (;;) + { + if (!m_bOpened || !m_bConnected) + { + LOGC(grlog.Error, + log << boolalpha << "grp/recv: $" << id() << ": ABANDONING: opened=" << m_bOpened + << " connected=" << m_bConnected); + throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0); + } + + vector aliveMembers; + recv_CollectAliveAndBroken(aliveMembers, broken); + if (aliveMembers.empty()) + { + LOGC(grlog.Error, log << "grp/recv: ALL LINKS BROKEN, ABANDONING."); + m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false); + throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0); + } + + vector readySockets; + if (m_bSynRecving) + readySockets = recv_WaitForReadReady(aliveMembers, broken); + else + readySockets = aliveMembers; + + if (m_bClosing) + { + HLOGC(grlog.Debug, log << "grp/recv: $" << id() << ": GROUP CLOSED, ABANDONING."); + throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); + } + + // Find the first readable packet among all member sockets. + CUDTSocket* socketToRead = NULL; + CRcvBufferNew::PacketInfo infoToRead = {-1, false, time_point()}; + for (vector::const_iterator si = readySockets.begin(); si != readySockets.end(); ++si) + { + CUDTSocket* ps = *si; + + ScopedLock lg(ps->core().m_RcvBufferLock); + if (m_RcvBaseSeqNo != SRT_SEQNO_NONE) + { + // Drop here to make sure the getFirstReadablePacketInfo() below return fresher packet. + int cnt = ps->core().dropTooLateUpTo(CSeqNo::incseq(m_RcvBaseSeqNo)); + if (cnt > 0) + { + HLOGC(grlog.Debug, + log << "grp/recv: $" << id() << ": @" << ps->m_SocketID << ": dropped " << cnt + << " packets before reading: m_RcvBaseSeqNo=" << m_RcvBaseSeqNo); + } + } + + const CRcvBufferNew::PacketInfo info = + ps->core().m_pRcvBuffer->getFirstReadablePacketInfo(steady_clock::now()); + if (info.seqno == SRT_SEQNO_NONE) + { + HLOGC(grlog.Debug, log << "grp/recv: $" << id() << ": @" << ps->m_SocketID << ": Nothing to read."); + continue; + } + // We need to qualify the sequence, just for a case. + if (m_RcvBaseSeqNo != SRT_SEQNO_NONE && !isValidSeqno(m_RcvBaseSeqNo, info.seqno)) + { + LOGC(grlog.Error, + log << "grp/recv: $" << id() << ": @" << ps->m_SocketID << ": SEQUENCE DISCREPANCY: base=%" + << m_RcvBaseSeqNo << " vs pkt=%" << info.seqno << ", setting ESECFAIL"); + ps->core().m_bBroken = true; + broken.insert(ps); + continue; + } + if (socketToRead == NULL || CSeqNo::seqcmp(info.seqno, infoToRead.seqno) < 0) + { + socketToRead = ps; + infoToRead = info; + } + } + + if (socketToRead == NULL) + { + if (m_bSynRecving) + { + HLOGC(grlog.Debug, + log << "grp/recv: $" << id() << ": No links reported any fresher packet, re-polling."); + continue; + } + else + { + HLOGC(grlog.Debug, + log << "grp/recv: $" << id() << ": No links reported any fresher packet, clearing readiness."); + m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false); + throw CUDTException(MJ_AGAIN, MN_RDAVAIL, 0); + } + } + else + { + HLOGC(grlog.Debug, + log << "grp/recv: $" << id() << ": Found first readable packet from @" << socketToRead->m_SocketID + << ": seq=" << infoToRead.seqno << " gap=" << infoToRead.seq_gap + << " time=" << FormatTime(infoToRead.tsbpd_time)); + } + + const int res = socketToRead->core().receiveMessage((buf), len, (w_mc), CUDTUnited::ERH_RETURN); + HLOGC(grlog.Debug, + log << "grp/recv: $" << id() << ": @" << socketToRead->m_SocketID << ": Extracted data with %" + << w_mc.pktseq << " #" << w_mc.msgno << ": " << (res <= 0 ? "(NOTHING)" : BufferStamp(buf, res))); + if (res == 0) + { + LOGC(grlog.Warn, + log << "grp/recv: $" << id() << ": @" << socketToRead->m_SocketID << ": Retrying next socket..."); + // This socket will not be socketToRead in the next turn because receiveMessage() return 0 here. + continue; + } + if (res == SRT_ERROR) + { + LOGC(grlog.Warn, + log << "grp/recv: $" << id() << ": @" << socketToRead->m_SocketID << ": " << srt_getlasterror_str() + << ". Retrying next socket..."); + broken.insert(socketToRead); + continue; + } + fillGroupData((w_mc), w_mc); + + HLOGC(grlog.Debug, + log << "grp/recv: $" << id() << ": Update m_RcvBaseSeqNo: %" << m_RcvBaseSeqNo << " -> %" << w_mc.pktseq); + m_RcvBaseSeqNo = w_mc.pktseq; + + // Update stats as per delivery + m_stats.recv.count(res); + updateAvgPayloadSize(res); + + for (vector::const_iterator si = aliveMembers.begin(); si != aliveMembers.end(); ++si) + { + CUDTSocket* ps = *si; + ScopedLock lg(ps->core().m_RcvBufferLock); + if (m_RcvBaseSeqNo != SRT_SEQNO_NONE) + { + int cnt = ps->core().dropTooLateUpTo(CSeqNo::incseq(m_RcvBaseSeqNo)); + if (cnt > 0) + { + HLOGC(grlog.Debug, + log << "grp/recv: $" << id() << ": @" << ps->m_SocketID << ": dropped " << cnt + << " packets after reading: m_RcvBaseSeqNo=" << m_RcvBaseSeqNo); + } + } + } + for (vector::const_iterator si = aliveMembers.begin(); si != aliveMembers.end(); ++si) + { + CUDTSocket* ps = *si; + if (!ps->core().isRcvBufferReady()) + m_Global.m_EPoll.update_events(ps->m_SocketID, ps->core().m_sPollID, SRT_EPOLL_IN, false); + } + + return res; + } + LOGC(grlog.Error, log << "grp/recv: UNEXPECTED RUN PATH, ABANDONING."); + m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false); + throw CUDTException(MJ_AGAIN, MN_RDAVAIL, 0); +} +#else // The "app reader" version of the reading function. // This reads the packets from every socket treating them as independent // and prepared to work with the application. Then packets are sorted out @@ -2731,6 +2918,7 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc) } } } +#endif // [[using locked(m_GroupLock)]] CUDTGroup::ReadPos* CUDTGroup::checkPacketAhead()