From 04e9f3a928f12df022585d66779f6866d6e00773 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Fri, 12 Aug 2022 15:46:19 +0200 Subject: [PATCH 1/3] [core] Minor CSndBuffer edits. Renamed m_iMSS to m_iBlockLen. --- srtcore/buffer.cpp | 93 ++++++++++++++++++++++------------------------ srtcore/buffer.h | 9 +++-- 2 files changed, 50 insertions(+), 52 deletions(-) diff --git a/srtcore/buffer.cpp b/srtcore/buffer.cpp index 635d21e90..67f77e972 100644 --- a/srtcore/buffer.cpp +++ b/srtcore/buffer.cpp @@ -162,7 +162,7 @@ void CRateEstimator::updateInputRate(const time_point& time, int pkts, int bytes } } -CSndBuffer::CSndBuffer(int size, int mss) +CSndBuffer::CSndBuffer(int size, int maxpld) : m_BufLock() , m_pBlock(NULL) , m_pFirstBlock(NULL) @@ -171,36 +171,30 @@ CSndBuffer::CSndBuffer(int size, int mss) , m_pBuffer(NULL) , m_iNextMsgNo(1) , m_iSize(size) - , m_iMSS(mss) + , m_iBlockLen(maxpld) , m_iCount(0) , m_iBytesCount(0) { // initial physical buffer of "size" m_pBuffer = new Buffer; - m_pBuffer->m_pcData = new char[m_iSize * m_iMSS]; + m_pBuffer->m_pcData = new char[m_iSize * m_iBlockLen]; m_pBuffer->m_iSize = m_iSize; m_pBuffer->m_pNext = NULL; // circular linked list for out bound packets m_pBlock = new Block; Block* pb = m_pBlock; + char* pc = m_pBuffer->m_pcData; for (int i = 1; i < m_iSize; ++i) { pb->m_pNext = new Block; pb->m_iMsgNoBitset = 0; + pb->m_pcData = pc; + pc += m_iBlockLen; pb = pb->m_pNext; } pb->m_pNext = m_pBlock; - pb = m_pBlock; - char* pc = m_pBuffer->m_pcData; - for (int i = 0; i < m_iSize; ++i) - { - pb->m_pcData = pc; - pb = pb->m_pNext; - pc += m_iMSS; - } - m_pFirstBlock = m_pCurrBlock = m_pLastBlock = m_pBlock; setupMutex(m_BufLock, "Buf"); @@ -230,31 +224,31 @@ CSndBuffer::~CSndBuffer() void CSndBuffer::addBuffer(const char* data, int len, SRT_MSGCTRL& w_mctrl) { - int32_t& w_msgno = w_mctrl.msgno; - int32_t& w_seqno = w_mctrl.pktseq; + int32_t& w_msgno = w_mctrl.msgno; + int32_t& w_seqno = w_mctrl.pktseq; int64_t& w_srctime = w_mctrl.srctime; const int& ttl = w_mctrl.msgttl; - int size = len / m_iMSS; - if ((len % m_iMSS) != 0) - size++; + const int iPktLen = m_iBlockLen; // Payload length per packet. + int iNumBlocks = len / iPktLen; + if ((len % m_iBlockLen) != 0) + ++iNumBlocks; HLOGC(bslog.Debug, - log << "addBuffer: size=" << m_iCount << " reserved=" << m_iSize << " needs=" << size << " buffers for " - << len << " bytes"); + log << "addBuffer: needs=" << iNumBlocks << " buffers for " << len << " bytes. Taken=" << m_iCount << "/" << m_iSize); // Retrieve current time before locking the mutex to be closer to packet submission event. const steady_clock::time_point tnow = steady_clock::now(); ScopedLock bufferguard(m_BufLock); // Dynamically increase sender buffer if there is not enough room. - while (size + m_iCount >= m_iSize) + while (iNumBlocks + m_iCount >= m_iSize) { - HLOGC(bslog.Debug, log << "addBuffer: ... still lacking " << (size + m_iCount - m_iSize) << " buffers..."); + HLOGC(bslog.Debug, log << "addBuffer: ... still lacking " << (iNumBlocks + m_iCount - m_iSize) << " buffers..."); increase(); } const int32_t inorder = w_mctrl.inorder ? MSGNO_PACKET_INORDER::mask : 0; HLOGC(bslog.Debug, - log << CONID() << "addBuffer: adding " << size << " packets (" << len << " bytes) to send, msgno=" + log << CONID() << "addBuffer: adding " << iNumBlocks << " packets (" << len << " bytes) to send, msgno=" << (w_msgno > 0 ? w_msgno : m_iNextMsgNo) << (inorder ? "" : " NOT") << " in order"); // Calculate origin time (same for all blocks of the message). @@ -281,16 +275,16 @@ void CSndBuffer::addBuffer(const char* data, int len, SRT_MSGCTRL& w_mctrl) m_iNextMsgNo = w_msgno; } - for (int i = 0; i < size; ++i) + for (int i = 0; i < iNumBlocks; ++i) { - int pktlen = len - i * m_iMSS; - if (pktlen > m_iMSS) - pktlen = m_iMSS; + int pktlen = len - i * iPktLen; + if (pktlen > iPktLen) + pktlen = iPktLen; HLOGC(bslog.Debug, - log << "addBuffer: %" << w_seqno << " #" << w_msgno << " spreading from=" << (i * m_iMSS) + log << "addBuffer: %" << w_seqno << " #" << w_msgno << " offset=" << (i * iPktLen) << " size=" << pktlen << " TO BUFFER:" << (void*)s->m_pcData); - memcpy((s->m_pcData), data + i * m_iMSS, pktlen); + memcpy((s->m_pcData), data + i * iPktLen, pktlen); s->m_iLength = pktlen; s->m_iSeqNo = w_seqno; @@ -299,7 +293,7 @@ void CSndBuffer::addBuffer(const char* data, int len, SRT_MSGCTRL& w_mctrl) s->m_iMsgNoBitset = m_iNextMsgNo | inorder; if (i == 0) s->m_iMsgNoBitset |= PacketBoundaryBits(PB_FIRST); - if (i == size - 1) + if (i == iNumBlocks - 1) s->m_iMsgNoBitset |= PacketBoundaryBits(PB_LAST); // NOTE: if i is neither 0 nor size-1, it resuls with PB_SUBSEQUENT. // if i == 0 == size-1, it results with PB_SOLO. @@ -318,10 +312,10 @@ void CSndBuffer::addBuffer(const char* data, int len, SRT_MSGCTRL& w_mctrl) } m_pLastBlock = s; - m_iCount += size; + m_iCount += iNumBlocks; m_iBytesCount += len; - m_rateEstimator.updateInputRate(m_tsLastOriginTime, size, len); + m_rateEstimator.updateInputRate(m_tsLastOriginTime, iNumBlocks, len); updAvgBufSize(m_tsLastOriginTime); // MSGNO_SEQ::mask has a form: 00000011111111... @@ -337,39 +331,40 @@ void CSndBuffer::addBuffer(const char* data, int len, SRT_MSGCTRL& w_mctrl) int CSndBuffer::addBufferFromFile(fstream& ifs, int len) { - int size = len / m_iMSS; - if ((len % m_iMSS) != 0) - size++; + const int iPktLen = m_iBlockLen; // Payload length per packet. + int iNumBlocks = len / iPktLen; + if ((len % m_iBlockLen) != 0) + ++iNumBlocks; HLOGC(bslog.Debug, - log << "addBufferFromFile: size=" << m_iCount << " reserved=" << m_iSize << " needs=" << size + log << "addBufferFromFile: size=" << m_iCount << " reserved=" << m_iSize << " needs=" << iPktLen << " buffers for " << len << " bytes"); // dynamically increase sender buffer - while (size + m_iCount >= m_iSize) + while (iPktLen + m_iCount >= m_iSize) { HLOGC(bslog.Debug, - log << "addBufferFromFile: ... still lacking " << (size + m_iCount - m_iSize) << " buffers..."); + log << "addBufferFromFile: ... still lacking " << (siPktLenize + m_iCount - m_iSize) << " buffers..."); increase(); } HLOGC(bslog.Debug, - log << CONID() << "addBufferFromFile: adding " << size << " packets (" << len + log << CONID() << "addBufferFromFile: adding " << iPktLen << " packets (" << len << " bytes) to send, msgno=" << m_iNextMsgNo); Block* s = m_pLastBlock; int total = 0; - for (int i = 0; i < size; ++i) + for (int i = 0; i < iPktLen; ++i) { if (ifs.bad() || ifs.fail() || ifs.eof()) break; - int pktlen = len - i * m_iMSS; - if (pktlen > m_iMSS) - pktlen = m_iMSS; + int pktlen = len - i * iPktLen; + if (pktlen > iPktLen) + pktlen = iPktLen; HLOGC(bslog.Debug, - log << "addBufferFromFile: reading from=" << (i * m_iMSS) << " size=" << pktlen + log << "addBufferFromFile: reading from=" << (i * iPktLen) << " size=" << pktlen << " TO BUFFER:" << (void*)s->m_pcData); ifs.read(s->m_pcData, pktlen); if ((pktlen = int(ifs.gcount())) <= 0) @@ -379,7 +374,7 @@ int CSndBuffer::addBufferFromFile(fstream& ifs, int len) s->m_iMsgNoBitset = m_iNextMsgNo | MSGNO_PACKET_INORDER::mask; if (i == 0) s->m_iMsgNoBitset |= PacketBoundaryBits(PB_FIRST); - if (i == size - 1) + if (i == iPktLen - 1) s->m_iMsgNoBitset |= PacketBoundaryBits(PB_LAST); // NOTE: PB_FIRST | PB_LAST == PB_SOLO. // none of PB_FIRST & PB_LAST == PB_SUBSEQUENT. @@ -393,7 +388,7 @@ int CSndBuffer::addBufferFromFile(fstream& ifs, int len) m_pLastBlock = s; enterCS(m_BufLock); - m_iCount += size; + m_iCount += iPktLen; m_iBytesCount += total; leaveCS(m_BufLock); @@ -588,7 +583,7 @@ int CSndBuffer::readData(const int offset, CPacket& w_packet, steady_clock::time } w_packet.m_pcData = p->m_pcData; - int readlen = p->m_iLength; + const int readlen = p->m_iLength; w_packet.setLength(readlen); // XXX Here the value predicted to be applied to PH_MSGNO field is extracted. @@ -750,7 +745,7 @@ void CSndBuffer::increase() try { nbuf = new Buffer; - nbuf->m_pcData = new char[unitsize * m_iMSS]; + nbuf->m_pcData = new char[unitsize * m_iBlockLen]; } catch (...) { @@ -794,13 +789,13 @@ void CSndBuffer::increase() { pb->m_pcData = pc; pb = pb->m_pNext; - pc += m_iMSS; + pc += m_iBlockLen; } m_iSize += unitsize; HLOGC(bslog.Debug, - log << "CSndBuffer: BUFFER FULL - adding " << (unitsize * m_iMSS) << " bytes spread to " << unitsize + log << "CSndBuffer: BUFFER FULL - adding " << (unitsize * m_iBlockLen) << " bytes spread to " << unitsize << " blocks" << " (total size: " << m_iSize << " bytes)"); } diff --git a/srtcore/buffer.h b/srtcore/buffer.h index 3ac7b3e95..2dfa40880 100644 --- a/srtcore/buffer.h +++ b/srtcore/buffer.h @@ -155,7 +155,10 @@ class CSndBuffer // Currently just "unimplemented". std::string CONID() const { return ""; } - CSndBuffer(int size = 32, int mss = 1500); + /// @brief CSndBuffer constructor. + /// @param size initial number of blocks (each block to store one packet payload). + /// @param maxpld maximum packet payload. + CSndBuffer(int size = 32, int maxpld = 1500); ~CSndBuffer(); public: @@ -256,7 +259,7 @@ class CSndBuffer struct Block { char* m_pcData; // pointer to the data block - int m_iLength; // length of the block + int m_iLength; // payload length of the block. int32_t m_iMsgNoBitset; // message number int32_t m_iSeqNo; // sequence number for scheduling @@ -292,7 +295,7 @@ class CSndBuffer int32_t m_iNextMsgNo; // next message number int m_iSize; // buffer size (number of packets) - int m_iMSS; // maximum seqment/packet size + const int m_iBlockLen; // maximum length of a block holding packet payload (excluding packet header). int m_iCount; // number of used blocks int m_iBytesCount; // number of payload bytes in queue From 63f4ea1f8f98b4688d3c055519fe2d915aef7d4f Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Fri, 12 Aug 2022 17:02:59 +0200 Subject: [PATCH 2/3] Fixing CSndBuffer construction --- srtcore/buffer.cpp | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/srtcore/buffer.cpp b/srtcore/buffer.cpp index 67f77e972..54960061e 100644 --- a/srtcore/buffer.cpp +++ b/srtcore/buffer.cpp @@ -184,14 +184,19 @@ CSndBuffer::CSndBuffer(int size, int maxpld) // circular linked list for out bound packets m_pBlock = new Block; Block* pb = m_pBlock; - char* pc = m_pBuffer->m_pcData; - for (int i = 1; i < m_iSize; ++i) + char* pc = m_pBuffer->m_pcData; + + for (int i = 0; i < m_iSize; ++i) { - pb->m_pNext = new Block; pb->m_iMsgNoBitset = 0; pb->m_pcData = pc; pc += m_iBlockLen; - pb = pb->m_pNext; + + if (i < m_iSize - 1) + { + pb->m_pNext = new Block; + pb = pb->m_pNext; + } } pb->m_pNext = m_pBlock; From c9316f588d4f0c1cc209e6bfe1a26c15f7fabb22 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Fri, 12 Aug 2022 17:26:44 +0200 Subject: [PATCH 3/3] Fixed HLOG --- srtcore/buffer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/srtcore/buffer.cpp b/srtcore/buffer.cpp index 54960061e..9a01a7e0d 100644 --- a/srtcore/buffer.cpp +++ b/srtcore/buffer.cpp @@ -349,7 +349,7 @@ int CSndBuffer::addBufferFromFile(fstream& ifs, int len) while (iPktLen + m_iCount >= m_iSize) { HLOGC(bslog.Debug, - log << "addBufferFromFile: ... still lacking " << (siPktLenize + m_iCount - m_iSize) << " buffers..."); + log << "addBufferFromFile: ... still lacking " << (iPktLen + m_iCount - m_iSize) << " buffers..."); increase(); }