diff --git a/srtcore/buffer.cpp b/srtcore/buffer.cpp index bbe41b7f6..c035ad936 100644 --- a/srtcore/buffer.cpp +++ b/srtcore/buffer.cpp @@ -1549,8 +1549,8 @@ CPacket* CRcvBuffer::getRcvReadyPacket(int32_t seqdistance) HLOGC(brlog.Debug, log << "getRcvReadyPacket: Sequence offset=" << seqdistance << " IS NOT RECEIVED."); return 0; } - IF_HEAVY_LOGGING(int nskipped = 0); + IF_HEAVY_LOGGING(int nskipped = 0); for (int i = m_iStartPos, n = m_iLastAckPos; i != n; i = shiftFwd(i)) { /* @@ -1559,8 +1559,8 @@ CPacket* CRcvBuffer::getRcvReadyPacket(int32_t seqdistance) if (m_pUnit[i] && m_pUnit[i]->m_iFlag == CUnit::GOOD) { HLOGC(brlog.Debug, - log << "getRcvReadyPacket: Found next packet seq=%" << m_pUnit[i]->m_Packet.getSeqNo() << " (" - << nskipped << " empty cells skipped)"); + log << "getRcvReadyPacket: Found next packet seq=%" << m_pUnit[i]->m_Packet.getSeqNo() << " (" + << nskipped << " empty cells skipped)"); return &m_pUnit[i]->m_Packet; } IF_HEAVY_LOGGING(++nskipped); @@ -1881,12 +1881,9 @@ void CRcvBuffer::setRcvTsbPdMode(const steady_clock::time_point& timebase, const m_tsbpd.setTsbPdMode(timebase, no_wrap_check, delay); } -bool CRcvBuffer::addRcvTsbPdDriftSample(uint32_t timestamp_us, - int rtt, - steady_clock::duration& w_udrift, - steady_clock::time_point& w_newtimebase) +bool CRcvBuffer::addRcvTsbPdDriftSample(uint32_t timestamp_us, int rtt) { - return m_tsbpd.addDriftSample(timestamp_us, rtt, w_udrift, w_newtimebase); + return m_tsbpd.addDriftSample(timestamp_us, rtt); } int CRcvBuffer::readMsg(char* data, int len) diff --git a/srtcore/buffer.h b/srtcore/buffer.h index 9bf02f216..2dc63d972 100644 --- a/srtcore/buffer.h +++ b/srtcore/buffer.h @@ -411,12 +411,8 @@ class CRcvBuffer /// Add packet timestamp for drift caclculation and compensation /// @param [in] timestamp packet time stamp - /// @param [out] w_udrift current drift value - /// @param [out] w_newtimebase current TSBPD base time - bool addRcvTsbPdDriftSample(uint32_t timestamp, - int rtt, - duration& w_udrift, - time_point& w_newtimebase); + /// @param [in] rtt RTT sample + bool addRcvTsbPdDriftSample(uint32_t timestamp, int rtt); #ifdef SRT_DEBUG_TSBPD_DRIFT void printDriftHistogram(int64_t iDrift); diff --git a/srtcore/core.cpp b/srtcore/core.cpp index a5fe649d8..ab1e7da9d 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -8324,17 +8324,14 @@ void srt::CUDT::processCtrlAckAck(const CPacket& ctrlpkt, const time_point& tsAr // srt_recvfile (which doesn't make any sense), you'll have a deadlock. if (m_config.bDriftTracer) { - steady_clock::duration udrift(0); - steady_clock::time_point newtimebase; - const bool drift_updated SRT_ATR_UNUSED = m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), - rtt, (udrift), (newtimebase)); + const bool drift_updated SRT_ATR_UNUSED = m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), rtt); #if ENABLE_EXPERIMENTAL_BONDING if (drift_updated && m_parent->m_GroupOf) { ScopedLock glock(s_UDTUnited.m_GlobControlLock); if (m_parent->m_GroupOf) { - m_parent->m_GroupOf->synchronizeDrift(this, udrift, newtimebase); + m_parent->m_GroupOf->synchronizeDrift(this); } } #endif diff --git a/srtcore/group.cpp b/srtcore/group.cpp index e835453ea..bee5c2beb 100644 --- a/srtcore/group.cpp +++ b/srtcore/group.cpp @@ -2725,48 +2725,24 @@ const char* CUDTGroup::StateStr(CUDTGroup::GroupState st) return unknown; } -void CUDTGroup::synchronizeDrift(CUDT* cu, steady_clock::duration udrift, steady_clock::time_point newtimebase) +void CUDTGroup::synchronizeDrift(const srt::CUDT* srcMember) { + SRT_ASSERT(srcMember != NULL); ScopedLock glock(m_GroupLock); - - bool wrap_period = false; - - bool anycheck = false; - - for (gli_t gi = m_Group.begin(); gi != m_Group.end(); ++gi) - { - // Skip non-connected; these will be synchronized when ready - if (gi->laststatus != SRTS_CONNECTED) - continue; - - // Skip the entity that has reported this - if (cu == &gi->ps->core()) - continue; - - steady_clock::time_point this_timebase; - steady_clock::duration this_udrift(0); - bool wrp = false; - gi->ps->core().m_pRcvBuffer->getInternalTimeBase((this_timebase), (wrp), (this_udrift)); - - udrift = std::min(udrift, this_udrift); - steady_clock::time_point new_newtimebase = std::min(newtimebase, this_timebase); - if (new_newtimebase != newtimebase) - { - wrap_period = wrp; - } - newtimebase = new_newtimebase; - anycheck = true; - } - - if (!anycheck) + if (m_Group.size() <= 1) { HLOGC(grlog.Debug, log << "GROUP: synch uDRIFT NOT DONE, no other links"); return; } + steady_clock::time_point timebase; + steady_clock::duration udrift(0); + bool wrap_period = false; + srcMember->m_pRcvBuffer->getInternalTimeBase((timebase), (wrap_period), (udrift)); + HLOGC(grlog.Debug, - log << "GROUP: synch uDRIFT=" << FormatDuration(udrift) << " TB=" << FormatTime(newtimebase) << "(" - << (wrap_period ? "" : "NO ") << "wrap period)"); + log << "GROUP: synch uDRIFT=" << FormatDuration(udrift) << " TB=" << FormatTime(timebase) << "(" + << (wrap_period ? "" : "NO ") << "wrap period)"); // Now that we have the minimum timebase and drift calculated, apply this to every link, // INCLUDING THE REPORTER. @@ -2776,8 +2752,11 @@ void CUDTGroup::synchronizeDrift(CUDT* cu, steady_clock::duration udrift, steady // Skip non-connected; these will be synchronized when ready if (gi->laststatus != SRTS_CONNECTED) continue; + CUDT& member = gi->ps->core(); + if (srcMember == &member) + continue; - gi->ps->core().m_pRcvBuffer->applyGroupDrift(newtimebase, wrap_period, udrift); + member.m_pRcvBuffer->applyGroupDrift(timebase, wrap_period, udrift); } } diff --git a/srtcore/group.h b/srtcore/group.h index e4b2fb6ed..04c8e7e01 100644 --- a/srtcore/group.h +++ b/srtcore/group.h @@ -798,7 +798,10 @@ class CUDTGroup // Live state synchronization bool getBufferTimeBase(srt::CUDT* forthesakeof, time_point& w_tb, bool& w_wp, duration& w_dr); bool applyGroupSequences(SRTSOCKET, int32_t& w_snd_isn, int32_t& w_rcv_isn); - void synchronizeDrift(srt::CUDT* cu, duration udrift, time_point newtimebase); + + /// @brief Synchronize TSBPD base time and clock drift among members using the @a srcMember as a reference. + /// @param srcMember a reference for synchronization. + void synchronizeDrift(const srt::CUDT* srcMember); void updateLatestRcv(srt::CUDTSocket*); diff --git a/srtcore/tsbpd_time.cpp b/srtcore/tsbpd_time.cpp index 54a8c5df5..9b75d595e 100644 --- a/srtcore/tsbpd_time.cpp +++ b/srtcore/tsbpd_time.cpp @@ -103,10 +103,7 @@ drift_logger g_drift_logger; #endif // SRT_DEBUG_TRACE_DRIFT -bool CTsbpdTime::addDriftSample(uint32_t usPktTimestamp, - int usRTTSample, - steady_clock::duration& w_udrift, - steady_clock::time_point& w_newtimebase) +bool CTsbpdTime::addDriftSample(uint32_t usPktTimestamp, int usRTTSample) { if (!m_bTsbPdMode) return false; @@ -149,9 +146,6 @@ bool CTsbpdTime::addDriftSample(uint32_t usPktTimestamp, log << "DRIFT=" << FormatDuration(tdDrift) << " TB REMAINS: " << FormatTime(m_tsTsbPdTimeBase)); } - w_udrift = tdDrift; - w_newtimebase = m_tsTsbPdTimeBase; - #if SRT_DEBUG_TRACE_DRIFT g_drift_logger.trace(usPktTimestamp, usRTTSample, diff --git a/srtcore/tsbpd_time.h b/srtcore/tsbpd_time.h index b6cb770f5..dcaf05718 100644 --- a/srtcore/tsbpd_time.h +++ b/srtcore/tsbpd_time.h @@ -67,14 +67,9 @@ class CTsbpdTime /// /// @param [in] pktTimestamp Timestamp of the arrived ACKACK packet. /// @param [in] usRTTSample RTT sample from an ACK-ACKACK pair. - /// @param [out] w_udrift Current clock drift value. - /// @param [out] w_newtimebase Current TSBPD base time. /// /// @return true if TSBPD base time has changed, false otherwise. - bool addDriftSample(uint32_t pktTimestamp, - int usRTTSample, - steady_clock::duration& w_udrift, - steady_clock::time_point& w_newtimebase); + bool addDriftSample(uint32_t pktTimestamp, int usRTTSample); /// @brief Handle timestamp of data packet when 32-bit integer carryover is about to happen. /// When packet timestamp approaches CPacket::MAX_TIMESTAMP, the TSBPD base time should be