Skip to content

Commit

Permalink
[core] refactor Group::recv() base on new rcv buffer to support messa…
Browse files Browse the repository at this point in the history
…ge mode
  • Loading branch information
gou4shi1 authored and maxsharabayko committed Feb 7, 2022
1 parent ac854f2 commit 8f68f61
Show file tree
Hide file tree
Showing 3 changed files with 232 additions and 5 deletions.
47 changes: 42 additions & 5 deletions srtcore/buffer_rcv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,6 @@ int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl)
IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBufferNew::readMessage. m_iStartSeqNo " << m_iStartSeqNo);

const int readPos = canReadInOrder ? m_iStartPos : m_iFirstReadableOutOfOrder;
// Remember if we actually read out of order packet.
const bool readingOutOfOrderPacket = !canReadInOrder || m_iStartPos == m_iFirstReadableOutOfOrder;

size_t remain = len;
char* dst = data;
Expand Down Expand Up @@ -331,13 +329,14 @@ int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl)
const bool pbLast = packet.getMsgBoundary() & PB_LAST;
if (msgctrl && (packet.getMsgBoundary() & PB_FIRST))
{
msgctrl->pktseq = pktseqno;
msgctrl->msgno = packet.getMsgSeq(m_bPeerRexmitFlag);
}
if (msgctrl && pbLast)
{
msgctrl->srctime = count_microseconds(getPktTsbPdTime(packet.getMsgTimeStamp()).time_since_epoch());
}
if (msgctrl)
msgctrl->pktseq = pktseqno;

releaseUnitInPos(i);
if (updateStartPos)
Expand All @@ -362,8 +361,6 @@ int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl)
}

countBytes(-pkts_read, -bytes_extracted);
if (!m_tsbpd.isEnabled() && readingOutOfOrderPacket)
updateFirstReadableOutOfOrder();

releaseNextFillerEntries();

Expand All @@ -373,6 +370,11 @@ int CRcvBufferNew::readMessage(char* data, size_t len, SRT_MSGCTRL* msgctrl)
//updateNonreadPos();
}

if (!m_tsbpd.isEnabled())
// We need updateFirstReadableOutOfOrder() here even if we are reading inorder,
// incase readable inorder packets are all read out.
updateFirstReadableOutOfOrder();

const int bytes_read = dst - data;
if (bytes_read < bytes_extracted)
{
Expand Down Expand Up @@ -606,6 +608,41 @@ bool CRcvBufferNew::isRcvDataReady(time_point time_now) const
return info.tsbpd_time <= time_now;
}

CRcvBufferNew::PacketInfo CRcvBufferNew::getFirstReadablePacketInfo(time_point time_now) const
{
const PacketInfo unreadableInfo = {SRT_SEQNO_NONE, false, time_point()};
const bool hasInorderPackets = hasReadableInorderPkts();

if (!m_tsbpd.isEnabled())
{
if (hasInorderPackets)
{
const CPacket& packet = m_entries[m_iStartPos].pUnit->m_Packet;
const PacketInfo info = {packet.getSeqNo(), false, time_point()};
return info;
}
SRT_ASSERT((!m_bMessageAPI && m_numOutOfOrderPackets == 0) || m_bMessageAPI);
if (m_iFirstReadableOutOfOrder >= 0)
{
SRT_ASSERT(m_numOutOfOrderPackets > 0);
const CPacket& packet = m_entries[m_iFirstReadableOutOfOrder].pUnit->m_Packet;
const PacketInfo info = {packet.getSeqNo(), true, time_point()};
return info;
}
return unreadableInfo;
}

if (!hasInorderPackets)
return unreadableInfo;

const PacketInfo info = getFirstValidPacketInfo();

if (info.tsbpd_time <= time_now)
return info;
else
return unreadableInfo;
}

void CRcvBufferNew::countBytes(int pkts, int bytes)
{
ScopedLock lock(m_BytesCountLock);
Expand Down
2 changes: 2 additions & 0 deletions srtcore/buffer_rcv.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ class CRcvBufferNew
/// IF skipseqno == -1, no missing packet but 1st not ready to play.
PacketInfo getFirstValidPacketInfo() const;

PacketInfo getFirstReadablePacketInfo(time_point time_now) const;

/// Get information on packets available to be read.
/// @returns a pair of sequence numbers (first available; first unavailable).
///
Expand Down
188 changes: 188 additions & 0 deletions srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2195,6 +2195,193 @@ static bool isValidSeqno(int32_t iBaseSeqno, int32_t iPktSeqno)
return false;
}

#ifdef ENABLE_NEW_RCVBUFFER
int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc)
{
// First, acquire GlobControlLock to make sure all member sockets still exist
enterCS(m_Global.m_GlobControlLock);
ScopedLock guard(m_GroupLock);

if (m_bClosing)
{
// The group could be set closing in the meantime, but if
// this is only about to be set by another thread, this thread
// must fist wait for being able to acquire this lock.
// The group will not be deleted now because it is added usage counter
// by this call, but will be released once it exits.
leaveCS(m_Global.m_GlobControlLock);
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
}

// Now, still under lock, check if all sockets still can be dispatched
send_CheckValidSockets();
leaveCS(m_Global.m_GlobControlLock);

if (m_bClosing)
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);

// Later iteration over it might be less efficient than
// by vector, but we'll also often try to check a single id
// if it was ever seen broken, so that it's skipped.
set<CUDTSocket*> broken;

for (;;)
{
if (!m_bOpened || !m_bConnected)
{
LOGC(grlog.Error,
log << boolalpha << "grp/recv: $" << id() << ": ABANDONING: opened=" << m_bOpened
<< " connected=" << m_bConnected);
throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);
}

vector<CUDTSocket*> aliveMembers;
recv_CollectAliveAndBroken(aliveMembers, broken);
if (aliveMembers.empty())
{
LOGC(grlog.Error, log << "grp/recv: ALL LINKS BROKEN, ABANDONING.");
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false);
throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);
}

vector<CUDTSocket*> readySockets;
if (m_bSynRecving)
readySockets = recv_WaitForReadReady(aliveMembers, broken);
else
readySockets = aliveMembers;

if (m_bClosing)
{
HLOGC(grlog.Debug, log << "grp/recv: $" << id() << ": GROUP CLOSED, ABANDONING.");
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
}

// Find the first readable packet among all member sockets.
CUDTSocket* socketToRead = NULL;
CRcvBufferNew::PacketInfo infoToRead = {-1, false, time_point()};
for (vector<CUDTSocket*>::const_iterator si = readySockets.begin(); si != readySockets.end(); ++si)
{
CUDTSocket* ps = *si;

ScopedLock lg(ps->core().m_RcvBufferLock);
if (m_RcvBaseSeqNo != SRT_SEQNO_NONE)
{
// Drop here to make sure the getFirstReadablePacketInfo() below return fresher packet.
int cnt = ps->core().dropTooLateUpTo(CSeqNo::incseq(m_RcvBaseSeqNo));
if (cnt > 0)
{
HLOGC(grlog.Debug,
log << "grp/recv: $" << id() << ": @" << ps->m_SocketID << ": dropped " << cnt
<< " packets before reading: m_RcvBaseSeqNo=" << m_RcvBaseSeqNo);
}
}

const CRcvBufferNew::PacketInfo info =
ps->core().m_pRcvBuffer->getFirstReadablePacketInfo(steady_clock::now());
if (info.seqno == SRT_SEQNO_NONE)
{
HLOGC(grlog.Debug, log << "grp/recv: $" << id() << ": @" << ps->m_SocketID << ": Nothing to read.");
continue;
}
// We need to qualify the sequence, just for a case.
if (m_RcvBaseSeqNo != SRT_SEQNO_NONE && !isValidSeqno(m_RcvBaseSeqNo, info.seqno))
{
LOGC(grlog.Error,
log << "grp/recv: $" << id() << ": @" << ps->m_SocketID << ": SEQUENCE DISCREPANCY: base=%"
<< m_RcvBaseSeqNo << " vs pkt=%" << info.seqno << ", setting ESECFAIL");
ps->core().m_bBroken = true;
broken.insert(ps);
continue;
}
if (socketToRead == NULL || CSeqNo::seqcmp(info.seqno, infoToRead.seqno) < 0)
{
socketToRead = ps;
infoToRead = info;
}
}

if (socketToRead == NULL)
{
if (m_bSynRecving)
{
HLOGC(grlog.Debug,
log << "grp/recv: $" << id() << ": No links reported any fresher packet, re-polling.");
continue;
}
else
{
HLOGC(grlog.Debug,
log << "grp/recv: $" << id() << ": No links reported any fresher packet, clearing readiness.");
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false);
throw CUDTException(MJ_AGAIN, MN_RDAVAIL, 0);
}
}
else
{
HLOGC(grlog.Debug,
log << "grp/recv: $" << id() << ": Found first readable packet from @" << socketToRead->m_SocketID
<< ": seq=" << infoToRead.seqno << " gap=" << infoToRead.seq_gap
<< " time=" << FormatTime(infoToRead.tsbpd_time));
}

const int res = socketToRead->core().receiveMessage((buf), len, (w_mc), CUDTUnited::ERH_RETURN);
HLOGC(grlog.Debug,
log << "grp/recv: $" << id() << ": @" << socketToRead->m_SocketID << ": Extracted data with %"
<< w_mc.pktseq << " #" << w_mc.msgno << ": " << (res <= 0 ? "(NOTHING)" : BufferStamp(buf, res)));
if (res == 0)
{
LOGC(grlog.Warn,
log << "grp/recv: $" << id() << ": @" << socketToRead->m_SocketID << ": Retrying next socket...");
// This socket will not be socketToRead in the next turn because receiveMessage() return 0 here.
continue;
}
if (res == SRT_ERROR)
{
LOGC(grlog.Warn,
log << "grp/recv: $" << id() << ": @" << socketToRead->m_SocketID << ": " << srt_getlasterror_str()
<< ". Retrying next socket...");
broken.insert(socketToRead);
continue;
}
fillGroupData((w_mc), w_mc);

HLOGC(grlog.Debug,
log << "grp/recv: $" << id() << ": Update m_RcvBaseSeqNo: %" << m_RcvBaseSeqNo << " -> %" << w_mc.pktseq);
m_RcvBaseSeqNo = w_mc.pktseq;

// Update stats as per delivery
m_stats.recv.count(res);
updateAvgPayloadSize(res);

for (vector<CUDTSocket*>::const_iterator si = aliveMembers.begin(); si != aliveMembers.end(); ++si)
{
CUDTSocket* ps = *si;
ScopedLock lg(ps->core().m_RcvBufferLock);
if (m_RcvBaseSeqNo != SRT_SEQNO_NONE)
{
int cnt = ps->core().dropTooLateUpTo(CSeqNo::incseq(m_RcvBaseSeqNo));
if (cnt > 0)
{
HLOGC(grlog.Debug,
log << "grp/recv: $" << id() << ": @" << ps->m_SocketID << ": dropped " << cnt
<< " packets after reading: m_RcvBaseSeqNo=" << m_RcvBaseSeqNo);
}
}
}
for (vector<CUDTSocket*>::const_iterator si = aliveMembers.begin(); si != aliveMembers.end(); ++si)
{
CUDTSocket* ps = *si;
if (!ps->core().isRcvBufferReady())
m_Global.m_EPoll.update_events(ps->m_SocketID, ps->core().m_sPollID, SRT_EPOLL_IN, false);
}

return res;
}
LOGC(grlog.Error, log << "grp/recv: UNEXPECTED RUN PATH, ABANDONING.");
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false);
throw CUDTException(MJ_AGAIN, MN_RDAVAIL, 0);
}
#else
// The "app reader" version of the reading function.
// This reads the packets from every socket treating them as independent
// and prepared to work with the application. Then packets are sorted out
Expand Down Expand Up @@ -2731,6 +2918,7 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc)
}
}
}
#endif

// [[using locked(m_GroupLock)]]
CUDTGroup::ReadPos* CUDTGroup::checkPacketAhead()
Expand Down

0 comments on commit 8f68f61

Please sign in to comment.