Skip to content

Commit

Permalink
[core] Fixing some lock-order-inversion and data race problems (TSAN …
Browse files Browse the repository at this point in the history
…reports) (#1824).
  • Loading branch information
ethouris committed Jan 23, 2024
1 parent 2eb47e3 commit 1bfab3d
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 33 deletions.
42 changes: 28 additions & 14 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,7 @@ void srt::CUDTSocket::setBrokenClosed()

bool srt::CUDTSocket::readReady()
{
// TODO: Use m_RcvBufferLock here (CUDT::isRcvReadReady())?
if (m_UDT.m_bConnected && m_UDT.m_pRcvBuffer->isRcvDataReady())
if (m_UDT.m_bConnected && m_UDT.isRcvBufferReady())
return true;

if (m_UDT.m_bListening)
Expand Down Expand Up @@ -2611,20 +2610,23 @@ void srt::CUDTUnited::checkBrokenSockets()
if (elapsed < milliseconds_from(CUDT::COMM_CLOSE_BROKEN_LISTENER_TIMEOUT_MS))
continue;
}
else if ((s->core().m_pRcvBuffer != NULL)
// FIXED: calling isRcvDataAvailable() just to get the information
// whether there are any data waiting in the buffer,
// NOT WHETHER THEY ARE ALSO READY TO PLAY at the time when
// this function is called (isRcvDataReady also checks if the
// available data is "ready to play").
&& s->core().m_pRcvBuffer->hasAvailablePackets())
else
{
const int bc = s->core().m_iBrokenCounter.load();
if (bc > 0)
CUDT& u = s->core();

enterCS(u.m_RcvBufferLock);
bool has_avail_packets = u.m_pRcvBuffer && u.m_pRcvBuffer->hasAvailablePackets();
leaveCS(u.m_RcvBufferLock);

if (has_avail_packets)
{
// if there is still data in the receiver buffer, wait longer
s->core().m_iBrokenCounter.store(bc - 1);
continue;
const int bc = u.m_iBrokenCounter.load();
if (bc > 0)
{
// if there is still data in the receiver buffer, wait longer
s->core().m_iBrokenCounter.store(bc - 1);
continue;
}
}
}

Expand Down Expand Up @@ -2782,8 +2784,20 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u)
// delete this one
m_ClosedSockets.erase(i);

// XXX This below section can unlock m_GlobControlLock
// just for calling CUDT::closeInternal(), which is needed
// to avoid locking m_ConnectionLock after m_GlobControlLock,
// while m_ConnectionLock orders BEFORE m_GlobControlLock.
// This should be perfectly safe thing to do after the socket
// ID has been erased from m_ClosedSockets. No container access
// is done in this case.
//
// Report: P04-1.28, P04-2.27, P04-2.50, P04-2.55

HLOGC(smlog.Debug, log << "GC/removeSocket: closing associated UDT @" << u);
leaveCS(m_GlobControlLock);
s->core().closeInternal();
enterCS(m_GlobControlLock);
HLOGC(smlog.Debug, log << "GC/removeSocket: DELETING SOCKET @" << u);
delete s;
HLOGC(smlog.Debug, log << "GC/removeSocket: socket @" << u << " DELETED. Checking muxer.");
Expand Down
8 changes: 4 additions & 4 deletions srtcore/buffer_snd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ void CSndBuffer::addBuffer(const char* data, int len, SRT_MSGCTRL& w_mctrl)
}
m_pLastBlock = s;

m_iCount += iNumBlocks;
m_iCount = m_iCount + iNumBlocks;
m_iBytesCount += len;

m_rateEstimator.updateInputRate(m_tsLastOriginTime, iNumBlocks, len);
Expand Down Expand Up @@ -293,7 +293,7 @@ int CSndBuffer::addBufferFromFile(fstream& ifs, int len)
m_pLastBlock = s;

enterCS(m_BufLock);
m_iCount += iNumBlocks;
m_iCount = m_iCount + iNumBlocks;
m_iBytesCount += total;

leaveCS(m_BufLock);
Expand Down Expand Up @@ -547,7 +547,7 @@ void CSndBuffer::ackData(int offset)
if (move)
m_pCurrBlock = m_pFirstBlock;

m_iCount -= offset;
m_iCount = m_iCount - offset;

updAvgBufSize(steady_clock::now());
}
Expand Down Expand Up @@ -653,7 +653,7 @@ int CSndBuffer::dropLateData(int& w_bytes, int32_t& w_first_msgno, const steady_
{
m_pCurrBlock = m_pFirstBlock;
}
m_iCount -= dpkts;
m_iCount = m_iCount - dpkts;

m_iBytesCount -= dbytes;
w_bytes = dbytes;
Expand Down
6 changes: 5 additions & 1 deletion srtcore/buffer_snd.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,11 @@ class CSndBuffer
int m_iSize; // buffer size (number of packets)
const int m_iBlockLen; // maximum length of a block holding packet payload and AUTH tag (excluding packet header).
const int m_iAuthTagSize; // Authentication tag size (if GCM is enabled).
int m_iCount; // number of used blocks

// NOTE: This is atomic AND under lock because the function getCurrBufSize()
// is returning it WITHOUT locking. Modification, however, must stay under
// a lock.
sync::atomic<int> m_iCount; // number of used blocks

int m_iBytesCount; // number of payload bytes in queue
time_point m_tsLastOriginTime;
Expand Down
33 changes: 21 additions & 12 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5708,6 +5708,14 @@ void srt::CUDT::rewriteHandshakeData(const sockaddr_any& peer, CHandShake& w_hs)
void srt::CUDT::acceptAndRespond(const sockaddr_any& agent, const sockaddr_any& peer, const CPacket& hspkt, CHandShake& w_hs)
{
HLOGC(cnlog.Debug, log << CONID() << "acceptAndRespond: setting up data according to handshake");
#if ENABLE_BONDING
// Keep the group alive for the lifetime of this function,
// and do it BEFORE acquiring m_ConnectionLock to avoid
// lock inversion.
// This will check if a socket belongs to a group and if so
// it will remember this group and keep it alive here.
CUDTUnited::GroupKeeper group_keeper(uglobal(), m_parent);
#endif

ScopedLock cg(m_ConnectionLock);

Expand Down Expand Up @@ -5814,8 +5822,7 @@ void srt::CUDT::acceptAndRespond(const sockaddr_any& agent, const sockaddr_any&

{
#if ENABLE_BONDING
ScopedLock cl (uglobal().m_GlobControlLock);
CUDTGroup* g = m_parent->m_GroupOf;
CUDTGroup* g = group_keeper.group;
if (g)
{
// This is the last moment when this can be done.
Expand Down Expand Up @@ -5981,7 +5988,7 @@ SRT_REJECT_REASON srt::CUDT::setupCC()
// Create the CCC object and configure it.

// UDT also sets back the congestion window: ???
// m_dCongestionWindow = m_pCC->m_dCWndSize;
// m_iCongestionWindow = m_pCC->m_dCWndSize;

// XXX Not sure about that. May happen that AGENT wants
// tsbpd mode, but PEER doesn't, even in bidirectional mode.
Expand Down Expand Up @@ -6155,8 +6162,9 @@ void srt::CUDT::addressAndSend(CPacket& w_pkt)
m_pSndQueue->sendto(m_PeerAddr, w_pkt, m_SourceAddr);
}

// [[using maybe_locked(m_GlobControlLock, if called from GC)]]
bool srt::CUDT::closeInternal()
// [[using maybe_locked(m_GlobControlLock, if called from breakSocket_LOCKED, usually from GC)]]
// [[using maybe_locked(m_parent->m_ControlLock, if called from srt_close())]]
bool srt::CUDT::closeInternal() ATR_NOEXCEPT
{
// NOTE: this function is called from within the garbage collector thread.

Expand Down Expand Up @@ -7509,7 +7517,7 @@ void srt::CUDT::bstats(CBytePerfMon *perf, bool clear, bool instantaneous)
perf->mbpsRecvRate = double(perf->byteRecv) * 8.0 / interval;
perf->usPktSndPeriod = (double) count_microseconds(m_tdSendInterval.load());
perf->pktFlowWindow = m_iFlowWindowSize.load();
perf->pktCongestionWindow = (int)m_dCongestionWindow;
perf->pktCongestionWindow = m_iCongestionWindow;
perf->pktFlightSize = flight_span;
perf->msRTT = (double)m_iSRTT / 1000.0;
perf->msSndTsbPdDelay = m_bPeerTsbPd ? m_iPeerTsbPdDelay_ms : 0;
Expand Down Expand Up @@ -7698,12 +7706,13 @@ bool srt::CUDT::updateCC(ETransmissionEvent evt, const EventVariant arg)
// - m_dPktSndPeriod
// - m_dCWndSize
m_tdSendInterval = microseconds_from((int64_t)m_CongCtl->pktSndPeriod_us());
m_dCongestionWindow = m_CongCtl->cgWindowSize();
const double cgwindow = m_CongCtl->cgWindowSize();
m_iCongestionWindow = cgwindow;
#if ENABLE_HEAVY_LOGGING
HLOGC(rslog.Debug,
log << CONID() << "updateCC: updated values from congctl: interval=" << count_microseconds(m_tdSendInterval) << " us ("
<< "tk (" << m_CongCtl->pktSndPeriod_us() << "us) cgwindow="
<< std::setprecision(3) << m_dCongestionWindow);
<< std::setprecision(3) << cgwindow);
#endif
}

Expand Down Expand Up @@ -8446,15 +8455,15 @@ void srt::CUDT::processCtrlAck(const CPacket &ctrlpkt, const steady_clock::time_

if (CSeqNo::seqcmp(ackdata_seqno, m_iSndLastAck) >= 0)
{
const int cwnd1 = std::min(int(m_iFlowWindowSize), int(m_dCongestionWindow));
const int cwnd1 = std::min<int>(m_iFlowWindowSize, m_iCongestionWindow);
const bool bWasStuck = cwnd1<= getFlightSpan();
// Update Flow Window Size, must update before and together with m_iSndLastAck
m_iFlowWindowSize = ackdata[ACKD_BUFFERLEFT];
m_iSndLastAck = ackdata_seqno;
m_tsLastRspAckTime = currtime;
m_iReXmitCount = 1; // Reset re-transmit count since last ACK

const int cwnd = std::min(int(m_iFlowWindowSize), int(m_dCongestionWindow));
const int cwnd = std::min<int>(m_iFlowWindowSize, m_iCongestionWindow);
if (bWasStuck && cwnd > getFlightSpan())
{
m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE);
Expand Down Expand Up @@ -9709,12 +9718,12 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet)
{
ScopedLock lkrack (m_RecvAckLock);
// Check the congestion/flow window limit
const int cwnd = std::min(int(m_iFlowWindowSize), int(m_dCongestionWindow));
const int cwnd = std::min<int>(m_iFlowWindowSize, m_iCongestionWindow);
const int flightspan = getFlightSpan();
if (cwnd <= flightspan)
{
HLOGC(qslog.Debug,
log << CONID() << "packUniqueData: CONGESTED: cwnd=min(" << m_iFlowWindowSize << "," << m_dCongestionWindow
log << CONID() << "packUniqueData: CONGESTED: cwnd=min(" << m_iFlowWindowSize << "," << m_iCongestionWindow
<< ")=" << cwnd << " seqlen=(" << m_iSndLastAck << "-" << m_iSndCurrSeqNo << ")=" << flightspan);
return false;
}
Expand Down
4 changes: 2 additions & 2 deletions srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ class CUDT

/// Close the opened UDT entity.

bool closeInternal();
bool closeInternal() ATR_NOEXCEPT;
void updateBrokenConnection();
void completeBrokenConnectionDependencies(int errorcode);

Expand Down Expand Up @@ -848,7 +848,7 @@ class CUDT

SRT_ATTR_GUARDED_BY(m_RecvAckLock)
sync::atomic<int> m_iFlowWindowSize; // Flow control window size
double m_dCongestionWindow; // Congestion window size
sync::atomic<int> m_iCongestionWindow; // Congestion window size

private: // Timers
atomic_time_point m_tsNextACKTime; // Next ACK time, in CPU clock cycles, same below
Expand Down
1 change: 1 addition & 0 deletions srtcore/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1747,6 +1747,7 @@ void srt::CRcvQueue::setNewEntry(CUDT* u)

bool srt::CRcvQueue::ifNewEntry()
{
ScopedLock listguard(m_IDLock);
return !(m_vNewEntry.empty());
}

Expand Down

0 comments on commit 1bfab3d

Please sign in to comment.