Skip to content

Commit

Permalink
Merge branch 'master' into dev-fix-connectionlock-inversion
Browse files Browse the repository at this point in the history
  • Loading branch information
Mikołaj Małecki committed Aug 7, 2023
2 parents 8a154d0 + 88ca9cc commit 2483164
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 66 deletions.
12 changes: 8 additions & 4 deletions srtcore/buffer_rcv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,14 @@ int CRcvBuffer::dropUpTo(int32_t seqno)
m_iStartSeqNo = seqno;
// Move forward if there are "read/drop" entries.
releaseNextFillerEntries();
// Set nonread position to the starting position before updating,
// because start position was increased, and preceding packets are invalid.
m_iFirstNonreadPos = m_iStartPos;
updateNonreadPos();

// If the nonread position is now behind the starting position, set it to the starting position and update.
// Preceding packets were likely missing, and the non read position can probably be moved further now.
if (CSeqNo::seqcmp(m_iFirstNonreadPos, m_iStartPos) < 0)
{
m_iFirstNonreadPos = m_iStartPos;
updateNonreadPos();
}
if (!m_tsbpd.isEnabled() && m_bMessageAPI)
updateFirstReadableOutOfOrder();
return iDropCnt;
Expand Down
6 changes: 5 additions & 1 deletion srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6813,6 +6813,11 @@ bool srt::CUDT::isRcvBufferReady() const
return m_pRcvBuffer->isRcvDataReady(steady_clock::now());
}

bool srt::CUDT::isRcvBufferReadyNoLock() const
{
return m_pRcvBuffer->isRcvDataReady(steady_clock::now());
}

// int by_exception: accepts values of CUDTUnited::ErrorHandling:
// - 0 - by return value
// - 1 - by exception
Expand Down Expand Up @@ -6857,7 +6862,6 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_
? m_pRcvBuffer->readMessage(data, len, &w_mctrl)
: 0;
leaveCS(m_RcvBufferLock);
w_mctrl.srctime = 0;

// Kick TsbPd thread to schedule next wakeup (if running)
if (m_bTsbPd)
Expand Down
3 changes: 3 additions & 0 deletions srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,9 @@ class CUDT
SRT_ATTR_EXCLUDES(m_RcvBufferLock)
bool isRcvBufferReady() const;

SRT_ATTR_REQUIRES2(m_RcvBufferLock)
bool isRcvBufferReadyNoLock() const;

// TSBPD thread main function.
static void* tsbpd(void* param);

Expand Down
53 changes: 9 additions & 44 deletions srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2295,73 +2295,38 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc)
m_stats.recv.count(res);
updateAvgPayloadSize(res);

bool canReadFurther = false;
for (vector<CUDTSocket*>::const_iterator si = aliveMembers.begin(); si != aliveMembers.end(); ++si)
{
CUDTSocket* ps = *si;
ScopedLock lg(ps->core().m_RcvBufferLock);
if (m_RcvBaseSeqNo != SRT_SEQNO_NONE)
{
int cnt = ps->core().rcvDropTooLateUpTo(CSeqNo::incseq(m_RcvBaseSeqNo));
const int cnt = ps->core().rcvDropTooLateUpTo(CSeqNo::incseq(m_RcvBaseSeqNo));
if (cnt > 0)
{
HLOGC(grlog.Debug,
log << "grp/recv: $" << id() << ": @" << ps->m_SocketID << ": dropped " << cnt
<< " packets after reading: m_RcvBaseSeqNo=" << m_RcvBaseSeqNo);
}
}
}
for (vector<CUDTSocket*>::const_iterator si = aliveMembers.begin(); si != aliveMembers.end(); ++si)
{
CUDTSocket* ps = *si;
if (!ps->core().isRcvBufferReady())

if (!ps->core().isRcvBufferReadyNoLock())
m_Global.m_EPoll.update_events(ps->m_SocketID, ps->core().m_sPollID, SRT_EPOLL_IN, false);
else
canReadFurther = true;
}

if (!canReadFurther)
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false);

return res;
}
LOGC(grlog.Error, log << "grp/recv: UNEXPECTED RUN PATH, ABANDONING.");
m_Global.m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_IN, false);
throw CUDTException(MJ_AGAIN, MN_RDAVAIL, 0);
}

// [[using locked(m_GroupLock)]]
CUDTGroup::ReadPos* CUDTGroup::checkPacketAhead()
{
typedef map<SRTSOCKET, ReadPos>::iterator pit_t;
ReadPos* out = 0;

// This map no longer maps only ahead links.
// Here are all links, and whether ahead, it's defined by the sequence.
for (pit_t i = m_Positions.begin(); i != m_Positions.end(); ++i)
{
// i->first: socket ID
// i->second: ReadPos { sequence, packet }
// We are not interested with the socket ID because we
// aren't going to read from it - we have the packet already.
ReadPos& a = i->second;

const int seqdiff = CSeqNo::seqcmp(a.mctrl.pktseq, m_RcvBaseSeqNo);
if (seqdiff == 1)
{
// The very next packet. Return it.
HLOGC(grlog.Debug,
log << "group/recv: Base %" << m_RcvBaseSeqNo << " ahead delivery POSSIBLE %" << a.mctrl.pktseq
<< " #" << a.mctrl.msgno << " from @" << i->first << ")");
out = &a;
}
else if (seqdiff < 1 && !a.packet.empty())
{
HLOGC(grlog.Debug,
log << "group/recv: @" << i->first << " dropping collected ahead %" << a.mctrl.pktseq << "#"
<< a.mctrl.msgno << " with base %" << m_RcvBaseSeqNo);
a.packet.clear();
}
// In case when it's >1, keep it in ahead
}

return out;
}

const char* CUDTGroup::StateStr(CUDTGroup::GroupState st)
{
static const char* const states[] = {"PENDING", "IDLE", "RUNNING", "BROKEN"};
Expand Down
17 changes: 0 additions & 17 deletions srtcore/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,6 @@ class CUDTGroup
m_bConnected = false;
}

// XXX BUGFIX
m_Positions.erase(id);

return !empty;
}

Expand Down Expand Up @@ -646,20 +643,6 @@ class CUDTGroup
time_point m_tsStartTime;
time_point m_tsRcvPeerStartTime;

struct ReadPos
{
std::vector<char> packet;
SRT_MSGCTRL mctrl;
ReadPos(int32_t s)
: mctrl(srt_msgctrl_default)
{
mctrl.pktseq = s;
}
};
std::map<SRTSOCKET, ReadPos> m_Positions;

ReadPos* checkPacketAhead();

void recv_CollectAliveAndBroken(std::vector<srt::CUDTSocket*>& w_alive, std::set<srt::CUDTSocket*>& w_broken);

/// The function polls alive member sockets and retrieves a list of read-ready.
Expand Down

0 comments on commit 2483164

Please sign in to comment.