Skip to content

Commit

Permalink
Calling updateNonreadPos in readMessage is not needed and affects per…
Browse files Browse the repository at this point in the history
…formance
  • Loading branch information
maxsharabayko committed Jul 22, 2021
1 parent 4447d9c commit 841534f
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 7 deletions.
59 changes: 54 additions & 5 deletions srtcore/buffer_rcv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,22 @@ using namespace std;

namespace srt {

namespace {
struct ScopedLog
{
ScopedLog() {};

~ScopedLog()
{
LOGC(rbuflog.Warn, log << ss.str());
}

stringstream ss;
};

#define IF_RCVBUF_DEBUG(instr) (void)0
}


/*
* RcvBufferNew (circular buffer):
Expand Down Expand Up @@ -77,11 +93,20 @@ int CRcvBufferNew::insert(CUnit* unit)
const int32_t seqno = unit->m_Packet.getSeqNo();
const int offset = CSeqNo::seqoff(m_iStartSeqNo, seqno);

IF_RCVBUF_DEBUG(ScopedLog scoped_log);
IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBufferNew::insert: seqno " << seqno << " m_iStartSeqNo " << m_iStartSeqNo << " offset " << offset);

if (offset < 0)
{
IF_RCVBUF_DEBUG(scoped_log.ss << " returns -2");
return -2;
}

if (offset >= (int) capacity())
if (offset >= (int)capacity())
{
IF_RCVBUF_DEBUG(scoped_log.ss << " returns -3");
return -3;
}

// If >= 2, then probably there is a long gap, and buffer needs to be reset.
SRT_ASSERT((m_iStartPos + offset) / m_szSize < 2);
Expand All @@ -93,7 +118,10 @@ int CRcvBufferNew::insert(CUnit* unit)
// Packet already exists
SRT_ASSERT(pos >= 0 && pos < m_szSize);
if (m_entries[pos].status != EntryState_Empty)
{
IF_RCVBUF_DEBUG(scoped_log.ss << " returns -1");
return -1;
}
SRT_ASSERT(m_entries[pos].pUnit == NULL);

m_pUnitQueue->makeUnitGood(unit);
Expand All @@ -110,6 +138,7 @@ int CRcvBufferNew::insert(CUnit* unit)
}

updateNonreadPos();
IF_RCVBUF_DEBUG(scoped_log.ss << " returns 0 (OK)");
return 0;
}

Expand All @@ -119,10 +148,16 @@ void CRcvBufferNew::dropUpTo(int32_t seqno)
// first unacknowledged packet is missing.
SRT_ASSERT(m_iStartPos == m_iFirstNonreadPos);

IF_RCVBUF_DEBUG(ScopedLog scoped_log);
IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBufferNew::dropUpTo: seqno " << seqno << " m_iStartSeqNo " << m_iStartSeqNo);

int len = CSeqNo::seqoff(m_iStartSeqNo, seqno);
SRT_ASSERT(len > 0);
if (len <= 0)
{
IF_RCVBUF_DEBUG(scoped_log.ss << ". Nothing to drop.");
return;
}

/*LOGC(rbuflog.Warn, log << "CRcvBufferNew.dropUpTo(): seqno=" << seqno << ", pkts=" << len
<< ". Buffer start " << m_iStartSeqNo << ".");*/
Expand Down Expand Up @@ -162,6 +197,8 @@ void CRcvBufferNew::dropUpTo(int32_t seqno)

void CRcvBufferNew::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno)
{
IF_RCVBUF_DEBUG(ScopedLog scoped_log);
IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBufferNew::dropMessage: seqnolo " << seqnolo << " seqnohi " << seqnohi << " m_iStartSeqNo " << m_iStartSeqNo);
// TODO: count bytes as removed?
const int end_pos = incPos(m_iStartPos, m_iMaxPosInc);
if (msgno != 0)
Expand Down Expand Up @@ -217,6 +254,9 @@ int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl)
return -1;
}

IF_RCVBUF_DEBUG(ScopedLog scoped_log);
IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBufferNew::readMessage. m_iStartSeqNo " << m_iStartSeqNo);

const int readPos = canReadInOrder ? m_iStartPos : m_iFirstReadableOutOfOrder;

size_t remain = len;
Expand Down Expand Up @@ -289,8 +329,13 @@ int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl)
updateFirstReadableOutOfOrder();

releaseNextFillerEntries();
m_iFirstNonreadPos = m_iStartPos;
updateNonreadPos();

if (!canReadInOrder)
{
// This heavily impacts performance!
m_iFirstNonreadPos = m_iStartPos;
updateNonreadPos();
}

return (dst - data);
}
Expand Down Expand Up @@ -327,7 +372,11 @@ int CRcvBufferNew::readBufferTo(int len, int iFirstUnackSeqNo, copy_to_dst_f fun
return 0;

int p = m_iStartPos;
const int iNumAvail = CSeqNo::seqlen(m_iStartSeqNo, iFirstUnackSeqNo);
const int iNumAvail = CSeqNo::seqlen(m_iStartSeqNo, iFirstUnackSeqNo) - 1;
if (iNumAvail > m_iMaxPosInc)
{
LOGC(rbuflog.Error, log << "readBufferTo: IPE: iNumAvail " << iNumAvail << " iMaxPosInc " << m_iMaxPosInc);
}

SRT_ASSERT(iNumAvail <= m_iMaxPosInc);
const int end_pos = incPos(m_iStartPos, iNumAvail);
Expand All @@ -341,7 +390,7 @@ int CRcvBufferNew::readBufferTo(int len, int iFirstUnackSeqNo, copy_to_dst_f fun
if (!m_entries[p].pUnit)
{
p = incPos(p);
LOGC(rbuflog.Error, log << "readBufferToFile: IPE: NULL unit found in file transmission");
LOGC(rbuflog.Error, log << "readBufferTo: IPE: NULL unit found in file transmission");
continue;
}

Expand Down
2 changes: 0 additions & 2 deletions test/test_buffer2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ TEST(CRcvBufferNew, OneMessageInSeveralPackets)
{
CUnit* unit = unit_queue.getNextAvailUnit();
EXPECT_NE(unit, nullptr);
unit->m_iFlag = CUnit::GOOD;
CPacket& packet = unit->m_Packet;
packet.setLength(payload_size);
packet.m_iSeqNo = initial_seqno + i;
Expand Down Expand Up @@ -169,7 +168,6 @@ TEST(CRcvBufferNew, MessageOutOfOrder)
{
CUnit* unit = unit_queue.getNextAvailUnit();
EXPECT_NE(unit, nullptr);
unit->m_iFlag = CUnit::GOOD;
CPacket& packet = unit->m_Packet;
packet.setLength(payload_size);
packet.m_iSeqNo = initial_seqno + message_len_in_pkts + i;
Expand Down

0 comments on commit 841534f

Please sign in to comment.