From e4dbba41d639438707a6c6cade389bd9ffbef5a7 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Mon, 21 Jun 2021 17:09:27 +0200 Subject: [PATCH] [core] Refactor CSndUList: pop CUDT, not a packet --- srtcore/core.cpp | 4 +-- srtcore/queue.cpp | 82 ++++++++++++++++++++++++++--------------------- srtcore/queue.h | 11 +++---- 3 files changed, 52 insertions(+), 45 deletions(-) diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 7a337cf41..521bc6125 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -7974,14 +7974,14 @@ void srt::CUDT::updateSndLossListOnACK(int32_t ackdata_seqno) #endif // insert this socket to snd list if it is not on the list yet - m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE); + const steady_clock::time_point currtime = steady_clock::now(); + m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE, currtime); if (m_config.bSynSending) { CSync::lock_signal(m_SendBlockCond, m_SendBlockLock); } - const steady_clock::time_point currtime = steady_clock::now(); // record total time used for sending enterCS(m_StatsLock); m_stats.sndDuration += count_microseconds(currtime - m_stats.sndDurationCounter); diff --git a/srtcore/queue.cpp b/srtcore/queue.cpp index f17ad5745..8cefd276d 100644 --- a/srtcore/queue.cpp +++ b/srtcore/queue.cpp @@ -272,7 +272,7 @@ srt::CSndUList::~CSndUList() delete[] m_pHeap; } -void srt::CSndUList::update(const CUDT* u, EReschedule reschedule) +void srt::CSndUList::update(const CUDT* u, EReschedule reschedule, sync::steady_clock::time_point ts) { ScopedLock listguard(m_ListLock); @@ -285,58 +285,33 @@ void srt::CSndUList::update(const CUDT* u, EReschedule reschedule) if (n->m_iHeapLoc == 0) { - n->m_tsTimeStamp = steady_clock::now(); + n->m_tsTimeStamp = ts; m_pTimer->interrupt(); return; } remove_(u); - insert_norealloc_(steady_clock::now(), u); + insert_norealloc_(ts, u); return; } - insert_(steady_clock::now(), u); + insert_(ts, u); } -int srt::CSndUList::pop(sockaddr_any& w_addr, CPacket& w_pkt) +srt::CUDT* srt::CSndUList::pop() { ScopedLock listguard(m_ListLock); if (-1 == m_iLastEntry) - return -1; + return NULL; // no pop until the next scheduled time if (m_pHeap[0]->m_tsTimeStamp > steady_clock::now()) - return -1; + return NULL; CUDT* u = m_pHeap[0]->m_pUDT; remove_(u); - -#define UST(field) ((u->m_b##field) ? "+" : "-") << #field << " " - - HLOGC(qslog.Debug, - log << "SND:pop: requesting packet from @" << u->socketID() << " STATUS: " << UST(Listening) - << UST(Connecting) << UST(Connected) << UST(Closing) << UST(Shutdown) << UST(Broken) << UST(PeerHealth) - << UST(Opened)); -#undef UST - - if (!u->m_bConnected || u->m_bBroken) - return -1; - - // pack a packet from the socket - const std::pair res_time = u->packData((w_pkt)); - - if (res_time.first <= 0) - return -1; - - w_addr = u->m_PeerAddr; - - // insert a new entry, ts is the next processing time - const steady_clock::time_point send_time = res_time.second; - if (!is_zero(send_time)) - insert_norealloc_(send_time, u); - - return 1; + return u; } void srt::CSndUList::remove(const CUDT* u) @@ -630,18 +605,51 @@ void* srt::CSndQueue::worker(void* param) } THREAD_RESUMED(); - // it is time to send the next pkt - sockaddr_any addr; - CPacket pkt; - if (self->m_pSndUList->pop((addr), (pkt)) < 0) + // Get a socket with a send request if any. + CUDT* u = self->m_pSndUList->pop(); + if (u == NULL) { +#if defined(SRT_DEBUG_SNDQ_HIGHRATE) + self->m_WorkerStats.lNotReadyPop++; +#endif /* SRT_DEBUG_SNDQ_HIGHRATE */ continue; + } + +#define UST(field) ((u->m_b##field) ? "+" : "-") << #field << " " + HLOGC(qslog.Debug, + log << "CSndQueue: requesting packet from @" << u->socketID() << " STATUS: " << UST(Listening) + << UST(Connecting) << UST(Connected) << UST(Closing) << UST(Shutdown) << UST(Broken) << UST(PeerHealth) + << UST(Opened)); +#undef UST + if (!u->m_bConnected || u->m_bBroken) + { #if defined(SRT_DEBUG_SNDQ_HIGHRATE) self->m_WorkerStats.lNotReadyPop++; #endif /* SRT_DEBUG_SNDQ_HIGHRATE */ + continue; } + // pack a packet from the socket + CPacket pkt; + const std::pair res_time = u->packData((pkt)); + + // Check if payload size is invalid. + if (res_time.first <= 0) + { +#if defined(SRT_DEBUG_SNDQ_HIGHRATE) + self->m_WorkerStats.lNotReadyPop++; +#endif /* SRT_DEBUG_SNDQ_HIGHRATE */ + continue; + } + + const sockaddr_any addr = u->m_PeerAddr; + // Insert a new entry, send_time is the next processing time. + // TODO: maybe reschedule by taking the smaller time? + const steady_clock::time_point send_time = res_time.second; + if (!is_zero(send_time)) + self->m_pSndUList->update(u, CSndUList::DONT_RESCHEDULE, send_time); + HLOGC(qslog.Debug, log << self->CONID() << "chn:SENDING: " << pkt.Info()); self->m_pChannel->sendto(addr, pkt); diff --git a/srtcore/queue.h b/srtcore/queue.h index 77256c3fc..2b7408747 100644 --- a/srtcore/queue.h +++ b/srtcore/queue.h @@ -173,13 +173,12 @@ class CSndUList /// Update the timestamp of the UDT instance on the list. /// @param [in] u pointer to the UDT instance /// @param [in] reschedule if the timestamp should be rescheduled - void update(const CUDT* u, EReschedule reschedule); + /// @param [in] ts the next time to trigger sending logic on the CUDT + void update(const CUDT* u, EReschedule reschedule, sync::steady_clock::time_point ts = sync::steady_clock::now()); - /// Retrieve the next packet and peer address from the first entry, and reschedule it in the queue. - /// @param [out] addr destination address of the next packet - /// @param [out] pkt the next packet to be sent - /// @return 1 if successfully retrieved, -1 if no packet found. - int pop(sockaddr_any& addr, CPacket& pkt); + /// Retrieve the next (in time) socket from the heap to process its sending request. + /// @return a pointer to CUDT instance to process next. + CUDT* pop(); /// Remove UDT instance from the list. /// @param [in] u pointer to the UDT instance