Skip to content

Commit

Permalink
[core] Fix CRcvBufferNew::dropMessage() (#2222)
Browse files Browse the repository at this point in the history
(revise buffer state after drop)
  • Loading branch information
gou4shi1 committed Jan 18, 2022
1 parent 24bf666 commit 8c05c70
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 6 deletions.
71 changes: 65 additions & 6 deletions srtcore/buffer_rcv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ void CRcvBufferNew::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno)
const int end_pos = incPos(m_iStartPos, m_iMaxPosInc);
if (msgno != 0)
{
int minDroppedOffset = -1;
for (int i = m_iStartPos; i != end_pos; i = incPos(i))
{
// TODO: Maybe check status?
Expand All @@ -215,11 +216,26 @@ void CRcvBufferNew::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno)
const int32_t msgseq = m_entries[i].pUnit->m_Packet.getMsgSeq(m_bPeerRexmitFlag);
if (msgseq == msgno)
{
releaseUnitInPos(i);
dropUnitInPos(i);
m_entries[i].status = EntryState_Drop;
if (minDroppedOffset == -1)
minDroppedOffset = offPos(m_iStartPos, i);
}
}

// Check if units before m_iFirstNonreadPos are dropped.
bool needUpdateNonreadPos = (minDroppedOffset != -1 && minDroppedOffset <= getRcvDataSize());
releaseNextFillerEntries();
if (needUpdateNonreadPos)
{
m_iFirstNonreadPos = m_iStartPos;
updateNonreadPos();
}
if (!m_tsbpd.isEnabled() && m_bMessageAPI)
{
if (!checkFirstReadableOutOfOrder())
m_iFirstReadableOutOfOrder = -1;
updateFirstReadableOutOfOrder();
}
return;
}

Expand All @@ -235,17 +251,32 @@ void CRcvBufferNew::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno)

const int start_off = max(0, offset_a);
const int last_pos = incPos(m_iStartPos, offset_b);
int minDroppedOffset = -1;
for (int i = incPos(m_iStartPos, start_off); i != end_pos && i != last_pos; i = incPos(i))
{
if (m_entries[i].pUnit)
{
releaseUnitInPos(i);
}
dropUnitInPos(i);
m_entries[i].status = EntryState_Drop;
if (minDroppedOffset == -1)
minDroppedOffset = offPos(m_iStartPos, i);
}

LOGC(rbuflog.Debug, log << "CRcvBufferNew.dropMessage(): [" << seqnolo << "; "
<< seqnohi << "].");

// Check if units before m_iFirstNonreadPos are dropped.
bool needUpdateNonreadPos = (minDroppedOffset != -1 && minDroppedOffset <= getRcvDataSize());
releaseNextFillerEntries();
if (needUpdateNonreadPos)
{
m_iFirstNonreadPos = m_iStartPos;
updateNonreadPos();
}
if (!m_tsbpd.isEnabled() && m_bMessageAPI)
{
if (!checkFirstReadableOutOfOrder())
m_iFirstReadableOutOfOrder = -1;
updateFirstReadableOutOfOrder();
}
}

int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl)
Expand Down Expand Up @@ -720,6 +751,34 @@ void CRcvBufferNew::onInsertNotInOrderPacket(int insertPos)
return;
}

bool CRcvBufferNew::checkFirstReadableOutOfOrder()
{
if (m_numOutOfOrderPackets <= 0 || m_iFirstReadableOutOfOrder < 0 || m_iMaxPosInc == 0)
return false;

const int endPos = incPos(m_iStartPos, m_iMaxPosInc);
int msgno = -1;
for (int pos = m_iFirstReadableOutOfOrder; pos != endPos; pos = incPos(pos))
{
if (!m_entries[pos].pUnit)
return false;

const CPacket& pkt = m_entries[pos].pUnit->m_Packet;
if (pkt.getMsgOrderFlag())
return false;

if (msgno == -1)
msgno = pkt.getMsgSeq(m_bPeerRexmitFlag);
else if (msgno != pkt.getMsgSeq(m_bPeerRexmitFlag))
return false;

if (pkt.getMsgBoundary() & PB_LAST)
return true;
}

return false;
}

void CRcvBufferNew::updateFirstReadableOutOfOrder()
{
if (hasReadableInorderPkts() || m_numOutOfOrderPackets <= 0 || m_iFirstReadableOutOfOrder >= 0)
Expand Down
3 changes: 3 additions & 0 deletions srtcore/buffer_rcv.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ class CRcvBufferNew
private:
inline int incPos(int pos, int inc = 1) const { return (pos + inc) % m_szSize; }
inline int decPos(int pos) const { return (pos - 1) >= 0 ? (pos - 1) : int(m_szSize - 1); }
inline int offPos(int pos1, int pos2) const { return (pos2 >= pos1) ? (pos2 - pos1) : (m_szSize + pos2 - pos1); }

private:
void countBytes(int pkts, int bytes);
Expand All @@ -237,6 +238,8 @@ class CRcvBufferNew

/// Scan for availability of out of order packets.
void onInsertNotInOrderPacket(int insertpos);
// Check if m_iFirstReadableOutOfOrder is still readable.
bool checkFirstReadableOutOfOrder();
void updateFirstReadableOutOfOrder();
int scanNotInOrderMessageRight(int startPos, int msgNo) const;
int scanNotInOrderMessageLeft(int startPos, int msgNo) const;
Expand Down

0 comments on commit 8c05c70

Please sign in to comment.