Skip to content

Commit

Permalink
[core] Added m_RcvTsbPdStartupLock to protect TSBPD thread start and …
Browse files Browse the repository at this point in the history
…join.

Deleted m_RecvDataLock.
Fixes #1624
  • Loading branch information
maxsharabayko committed Oct 29, 2020
1 parent 7997515 commit a6f6663
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 9 deletions.
21 changes: 13 additions & 8 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7597,7 +7597,6 @@ void CUDT::initSynch()
{
setupMutex(m_SendBlockLock, "SendBlock");
setupCond(m_SendBlockCond, "SendBlock");
setupMutex(m_RecvDataLock, "RecvData");
setupCond(m_RecvDataCond, "RecvData");
setupMutex(m_SendLock, "Send");
setupMutex(m_RecvLock, "Recv");
Expand All @@ -7613,7 +7612,6 @@ void CUDT::destroySynch()
{
releaseMutex(m_SendBlockLock);
releaseCond(m_SendBlockCond);
releaseMutex(m_RecvDataLock);
releaseCond(m_RecvDataCond);
releaseMutex(m_SendLock);
releaseMutex(m_RecvLock);
Expand All @@ -7627,6 +7625,7 @@ void CUDT::destroySynch()

void CUDT::releaseSynch()
{
SRT_ASSERT(m_bClosing);
// wake up user calls
CSync::lock_signal(m_SendBlockCond, m_SendBlockLock);

Expand All @@ -7637,13 +7636,18 @@ void CUDT::releaseSynch()
CSync::lock_signal(m_RecvDataCond, m_RecvLock);
CSync::lock_signal(m_RcvTsbPdCond, m_RecvLock);

enterCS(m_RecvDataLock);
// Azquiring m_RcvTsbPdStartupLock protects race in starting
// the tsbpd() thread in CUDT::processData().
// Wait for tsbpd() thread to finish.
enterCS(m_RcvTsbPdStartupLock);
if (m_RcvTsbPdThread.joinable())
{
m_RcvTsbPdThread.join();
}
leaveCS(m_RecvDataLock);
leaveCS(m_RcvTsbPdStartupLock);

// Acquiring the m_RecvLock it is assumed that both tsbpd()
// and srt_recv*(..) threads will be aware about the state of m_bClosing.
enterCS(m_RecvLock);
leaveCS(m_RecvLock);
}
Expand Down Expand Up @@ -9343,8 +9347,6 @@ int CUDT::processData(CUnit* in_unit)

CPacket &packet = in_unit->m_Packet;

// XXX This should be called (exclusively) here:
// m_pRcvBuffer->addLocalTsbPdDriftSample(packet.getMsgTimeStamp());
// Just heard from the peer, reset the expiration count.
m_iEXPCount = 1;
m_tsLastRspTime = steady_clock::now();
Expand All @@ -9354,6 +9356,11 @@ int CUDT::processData(CUnit* in_unit)
// We are receiving data, start tsbpd thread if TsbPd is enabled
if (need_tsbpd && !m_RcvTsbPdThread.joinable())
{
ScopedLock lock(m_RcvTsbPdStartupLock);

if (m_bClosing) // Check again to protect join() in CUDT::releaseSync()
return -1;

HLOGP(qrlog.Debug, "Spawning Socket TSBPD thread");
#if ENABLE_HEAVY_LOGGING
std::ostringstream tns1, tns2;
Expand All @@ -9370,8 +9377,6 @@ int CUDT::processData(CUnit* in_unit)
if (!StartThread(m_RcvTsbPdThread, CUDT::tsbpd, this, thname))
return -1;
}
// NOTE: In case of group TSBPD, this facility will be started
// in different place. Group TSBPD is a concept implementation - not done here.

const int pktrexmitflag = m_bPeerRexmitFlag ? (packet.getRexmitFlag() ? 1 : 0) : 2;
#if ENABLE_HEAVY_LOGGING
Expand Down
2 changes: 1 addition & 1 deletion srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,7 @@ class CUDT
srt::sync::CThread m_RcvTsbPdThread; // Rcv TsbPD Thread handle
srt::sync::Condition m_RcvTsbPdCond; // TSBPD signals if reading is ready. Use together with m_RecvLock.
bool m_bTsbPdAckWakeup; // Signal TsbPd thread on Ack sent
srt::sync::Mutex m_RcvTsbPdStartupLock; // Protects TSBPD thread creating and joining

CallbackHolder<srt_listen_callback_fn> m_cbAcceptHook;
CallbackHolder<srt_connect_callback_fn> m_cbConnectHook;
Expand Down Expand Up @@ -1018,7 +1019,6 @@ class CUDT
srt::sync::Mutex m_RecvAckLock; // Protects the state changes while processing incomming ACK (SRT_EPOLL_OUT)

srt::sync::Condition m_RecvDataCond; // used to block "srt_recv*" when there is no data. Use together with m_RecvLock
srt::sync::Mutex m_RecvDataLock; // lock associated to m_RecvDataCond
srt::sync::Mutex m_RecvLock; // used to synchronize "srt_recv*" call, protects TSBPD drift updates (CRcvBuffer::isRcvDataReady())

srt::sync::Mutex m_SendLock; // used to synchronize "send" call
Expand Down

0 comments on commit a6f6663

Please sign in to comment.