diff --git a/srtcore/buffer.cpp b/srtcore/buffer.cpp index c43cc4c59..7102d09a8 100644 --- a/srtcore/buffer.cpp +++ b/srtcore/buffer.cpp @@ -193,21 +193,28 @@ void CSndBuffer::addBuffer(const char* data, int len, SRT_MSGCTRL& w_mctrl) HLOGC(bslog.Debug, log << "addBuffer: size=" << m_iCount << " reserved=" << m_iSize << " needs=" << size << " buffers for " << len << " bytes"); + // Retrieve current time before locking the mutex to be closer to packet submission event. + const steady_clock::time_point tnow = steady_clock::now(); - // dynamically increase sender buffer + ScopedLock bufferguard(m_BufLock); + // Dynamically increase sender buffer if there is not enough room. while (size + m_iCount >= m_iSize) { HLOGC(bslog.Debug, log << "addBuffer: ... still lacking " << (size + m_iCount - m_iSize) << " buffers..."); increase(); } - const steady_clock::time_point time = steady_clock::now(); const int32_t inorder = w_mctrl.inorder ? MSGNO_PACKET_INORDER::mask : 0; - HLOGC(bslog.Debug, log << CONID() << "addBuffer: adding " << size << " packets (" << len << " bytes) to send, msgno=" << (w_msgno > 0 ? w_msgno : m_iNextMsgNo) << (inorder ? "" : " NOT") << " in order"); + // Calculate origin time (same for all blocks of the message). + m_tsLastOriginTime = w_srctime ? time_point() + microseconds_from(w_srctime) : tnow; + // Rewrite back the actual value, even if it stays the same, so that the calling facilities can reuse it. + // May also be a subject to conversion error, thus the actual value is signalled back. + w_srctime = count_microseconds(m_tsLastOriginTime.time_since_epoch()); + // The sequence number passed to this function is the sequence number // that the very first packet from the packet series should get here. // If there's more than one packet, this function must increase it by itself @@ -253,33 +260,21 @@ void CSndBuffer::addBuffer(const char* data, int len, SRT_MSGCTRL& w_mctrl) // [PB_FIRST] [PB_LAST] - 2 packets per message // [PB_SOLO] - 1 packet per message - s->m_llSourceTime_us = w_srctime; - s->m_tsOriginTime = time; - s->m_tsRexmitTime = time_point(); s->m_iTTL = ttl; - // Rewrite the actual sending time back into w_srctime - // so that the calling facilities can reuse it - if (!w_srctime) - w_srctime = count_microseconds(s->m_tsOriginTime.time_since_epoch()); - - // XXX unchecked condition: s->m_pNext == NULL. + s->m_tsRexmitTime = time_point(); + s->m_tsOriginTime = m_tsLastOriginTime; + // Should never happen, as the call to increase() should ensure enough buffers. SRT_ASSERT(s->m_pNext); s = s->m_pNext; } m_pLastBlock = s; - enterCS(m_BufLock); m_iCount += size; - m_iBytesCount += len; - m_tsLastOriginTime = time; - updateInputRate(time, size, len); - - updAvgBufSize(time); - - leaveCS(m_BufLock); + updateInputRate(m_tsLastOriginTime, size, len); + updAvgBufSize(m_tsLastOriginTime); // MSGNO_SEQ::mask has a form: 00000011111111... // At least it's known that it's from some index inside til the end (to bit 0). @@ -402,16 +397,6 @@ int CSndBuffer::addBufferFromFile(fstream& ifs, int len) return total; } -steady_clock::time_point CSndBuffer::getSourceTime(const CSndBuffer::Block& block) -{ - if (block.m_llSourceTime_us) - { - return steady_clock::time_point() + microseconds_from(block.m_llSourceTime_us); - } - - return block.m_tsOriginTime; -} - int CSndBuffer::readData(CPacket& w_packet, steady_clock::time_point& w_srctime, int kflgs) { // No data to read @@ -459,7 +444,7 @@ int CSndBuffer::readData(CPacket& w_packet, steady_clock::time_point& w_srctime, } w_packet.m_iMsgNo = m_pCurrBlock->m_iMsgNoBitset; - w_srctime = getSourceTime(*m_pCurrBlock); + w_srctime = m_pCurrBlock->m_tsOriginTime; m_pCurrBlock = m_pCurrBlock->m_pNext; HLOGC(bslog.Debug, log << CONID() << "CSndBuffer: extracting packet size=" << readlen << " to send"); @@ -593,7 +578,7 @@ int CSndBuffer::readData(const int offset, CPacket& w_packet, steady_clock::time // the packet originally (the other overload of this function) must set these // flags. w_packet.m_iMsgNo = p->m_iMsgNoBitset; - w_srctime = getSourceTime(*p); + w_srctime = p->m_tsOriginTime; // This function is called when packet retransmission is triggered. // Therefore we are setting the rexmit time. @@ -683,11 +668,17 @@ int CSndBuffer::getCurrBufSize(int& w_bytes, int& w_timespan) * Also, if there is only one pkt in buffer, the time difference will be 0. * Therefore, always add 1 ms if not empty. */ - w_timespan = 0 < m_iCount ? count_milliseconds(m_tsLastOriginTime - m_pFirstBlock->m_tsOriginTime) + 1 : 0; + w_timespan = 0 < m_iCount ? (int) count_milliseconds(m_tsLastOriginTime - m_pFirstBlock->m_tsOriginTime) + 1 : 0; return m_iCount; } +CSndBuffer::time_point CSndBuffer::getOldestTime() const +{ + SRT_ASSERT(m_pFirstBlock); + return m_pFirstBlock->m_tsOriginTime; +} + int CSndBuffer::dropLateData(int& w_bytes, int32_t& w_first_msgno, const steady_clock::time_point& too_late_time) { int dpkts = 0; diff --git a/srtcore/buffer.h b/srtcore/buffer.h index 428db748d..9b6259226 100644 --- a/srtcore/buffer.h +++ b/srtcore/buffer.h @@ -178,6 +178,8 @@ class CSndBuffer int getAvgBufSize(int& bytes, int& timespan); int getCurrBufSize(int& bytes, int& timespan); + time_point getOldestTime() const; + uint64_t getInRatePeriod() const { return m_InRatePeriod; } /// Retrieve input bitrate in bytes per second @@ -197,9 +199,6 @@ class CSndBuffer void increase(); void setInputRateSmpPeriod(int period); - struct Block; // Defined below - static time_point getSourceTime(const CSndBuffer::Block& block); - private: // Constants static const uint64_t INPUTRATE_FAST_START_US = 500000; // 500 ms static const uint64_t INPUTRATE_RUNNING_US = 1000000; // 1000 ms @@ -216,9 +215,8 @@ class CSndBuffer int32_t m_iMsgNoBitset; // message number int32_t m_iSeqNo; // sequence number for scheduling - time_point m_tsOriginTime; // original request time + time_point m_tsOriginTime; // block origin time (either provided from above or equials the time a message was submitted for sending. time_point m_tsRexmitTime; // packet retransmission time - uint64_t m_llSourceTime_us; int m_iTTL; // time to live (milliseconds) Block* m_pNext; // next block diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 9aa2c5eb2..1833b25d8 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -6362,10 +6362,10 @@ int srt::CUDT::receiveBuffer(char *data, int len) // [[using maybe_locked(CUDTGroup::m_GroupLock, m_parent->m_GroupOf != NULL)]]; // [[using locked(m_SendLock)]]; -void srt::CUDT::checkNeedDrop(bool& w_bCongestion) +bool srt::CUDT::checkNeedDrop() { if (!m_bPeerTLPktDrop) - return; + return false; if (!m_config.bMessageAPI) { @@ -6390,6 +6390,7 @@ void srt::CUDT::checkNeedDrop(bool& w_bCongestion) (2 * COMM_SYN_INTERVAL_US / 1000); } + bool bCongestion = false; if (threshold_ms && timespan_ms > threshold_ms) { // protect packet retransmission @@ -6447,7 +6448,7 @@ void srt::CUDT::checkNeedDrop(bool& w_bCongestion) } #endif } - w_bCongestion = true; + bCongestion = true; leaveCS(m_RecvAckLock); } else if (timespan_ms > (m_iPeerTsbPdDelay_ms / 2)) @@ -6455,8 +6456,9 @@ void srt::CUDT::checkNeedDrop(bool& w_bCongestion) HLOGC(aslog.Debug, log << "cong, BYTES " << bytes << ", TMSPAN " << timespan_ms << "ms"); - w_bCongestion = true; + bCongestion = true; } + return bCongestion; } int srt::CUDT::sendmsg(const char *data, int len, int msttl, bool inorder, int64_t srctime) @@ -6473,8 +6475,6 @@ int srt::CUDT::sendmsg(const char *data, int len, int msttl, bool inorder, int64 // which is the only case when the m_parent->m_GroupOf is not NULL. int srt::CUDT::sendmsg2(const char *data, int len, SRT_MSGCTRL& w_mctrl) { - bool bCongestion = false; - // throw an exception if not connected if (m_bBroken || m_bClosing) throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); @@ -6563,7 +6563,7 @@ int srt::CUDT::sendmsg2(const char *data, int len, SRT_MSGCTRL& w_mctrl) // checkNeedDrop(...) may lock m_RecvAckLock // to modify m_pSndBuffer and m_pSndLossList - checkNeedDrop((bCongestion)); + const bool bCongestion = checkNeedDrop(); int minlen = 1; // Minimum sender buffer space required for STREAM API if (m_config.bMessageAPI) diff --git a/srtcore/core.h b/srtcore/core.h index 9f957d040..54c3927e2 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -541,7 +541,7 @@ class CUDT void updateIdleLinkFrom(CUDT* source); - void checkNeedDrop(bool& bCongestion); + bool checkNeedDrop(); /// Connect to a UDT entity as per hs request. This will update /// required data in the entity, then update them also in the hs structure,