Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Fixing some lock-order-inversion and data race problems (TSAN reports) #1824

Merged
merged 9 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 28 additions & 14 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,7 @@ void srt::CUDTSocket::setBrokenClosed()

bool srt::CUDTSocket::readReady()
{
// TODO: Use m_RcvBufferLock here (CUDT::isRcvReadReady())?
if (m_UDT.m_bConnected && m_UDT.m_pRcvBuffer->isRcvDataReady())
if (m_UDT.m_bConnected && m_UDT.isRcvBufferReady())
return true;

if (m_UDT.m_bListening)
Expand Down Expand Up @@ -2611,20 +2610,23 @@ void srt::CUDTUnited::checkBrokenSockets()
if (elapsed < milliseconds_from(CUDT::COMM_CLOSE_BROKEN_LISTENER_TIMEOUT_MS))
continue;
}
else if ((s->core().m_pRcvBuffer != NULL)
// FIXED: calling isRcvDataAvailable() just to get the information
// whether there are any data waiting in the buffer,
// NOT WHETHER THEY ARE ALSO READY TO PLAY at the time when
// this function is called (isRcvDataReady also checks if the
// available data is "ready to play").
&& s->core().m_pRcvBuffer->hasAvailablePackets())
else
{
const int bc = s->core().m_iBrokenCounter.load();
if (bc > 0)
CUDT& u = s->core();

enterCS(u.m_RcvBufferLock);
bool has_avail_packets = u.m_pRcvBuffer && u.m_pRcvBuffer->hasAvailablePackets();
leaveCS(u.m_RcvBufferLock);

if (has_avail_packets)
{
// if there is still data in the receiver buffer, wait longer
s->core().m_iBrokenCounter.store(bc - 1);
continue;
const int bc = u.m_iBrokenCounter.load();
if (bc > 0)
{
// if there is still data in the receiver buffer, wait longer
s->core().m_iBrokenCounter.store(bc - 1);
continue;
}
}
}

Expand Down Expand Up @@ -2782,8 +2784,20 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u)
// delete this one
m_ClosedSockets.erase(i);

// XXX This below section can unlock m_GlobControlLock
// just for calling CUDT::closeInternal(), which is needed
// to avoid locking m_ConnectionLock after m_GlobControlLock,
// while m_ConnectionLock orders BEFORE m_GlobControlLock.
// This should be perfectly safe thing to do after the socket
// ID has been erased from m_ClosedSockets. No container access
// is done in this case.
//
// Report: P04-1.28, P04-2.27, P04-2.50, P04-2.55

HLOGC(smlog.Debug, log << "GC/removeSocket: closing associated UDT @" << u);
leaveCS(m_GlobControlLock);
s->core().closeInternal();
enterCS(m_GlobControlLock);
HLOGC(smlog.Debug, log << "GC/removeSocket: DELETING SOCKET @" << u);
delete s;
HLOGC(smlog.Debug, log << "GC/removeSocket: socket @" << u << " DELETED. Checking muxer.");
Expand Down
8 changes: 4 additions & 4 deletions srtcore/buffer_snd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ void CSndBuffer::addBuffer(const char* data, int len, SRT_MSGCTRL& w_mctrl)
}
m_pLastBlock = s;

m_iCount += iNumBlocks;
m_iCount = m_iCount + iNumBlocks;
m_iBytesCount += len;

m_rateEstimator.updateInputRate(m_tsLastOriginTime, iNumBlocks, len);
Expand Down Expand Up @@ -293,7 +293,7 @@ int CSndBuffer::addBufferFromFile(fstream& ifs, int len)
m_pLastBlock = s;

enterCS(m_BufLock);
m_iCount += iNumBlocks;
m_iCount = m_iCount + iNumBlocks;
m_iBytesCount += total;

leaveCS(m_BufLock);
Expand Down Expand Up @@ -547,7 +547,7 @@ void CSndBuffer::ackData(int offset)
if (move)
m_pCurrBlock = m_pFirstBlock;

m_iCount -= offset;
m_iCount = m_iCount - offset;

updAvgBufSize(steady_clock::now());
}
Expand Down Expand Up @@ -653,7 +653,7 @@ int CSndBuffer::dropLateData(int& w_bytes, int32_t& w_first_msgno, const steady_
{
m_pCurrBlock = m_pFirstBlock;
}
m_iCount -= dpkts;
m_iCount = m_iCount - dpkts;

m_iBytesCount -= dbytes;
w_bytes = dbytes;
Expand Down
6 changes: 5 additions & 1 deletion srtcore/buffer_snd.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,11 @@ class CSndBuffer
int m_iSize; // buffer size (number of packets)
const int m_iBlockLen; // maximum length of a block holding packet payload and AUTH tag (excluding packet header).
const int m_iAuthTagSize; // Authentication tag size (if GCM is enabled).
int m_iCount; // number of used blocks

// NOTE: This is atomic AND under lock because the function getCurrBufSize()
// is returning it WITHOUT locking. Modification, however, must stay under
// a lock.
sync::atomic<int> m_iCount; // number of used blocks

int m_iBytesCount; // number of payload bytes in queue
time_point m_tsLastOriginTime;
Expand Down
33 changes: 21 additions & 12 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5708,6 +5708,14 @@ void srt::CUDT::rewriteHandshakeData(const sockaddr_any& peer, CHandShake& w_hs)
void srt::CUDT::acceptAndRespond(const sockaddr_any& agent, const sockaddr_any& peer, const CPacket& hspkt, CHandShake& w_hs)
{
HLOGC(cnlog.Debug, log << CONID() << "acceptAndRespond: setting up data according to handshake");
#if ENABLE_BONDING
// Keep the group alive for the lifetime of this function,
// and do it BEFORE acquiring m_ConnectionLock to avoid
// lock inversion.
// This will check if a socket belongs to a group and if so
// it will remember this group and keep it alive here.
CUDTUnited::GroupKeeper group_keeper(uglobal(), m_parent);
#endif

ScopedLock cg(m_ConnectionLock);

Expand Down Expand Up @@ -5814,8 +5822,7 @@ void srt::CUDT::acceptAndRespond(const sockaddr_any& agent, const sockaddr_any&

{
#if ENABLE_BONDING
ScopedLock cl (uglobal().m_GlobControlLock);
CUDTGroup* g = m_parent->m_GroupOf;
CUDTGroup* g = group_keeper.group;
if (g)
{
// This is the last moment when this can be done.
Expand Down Expand Up @@ -5981,7 +5988,7 @@ SRT_REJECT_REASON srt::CUDT::setupCC()
// Create the CCC object and configure it.

// UDT also sets back the congestion window: ???
// m_dCongestionWindow = m_pCC->m_dCWndSize;
// m_iCongestionWindow = m_pCC->m_dCWndSize;

// XXX Not sure about that. May happen that AGENT wants
// tsbpd mode, but PEER doesn't, even in bidirectional mode.
Expand Down Expand Up @@ -6155,8 +6162,9 @@ void srt::CUDT::addressAndSend(CPacket& w_pkt)
m_pSndQueue->sendto(m_PeerAddr, w_pkt, m_SourceAddr);
}

// [[using maybe_locked(m_GlobControlLock, if called from GC)]]
bool srt::CUDT::closeInternal()
// [[using maybe_locked(m_GlobControlLock, if called from breakSocket_LOCKED, usually from GC)]]
// [[using maybe_locked(m_parent->m_ControlLock, if called from srt_close())]]
bool srt::CUDT::closeInternal() ATR_NOEXCEPT
{
// NOTE: this function is called from within the garbage collector thread.

Expand Down Expand Up @@ -7509,7 +7517,7 @@ void srt::CUDT::bstats(CBytePerfMon *perf, bool clear, bool instantaneous)
perf->mbpsRecvRate = double(perf->byteRecv) * 8.0 / interval;
perf->usPktSndPeriod = (double) count_microseconds(m_tdSendInterval.load());
perf->pktFlowWindow = m_iFlowWindowSize.load();
perf->pktCongestionWindow = (int)m_dCongestionWindow;
perf->pktCongestionWindow = m_iCongestionWindow;
perf->pktFlightSize = flight_span;
perf->msRTT = (double)m_iSRTT / 1000.0;
perf->msSndTsbPdDelay = m_bPeerTsbPd ? m_iPeerTsbPdDelay_ms : 0;
Expand Down Expand Up @@ -7698,12 +7706,13 @@ bool srt::CUDT::updateCC(ETransmissionEvent evt, const EventVariant arg)
// - m_dPktSndPeriod
// - m_dCWndSize
m_tdSendInterval = microseconds_from((int64_t)m_CongCtl->pktSndPeriod_us());
m_dCongestionWindow = m_CongCtl->cgWindowSize();
double cgwindow = m_CongCtl->cgWindowSize();
ethouris marked this conversation as resolved.
Show resolved Hide resolved
m_iCongestionWindow = cgwindow;
#if ENABLE_HEAVY_LOGGING
HLOGC(rslog.Debug,
log << CONID() << "updateCC: updated values from congctl: interval=" << count_microseconds(m_tdSendInterval) << " us ("
<< "tk (" << m_CongCtl->pktSndPeriod_us() << "us) cgwindow="
<< std::setprecision(3) << m_dCongestionWindow);
<< std::setprecision(3) << cgwindow);
#endif
}

Expand Down Expand Up @@ -8446,15 +8455,15 @@ void srt::CUDT::processCtrlAck(const CPacket &ctrlpkt, const steady_clock::time_

if (CSeqNo::seqcmp(ackdata_seqno, m_iSndLastAck) >= 0)
{
const int cwnd1 = std::min(int(m_iFlowWindowSize), int(m_dCongestionWindow));
const int cwnd1 = std::min<int>(m_iFlowWindowSize, m_iCongestionWindow);
const bool bWasStuck = cwnd1<= getFlightSpan();
// Update Flow Window Size, must update before and together with m_iSndLastAck
m_iFlowWindowSize = ackdata[ACKD_BUFFERLEFT];
m_iSndLastAck = ackdata_seqno;
m_tsLastRspAckTime = currtime;
m_iReXmitCount = 1; // Reset re-transmit count since last ACK

const int cwnd = std::min(int(m_iFlowWindowSize), int(m_dCongestionWindow));
const int cwnd = std::min<int>(m_iFlowWindowSize, m_iCongestionWindow);
if (bWasStuck && cwnd > getFlightSpan())
{
m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE);
Expand Down Expand Up @@ -9709,12 +9718,12 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet)
{
ScopedLock lkrack (m_RecvAckLock);
// Check the congestion/flow window limit
const int cwnd = std::min(int(m_iFlowWindowSize), int(m_dCongestionWindow));
const int cwnd = std::min<int>(m_iFlowWindowSize, m_iCongestionWindow);
const int flightspan = getFlightSpan();
if (cwnd <= flightspan)
{
HLOGC(qslog.Debug,
log << CONID() << "packUniqueData: CONGESTED: cwnd=min(" << m_iFlowWindowSize << "," << m_dCongestionWindow
log << CONID() << "packUniqueData: CONGESTED: cwnd=min(" << m_iFlowWindowSize << "," << m_iCongestionWindow
<< ")=" << cwnd << " seqlen=(" << m_iSndLastAck << "-" << m_iSndCurrSeqNo << ")=" << flightspan);
return false;
}
Expand Down
4 changes: 2 additions & 2 deletions srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ class CUDT

/// Close the opened UDT entity.

bool closeInternal();
bool closeInternal() ATR_NOEXCEPT;
void updateBrokenConnection();
void completeBrokenConnectionDependencies(int errorcode);

Expand Down Expand Up @@ -848,7 +848,7 @@ class CUDT

SRT_ATTR_GUARDED_BY(m_RecvAckLock)
sync::atomic<int> m_iFlowWindowSize; // Flow control window size
double m_dCongestionWindow; // Congestion window size
sync::atomic<int> m_iCongestionWindow; // Congestion window size

private: // Timers
atomic_time_point m_tsNextACKTime; // Next ACK time, in CPU clock cycles, same below
Expand Down
1 change: 1 addition & 0 deletions srtcore/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1747,6 +1747,7 @@ void srt::CRcvQueue::setNewEntry(CUDT* u)

bool srt::CRcvQueue::ifNewEntry()
{
ScopedLock listguard(m_IDLock);
return !(m_vNewEntry.empty());
}

Expand Down