diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 274446e90..51b1704be 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -7597,7 +7597,6 @@ void CUDT::initSynch() { setupMutex(m_SendBlockLock, "SendBlock"); setupCond(m_SendBlockCond, "SendBlock"); - setupMutex(m_RecvDataLock, "RecvData"); setupCond(m_RecvDataCond, "RecvData"); setupMutex(m_SendLock, "Send"); setupMutex(m_RecvLock, "Recv"); @@ -7613,7 +7612,6 @@ void CUDT::destroySynch() { releaseMutex(m_SendBlockLock); releaseCond(m_SendBlockCond); - releaseMutex(m_RecvDataLock); releaseCond(m_RecvDataCond); releaseMutex(m_SendLock); releaseMutex(m_RecvLock); @@ -7627,6 +7625,7 @@ void CUDT::destroySynch() void CUDT::releaseSynch() { + SRT_ASSERT(m_bClosing); // wake up user calls CSync::lock_signal(m_SendBlockCond, m_SendBlockLock); @@ -7637,13 +7636,18 @@ void CUDT::releaseSynch() CSync::lock_signal(m_RecvDataCond, m_RecvLock); CSync::lock_signal(m_RcvTsbPdCond, m_RecvLock); - enterCS(m_RecvDataLock); + // Azquiring m_RcvTsbPdStartupLock protects race in starting + // the tsbpd() thread in CUDT::processData(). + // Wait for tsbpd() thread to finish. + enterCS(m_RcvTsbPdStartupLock); if (m_RcvTsbPdThread.joinable()) { m_RcvTsbPdThread.join(); } - leaveCS(m_RecvDataLock); + leaveCS(m_RcvTsbPdStartupLock); + // Acquiring the m_RecvLock it is assumed that both tsbpd() + // and srt_recv*(..) threads will be aware about the state of m_bClosing. enterCS(m_RecvLock); leaveCS(m_RecvLock); } @@ -9343,8 +9347,6 @@ int CUDT::processData(CUnit* in_unit) CPacket &packet = in_unit->m_Packet; - // XXX This should be called (exclusively) here: - // m_pRcvBuffer->addLocalTsbPdDriftSample(packet.getMsgTimeStamp()); // Just heard from the peer, reset the expiration count. m_iEXPCount = 1; m_tsLastRspTime = steady_clock::now(); @@ -9354,6 +9356,11 @@ int CUDT::processData(CUnit* in_unit) // We are receiving data, start tsbpd thread if TsbPd is enabled if (need_tsbpd && !m_RcvTsbPdThread.joinable()) { + ScopedLock lock(m_RcvTsbPdStartupLock); + + if (m_bClosing) // Check again to protect join() in CUDT::releaseSync() + return -1; + HLOGP(qrlog.Debug, "Spawning Socket TSBPD thread"); #if ENABLE_HEAVY_LOGGING std::ostringstream tns1, tns2; @@ -9370,8 +9377,6 @@ int CUDT::processData(CUnit* in_unit) if (!StartThread(m_RcvTsbPdThread, CUDT::tsbpd, this, thname)) return -1; } - // NOTE: In case of group TSBPD, this facility will be started - // in different place. Group TSBPD is a concept implementation - not done here. const int pktrexmitflag = m_bPeerRexmitFlag ? (packet.getRexmitFlag() ? 1 : 0) : 2; #if ENABLE_HEAVY_LOGGING diff --git a/srtcore/core.h b/srtcore/core.h index 2b280b5bb..a24fc0bb6 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -987,6 +987,7 @@ class CUDT srt::sync::CThread m_RcvTsbPdThread; // Rcv TsbPD Thread handle srt::sync::Condition m_RcvTsbPdCond; // TSBPD signals if reading is ready. Use together with m_RecvLock. bool m_bTsbPdAckWakeup; // Signal TsbPd thread on Ack sent + srt::sync::Mutex m_RcvTsbPdStartupLock; // Protects TSBPD thread creating and joining CallbackHolder m_cbAcceptHook; CallbackHolder m_cbConnectHook; @@ -1018,7 +1019,6 @@ class CUDT srt::sync::Mutex m_RecvAckLock; // Protects the state changes while processing incomming ACK (SRT_EPOLL_OUT) srt::sync::Condition m_RecvDataCond; // used to block "srt_recv*" when there is no data. Use together with m_RecvLock - srt::sync::Mutex m_RecvDataLock; // lock associated to m_RecvDataCond srt::sync::Mutex m_RecvLock; // used to synchronize "srt_recv*" call, protects TSBPD drift updates (CRcvBuffer::isRcvDataReady()) srt::sync::Mutex m_SendLock; // used to synchronize "send" call