Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed group drift synchronization #2139

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions srtcore/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1549,8 +1549,8 @@ CPacket* CRcvBuffer::getRcvReadyPacket(int32_t seqdistance)
HLOGC(brlog.Debug, log << "getRcvReadyPacket: Sequence offset=" << seqdistance << " IS NOT RECEIVED.");
return 0;
}
IF_HEAVY_LOGGING(int nskipped = 0);

IF_HEAVY_LOGGING(int nskipped = 0);
for (int i = m_iStartPos, n = m_iLastAckPos; i != n; i = shiftFwd(i))
{
/*
Expand All @@ -1559,8 +1559,8 @@ CPacket* CRcvBuffer::getRcvReadyPacket(int32_t seqdistance)
if (m_pUnit[i] && m_pUnit[i]->m_iFlag == CUnit::GOOD)
{
HLOGC(brlog.Debug,
log << "getRcvReadyPacket: Found next packet seq=%" << m_pUnit[i]->m_Packet.getSeqNo() << " ("
<< nskipped << " empty cells skipped)");
log << "getRcvReadyPacket: Found next packet seq=%" << m_pUnit[i]->m_Packet.getSeqNo() << " ("
<< nskipped << " empty cells skipped)");
return &m_pUnit[i]->m_Packet;
}
IF_HEAVY_LOGGING(++nskipped);
Expand Down Expand Up @@ -1881,12 +1881,9 @@ void CRcvBuffer::setRcvTsbPdMode(const steady_clock::time_point& timebase, const
m_tsbpd.setTsbPdMode(timebase, no_wrap_check, delay);
}

bool CRcvBuffer::addRcvTsbPdDriftSample(uint32_t timestamp_us,
int rtt,
steady_clock::duration& w_udrift,
steady_clock::time_point& w_newtimebase)
bool CRcvBuffer::addRcvTsbPdDriftSample(uint32_t timestamp_us, int rtt)
{
return m_tsbpd.addDriftSample(timestamp_us, rtt, w_udrift, w_newtimebase);
return m_tsbpd.addDriftSample(timestamp_us, rtt);
}

int CRcvBuffer::readMsg(char* data, int len)
Expand Down
8 changes: 2 additions & 6 deletions srtcore/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -411,12 +411,8 @@ class CRcvBuffer

/// Add packet timestamp for drift caclculation and compensation
/// @param [in] timestamp packet time stamp
/// @param [out] w_udrift current drift value
/// @param [out] w_newtimebase current TSBPD base time
bool addRcvTsbPdDriftSample(uint32_t timestamp,
int rtt,
duration& w_udrift,
time_point& w_newtimebase);
/// @param [in] rtt RTT sample
bool addRcvTsbPdDriftSample(uint32_t timestamp, int rtt);

#ifdef SRT_DEBUG_TSBPD_DRIFT
void printDriftHistogram(int64_t iDrift);
Expand Down
7 changes: 2 additions & 5 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8324,17 +8324,14 @@ void srt::CUDT::processCtrlAckAck(const CPacket& ctrlpkt, const time_point& tsAr
// srt_recvfile (which doesn't make any sense), you'll have a deadlock.
if (m_config.bDriftTracer)
{
steady_clock::duration udrift(0);
steady_clock::time_point newtimebase;
const bool drift_updated SRT_ATR_UNUSED = m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(),
rtt, (udrift), (newtimebase));
const bool drift_updated SRT_ATR_UNUSED = m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), rtt);
#if ENABLE_EXPERIMENTAL_BONDING
if (drift_updated && m_parent->m_GroupOf)
{
ScopedLock glock(s_UDTUnited.m_GlobControlLock);
if (m_parent->m_GroupOf)
{
m_parent->m_GroupOf->synchronizeDrift(this, udrift, newtimebase);
m_parent->m_GroupOf->synchronizeDrift(this);
}
}
#endif
Expand Down
49 changes: 14 additions & 35 deletions srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2725,48 +2725,24 @@ const char* CUDTGroup::StateStr(CUDTGroup::GroupState st)
return unknown;
}

void CUDTGroup::synchronizeDrift(CUDT* cu, steady_clock::duration udrift, steady_clock::time_point newtimebase)
void CUDTGroup::synchronizeDrift(const srt::CUDT* srcMember)
{
SRT_ASSERT(srcMember != NULL);
ScopedLock glock(m_GroupLock);

bool wrap_period = false;

bool anycheck = false;

for (gli_t gi = m_Group.begin(); gi != m_Group.end(); ++gi)
{
// Skip non-connected; these will be synchronized when ready
if (gi->laststatus != SRTS_CONNECTED)
continue;

// Skip the entity that has reported this
if (cu == &gi->ps->core())
continue;

steady_clock::time_point this_timebase;
steady_clock::duration this_udrift(0);
bool wrp = false;
gi->ps->core().m_pRcvBuffer->getInternalTimeBase((this_timebase), (wrp), (this_udrift));

udrift = std::min(udrift, this_udrift);
steady_clock::time_point new_newtimebase = std::min(newtimebase, this_timebase);
if (new_newtimebase != newtimebase)
{
wrap_period = wrp;
}
newtimebase = new_newtimebase;
anycheck = true;
}

if (!anycheck)
if (m_Group.size() <= 1)
{
HLOGC(grlog.Debug, log << "GROUP: synch uDRIFT NOT DONE, no other links");
return;
}

steady_clock::time_point timebase;
steady_clock::duration udrift(0);
bool wrap_period = false;
srcMember->m_pRcvBuffer->getInternalTimeBase((timebase), (wrap_period), (udrift));

HLOGC(grlog.Debug,
log << "GROUP: synch uDRIFT=" << FormatDuration(udrift) << " TB=" << FormatTime(newtimebase) << "("
<< (wrap_period ? "" : "NO ") << "wrap period)");
log << "GROUP: synch uDRIFT=" << FormatDuration(udrift) << " TB=" << FormatTime(timebase) << "("
<< (wrap_period ? "" : "NO ") << "wrap period)");

// Now that we have the minimum timebase and drift calculated, apply this to every link,
// INCLUDING THE REPORTER.
Expand All @@ -2776,8 +2752,11 @@ void CUDTGroup::synchronizeDrift(CUDT* cu, steady_clock::duration udrift, steady
// Skip non-connected; these will be synchronized when ready
if (gi->laststatus != SRTS_CONNECTED)
continue;
CUDT& member = gi->ps->core();
if (srcMember == &member)
continue;

gi->ps->core().m_pRcvBuffer->applyGroupDrift(newtimebase, wrap_period, udrift);
member.m_pRcvBuffer->applyGroupDrift(timebase, wrap_period, udrift);
}
}

Expand Down
5 changes: 4 additions & 1 deletion srtcore/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,10 @@ class CUDTGroup
// Live state synchronization
bool getBufferTimeBase(srt::CUDT* forthesakeof, time_point& w_tb, bool& w_wp, duration& w_dr);
bool applyGroupSequences(SRTSOCKET, int32_t& w_snd_isn, int32_t& w_rcv_isn);
void synchronizeDrift(srt::CUDT* cu, duration udrift, time_point newtimebase);

/// @brief Synchronize TSBPD base time and clock drift among members using the @a srcMember as a reference.
/// @param srcMember a reference for synchronization.
void synchronizeDrift(const srt::CUDT* srcMember);

void updateLatestRcv(srt::CUDTSocket*);

Expand Down
8 changes: 1 addition & 7 deletions srtcore/tsbpd_time.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,7 @@ drift_logger g_drift_logger;

#endif // SRT_DEBUG_TRACE_DRIFT

bool CTsbpdTime::addDriftSample(uint32_t usPktTimestamp,
int usRTTSample,
steady_clock::duration& w_udrift,
steady_clock::time_point& w_newtimebase)
bool CTsbpdTime::addDriftSample(uint32_t usPktTimestamp, int usRTTSample)
{
if (!m_bTsbPdMode)
return false;
Expand Down Expand Up @@ -149,9 +146,6 @@ bool CTsbpdTime::addDriftSample(uint32_t usPktTimestamp,
log << "DRIFT=" << FormatDuration(tdDrift) << " TB REMAINS: " << FormatTime(m_tsTsbPdTimeBase));
}

w_udrift = tdDrift;
w_newtimebase = m_tsTsbPdTimeBase;

#if SRT_DEBUG_TRACE_DRIFT
g_drift_logger.trace(usPktTimestamp,
usRTTSample,
Expand Down
7 changes: 1 addition & 6 deletions srtcore/tsbpd_time.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,9 @@ class CTsbpdTime
///
/// @param [in] pktTimestamp Timestamp of the arrived ACKACK packet.
/// @param [in] usRTTSample RTT sample from an ACK-ACKACK pair.
/// @param [out] w_udrift Current clock drift value.
/// @param [out] w_newtimebase Current TSBPD base time.
///
/// @return true if TSBPD base time has changed, false otherwise.
bool addDriftSample(uint32_t pktTimestamp,
int usRTTSample,
steady_clock::duration& w_udrift,
steady_clock::time_point& w_newtimebase);
bool addDriftSample(uint32_t pktTimestamp, int usRTTSample);

/// @brief Handle timestamp of data packet when 32-bit integer carryover is about to happen.
/// When packet timestamp approaches CPacket::MAX_TIMESTAMP, the TSBPD base time should be
Expand Down