diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 6f5e17ff1..b179332a5 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -5305,6 +5305,10 @@ void * srt::CUDT::tsbpd(void* param) int srt::CUDT::dropTooLateUpTo(int seqno) { + // Make sure that it would not drop over m_iRcvCurrSeqNo, which may break senders. + if (CSeqNo::seqcmp(seqno, CSeqNo::incseq(m_iRcvCurrSeqNo)) > 0) + seqno = CSeqNo::incseq(m_iRcvCurrSeqNo); + const int seq_gap_len = CSeqNo::seqoff(m_iRcvLastSkipAck, seqno); // seq_gap_len can be <= 0 if a packet has been dropped by the sender. @@ -7706,6 +7710,38 @@ int32_t srt::CUDT::ackDataUpTo(int32_t ack) #endif } +#if ENABLE_EXPERIMENTAL_BONDING && ENABLE_NEW_RCVBUFFER +void srt::CUDT::dropToGroupRecvBase() { + int32_t group_recv_base = SRT_SEQNO_NONE; + if (m_parent->m_GroupOf) + { + // Check is first done before locking to avoid unnecessary + // mutex locking. The condition for this field is that it + // can be either never set, already reset, or ever set + // and possibly dangling. The re-check after lock eliminates + // the dangling case. + ScopedLock glock (uglobal().m_GlobControlLock); + + // Note that getRcvBaseSeqNo() will lock m_GroupOf->m_GroupLock, + // but this is an intended order. + if (m_parent->m_GroupOf) + group_recv_base = m_parent->m_GroupOf->getRcvBaseSeqNo(); + } + if (group_recv_base == SRT_SEQNO_NONE) + return; + + ScopedLock lck(m_RcvBufferLock); + int cnt = dropTooLateUpTo(CSeqNo::incseq(group_recv_base)); + if (cnt > 0) + { + HLOGC(grlog.Debug, + log << "dropToGroupRecvBase: " << CONID() << " dropped " << cnt << " packets before ACK: group_recv_base=" + << group_recv_base << " m_iRcvLastSkipAck=" << m_iRcvLastSkipAck + << " m_iRcvCurrSeqNo=" << m_iRcvCurrSeqNo << " m_bTsbPd=" << m_bTsbPd); + } +} +#endif + namespace srt { #if ENABLE_HEAVY_LOGGING static void DebugAck(string hdr, int prev, int ack) @@ -7901,6 +7937,10 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) string reason = "first lost"; // just for "a reason" of giving particular % for ACK #endif +#if ENABLE_EXPERIMENTAL_BONDING && ENABLE_NEW_RCVBUFFER + dropToGroupRecvBase(); +#endif + { // If there is no loss, the ACK is the current largest sequence number plus 1; // Otherwise it is the smallest sequence number in the receiver loss list. diff --git a/srtcore/core.h b/srtcore/core.h index adf35b35b..8ebe9bad1 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -1060,6 +1060,12 @@ class CUDT /// @return int32_t ackDataUpTo(int32_t seq); +#if ENABLE_EXPERIMENTAL_BONDING && ENABLE_NEW_RCVBUFFER + /// @brief Drop packets in the recv buffer behind group_recv_base. + /// Updates m_iRcvLastSkipAck if it's behind group_recv_base. + void dropToGroupRecvBase(); +#endif + void handleKeepalive(const char* data, size_t lenghth); /// Locks m_RcvBufferLock and retrieves the available size of the receiver buffer.