From 841534f502d4b644759200c783c39fabc184fa0e Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Thu, 22 Jul 2021 17:18:57 +0200 Subject: [PATCH] Calling updateNonreadPos in readMessage is not needed and affects performance --- srtcore/buffer_rcv.cpp | 59 ++++++++++++++++++++++++++++++++++++++---- test/test_buffer2.cpp | 2 -- 2 files changed, 54 insertions(+), 7 deletions(-) diff --git a/srtcore/buffer_rcv.cpp b/srtcore/buffer_rcv.cpp index af8af12788..1316226718 100644 --- a/srtcore/buffer_rcv.cpp +++ b/srtcore/buffer_rcv.cpp @@ -16,6 +16,22 @@ using namespace std; namespace srt { +namespace { + struct ScopedLog + { + ScopedLog() {}; + + ~ScopedLog() + { + LOGC(rbuflog.Warn, log << ss.str()); + } + + stringstream ss; + }; + +#define IF_RCVBUF_DEBUG(instr) (void)0 +} + /* * RcvBufferNew (circular buffer): @@ -77,11 +93,20 @@ int CRcvBufferNew::insert(CUnit* unit) const int32_t seqno = unit->m_Packet.getSeqNo(); const int offset = CSeqNo::seqoff(m_iStartSeqNo, seqno); + IF_RCVBUF_DEBUG(ScopedLog scoped_log); + IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBufferNew::insert: seqno " << seqno << " m_iStartSeqNo " << m_iStartSeqNo << " offset " << offset); + if (offset < 0) + { + IF_RCVBUF_DEBUG(scoped_log.ss << " returns -2"); return -2; + } - if (offset >= (int) capacity()) + if (offset >= (int)capacity()) + { + IF_RCVBUF_DEBUG(scoped_log.ss << " returns -3"); return -3; + } // If >= 2, then probably there is a long gap, and buffer needs to be reset. SRT_ASSERT((m_iStartPos + offset) / m_szSize < 2); @@ -93,7 +118,10 @@ int CRcvBufferNew::insert(CUnit* unit) // Packet already exists SRT_ASSERT(pos >= 0 && pos < m_szSize); if (m_entries[pos].status != EntryState_Empty) + { + IF_RCVBUF_DEBUG(scoped_log.ss << " returns -1"); return -1; + } SRT_ASSERT(m_entries[pos].pUnit == NULL); m_pUnitQueue->makeUnitGood(unit); @@ -110,6 +138,7 @@ int CRcvBufferNew::insert(CUnit* unit) } updateNonreadPos(); + IF_RCVBUF_DEBUG(scoped_log.ss << " returns 0 (OK)"); return 0; } @@ -119,10 +148,16 @@ void CRcvBufferNew::dropUpTo(int32_t seqno) // first unacknowledged packet is missing. SRT_ASSERT(m_iStartPos == m_iFirstNonreadPos); + IF_RCVBUF_DEBUG(ScopedLog scoped_log); + IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBufferNew::dropUpTo: seqno " << seqno << " m_iStartSeqNo " << m_iStartSeqNo); + int len = CSeqNo::seqoff(m_iStartSeqNo, seqno); SRT_ASSERT(len > 0); if (len <= 0) + { + IF_RCVBUF_DEBUG(scoped_log.ss << ". Nothing to drop."); return; + } /*LOGC(rbuflog.Warn, log << "CRcvBufferNew.dropUpTo(): seqno=" << seqno << ", pkts=" << len << ". Buffer start " << m_iStartSeqNo << ".");*/ @@ -162,6 +197,8 @@ void CRcvBufferNew::dropUpTo(int32_t seqno) void CRcvBufferNew::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno) { + IF_RCVBUF_DEBUG(ScopedLog scoped_log); + IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBufferNew::dropMessage: seqnolo " << seqnolo << " seqnohi " << seqnohi << " m_iStartSeqNo " << m_iStartSeqNo); // TODO: count bytes as removed? const int end_pos = incPos(m_iStartPos, m_iMaxPosInc); if (msgno != 0) @@ -217,6 +254,9 @@ int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl) return -1; } + IF_RCVBUF_DEBUG(ScopedLog scoped_log); + IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBufferNew::readMessage. m_iStartSeqNo " << m_iStartSeqNo); + const int readPos = canReadInOrder ? m_iStartPos : m_iFirstReadableOutOfOrder; size_t remain = len; @@ -289,8 +329,13 @@ int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl) updateFirstReadableOutOfOrder(); releaseNextFillerEntries(); - m_iFirstNonreadPos = m_iStartPos; - updateNonreadPos(); + + if (!canReadInOrder) + { + // This heavily impacts performance! + m_iFirstNonreadPos = m_iStartPos; + updateNonreadPos(); + } return (dst - data); } @@ -327,7 +372,11 @@ int CRcvBufferNew::readBufferTo(int len, int iFirstUnackSeqNo, copy_to_dst_f fun return 0; int p = m_iStartPos; - const int iNumAvail = CSeqNo::seqlen(m_iStartSeqNo, iFirstUnackSeqNo); + const int iNumAvail = CSeqNo::seqlen(m_iStartSeqNo, iFirstUnackSeqNo) - 1; + if (iNumAvail > m_iMaxPosInc) + { + LOGC(rbuflog.Error, log << "readBufferTo: IPE: iNumAvail " << iNumAvail << " iMaxPosInc " << m_iMaxPosInc); + } SRT_ASSERT(iNumAvail <= m_iMaxPosInc); const int end_pos = incPos(m_iStartPos, iNumAvail); @@ -341,7 +390,7 @@ int CRcvBufferNew::readBufferTo(int len, int iFirstUnackSeqNo, copy_to_dst_f fun if (!m_entries[p].pUnit) { p = incPos(p); - LOGC(rbuflog.Error, log << "readBufferToFile: IPE: NULL unit found in file transmission"); + LOGC(rbuflog.Error, log << "readBufferTo: IPE: NULL unit found in file transmission"); continue; } diff --git a/test/test_buffer2.cpp b/test/test_buffer2.cpp index 325112b24b..13fa21ae64 100644 --- a/test/test_buffer2.cpp +++ b/test/test_buffer2.cpp @@ -120,7 +120,6 @@ TEST(CRcvBufferNew, OneMessageInSeveralPackets) { CUnit* unit = unit_queue.getNextAvailUnit(); EXPECT_NE(unit, nullptr); - unit->m_iFlag = CUnit::GOOD; CPacket& packet = unit->m_Packet; packet.setLength(payload_size); packet.m_iSeqNo = initial_seqno + i; @@ -169,7 +168,6 @@ TEST(CRcvBufferNew, MessageOutOfOrder) { CUnit* unit = unit_queue.getNextAvailUnit(); EXPECT_NE(unit, nullptr); - unit->m_iFlag = CUnit::GOOD; CPacket& packet = unit->m_Packet; packet.setLength(payload_size); packet.m_iSeqNo = initial_seqno + message_len_in_pkts + i;