From 5da6c6011fed01577e5309d69b86c6328150fd4a Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Mon, 21 Jun 2021 16:18:53 +0200 Subject: [PATCH 1/3] [core] Moved mutex from CSndQueue to CSndUList Co-authored-by: Sektor van Skijlen --- CMakeLists.txt | 2 +- srtcore/queue.cpp | 51 +++++++++++++++++++++++++++-------------------- srtcore/queue.h | 35 +++++++++++++++----------------- 3 files changed, 46 insertions(+), 42 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 98d360671..c14c81c55 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -129,7 +129,7 @@ option(ENABLE_CXX_DEPS "Extra library dependencies in srt.pc for the CXX librari option(USE_STATIC_LIBSTDCXX "Should use static rather than shared libstdc++" OFF) option(ENABLE_INET_PTON "Set to OFF to prevent usage of inet_pton when building against modern SDKs while still requiring compatibility with older Windows versions, such as Windows XP, Windows Server 2003 etc." ON) option(ENABLE_CODE_COVERAGE "Enable code coverage reporting" OFF) -option(ENABLE_MONOTONIC_CLOCK "Enforced clock_gettime with monotonic clock on GC CV /temporary fix for #729/" OFF) +option(ENABLE_MONOTONIC_CLOCK "Enforced clock_gettime with monotonic clock on GC CV" OFF) option(ENABLE_STDCXX_SYNC "Use C++11 chrono and threads for timing instead of pthreads" OFF) option(USE_OPENSSL_PC "Use pkg-config to find OpenSSL libraries" ON) option(USE_BUSY_WAITING "Enable more accurate sending times at a cost of potentially higher CPU load" OFF) diff --git a/srtcore/queue.cpp b/srtcore/queue.cpp index b9d7ed02e..f17ad5745 100644 --- a/srtcore/queue.cpp +++ b/srtcore/queue.cpp @@ -255,20 +255,20 @@ void srt::CUnitQueue::makeUnitGood(CUnit* unit) unit->m_iFlag = CUnit::GOOD; } -srt::CSndUList::CSndUList() +srt::CSndUList::CSndUList(sync::CTimer* pTimer) : m_pHeap(NULL) , m_iArrayLength(512) , m_iLastEntry(-1) , m_ListLock() - , m_pWindowLock(NULL) - , m_pWindowCond(NULL) - , m_pTimer(NULL) + , m_pTimer(pTimer) { + setupCond(m_ListCond, "CSndUListCond"); m_pHeap = new CSNode*[m_iArrayLength]; } srt::CSndUList::~CSndUList() { + releaseCond(m_ListCond); delete[] m_pHeap; } @@ -305,7 +305,7 @@ int srt::CSndUList::pop(sockaddr_any& w_addr, CPacket& w_pkt) if (-1 == m_iLastEntry) return -1; - // no pop until the next schedulled time + // no pop until the next scheduled time if (m_pHeap[0]->m_tsTimeStamp > steady_clock::now()) return -1; @@ -342,7 +342,6 @@ int srt::CSndUList::pop(sockaddr_any& w_addr, CPacket& w_pkt) void srt::CSndUList::remove(const CUDT* u) { ScopedLock listguard(m_ListLock); - remove_(u); } @@ -356,6 +355,21 @@ steady_clock::time_point srt::CSndUList::getNextProcTime() return m_pHeap[0]->m_tsTimeStamp; } +void srt::CSndUList::waitNonEmpty() const +{ + UniqueLock listguard(m_ListLock); + if (m_iLastEntry >= 0) + return; + + m_ListCond.wait(listguard); +} + +void srt::CSndUList::signalInterrupt() const +{ + ScopedLock listguard(m_ListLock); + m_ListCond.notify_all(); +} + void srt::CSndUList::realloc_() { CSNode** temp = NULL; @@ -420,7 +434,8 @@ void srt::CSndUList::insert_norealloc_(const steady_clock::time_point& ts, const // first entry, activate the sending queue if (0 == m_iLastEntry) { - CSync::lock_signal(*m_pWindowCond, *m_pWindowLock); + // m_ListLock is assumed to be locked. + m_ListCond.notify_all(); } } @@ -468,10 +483,8 @@ srt::CSndQueue::CSndQueue() : m_pSndUList(NULL) , m_pChannel(NULL) , m_pTimer(NULL) - , m_WindowCond() , m_bClosing(false) { - setupCond(m_WindowCond, "Window"); } srt::CSndQueue::~CSndQueue() @@ -483,14 +496,14 @@ srt::CSndQueue::~CSndQueue() m_pTimer->interrupt(); } - CSync::lock_signal(m_WindowCond, m_WindowLock); + // Unblock CSndQueue worker thread if it is waiting. + m_pSndUList->signalInterrupt(); if (m_WorkerThread.joinable()) { HLOGC(rslog.Debug, log << "SndQueue: EXIT"); m_WorkerThread.join(); } - releaseCond(m_WindowCond); delete m_pSndUList; } @@ -510,12 +523,9 @@ int srt::CSndQueue::m_counter = 0; void srt::CSndQueue::init(CChannel* c, CTimer* t) { - m_pChannel = c; - m_pTimer = t; - m_pSndUList = new CSndUList; - m_pSndUList->m_pWindowLock = &m_WindowLock; - m_pSndUList->m_pWindowCond = &m_WindowCond; - m_pSndUList->m_pTimer = m_pTimer; + m_pChannel = c; + m_pTimer = t; + m_pSndUList = new CSndUList(t); #if ENABLE_LOGGING ++m_counter; @@ -575,14 +585,11 @@ void* srt::CSndQueue::worker(void* param) self->m_WorkerStats.lNotReadyTs++; #endif /* SRT_DEBUG_SNDQ_HIGHRATE */ - UniqueLock windlock(self->m_WindowLock); - CSync windsync(self->m_WindowCond, windlock); - // wait here if there is no sockets with data to be sent THREAD_PAUSED(); - if (!self->m_bClosing && (self->m_pSndUList->m_iLastEntry < 0)) + if (!self->m_bClosing) { - windsync.wait(); + self->m_pSndUList->waitNonEmpty(); #if defined(SRT_DEBUG_SNDQ_HIGHRATE) self->m_WorkerStats.lCondWait++; diff --git a/srtcore/queue.h b/srtcore/queue.h index ee05440c8..77256c3fc 100644 --- a/srtcore/queue.h +++ b/srtcore/queue.h @@ -157,10 +157,8 @@ struct CSNode class CSndUList { - friend class CSndQueue; - public: - CSndUList(); + CSndUList(sync::CTimer* pTimer); ~CSndUList(); public: @@ -175,30 +173,32 @@ class CSndUList /// Update the timestamp of the UDT instance on the list. /// @param [in] u pointer to the UDT instance /// @param [in] reschedule if the timestamp should be rescheduled - void update(const CUDT* u, EReschedule reschedule); /// Retrieve the next packet and peer address from the first entry, and reschedule it in the queue. /// @param [out] addr destination address of the next packet /// @param [out] pkt the next packet to be sent /// @return 1 if successfully retrieved, -1 if no packet found. - int pop(sockaddr_any& addr, CPacket& pkt); /// Remove UDT instance from the list. /// @param [in] u pointer to the UDT instance - - void remove(const CUDT* u); + void remove(const CUDT* u);// EXCLUDES(m_ListLock); /// Retrieve the next scheduled processing time. /// @return Scheduled processing time of the first UDT socket in the list. - sync::steady_clock::time_point getNextProcTime(); + /// Wait for the list to become non empty. + void waitNonEmpty() const; + + /// Signal to stop waiting in waitNonEmpty(). + void signalInterrupt() const; + private: /// Doubles the size of the list. /// - void realloc_(); + void realloc_();// REQUIRES(m_ListLock); /// Insert a new UDT instance into the list with realloc if required. /// @@ -211,21 +211,21 @@ class CSndUList /// /// @param [in] ts time stamp: next processing time /// @param [in] u pointer to the UDT instance - void insert_norealloc_(const sync::steady_clock::time_point& ts, const CUDT* u); + void insert_norealloc_(const sync::steady_clock::time_point& ts, const CUDT* u);// REQUIRES(m_ListLock); + /// Removes CUDT entry from the list. + /// If the last entry is removed, calls sync::CTimer::interrupt(). void remove_(const CUDT* u); private: CSNode** m_pHeap; // The heap array int m_iArrayLength; // physical length of the array - int m_iLastEntry; // position of last entry on the heap array + int m_iLastEntry; // position of last entry on the heap array or -1 if empty. - sync::Mutex m_ListLock; + mutable sync::Mutex m_ListLock; // Protects the list (m_pHeap, m_iArrayLength, m_iLastEntry). + mutable sync::Condition m_ListCond; - sync::Mutex* m_pWindowLock; - sync::Condition* m_pWindowCond; - - sync::CTimer* m_pTimer; + sync::CTimer* const m_pTimer; private: CSndUList(const CSndUList&); @@ -462,9 +462,6 @@ class CSndQueue CChannel* m_pChannel; // The UDP channel for data sending srt::sync::CTimer* m_pTimer; // Timing facility - srt::sync::Mutex m_WindowLock; - srt::sync::Condition m_WindowCond; - srt::sync::atomic m_bClosing; // closing the worker #if defined(SRT_DEBUG_SNDQ_HIGHRATE) //>>debug high freq worker From e4dbba41d639438707a6c6cade389bd9ffbef5a7 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Mon, 21 Jun 2021 17:09:27 +0200 Subject: [PATCH 2/3] [core] Refactor CSndUList: pop CUDT, not a packet --- srtcore/core.cpp | 4 +-- srtcore/queue.cpp | 82 ++++++++++++++++++++++++++--------------------- srtcore/queue.h | 11 +++---- 3 files changed, 52 insertions(+), 45 deletions(-) diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 7a337cf41..521bc6125 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -7974,14 +7974,14 @@ void srt::CUDT::updateSndLossListOnACK(int32_t ackdata_seqno) #endif // insert this socket to snd list if it is not on the list yet - m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE); + const steady_clock::time_point currtime = steady_clock::now(); + m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE, currtime); if (m_config.bSynSending) { CSync::lock_signal(m_SendBlockCond, m_SendBlockLock); } - const steady_clock::time_point currtime = steady_clock::now(); // record total time used for sending enterCS(m_StatsLock); m_stats.sndDuration += count_microseconds(currtime - m_stats.sndDurationCounter); diff --git a/srtcore/queue.cpp b/srtcore/queue.cpp index f17ad5745..8cefd276d 100644 --- a/srtcore/queue.cpp +++ b/srtcore/queue.cpp @@ -272,7 +272,7 @@ srt::CSndUList::~CSndUList() delete[] m_pHeap; } -void srt::CSndUList::update(const CUDT* u, EReschedule reschedule) +void srt::CSndUList::update(const CUDT* u, EReschedule reschedule, sync::steady_clock::time_point ts) { ScopedLock listguard(m_ListLock); @@ -285,58 +285,33 @@ void srt::CSndUList::update(const CUDT* u, EReschedule reschedule) if (n->m_iHeapLoc == 0) { - n->m_tsTimeStamp = steady_clock::now(); + n->m_tsTimeStamp = ts; m_pTimer->interrupt(); return; } remove_(u); - insert_norealloc_(steady_clock::now(), u); + insert_norealloc_(ts, u); return; } - insert_(steady_clock::now(), u); + insert_(ts, u); } -int srt::CSndUList::pop(sockaddr_any& w_addr, CPacket& w_pkt) +srt::CUDT* srt::CSndUList::pop() { ScopedLock listguard(m_ListLock); if (-1 == m_iLastEntry) - return -1; + return NULL; // no pop until the next scheduled time if (m_pHeap[0]->m_tsTimeStamp > steady_clock::now()) - return -1; + return NULL; CUDT* u = m_pHeap[0]->m_pUDT; remove_(u); - -#define UST(field) ((u->m_b##field) ? "+" : "-") << #field << " " - - HLOGC(qslog.Debug, - log << "SND:pop: requesting packet from @" << u->socketID() << " STATUS: " << UST(Listening) - << UST(Connecting) << UST(Connected) << UST(Closing) << UST(Shutdown) << UST(Broken) << UST(PeerHealth) - << UST(Opened)); -#undef UST - - if (!u->m_bConnected || u->m_bBroken) - return -1; - - // pack a packet from the socket - const std::pair res_time = u->packData((w_pkt)); - - if (res_time.first <= 0) - return -1; - - w_addr = u->m_PeerAddr; - - // insert a new entry, ts is the next processing time - const steady_clock::time_point send_time = res_time.second; - if (!is_zero(send_time)) - insert_norealloc_(send_time, u); - - return 1; + return u; } void srt::CSndUList::remove(const CUDT* u) @@ -630,18 +605,51 @@ void* srt::CSndQueue::worker(void* param) } THREAD_RESUMED(); - // it is time to send the next pkt - sockaddr_any addr; - CPacket pkt; - if (self->m_pSndUList->pop((addr), (pkt)) < 0) + // Get a socket with a send request if any. + CUDT* u = self->m_pSndUList->pop(); + if (u == NULL) { +#if defined(SRT_DEBUG_SNDQ_HIGHRATE) + self->m_WorkerStats.lNotReadyPop++; +#endif /* SRT_DEBUG_SNDQ_HIGHRATE */ continue; + } + +#define UST(field) ((u->m_b##field) ? "+" : "-") << #field << " " + HLOGC(qslog.Debug, + log << "CSndQueue: requesting packet from @" << u->socketID() << " STATUS: " << UST(Listening) + << UST(Connecting) << UST(Connected) << UST(Closing) << UST(Shutdown) << UST(Broken) << UST(PeerHealth) + << UST(Opened)); +#undef UST + if (!u->m_bConnected || u->m_bBroken) + { #if defined(SRT_DEBUG_SNDQ_HIGHRATE) self->m_WorkerStats.lNotReadyPop++; #endif /* SRT_DEBUG_SNDQ_HIGHRATE */ + continue; } + // pack a packet from the socket + CPacket pkt; + const std::pair res_time = u->packData((pkt)); + + // Check if payload size is invalid. + if (res_time.first <= 0) + { +#if defined(SRT_DEBUG_SNDQ_HIGHRATE) + self->m_WorkerStats.lNotReadyPop++; +#endif /* SRT_DEBUG_SNDQ_HIGHRATE */ + continue; + } + + const sockaddr_any addr = u->m_PeerAddr; + // Insert a new entry, send_time is the next processing time. + // TODO: maybe reschedule by taking the smaller time? + const steady_clock::time_point send_time = res_time.second; + if (!is_zero(send_time)) + self->m_pSndUList->update(u, CSndUList::DONT_RESCHEDULE, send_time); + HLOGC(qslog.Debug, log << self->CONID() << "chn:SENDING: " << pkt.Info()); self->m_pChannel->sendto(addr, pkt); diff --git a/srtcore/queue.h b/srtcore/queue.h index 77256c3fc..2b7408747 100644 --- a/srtcore/queue.h +++ b/srtcore/queue.h @@ -173,13 +173,12 @@ class CSndUList /// Update the timestamp of the UDT instance on the list. /// @param [in] u pointer to the UDT instance /// @param [in] reschedule if the timestamp should be rescheduled - void update(const CUDT* u, EReschedule reschedule); + /// @param [in] ts the next time to trigger sending logic on the CUDT + void update(const CUDT* u, EReschedule reschedule, sync::steady_clock::time_point ts = sync::steady_clock::now()); - /// Retrieve the next packet and peer address from the first entry, and reschedule it in the queue. - /// @param [out] addr destination address of the next packet - /// @param [out] pkt the next packet to be sent - /// @return 1 if successfully retrieved, -1 if no packet found. - int pop(sockaddr_any& addr, CPacket& pkt); + /// Retrieve the next (in time) socket from the heap to process its sending request. + /// @return a pointer to CUDT instance to process next. + CUDT* pop(); /// Remove UDT instance from the list. /// @param [in] u pointer to the UDT instance From 2e6feb2019c68ee696b223b5edf814382e871b2a Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Mon, 12 Jul 2021 16:47:07 +0200 Subject: [PATCH 3/3] [core] CSndUList::update: reschedule if earlier time --- srtcore/queue.cpp | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/srtcore/queue.cpp b/srtcore/queue.cpp index 8cefd276d..2b0fd2c74 100644 --- a/srtcore/queue.cpp +++ b/srtcore/queue.cpp @@ -280,7 +280,10 @@ void srt::CSndUList::update(const CUDT* u, EReschedule reschedule, sync::steady_ if (n->m_iHeapLoc >= 0) { - if (!reschedule) // EReschedule to bool conversion, predicted. + if (reschedule == DONT_RESCHEDULE) + return; + + if (n->m_tsTimeStamp <= ts) return; if (n->m_iHeapLoc == 0) @@ -644,11 +647,9 @@ void* srt::CSndQueue::worker(void* param) } const sockaddr_any addr = u->m_PeerAddr; - // Insert a new entry, send_time is the next processing time. - // TODO: maybe reschedule by taking the smaller time? - const steady_clock::time_point send_time = res_time.second; - if (!is_zero(send_time)) - self->m_pSndUList->update(u, CSndUList::DONT_RESCHEDULE, send_time); + const steady_clock::time_point next_send_time = res_time.second; + if (!is_zero(next_send_time)) + self->m_pSndUList->update(u, CSndUList::DO_RESCHEDULE, next_send_time); HLOGC(qslog.Debug, log << self->CONID() << "chn:SENDING: " << pkt.Info()); self->m_pChannel->sendto(addr, pkt);