diff --git a/srtcore/congctl.cpp b/srtcore/congctl.cpp index b394f8183..33b5389e0 100644 --- a/srtcore/congctl.cpp +++ b/srtcore/congctl.cpp @@ -77,7 +77,7 @@ class LiveCC: public SrtCongestionControlBase { m_llSndMaxBW = BW_INFINITE; // 1 Gbbps in Bytes/sec BW_INFINITE m_zMaxPayloadSize = parent->OPT_PayloadSize(); - if ( m_zMaxPayloadSize == 0 ) + if (m_zMaxPayloadSize == 0) m_zMaxPayloadSize = parent->maxPayloadSize(); m_zSndAvgPayloadSize = m_zMaxPayloadSize; diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 2aece2d3d..62da98d45 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -5174,8 +5174,7 @@ void * srt::CUDT::tsbpd(void* param) rxready = true; if (info.seq_gap) { - const int iDropCnt SRT_ATR_UNUSED = self->dropTooLateUpTo(info.seqno); - + const int iDropCnt SRT_ATR_UNUSED = self->rcvDropTooLateUpTo(info.seqno); #if ENABLE_EXPERIMENTAL_BONDING shall_update_group = true; #endif @@ -5303,7 +5302,7 @@ void * srt::CUDT::tsbpd(void* param) return NULL; } -int srt::CUDT::dropTooLateUpTo(int seqno) +int srt::CUDT::rcvDropTooLateUpTo(int seqno) { const int seq_gap_len = CSeqNo::seqoff(m_iRcvLastSkipAck, seqno); @@ -6327,10 +6326,10 @@ int srt::CUDT::receiveBuffer(char *data, int len) // [[using maybe_locked(CUDTGroup::m_GroupLock, m_parent->m_GroupOf != NULL)]]; // [[using locked(m_SendLock)]]; -bool srt::CUDT::checkNeedDrop() +int srt::CUDT::sndDropTooLate() { if (!m_bPeerTLPktDrop) - return false; + return 0; if (!m_config.bMessageAPI) { @@ -6352,72 +6351,64 @@ bool srt::CUDT::checkNeedDrop() + (2 * COMM_SYN_INTERVAL_US / 1000) : 0; - bool bCongestion = false; - if (threshold_ms && buffdelay_ms > threshold_ms) - { - // protect packet retransmission - enterCS(m_RecvAckLock); - int dbytes; - int32_t first_msgno; - int dpkts = m_pSndBuffer->dropLateData((dbytes), (first_msgno), tnow - milliseconds_from(threshold_ms)); - if (dpkts > 0) - { - enterCS(m_StatsLock); - m_stats.sndr.dropped.count(stats::BytesPackets(dbytes, dpkts)); - leaveCS(m_StatsLock); + if (threshold_ms == 0 || buffdelay_ms <= threshold_ms) + return 0; - IF_HEAVY_LOGGING(const int32_t realack = m_iSndLastDataAck); - const int32_t fakeack = CSeqNo::incseq(m_iSndLastDataAck, dpkts); + // protect packet retransmission + ScopedLock rcvlck(m_RecvAckLock); + int dbytes; + int32_t first_msgno; + const int dpkts = m_pSndBuffer->dropLateData((dbytes), (first_msgno), tnow - milliseconds_from(threshold_ms)); + if (dpkts <= 0) + return 0; - m_iSndLastAck = fakeack; - m_iSndLastDataAck = fakeack; + // If some packets were dropped update stats, socket state, loss list and the parent group if any. + enterCS(m_StatsLock); + m_stats.sndr.dropped.count(dbytes);; + leaveCS(m_StatsLock); - int32_t minlastack = CSeqNo::decseq(m_iSndLastDataAck); - m_pSndLossList->removeUpTo(minlastack); - /* If we dropped packets not yet sent, advance current position */ - // THIS MEANS: m_iSndCurrSeqNo = MAX(m_iSndCurrSeqNo, m_iSndLastDataAck-1) - if (CSeqNo::seqcmp(m_iSndCurrSeqNo, minlastack) < 0) - { - m_iSndCurrSeqNo = minlastack; - } + IF_HEAVY_LOGGING(const int32_t realack = m_iSndLastDataAck); + const int32_t fakeack = CSeqNo::incseq(m_iSndLastDataAck, dpkts); - HLOGC(aslog.Debug, log << "SND-DROP: %(" << realack << "-" << m_iSndCurrSeqNo << ") n=" - << dpkts << "pkt " << dbytes << "B, span=" << buffdelay_ms << " ms, FIRST #" << first_msgno); + m_iSndLastAck = fakeack; + m_iSndLastDataAck = fakeack; -#if ENABLE_EXPERIMENTAL_BONDING - // This is done with a presumption that the group - // exists and if this is not NULL, it means that this - // function was called with locked m_GroupLock, as sendmsg2 - // function was called from inside CUDTGroup::send, which - // locks the whole function. - // - // XXX This is true only because all existing groups are managed - // groups, that is, sockets cannot be added or removed from group - // manually, nor can send/recv operation be done on a single socket - // from the API call directly. This should be extra verified, if that - // changes in the future. - // - if (m_parent->m_GroupOf) - { - // What's important is that the lock on GroupLock cannot be applied - // here, both because it might be applied already, that is, according - // to the condition defined at this function's header, it is applied - // under this condition. Hence ackMessage can be defined as 100% locked. - m_parent->m_GroupOf->ackMessage(first_msgno); - } -#endif - } - bCongestion = true; - leaveCS(m_RecvAckLock); - } - else if (buffdelay_ms > (m_iPeerTsbPdDelay_ms / 2)) + const int32_t minlastack = CSeqNo::decseq(m_iSndLastDataAck); + m_pSndLossList->removeUpTo(minlastack); + /* If we dropped packets not yet sent, advance current position */ + // THIS MEANS: m_iSndCurrSeqNo = MAX(m_iSndCurrSeqNo, m_iSndLastDataAck-1) + if (CSeqNo::seqcmp(m_iSndCurrSeqNo, minlastack) < 0) { - HLOGC(aslog.Debug, - log << "cong TIMESPAN " << buffdelay_ms << "ms"); + m_iSndCurrSeqNo = minlastack; + } - bCongestion = true; + HLOGC(aslog.Debug, log << "SND-DROP: %(" << realack << "-" << m_iSndCurrSeqNo << ") n=" + << dpkts << "pkt " << dbytes << "B, span=" << buffdelay_ms << " ms, FIRST #" << first_msgno); + +#if ENABLE_EXPERIMENTAL_BONDING + // This is done with a presumption that the group + // exists and if this is not NULL, it means that this + // function was called with locked m_GroupLock, as sendmsg2 + // function was called from inside CUDTGroup::send, which + // locks the whole function. + // + // XXX This is true only because all existing groups are managed + // groups, that is, sockets cannot be added or removed from group + // manually, nor can send/recv operation be done on a single socket + // from the API call directly. This should be extra verified, if that + // changes in the future. + // + if (m_parent->m_GroupOf) + { + // What's important is that the lock on GroupLock cannot be applied + // here, both because it might be applied already, that is, according + // to the condition defined at this function's header, it is applied + // under this condition. Hence ackMessage can be defined as 100% locked. + m_parent->m_GroupOf->ackMessage(first_msgno); } - return bCongestion; +#endif + + return dpkts; } int srt::CUDT::sendmsg(const char *data, int len, int msttl, bool inorder, int64_t srctime) @@ -6520,9 +6511,9 @@ int srt::CUDT::sendmsg2(const char *data, int len, SRT_MSGCTRL& w_mctrl) m_iReXmitCount = 1; } - // checkNeedDrop(...) may lock m_RecvAckLock + // sndDropTooLate(...) may lock m_RecvAckLock // to modify m_pSndBuffer and m_pSndLossList - const bool bCongestion = checkNeedDrop(); + const int iPktsTLDropped SRT_ATR_UNUSED = sndDropTooLate(); int minlen = 1; // Minimum sender buffer space required for STREAM API if (m_config.bMessageAPI) @@ -6701,12 +6692,13 @@ int srt::CUDT::sendmsg2(const char *data, int len, SRT_MSGCTRL& w_mctrl) } } - // insert this socket to the snd list if it is not on the list yet + // Insert this socket to the snd list if it is not on the list already. // m_pSndUList->pop may lock CSndUList::m_ListLock and then m_RecvAckLock - m_pSndQueue->m_pSndUList->update(this, CSndUList::rescheduleIf(bCongestion)); + m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE); #ifdef SRT_ENABLE_ECN - if (bCongestion) + // IF there was a packet drop on the sender side, report congestion to the app. + if (iPktsTLDropped > 0) { LOGC(aslog.Error, log << "sendmsg2: CONGESTION; reporting error"); throw CUDTException(MJ_AGAIN, MN_CONGESTION, 0); @@ -8192,7 +8184,7 @@ void srt::CUDT::updateSndLossListOnACK(int32_t ackdata_seqno) // Guard access to m_iSndAckedMsgNo field // Note: This can't be done inside CUDTGroup::ackMessage - // because this function is also called from CUDT::checkNeedDrop + // because this function is also called from CUDT::sndDropTooLate // called from CUDT::sendmsg2 called from CUDTGroup::send, which // applies the lock on m_GroupLock already. ScopedLock glk (*m_parent->m_GroupOf->exp_groupLock()); @@ -8696,7 +8688,7 @@ void srt::CUDT::processCtrlLossReport(const CPacket& ctrlpkt) } // the lost packet (retransmission) should be sent out immediately - m_pSndQueue->m_pSndUList->update(this, CSndUList::DO_RESCHEDULE); + m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE); enterCS(m_StatsLock); m_stats.sndr.recvdNak.count(1); @@ -11166,8 +11158,8 @@ void srt::CUDT::checkRexmitTimer(const steady_clock::time_point& currtime) const ECheckTimerStage stage = is_fastrexmit ? TEV_CHT_FASTREXMIT : TEV_CHT_REXMIT; updateCC(TEV_CHECKTIMER, EventVariant(stage)); - // immediately restart transmission - m_pSndQueue->m_pSndUList->update(this, CSndUList::DO_RESCHEDULE); + // schedule sending if not scheduled already + m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE); } void srt::CUDT::checkTimers() diff --git a/srtcore/core.h b/srtcore/core.h index c25e9c8c1..caf1f9837 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -543,7 +543,10 @@ class CUDT void updateIdleLinkFrom(CUDT* source); - bool checkNeedDrop(); + /// @brief Drop packets too late to be delivered if any. + /// @returns the number of packets actually dropped. + SRT_ATTR_REQUIRES(m_RecvAckLock, m_StatsLock) + int sndDropTooLate(); /// Connect to a UDT entity as per hs request. This will update /// required data in the entity, then update them also in the hs structure, @@ -706,11 +709,11 @@ class CUDT static void* tsbpd(void* param); #if ENABLE_NEW_RCVBUFFER - /// Drop too late packets. Updaet loss lists and ACK positions. + /// Drop too late packets (receiver side). Updaet loss lists and ACK positions. /// The @a seqno packet itself is not dropped. /// @param seqno [in] The sequence number of the first packets following those to be dropped. /// @return The number of packets dropped. - int dropTooLateUpTo(int seqno); + int rcvDropTooLateUpTo(int seqno); #endif void updateForgotten(int seqlen, int32_t lastack, int32_t skiptoseqno); diff --git a/srtcore/stats.h b/srtcore/stats.h index 6d934b0a7..10523f851 100644 --- a/srtcore/stats.h +++ b/srtcore/stats.h @@ -91,8 +91,7 @@ class BytesPackets uint64_t bytesWithHdr() const { - static const int PKT_HDR_SIZE = CPacket::HDR_SIZE + CPacket::UDP_HDR_SIZE; - return m_bytes + m_packets * PKT_HDR_SIZE; + return m_bytes + m_packets * CPacket::SRT_DATA_HDR_SIZE; } private: