Skip to content

Commit

Permalink
[core] Fixed group drift synchronization (#2139)
Browse files Browse the repository at this point in the history
  • Loading branch information
maxsharabayko authored Sep 29, 2021
1 parent cdec114 commit 167b8e5
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 68 deletions.
13 changes: 5 additions & 8 deletions srtcore/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1549,8 +1549,8 @@ CPacket* CRcvBuffer::getRcvReadyPacket(int32_t seqdistance)
HLOGC(brlog.Debug, log << "getRcvReadyPacket: Sequence offset=" << seqdistance << " IS NOT RECEIVED.");
return 0;
}
IF_HEAVY_LOGGING(int nskipped = 0);

IF_HEAVY_LOGGING(int nskipped = 0);
for (int i = m_iStartPos, n = m_iLastAckPos; i != n; i = shiftFwd(i))
{
/*
Expand All @@ -1559,8 +1559,8 @@ CPacket* CRcvBuffer::getRcvReadyPacket(int32_t seqdistance)
if (m_pUnit[i] && m_pUnit[i]->m_iFlag == CUnit::GOOD)
{
HLOGC(brlog.Debug,
log << "getRcvReadyPacket: Found next packet seq=%" << m_pUnit[i]->m_Packet.getSeqNo() << " ("
<< nskipped << " empty cells skipped)");
log << "getRcvReadyPacket: Found next packet seq=%" << m_pUnit[i]->m_Packet.getSeqNo() << " ("
<< nskipped << " empty cells skipped)");
return &m_pUnit[i]->m_Packet;
}
IF_HEAVY_LOGGING(++nskipped);
Expand Down Expand Up @@ -1881,12 +1881,9 @@ void CRcvBuffer::setRcvTsbPdMode(const steady_clock::time_point& timebase, const
m_tsbpd.setTsbPdMode(timebase, no_wrap_check, delay);
}

bool CRcvBuffer::addRcvTsbPdDriftSample(uint32_t timestamp_us,
int rtt,
steady_clock::duration& w_udrift,
steady_clock::time_point& w_newtimebase)
bool CRcvBuffer::addRcvTsbPdDriftSample(uint32_t timestamp_us, int rtt)
{
return m_tsbpd.addDriftSample(timestamp_us, rtt, w_udrift, w_newtimebase);
return m_tsbpd.addDriftSample(timestamp_us, rtt);
}

int CRcvBuffer::readMsg(char* data, int len)
Expand Down
8 changes: 2 additions & 6 deletions srtcore/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -411,12 +411,8 @@ class CRcvBuffer

/// Add packet timestamp for drift caclculation and compensation
/// @param [in] timestamp packet time stamp
/// @param [out] w_udrift current drift value
/// @param [out] w_newtimebase current TSBPD base time
bool addRcvTsbPdDriftSample(uint32_t timestamp,
int rtt,
duration& w_udrift,
time_point& w_newtimebase);
/// @param [in] rtt RTT sample
bool addRcvTsbPdDriftSample(uint32_t timestamp, int rtt);

#ifdef SRT_DEBUG_TSBPD_DRIFT
void printDriftHistogram(int64_t iDrift);
Expand Down
7 changes: 2 additions & 5 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8324,17 +8324,14 @@ void srt::CUDT::processCtrlAckAck(const CPacket& ctrlpkt, const time_point& tsAr
// srt_recvfile (which doesn't make any sense), you'll have a deadlock.
if (m_config.bDriftTracer)
{
steady_clock::duration udrift(0);
steady_clock::time_point newtimebase;
const bool drift_updated SRT_ATR_UNUSED = m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(),
rtt, (udrift), (newtimebase));
const bool drift_updated SRT_ATR_UNUSED = m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), rtt);
#if ENABLE_EXPERIMENTAL_BONDING
if (drift_updated && m_parent->m_GroupOf)
{
ScopedLock glock(s_UDTUnited.m_GlobControlLock);
if (m_parent->m_GroupOf)
{
m_parent->m_GroupOf->synchronizeDrift(this, udrift, newtimebase);
m_parent->m_GroupOf->synchronizeDrift(this);
}
}
#endif
Expand Down
49 changes: 14 additions & 35 deletions srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2725,48 +2725,24 @@ const char* CUDTGroup::StateStr(CUDTGroup::GroupState st)
return unknown;
}

void CUDTGroup::synchronizeDrift(CUDT* cu, steady_clock::duration udrift, steady_clock::time_point newtimebase)
void CUDTGroup::synchronizeDrift(const srt::CUDT* srcMember)
{
SRT_ASSERT(srcMember != NULL);
ScopedLock glock(m_GroupLock);

bool wrap_period = false;

bool anycheck = false;

for (gli_t gi = m_Group.begin(); gi != m_Group.end(); ++gi)
{
// Skip non-connected; these will be synchronized when ready
if (gi->laststatus != SRTS_CONNECTED)
continue;

// Skip the entity that has reported this
if (cu == &gi->ps->core())
continue;

steady_clock::time_point this_timebase;
steady_clock::duration this_udrift(0);
bool wrp = false;
gi->ps->core().m_pRcvBuffer->getInternalTimeBase((this_timebase), (wrp), (this_udrift));

udrift = std::min(udrift, this_udrift);
steady_clock::time_point new_newtimebase = std::min(newtimebase, this_timebase);
if (new_newtimebase != newtimebase)
{
wrap_period = wrp;
}
newtimebase = new_newtimebase;
anycheck = true;
}

if (!anycheck)
if (m_Group.size() <= 1)
{
HLOGC(grlog.Debug, log << "GROUP: synch uDRIFT NOT DONE, no other links");
return;
}

steady_clock::time_point timebase;
steady_clock::duration udrift(0);
bool wrap_period = false;
srcMember->m_pRcvBuffer->getInternalTimeBase((timebase), (wrap_period), (udrift));

HLOGC(grlog.Debug,
log << "GROUP: synch uDRIFT=" << FormatDuration(udrift) << " TB=" << FormatTime(newtimebase) << "("
<< (wrap_period ? "" : "NO ") << "wrap period)");
log << "GROUP: synch uDRIFT=" << FormatDuration(udrift) << " TB=" << FormatTime(timebase) << "("
<< (wrap_period ? "" : "NO ") << "wrap period)");

// Now that we have the minimum timebase and drift calculated, apply this to every link,
// INCLUDING THE REPORTER.
Expand All @@ -2776,8 +2752,11 @@ void CUDTGroup::synchronizeDrift(CUDT* cu, steady_clock::duration udrift, steady
// Skip non-connected; these will be synchronized when ready
if (gi->laststatus != SRTS_CONNECTED)
continue;
CUDT& member = gi->ps->core();
if (srcMember == &member)
continue;

gi->ps->core().m_pRcvBuffer->applyGroupDrift(newtimebase, wrap_period, udrift);
member.m_pRcvBuffer->applyGroupDrift(timebase, wrap_period, udrift);
}
}

Expand Down
5 changes: 4 additions & 1 deletion srtcore/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,10 @@ class CUDTGroup
// Live state synchronization
bool getBufferTimeBase(srt::CUDT* forthesakeof, time_point& w_tb, bool& w_wp, duration& w_dr);
bool applyGroupSequences(SRTSOCKET, int32_t& w_snd_isn, int32_t& w_rcv_isn);
void synchronizeDrift(srt::CUDT* cu, duration udrift, time_point newtimebase);

/// @brief Synchronize TSBPD base time and clock drift among members using the @a srcMember as a reference.
/// @param srcMember a reference for synchronization.
void synchronizeDrift(const srt::CUDT* srcMember);

void updateLatestRcv(srt::CUDTSocket*);

Expand Down
8 changes: 1 addition & 7 deletions srtcore/tsbpd_time.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,7 @@ drift_logger g_drift_logger;

#endif // SRT_DEBUG_TRACE_DRIFT

bool CTsbpdTime::addDriftSample(uint32_t usPktTimestamp,
int usRTTSample,
steady_clock::duration& w_udrift,
steady_clock::time_point& w_newtimebase)
bool CTsbpdTime::addDriftSample(uint32_t usPktTimestamp, int usRTTSample)
{
if (!m_bTsbPdMode)
return false;
Expand Down Expand Up @@ -149,9 +146,6 @@ bool CTsbpdTime::addDriftSample(uint32_t usPktTimestamp,
log << "DRIFT=" << FormatDuration(tdDrift) << " TB REMAINS: " << FormatTime(m_tsTsbPdTimeBase));
}

w_udrift = tdDrift;
w_newtimebase = m_tsTsbPdTimeBase;

#if SRT_DEBUG_TRACE_DRIFT
g_drift_logger.trace(usPktTimestamp,
usRTTSample,
Expand Down
7 changes: 1 addition & 6 deletions srtcore/tsbpd_time.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,9 @@ class CTsbpdTime
///
/// @param [in] pktTimestamp Timestamp of the arrived ACKACK packet.
/// @param [in] usRTTSample RTT sample from an ACK-ACKACK pair.
/// @param [out] w_udrift Current clock drift value.
/// @param [out] w_newtimebase Current TSBPD base time.
///
/// @return true if TSBPD base time has changed, false otherwise.
bool addDriftSample(uint32_t pktTimestamp,
int usRTTSample,
steady_clock::duration& w_udrift,
steady_clock::time_point& w_newtimebase);
bool addDriftSample(uint32_t pktTimestamp, int usRTTSample);

/// @brief Handle timestamp of data packet when 32-bit integer carryover is about to happen.
/// When packet timestamp approaches CPacket::MAX_TIMESTAMP, the TSBPD base time should be
Expand Down

0 comments on commit 167b8e5

Please sign in to comment.