Skip to content

Commit

Permalink
[core] drop packets in the new recv buffer by group recv base
Browse files Browse the repository at this point in the history
  • Loading branch information
gou4shi1 committed Dec 30, 2021
1 parent 3d26644 commit 922f8fc
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 15 deletions.
23 changes: 8 additions & 15 deletions srtcore/buffer_rcv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,10 @@ int CRcvBufferNew::insert(CUnit* unit)

int CRcvBufferNew::dropUpTo(int32_t seqno)
{
// Can drop only when nothing to read, and
// first unacknowledged packet is missing.
SRT_ASSERT(m_iStartPos == m_iFirstNonreadPos);

IF_RCVBUF_DEBUG(ScopedLog scoped_log);
IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBufferNew::dropUpTo: seqno " << seqno << " m_iStartSeqNo " << m_iStartSeqNo);

int len = CSeqNo::seqoff(m_iStartSeqNo, seqno);
SRT_ASSERT(len > 0);
if (len <= 0)
{
IF_RCVBUF_DEBUG(scoped_log.ss << ". Nothing to drop.");
Expand All @@ -180,34 +175,32 @@ int CRcvBufferNew::dropUpTo(int32_t seqno)
if (m_iMaxPosInc < 0)
m_iMaxPosInc = 0;

// Check that all packets being dropped are missing.
const int iDropCnt = len;
while (len > 0)
{
if (m_entries[m_iStartPos].pUnit != NULL)
CUnit* pUnit = m_entries[m_iStartPos].pUnit;
if (pUnit != NULL)
{
if (!m_tsbpd.isEnabled() && m_bMessageAPI && !pUnit->m_Packet.getMsgOrderFlag())
--m_numOutOfOrderPackets;
releaseUnitInPos(m_iStartPos);
}

if (m_entries[m_iStartPos].status != EntryState_Empty)
{
SRT_ASSERT(m_entries[m_iStartPos].status == EntryState_Drop || m_entries[m_iStartPos].status == EntryState_Read);
m_entries[m_iStartPos].status = EntryState_Empty;
}

m_entries[m_iStartPos].status = EntryState_Empty;
SRT_ASSERT(m_entries[m_iStartPos].pUnit == NULL && m_entries[m_iStartPos].status == EntryState_Empty);
m_iStartPos = incPos(m_iStartPos);
--len;
}

// Update positions
m_iStartSeqNo = seqno;
// Move forward if there are "read" entries.
// Move forward if there are "read/drop" entries.
releaseNextFillerEntries();
// Set nonread position to the starting position before updating,
// because start position was increased, and preceeding packets are invalid.
m_iFirstNonreadPos = m_iStartPos;
updateNonreadPos();
if (!m_tsbpd.isEnabled() && m_bMessageAPI)
updateFirstReadableOutOfOrder();
return iDropCnt;
}

Expand Down
34 changes: 34 additions & 0 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7706,6 +7706,36 @@ int32_t srt::CUDT::ackDataUpTo(int32_t ack)
#endif
}

#if ENABLE_EXPERIMENTAL_BONDING && ENABLE_NEW_RCVBUFFER
void srt::CUDT::dropToGroupRecvBase() {
int32_t group_recv_base = SRT_SEQNO_NONE;
if (m_parent->m_GroupOf)
{
// Check is first done before locking to avoid unnecessary
// mutex locking. The condition for this field is that it
// can be either never set, already reset, or ever set
// and possibly dangling. The re-check after lock eliminates
// the dangling case.
ScopedLock glock (uglobal().m_GlobControlLock);

// Note that getRcvBaseSeqNo() will lock m_GroupOf->m_GroupLock,
// but this is an intended order.
if (m_parent->m_GroupOf)
group_recv_base = m_parent->m_GroupOf->getRcvBaseSeqNo();
}
if (group_recv_base == SRT_SEQNO_NONE)
return;

ScopedLock lck(m_RcvBufferLock);
dropTooLateUpTo(CSeqNo::incseq(group_recv_base));
if (CSeqNo::seqcmp(group_recv_base, m_iRcvCurrSeqNo) > 0)
{
LOGC(xtlog.Note, log << "dropToGroupRecvBase: m_iRcvCurrSeqNo: %" << m_iRcvCurrSeqNo << " -> %" << group_recv_base );
m_iRcvCurrSeqNo = group_recv_base;
}
}
#endif

namespace srt {
#if ENABLE_HEAVY_LOGGING
static void DebugAck(string hdr, int prev, int ack)
Expand Down Expand Up @@ -7901,6 +7931,10 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)
string reason = "first lost"; // just for "a reason" of giving particular % for ACK
#endif

#if ENABLE_EXPERIMENTAL_BONDING && ENABLE_NEW_RCVBUFFER
dropToGroupRecvBase();
#endif

{
// If there is no loss, the ACK is the current largest sequence number plus 1;
// Otherwise it is the smallest sequence number in the receiver loss list.
Expand Down
7 changes: 7 additions & 0 deletions srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -1060,6 +1060,13 @@ class CUDT
/// @return
int32_t ackDataUpTo(int32_t seq);

#if ENABLE_EXPERIMENTAL_BONDING && ENABLE_NEW_RCVBUFFER
/// @brief Drop packets in the recv buffer behind group_recv_base.
/// Updates m_iRcvLastSkipAck if it's behind group_recv_base.
/// Updates m_iRcvCurrSeqNo if it's behind group_recv_base.
void dropToGroupRecvBase();
#endif

void handleKeepalive(const char* data, size_t lenghth);

/// Locks m_RcvBufferLock and retrieves the available size of the receiver buffer.
Expand Down

0 comments on commit 922f8fc

Please sign in to comment.