Skip to content

Commit

Permalink
[core] Fixed CRcvBuffer::dropMessage (#2657).
Browse files Browse the repository at this point in the history
Tell what to do with existing packets.
Fixed pkt seqno dropping loop range.
  • Loading branch information
maxsharabayko authored Feb 10, 2023
1 parent e9a3955 commit 3ffc93f
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 72 deletions.
65 changes: 35 additions & 30 deletions srtcore/buffer_rcv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,53 +254,52 @@ int CRcvBuffer::dropAll()
return dropUpTo(end_seqno);
}

int CRcvBuffer::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno)
int CRcvBuffer::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno, DropActionIfExists actionOnExisting)
{
IF_RCVBUF_DEBUG(ScopedLog scoped_log);
IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBuffer::dropMessage: seqnolo " << seqnolo << " seqnohi " << seqnohi << " m_iStartSeqNo " << m_iStartSeqNo);
const bool bKeepExisting = (actionOnExisting == KEEP_EXISTING);
// TODO: count bytes as removed?
const int end_pos = incPos(m_iStartPos, m_iMaxPosOff);
if (msgno > 0) // including SRT_MSGNO_NONE and SRT_MSGNO_CONTROL
int minDroppedOffset = -1;
int iDropCnt = 0;
if (msgno > 0) // excluding SRT_MSGNO_NONE and SRT_MSGNO_CONTROL
{
// First try to drop by message number in case the message starts earlier thtan @a seqnolo.
IF_RCVBUF_DEBUG(scoped_log.ss << " msgno " << msgno);
int minDroppedOffset = -1;
int iDropCnt = 0;
const int end_pos = incPos(m_iStartPos, m_iMaxPosOff);
for (int i = m_iStartPos; i != end_pos; i = incPos(i))
{
// TODO: Maybe check status?
// Can't drop is message number is not known.
// TODO: Maybe check entry status?
if (!m_entries[i].pUnit)
continue;

// TODO: Break the loop if a massege has been found. No need to search further.
const PacketBoundary bnd = packetAt(i).getMsgBoundary();
const int32_t msgseq = packetAt(i).getMsgSeq(m_bPeerRexmitFlag);
if (msgseq == msgno)
{
if (bKeepExisting && bnd == PB_SOLO)
{
LOGC(rbuflog.Debug, log << "CRcvBuffer.dropMessage(): Skipped dropping an exising SOLO message packet %"
<< packetAt(i).getSeqNo() << ".");
break;
}

++iDropCnt;
dropUnitInPos(i);
m_entries[i].status = EntryState_Drop;
if (minDroppedOffset == -1)
minDroppedOffset = offPos(m_iStartPos, i);

// Break the loop if the end of message has been found. No need to search further.
if (bnd == PB_LAST)
break;
}
}
IF_RCVBUF_DEBUG(scoped_log.ss << " iDropCnt " << iDropCnt);
// Check if units before m_iFirstNonreadPos are dropped.
bool needUpdateNonreadPos = (minDroppedOffset != -1 && minDroppedOffset <= getRcvDataSize());
releaseNextFillerEntries();
if (needUpdateNonreadPos)
{
m_iFirstNonreadPos = m_iStartPos;
updateNonreadPos();
}
if (!m_tsbpd.isEnabled() && m_bMessageAPI)
{
if (!checkFirstReadableOutOfOrder())
m_iFirstReadableOutOfOrder = -1;
updateFirstReadableOutOfOrder();
}
return iDropCnt;
}

// Drop by packet seqno range.
// Drop by packet seqno range to also wipe those packets that do not exist in the buffer.
const int offset_a = CSeqNo::seqoff(m_iStartSeqNo, seqnolo);
const int offset_b = CSeqNo::seqoff(m_iStartSeqNo, seqnohi);
if (offset_b < 0)
Expand All @@ -310,15 +309,21 @@ int CRcvBuffer::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno)
return 0;
}

const int end_pos = decPos(m_iStartPos); // Can potentially drop the whole buffer, even if it is empty.
const int start_off = max(0, offset_a);
const int last_pos = incPos(m_iStartPos, offset_b);
int minDroppedOffset = -1;
int iDropCnt = 0;
for (int i = incPos(m_iStartPos, start_off); i != end_pos && i != last_pos; i = incPos(i))
const int break_pos = incPos(m_iStartPos, offset_b + 1); // The position right after the last packet to drop.
for (int i = incPos(m_iStartPos, start_off); i != end_pos && i != break_pos; i = incPos(i))
{
// Don't drop messages, if all its packets are already in the buffer.
// TODO: Don't drop a several-packet message if all packets are in the buffer.
if (m_entries[i].pUnit && packetAt(i).getMsgBoundary() == PB_SOLO)
if (bKeepExisting && m_entries[i].pUnit && packetAt(i).getMsgBoundary() == PB_SOLO)
{
LOGC(rbuflog.Debug, log << "CRcvBuffer.dropMessage(): Skipped dropping an exising SOLO packet %" << packetAt(i).getSeqNo() << ".");
continue;
}

// Check if the unit was already dropped earlier.
if (m_entries[i].status == EntryState_Drop)
continue;

dropUnitInPos(i);
Expand All @@ -328,11 +333,11 @@ int CRcvBuffer::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno)
minDroppedOffset = offPos(m_iStartPos, i);
}

LOGC(rbuflog.Debug, log << "CRcvBuffer.dropMessage(): [" << seqnolo << "; "
HLOGC(rbuflog.Debug, log << "CRcvBuffer.dropMessage(): [" << seqnolo << "; "
<< seqnohi << "].");

// Check if units before m_iFirstNonreadPos are dropped.
bool needUpdateNonreadPos = (minDroppedOffset != -1 && minDroppedOffset <= getRcvDataSize());
const bool needUpdateNonreadPos = (minDroppedOffset != -1 && minDroppedOffset <= getRcvDataSize());
releaseNextFillerEntries();
if (needUpdateNonreadPos)
{
Expand Down
22 changes: 19 additions & 3 deletions srtcore/buffer_rcv.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,29 @@ class CRcvBuffer
/// @return the number of dropped packets.
int dropAll();

/// @brief Drop the whole message from the buffer.
enum DropActionIfExists {
DROP_EXISTING = 0,
KEEP_EXISTING = 1
};

/// @brief Drop a sequence of packets from the buffer.
/// If @a msgno is valid, sender has requested to drop the whole message by TTL. In this case it has to also provide a pkt seqno range.
/// However, if a message has been partially acknowledged and already removed from the SND buffer,
/// the @a seqnolo might specify some position in the middle of the message, not the very first packet.
/// If those packets have been acknowledged, they must exist in the receiver buffer unless already read.
/// In this case the @a msgno should be used to determine starting packets of the message.
/// Some packets of the message can be missing on the receiver, therefore the actual drop should still be performed by pkt seqno range.
/// If message number is 0 or SRT_MSGNO_NONE, then use sequence numbers to locate sequence range to drop [seqnolo, seqnohi].
/// When one packet of the message is in the range of dropping, the whole message is to be dropped.
/// A SOLO message packet can be kept depending on @a actionOnExisting value.
/// TODO: A message in general can be kept if all of its packets are in the buffer, depending on @a actionOnExisting value.
/// This is done to avoid dropping existing packet when the sender was asked to re-transmit a packet from an outdated loss report,
/// which is already not available in the SND buffer.
/// @param seqnolo sequence number of the first packet in the dropping range.
/// @param seqnohi sequence number of the last packet in the dropping range.
/// @param msgno message number to drop (0 if unknown)
/// @param actionOnExisting Should an exising SOLO packet be dropped from the buffer or preserved?
/// @return the number of packets actually dropped.
int dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno);
int dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno, DropActionIfExists actionOnExisting);

/// Read the whole message from one or several packets.
///
Expand Down Expand Up @@ -136,6 +151,7 @@ class CRcvBuffer
}

/// @brief Checks if the buffer has packets available for reading regardless of the TSBPD.
/// A message is available for reading only if all of its packets are present in the buffer.
/// @return true if there are packets available for reading, false otherwise.
bool hasAvailablePackets() const;

Expand Down
4 changes: 2 additions & 2 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8793,7 +8793,7 @@ void srt::CUDT::processCtrlDropReq(const CPacket& ctrlpkt)
{
const bool using_rexmit_flag = m_bPeerRexmitFlag;
ScopedLock rblock(m_RcvBufferLock);
const int iDropCnt = m_pRcvBuffer->dropMessage(dropdata[0], dropdata[1], ctrlpkt.getMsgSeq(using_rexmit_flag));
const int iDropCnt = m_pRcvBuffer->dropMessage(dropdata[0], dropdata[1], ctrlpkt.getMsgSeq(using_rexmit_flag), CRcvBuffer::KEEP_EXISTING);

if (iDropCnt > 0)
{
Expand Down Expand Up @@ -9895,7 +9895,7 @@ int srt::CUDT::handleSocketPacketReception(const vector<CUnit*>& incoming, bool&
// Drop the packet from the receiver buffer.
// The packet was added to the buffer based on the sequence number, therefore sequence number should be used to drop it from the buffer.
// A drawback is that it would prevent a valid packet with the same sequence number, if it happens to arrive later, to end up in the buffer.
const int iDropCnt = m_pRcvBuffer->dropMessage(u->m_Packet.getSeqNo(), u->m_Packet.getSeqNo(), SRT_MSGNO_NONE);
const int iDropCnt = m_pRcvBuffer->dropMessage(u->m_Packet.getSeqNo(), u->m_Packet.getSeqNo(), SRT_MSGNO_NONE, CRcvBuffer::DROP_EXISTING);

const steady_clock::time_point tnow = steady_clock::now();
ScopedLock lg(m_StatsLock);
Expand Down
Loading

0 comments on commit 3ffc93f

Please sign in to comment.