Skip to content

Commit

Permalink
[core] Changed cond notification naming. Added and used new sync util…
Browse files Browse the repository at this point in the history
…ities. (#2429)

* Changed signal/broadcast naming to notify_one/all to match C++ standard naming.
* Added CUniqueSync class that integrates CSync+UniqueLock for simplicity.
  • Loading branch information
ethouris authored Aug 18, 2022
1 parent eae2749 commit 8941831
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 57 deletions.
16 changes: 11 additions & 5 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ int srt::CUDTUnited::cleanup()
// after which the m_bClosing flag is cheched, which
// is set here above. Worst case secenario, this
// pthread_join() call will block for 1 second.
CSync::signal_relaxed(m_GCStopCond);
CSync::notify_one_relaxed(m_GCStopCond);
m_GCThread.join();

m_bGCStatus = false;
Expand Down Expand Up @@ -778,7 +778,7 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen,
}

// wake up a waiting accept() call
CSync::lock_signal(ls->m_AcceptCond, ls->m_AcceptLock);
CSync::lock_notify_one(ls->m_AcceptCond, ls->m_AcceptLock);
}
else
{
Expand Down Expand Up @@ -1975,7 +1975,7 @@ int srt::CUDTUnited::close(CUDTSocket* s)
s->core().notListening();

// broadcast all "accept" waiting
CSync::lock_broadcast(s->m_AcceptCond, s->m_AcceptLock);
CSync::lock_notify_all(s->m_AcceptCond, s->m_AcceptLock);
}
else
{
Expand Down Expand Up @@ -2684,6 +2684,8 @@ void srt::CUDTUnited::checkBrokenSockets()
// remove those timeout sockets
for (vector<SRTSOCKET>::iterator l = tbr.begin(); l != tbr.end(); ++l)
removeSocket(*l);

HLOGC(smlog.Debug, log << "checkBrokenSockets: after removal: m_ClosedSockets.size()=" << m_ClosedSockets.size());
}

// [[using locked(m_GlobControlLock)]]
Expand Down Expand Up @@ -2768,9 +2770,13 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u)
s->core().closeInternal();
HLOGC(smlog.Debug, log << "GC/removeSocket: DELETING SOCKET @" << u);
delete s;
HLOGC(smlog.Debug, log << "GC/removeSocket: socket @" << u << " DELETED. Checking muxer.");

if (mid == -1)
{
HLOGC(smlog.Debug, log << "GC/removeSocket: no muxer found, finishing.");
return;
}

map<int, CMultiplexer>::iterator m;
m = m_mMultiplexer.find(mid);
Expand All @@ -2783,8 +2789,7 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u)
CMultiplexer& mx = m->second;

mx.m_iRefCount--;
// HLOGF(smlog.Debug, "unrefing underlying socket for %u: %u\n",
// u, mx.m_iRefCount);
HLOGC(smlog.Debug, log << "unrefing underlying muxer " << mid << " for @" << u << ", ref=" << mx.m_iRefCount);
if (0 == mx.m_iRefCount)
{
HLOGC(smlog.Debug,
Expand Down Expand Up @@ -3163,6 +3168,7 @@ void* srt::CUDTUnited::garbageCollect(void* p)
if (empty)
break;

HLOGC(inlog.Debug, log << "GC: checkBrokenSockets didn't wipe all sockets, repeating after 1s sleep");
srt::sync::this_thread::sleep_for(milliseconds_from(1));
}

Expand Down
50 changes: 23 additions & 27 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5170,9 +5170,8 @@ void * srt::CUDT::tsbpd(void* param)
CUDTUnited::GroupKeeper gkeeper(self->uglobal(), self->m_parent);
#endif

UniqueLock recv_lock(self->m_RecvLock);
CSync recvdata_cc(self->m_RecvDataCond, recv_lock);
CSync tsbpd_cc(self->m_RcvTsbPdCond, recv_lock);
CUniqueSync recvdata_lcc (self->m_RecvLock, self->m_RecvDataCond);
CSync tsbpd_cc(self->m_RcvTsbPdCond, recvdata_lcc.locker());

self->m_bTsbPdAckWakeup = true;
while (!self->m_bClosing)
Expand Down Expand Up @@ -5235,7 +5234,7 @@ void * srt::CUDT::tsbpd(void* param)
*/
if (self->m_config.bSynRecving)
{
recvdata_cc.signal_locked(recv_lock);
recvdata_lcc.notify_one();
}
/*
* Set EPOLL_IN to wakeup any thread waiting on epoll
Expand Down Expand Up @@ -5481,7 +5480,7 @@ void * srt::CUDT::tsbpd(void *param)
*/
if (self->m_config.bSynRecving)
{
recvdata_cc.signal_locked(recv_lock);
recvdata_cc.notify_one_locked(recv_lock);
}
/*
* Set EPOLL_IN to wakeup any thread waiting on epoll
Expand Down Expand Up @@ -6361,7 +6360,7 @@ int srt::CUDT::receiveBuffer(char *data, int len)
if (m_bTsbPd)
{
HLOGP(tslog.Debug, "Ping TSBPD thread to schedule wakeup");
tscond.signal_locked(recvguard);
tscond.notify_one_locked(recvguard);
}
else
{
Expand Down Expand Up @@ -6889,7 +6888,7 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_
if (m_bTsbPd)
{
HLOGP(tslog.Debug, "Ping TSBPD thread to schedule wakeup");
tscond.signal_locked(recvguard);
tscond.notify_one_locked(recvguard);
}
else
{
Expand Down Expand Up @@ -6940,7 +6939,7 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_
if (m_bTsbPd)
{
HLOGP(arlog.Debug, "receiveMessage: nothing to read, kicking TSBPD, return AGAIN");
tscond.signal_locked(recvguard);
tscond.notify_one_locked(recvguard);
}
else
{
Expand All @@ -6961,7 +6960,7 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_
if (m_bTsbPd)
{
HLOGP(arlog.Debug, "receiveMessage: DATA READ, but nothing more - kicking TSBPD.");
tscond.signal_locked(recvguard);
tscond.notify_one_locked(recvguard);
}
else
{
Expand Down Expand Up @@ -7011,7 +7010,7 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_
// bool spurious = (tstime != 0);

HLOGC(tslog.Debug, log << CONID() << "receiveMessage: KICK tsbpd");
tscond.signal_locked(recvguard);
tscond.notify_one_locked(recvguard);
}

THREAD_PAUSED();
Expand Down Expand Up @@ -7092,7 +7091,7 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_
if (m_bTsbPd)
{
HLOGP(tslog.Debug, "recvmsg: KICK tsbpd() (buffer empty)");
tscond.signal_locked(recvguard);
tscond.notify_one_locked(recvguard);
}

// Shut up EPoll if no more messages in non-blocking mode
Expand Down Expand Up @@ -7695,14 +7694,14 @@ void srt::CUDT::releaseSynch()
{
SRT_ASSERT(m_bClosing);
// wake up user calls
CSync::lock_signal(m_SendBlockCond, m_SendBlockLock);
CSync::lock_notify_one(m_SendBlockCond, m_SendBlockLock);

enterCS(m_SendLock);
leaveCS(m_SendLock);

// Awake tsbpd() and srt_recv*(..) threads for them to check m_bClosing.
CSync::lock_signal(m_RecvDataCond, m_RecvLock);
CSync::lock_signal(m_RcvTsbPdCond, m_RecvLock);
CSync::lock_notify_one(m_RecvDataCond, m_RecvLock);
CSync::lock_notify_one(m_RcvTsbPdCond, m_RecvLock);

// Azquiring m_RcvTsbPdStartupLock protects race in starting
// the tsbpd() thread in CUDT::processData().
Expand Down Expand Up @@ -8070,17 +8069,15 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)
if (m_bTsbPd)
{
/* Newly acknowledged data, signal TsbPD thread */
UniqueLock rcvlock(m_RecvLock);
CSync tscond(m_RcvTsbPdCond, rcvlock);
CUniqueSync tslcc (m_RecvLock, m_RcvTsbPdCond);
// m_bTsbPdAckWakeup is protected by m_RecvLock in the tsbpd() thread
if (m_bTsbPdAckWakeup)
tscond.signal_locked(rcvlock);
tslcc.notify_one();
}
else
{
{
UniqueLock rdlock (m_RecvLock);
CSync rdcond (m_RecvDataCond, rdlock);
CUniqueSync rdcc (m_RecvLock, m_RecvDataCond);

#if ENABLE_NEW_RCVBUFFER
// Locks m_RcvBufferLock, which is unlocked above by InvertedLock un_bufflock.
Expand All @@ -8091,7 +8088,7 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)
if (m_config.bSynRecving)
{
// signal a waiting "recv" call if there is any data available
rdcond.signal_locked(rdlock);
rdcc.notify_one();
}
// acknowledge any waiting epolls to read
// fix SRT_EPOLL_IN event loss but rcvbuffer still have data:
Expand Down Expand Up @@ -8294,7 +8291,7 @@ void srt::CUDT::updateSndLossListOnACK(int32_t ackdata_seqno)

if (m_config.bSynSending)
{
CSync::lock_signal(m_SendBlockCond, m_SendBlockLock);
CSync::lock_notify_one(m_SendBlockCond, m_SendBlockLock);
}

// record total time used for sending
Expand Down Expand Up @@ -8903,7 +8900,7 @@ void srt::CUDT::processCtrlDropReq(const CPacket& ctrlpkt)
const int32_t* dropdata = (const int32_t*) ctrlpkt.m_pcData;

{
UniqueLock rlock(m_RecvLock);
CUniqueSync rcvtscc (m_RecvLock, m_RcvTsbPdCond);
// With both TLPktDrop and TsbPd enabled, a message always consists only of one packet.
// It will be dropped as too late anyway. Not dropping it from the receiver buffer
// in advance reduces false drops if the packet somehow manages to arrive.
Expand Down Expand Up @@ -8937,8 +8934,7 @@ void srt::CUDT::processCtrlDropReq(const CPacket& ctrlpkt)
if (m_bTsbPd)
{
HLOGP(inlog.Debug, "DROPREQ: signal TSBPD");
CSync cc(m_RcvTsbPdCond, rlock);
cc.signal_locked(rlock);
rcvtscc.notify_one();
}
}

Expand Down Expand Up @@ -9738,7 +9734,7 @@ void srt::CUDT::processClose()
if (m_bTsbPd)
{
HLOGP(smlog.Debug, "processClose: lock-and-signal TSBPD");
CSync::lock_signal(m_RcvTsbPdCond, m_RecvLock);
CSync::lock_notify_one(m_RcvTsbPdCond, m_RecvLock);
}

// Signal the sender and recver if they are waiting for data.
Expand Down Expand Up @@ -10325,7 +10321,7 @@ int srt::CUDT::processData(CUnit* in_unit)
if (m_bTsbPd)
{
HLOGC(qrlog.Debug, log << "loss: signaling TSBPD cond");
CSync::lock_signal(m_RcvTsbPdCond, m_RecvLock);
CSync::lock_notify_one(m_RcvTsbPdCond, m_RecvLock);
}
else
{
Expand All @@ -10346,7 +10342,7 @@ int srt::CUDT::processData(CUnit* in_unit)
if (m_bTsbPd)
{
HLOGC(qrlog.Debug, log << "loss: signaling TSBPD cond");
CSync::lock_signal(m_RcvTsbPdCond, m_RecvLock);
CSync::lock_notify_one(m_RcvTsbPdCond, m_RecvLock);
}
}

Expand Down
2 changes: 1 addition & 1 deletion srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,7 @@ void CUDTGroup::close()
// XXX This looks like a dead code. Group receiver functions
// do not use any lock on m_RcvDataLock, it is likely a remainder
// of the old, internal impementation.
// CSync::lock_signal(m_RcvDataCond, m_RcvDataLock);
// CSync::lock_notify_one(m_RcvDataCond, m_RcvDataLock);
}

// [[using locked(m_Global->m_GlobControlLock)]]
Expand Down
8 changes: 3 additions & 5 deletions srtcore/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1599,8 +1599,7 @@ void srt::CRcvQueue::stopWorker()

int srt::CRcvQueue::recvfrom(int32_t id, CPacket& w_packet)
{
UniqueLock bufferlock(m_BufferLock);
CSync buffercond(m_BufferCond, bufferlock);
CUniqueSync buffercond(m_BufferLock, m_BufferCond);

map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);

Expand Down Expand Up @@ -1729,15 +1728,14 @@ srt::CUDT* srt::CRcvQueue::getNewEntry()

void srt::CRcvQueue::storePkt(int32_t id, CPacket* pkt)
{
UniqueLock bufferlock(m_BufferLock);
CSync passcond(m_BufferCond, bufferlock);
CUniqueSync passcond(m_BufferLock, m_BufferCond);

map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);

if (i == m_mBuffer.end())
{
m_mBuffer[id].push(pkt);
passcond.signal_locked(bufferlock);
passcond.notify_one();
}
else
{
Expand Down
Loading

0 comments on commit 8941831

Please sign in to comment.