From 76567593fc1e10d694957fab5e7ee56dbdc916d3 Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Mon, 8 Feb 2021 10:07:45 +0100 Subject: [PATCH] [core] Runtime link stability timeout for main/backup (#1775) --- CMakeLists.txt | 1 + docs/APISocketOptions.md | 2 + srtcore/core.cpp | 2 +- srtcore/core.h | 6 +- srtcore/group.cpp | 191 ++++++++++++++++++++++++--------------- 5 files changed, 127 insertions(+), 75 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index c5f06d3f7..626bbc7e8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -96,6 +96,7 @@ endforeach() # SRT_DEBUG_TSBPD_WRAP 1 /* Debug packet timestamp wraparound */ # SRT_DEBUG_TLPKTDROP_DROPSEQ 1 # SRT_DEBUG_SNDQ_HIGHRATE 1 +# SRT_DEBUG_BONDING_STATES 1 # SRT_MAVG_SAMPLING_RATE 40 /* Max sampling rate */ # option defaults diff --git a/docs/APISocketOptions.md b/docs/APISocketOptions.md index 878a0ae17..aa34f4fb4 100644 --- a/docs/APISocketOptions.md +++ b/docs/APISocketOptions.md @@ -437,6 +437,8 @@ function will return the group, not this socket ID. | --------------------- | ----- | -------- | ---------- | ------ | -------- | ------ | --- | ------ | | `SRTO_GROUPSTABTIMEO` | 1.5.0 | pre | `int32_t` | ms | 80 | 10-... | W | GSD+ | +**Not in use at the moment. Is to be repurposed in SRT v1.4.3!** + This setting is used for groups of type `SRT_GTYPE_BACKUP`. It defines the stability timeout, which is the maximum interval between two consecutive packets retrieved from the peer on the currently active link. These two packets can be of any type, diff --git a/srtcore/core.cpp b/srtcore/core.cpp index ee631a818..85bd2f369 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -11175,7 +11175,7 @@ bool CUDT::checkExpTimer(const steady_clock::time_point& currtime, int check_rea * (keepalive fix) * duB: * It seems there is confusion of the direction of the Response here. - * LastRspTime is supposed to be when receiving (data/ctrl) from peer + * lastRspTime is supposed to be when receiving (data/ctrl) from peer * as shown in processCtrl and processData, * Here we set because we sent something? * diff --git a/srtcore/core.h b/srtcore/core.h index b5cd59d08..8c0a1a85e 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -374,9 +374,12 @@ class CUDT bool isOPT_TsbPd() const { return m_bOPT_TsbPd; } int RTT() const { return m_iRTT; } + int RTTVar() const { return m_iRTTVar; } int32_t sndSeqNo() const { return m_iSndCurrSeqNo; } int32_t schedSeqNo() const { return m_iSndNextSeqNo; } bool overrideSndSeqNo(int32_t seq); + srt::sync::steady_clock::time_point lastRspTime() const { return m_tsLastRspTime; } + srt::sync::steady_clock::time_point freshActivationStart() const { return m_tsFreshActivation; } int32_t rcvSeqNo() const { return m_iRcvCurrSeqNo; } int flowWindowSize() const { return m_iFlowWindowSize; } @@ -385,7 +388,8 @@ class CUDT int64_t maxBandwidth() const { return m_llMaxBW; } int MSS() const { return m_iMSS; } - uint32_t latency_us() const {return m_iTsbPdDelay_ms*1000; } + uint32_t peerLatency_us() const {return m_iPeerTsbPdDelay_ms * 1000; } + int peerIdleTimeout_ms() const { return m_iOPT_PeerIdleTimeout; } size_t maxPayloadSize() const { return m_iMaxSRTPayloadSize; } size_t OPT_PayloadSize() const { return m_zOPT_ExpPayloadSize; } int sndLossLength() { return m_pSndLossList->getLossLength(); } diff --git a/srtcore/group.cpp b/srtcore/group.cpp index e48207287..15e7e7ace 100644 --- a/srtcore/group.cpp +++ b/srtcore/group.cpp @@ -2978,6 +2978,111 @@ void CUDTGroup::sendBackup_CheckIdleTime(gli_t w_d) } } +#if SRT_DEBUG_BONDING_STATES +class StabilityTracer +{ +public: + StabilityTracer() + { + } + + ~StabilityTracer() + { + srt::sync::ScopedLock lck(m_mtx); + m_fout.close(); + } + + void trace(const CUDT& u, const srt::sync::steady_clock::time_point& currtime, uint32_t activation_period_us, + int64_t stability_tmo_us, const std::string& state, uint16_t weight) + { + srt::sync::ScopedLock lck(m_mtx); + create_file(); + + m_fout << srt::sync::FormatTime(currtime) << ","; + m_fout << u.id() << ","; + m_fout << weight << ","; + m_fout << u.peerLatency_us() << ","; + m_fout << u.RTT() << ","; + m_fout << u.RTTVar() << ","; + m_fout << stability_tmo_us << ","; + m_fout << count_microseconds(currtime - u.lastRspTime()) << ","; + m_fout << state << ","; + m_fout << (srt::sync::is_zero(u.freshActivationStart()) ? -1 : (count_microseconds(currtime - u.freshActivationStart()))) << ","; + m_fout << activation_period_us << "\n"; + m_fout.flush(); + } + +private: + void print_header() + { + //srt::sync::ScopedLock lck(m_mtx); + m_fout << "Timepoint,SocketID,weight,usLatency,usRTT,usRTTVar,usStabilityTimeout,usSinceLastResp,State,usSinceActivation,usActivationPeriod\n"; + } + + void create_file() + { + if (m_fout) + return; + + std::string str_tnow = srt::sync::FormatTimeSys(srt::sync::steady_clock::now()); + str_tnow.resize(str_tnow.size() - 6); // remove trailing ' [SYS]' part + while (str_tnow.find(':') != std::string::npos) { + str_tnow.replace(str_tnow.find(':'), 1, 1, '_'); + } + const std::string fname = "stability_trace_" + str_tnow + ".csv"; + m_fout.open(fname, std::ofstream::out); + if (!m_fout) + std::cerr << "IPE: Failed to open " << fname << "!!!\n"; + + print_header(); + } + +private: + srt::sync::Mutex m_mtx; + std::ofstream m_fout; +}; + +StabilityTracer s_stab_trace; +#endif + +/// TODO: Remove 'weight' parameter? Only needed for logging. +/// @retval 1 - link is identified as stable +/// @retval 0 - link state remains unchanged (too early to identify, still in activation phase) +/// @retval -1 - link is identified as unstable +static int sendBackup_CheckRunningLinkStable(const CUDT& u, const srt::sync::steady_clock::time_point& currtime, uint16_t weight) +{ + const uint32_t latency_us = u.peerLatency_us(); + const int32_t min_stability_us = 60000; // Minimum Link Stability Timeout: 60ms. + const int64_t initial_stabtout_us = max(min_stability_us, latency_us); + const int64_t activation_period_us = initial_stabtout_us + 5 * CUDT::COMM_SYN_INTERVAL_US; + + // RTT and RTTVar values are still being refined during activation period, + // therefore the dymanic timeout should not be used in activation phase. + const bool is_activation_phase = !is_zero(u.freshActivationStart()) + && (count_microseconds(currtime - u.freshActivationStart()) <= activation_period_us); + + const int64_t stability_tout_us = is_activation_phase + ? initial_stabtout_us // activation phase + : min(max(min_stability_us, 2 * u.RTT() + 4 * u.RTTVar()), latency_us); + + const steady_clock::time_point last_rsp = max(u.freshActivationStart(), u.lastRspTime()); + const steady_clock::duration td_response = currtime - last_rsp; + if (count_microseconds(td_response) > stability_tout_us) + { +#if SRT_DEBUG_BONDING_STATES + s_stab_trace.trace(u, currtime, activation_period_us, stability_tout_us, is_activation_phase ? "ACTIVATION-UNSTABLE" : "UNSTABLE", weight); +#endif + return -1; + } + + // u.lastRspTime() > currtime is alwats true due to the very first check above in this function +#if SRT_DEBUG_BONDING_STATES + s_stab_trace.trace(u, currtime, activation_period_us, stability_tout_us, is_activation_phase ? "ACTIVATION" : "STABLE", weight); +#endif + return is_activation_phase ? 0 : 1; +} + + // [[using locked(this->m_GroupLock)]] bool CUDTGroup::sendBackup_CheckRunningStability(const gli_t d, const time_point currtime) { @@ -2994,8 +3099,6 @@ bool CUDTGroup::sendBackup_CheckRunningStability(const gli_t d, const time_point // negative value is relatively easy, while introducing a mutex would only add a // deadlock risk and performance degradation. - bool is_stable = true; - HLOGC(gslog.Debug, log << "grp/sendBackup: CHECK STABLE: @" << d->id << ": TIMEDIFF {response= " << FormatDuration(currtime - u.m_tsLastRspTime) @@ -3005,89 +3108,33 @@ bool CUDTGroup::sendBackup_CheckRunningStability(const gli_t d, const time_point << (!is_zero(u.m_tsUnstableSince) ? FormatDuration(currtime - u.m_tsUnstableSince) : "NEVER") << "}"); - if (currtime > u.m_tsLastRspTime) - { - // The last response predates the start of this function, look at the difference - const steady_clock::duration td_responsive = currtime - u.m_tsLastRspTime; - bool check_stability = true; - - if (!is_zero(u.m_tsFreshActivation) && u.m_tsFreshActivation < currtime) - { - // The link is temporary-activated. Calculate then since the activation time. - - // Check the last received ACK time first. This time is initialized with 'now' - // at the CUDT::open call, so you can't count on the trap zero time here, but - // it's still possible to check if activation time predates the ACK time. Things - // are here in the following possible order: - // - // - ACK time (old because defined at open) - // - Response time (old because the time of received handshake or keepalive counts) - // ... long time nothing ... - // - Activation time. - // - // If we have this situation, we have to wait for at least one ACK that is - // newer than activation time. However, if in this situation we have a fresh - // response, that is: - // - // - ACK time - // ... - // - Activation time - // - Response time (because a Keepalive had a caprice to come accidentally after sending) - // - // We still wait for a situation that there's at least one ACK that is newer than activation. + const int is_stable = sendBackup_CheckRunningLinkStable(u, currtime, d->weight); - // As we DO have activation time, we need to check if there's at least - // one ACK newer than activation, that is, td_acked < td_active - if (u.m_tsLastRspAckTime < u.m_tsFreshActivation) - { - check_stability = false; - HLOGC(gslog.Debug, - log << "grp/sendBackup: link @" << d->id - << " activated after ACK, " - "not checking for stability"); - } - else - { - u.m_tsFreshActivation = steady_clock::time_point(); - } - } - - if (check_stability && count_microseconds(td_responsive) > m_uOPT_StabilityTimeout) - { - if (is_zero(u.m_tsUnstableSince)) - { - HLOGC(gslog.Debug, - log << "grp/sendBackup: socket NEW UNSTABLE: @" << d->id << " last heard " - << FormatDuration(td_responsive) << " > " << m_uOPT_StabilityTimeout - << " (stability timeout)"); - // The link seems to have missed two ACKs already. - // Qualify this link as unstable - // Notify that it has been seen so since now - u.m_tsUnstableSince = currtime; - } - - is_stable = false; - } - } - - if (is_stable) + if (is_stable >= 0) { - // If stability is ok, but unstable-since was set before, reset it. HLOGC(gslog.Debug, log << "grp/sendBackup: link STABLE: @" << d->id << (!is_zero(u.m_tsUnstableSince) ? " - RESTORED" : " - CONTINUED") << ", state RUNNING - will send a payload"); u.m_tsUnstableSince = steady_clock::time_point(); + + // For some cases + if (is_stable > 0) + u.m_tsFreshActivation = steady_clock::time_point(); } else { HLOGC(gslog.Debug, log << "grp/sendBackup: link UNSTABLE for " << FormatDuration(currtime - u.m_tsUnstableSince) << " : @" << d->id << " - will send a payload"); + if (is_zero(u.m_tsUnstableSince)) + { + u.m_tsUnstableSince = currtime; + } } - return is_stable; + return is_stable >= 0; } // [[using locked(this->m_GroupLock)]] @@ -3810,12 +3857,10 @@ void CUDTGroup::sendBackup_SilenceRedundantLinks(vector& w_parallel) } CUDT& ce = d->ps->core(); steady_clock::duration td(0); - if (!is_zero(ce.m_tsFreshActivation) && - count_microseconds(td = currtime - ce.m_tsFreshActivation) < ce.m_uOPT_StabilityTimeout) + if (!is_zero(ce.m_tsFreshActivation) && sendBackup_CheckRunningLinkStable(ce, currtime, d->weight) != 1) { HLOGC(gslog.Debug, - log << "... not silencing @" << d->id << ": too early: " << FormatDuration(td) << " < " - << ce.m_uOPT_StabilityTimeout << "(stability timeout)"); + log << "... not silencing @" << d->id << ": too early: " << FormatDuration(td)); continue; }