Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Improved RTT estimation #1957

1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ endforeach()
# SRT_DEBUG_TLPKTDROP_DROPSEQ 1
# SRT_DEBUG_SNDQ_HIGHRATE 1
# SRT_DEBUG_BONDING_STATES 1
# SRT_DEBUG_RTT 1 /* RTT trace */
# SRT_MAVG_SAMPLING_RATE 40 /* Max sampling rate */

# option defaults
Expand Down
190 changes: 171 additions & 19 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,6 @@ void CUDT::construct()
// Will be reset to 0 for HSv5, this value is important for HSv4.
m_iSndHsRetryCnt = SRT_MAX_HSRETRY + 1;

// Initial status
m_bOpened = false;
m_bListening = false;
m_bConnecting = false;
Expand Down Expand Up @@ -932,8 +931,10 @@ void CUDT::open()
m_pRNode->m_pPrev = m_pRNode->m_pNext = NULL;
m_pRNode->m_bOnList = false;

m_iRTT = 10 * COMM_SYN_INTERVAL_US;
m_iRTTVar = m_iRTT >> 1;
// Set initial values of smoothed RTT and RTT variance.
m_iRTT = INITIAL_RTT;
m_iRTTVar = INITIAL_RTTVAR;
m_bIsFirstRTTReceived = false;

// set minimum NAK and EXP timeout to 300ms
m_tdMinNakInterval = milliseconds_from(300);
Expand Down Expand Up @@ -1867,6 +1868,74 @@ void SrtExtractHandshakeExtensions(const char* bufbegin, size_t buflength,
}
}

#if SRT_DEBUG_RTT
class RttTracer
{
public:
RttTracer()
{
}

~RttTracer()
{
srt::sync::ScopedLock lck(m_mtx);
m_fout.close();
}

void trace(const srt::sync::steady_clock::time_point& currtime,
const std::string& event, int rtt_sample, int rttvar_sample,
bool is_smoothed_rtt_reset, int64_t recvTotal,
int smoothed_rtt, int rttvar)
{
srt::sync::ScopedLock lck(m_mtx);
create_file();

m_fout << srt::sync::FormatTimeSys(currtime) << ",";
m_fout << srt::sync::FormatTime(currtime) << ",";
m_fout << event << ",";
m_fout << rtt_sample << ",";
m_fout << rttvar_sample << ",";
m_fout << is_smoothed_rtt_reset << ",";
m_fout << recvTotal << ",";
m_fout << smoothed_rtt << ",";
m_fout << rttvar << "\n";
m_fout.flush();
}

private:
void print_header()
{
m_fout << "Timepoint_SYST,Timepoint_STDY,Event,usRTTSample,"
"usRTTVarSample,IsSmoothedRTTReset,pktsRecvTotal,"
"usSmoothedRTT,usRTTVar\n";
}

void create_file()
{
if (m_fout.is_open())
return;

std::string str_tnow = srt::sync::FormatTimeSys(srt::sync::steady_clock::now());
str_tnow.resize(str_tnow.size() - 7); // remove trailing ' [SYST]' part
while (str_tnow.find(':') != std::string::npos) {
str_tnow.replace(str_tnow.find(':'), 1, 1, '_');
}
const std::string fname = "rtt_trace_" + str_tnow + "_" + SRT_SYNC_CLOCK_STR + ".csv";
m_fout.open(fname, std::ofstream::out);
if (!m_fout)
std::cerr << "IPE: Failed to open " << fname << "!!!\n";

print_header();
}

private:
srt::sync::Mutex m_mtx;
std::ofstream m_fout;
};

RttTracer s_rtt_trace;
#endif


bool CUDT::processSrtMsg(const CPacket *ctrlpkt)
{
Expand Down Expand Up @@ -3261,8 +3330,8 @@ void CUDT::synchronizeWithGroup(CUDTGroup* gp)
}
}
}

#endif

void CUDT::startConnect(const sockaddr_any& serv_addr, int32_t forced_isn)
{
ScopedLock cg (m_ConnectionLock);
Expand Down Expand Up @@ -4485,10 +4554,15 @@ EConnectStatus CUDT::postConnect(const CPacket &response, bool rendezvous, CUDTE
if (m_pCache->lookup(&ib) >= 0)
{
m_iRTT = ib.m_iRTT;
m_iRTTVar = m_iRTT >> 1;
m_iRTTVar = ib.m_iRTT / 2;
m_iBandwidth = ib.m_iBandwidth;
}

#if SRT_DEBUG_RTT
s_rtt_trace.trace(steady_clock::now(), "Connect", -1, -1,
m_bIsFirstRTTReceived, -1, m_iRTT, m_iRTTVar);
#endif

SRT_REJECT_REASON rr = setupCC();
if (rr != SRT_REJ_UNKNOWN)
{
Expand Down Expand Up @@ -5384,10 +5458,15 @@ void CUDT::acceptAndRespond(const sockaddr_any& agent, const sockaddr_any& peer,
if (m_pCache->lookup(&ib) >= 0)
{
m_iRTT = ib.m_iRTT;
m_iRTTVar = m_iRTT >> 1;
m_iRTTVar = ib.m_iRTT / 2;
m_iBandwidth = ib.m_iBandwidth;
}

#if SRT_DEBUG_RTT
s_rtt_trace.trace(steady_clock::now(), "Accept", -1, -1,
m_bIsFirstRTTReceived, -1, m_iRTT, m_iRTTVar);
#endif

m_PeerAddr = peer;

// This should extract the HSREQ and KMREQ portion in the handshake packet.
Expand Down Expand Up @@ -5851,6 +5930,11 @@ bool CUDT::closeInternal()
ib.m_iBandwidth = m_iBandwidth;
m_pCache->update(&ib);

#if SRT_DEBUG_RTT
s_rtt_trace.trace(steady_clock::now(), "Cache", -1, -1,
m_bIsFirstRTTReceived, -1, m_iRTT, -1);
#endif

m_bConnected = false;
}

Expand Down Expand Up @@ -7998,15 +8082,63 @@ void CUDT::processCtrlAck(const CPacket &ctrlpkt, const steady_clock::time_point
}
// This check covers fields up to ACKD_BUFFERLEFT.

// Update RTT
// m_iRTT = ackdata[ACKD_RTT];
// m_iRTTVar = ackdata[ACKD_RTTVAR];
// XXX These ^^^ commented-out were blocked in UDT;
// the current RTT calculations are exactly the same as in UDT4.
const int rtt = ackdata[ACKD_RTT];
// Extract RTT estimate and RTTVar from the ACK packet.
const int rtt = ackdata[ACKD_RTT];
const int rttvar = ackdata[ACKD_RTTVAR];

// Update the values of smoothed RTT and the variation in RTT samples
// on subsequent RTT estimates extracted from the ACK packets
// (during transmission).
if (m_bIsFirstRTTReceived)
{
// Suppose transmission is bidirectional if sender is also receiving
// data packets.
enterCS(m_StatsLock);
const bool bPktsReceived = m_stats.recvTotal != 0;
leaveCS(m_StatsLock);

m_iRTTVar = avg_iir<4>(m_iRTTVar, abs(rtt - m_iRTT));
m_iRTT = avg_iir<8>(m_iRTT, rtt);
if (bPktsReceived) // Transmission is bidirectional.
{
// RTT value extracted from the ACK packet (rtt) is already smoothed
// RTT obtained at the receiver side. Apply EWMA anyway for the second
// time on the sender side. Ignore initial values which might arrive
// after the smoothed RTT on the sender side has been
// reset to the very first RTT sample received from the receiver.
// TODO: The case of bidirectional transmission requires further
// improvements and testing. Double smoothing is applied here to be
// consistent with the previous behavior.

if (rtt != INITIAL_RTT && rttvar != INITIAL_RTTVAR)
{
m_iRTTVar = avg_iir<4>(m_iRTTVar, abs(rtt - m_iRTT));
m_iRTT = avg_iir<8>(m_iRTT, rtt);
}
}
else // Transmission is unidirectional.
{
// Simply take the values of smoothed RTT and RTT variance from
// the ACK packet.
m_iRTT = rtt;
m_iRTTVar = rttvar;
}
}
// Reset the value of smoothed RTT to the first real RTT estimate extracted
// from an ACK after initialization (at the beginning of a transmission).
// In case of resumed connection over the same network, the very first RTT
// value sent within an ACK will be taken from cache and equal to previous
// connection's final smoothed RTT value. The reception of such a value
// will also trigger the smoothed RTT reset at the sender side.
else if (rtt != INITIAL_RTT && rttvar != INITIAL_RTTVAR)
{
m_iRTT = rtt;
m_iRTTVar = rttvar;
m_bIsFirstRTTReceived = true;
}

#if SRT_DEBUG_RTT
s_rtt_trace.trace(currtime, "ACK", rtt, rttvar, m_bIsFirstRTTReceived,
m_stats.recvTotal, m_iRTT, m_iRTTVar);
#endif

/* Version-dependent fields:
* Original UDT (total size: ACKD_TOTAL_SIZE_SMALL):
Expand Down Expand Up @@ -8062,7 +8194,7 @@ void CUDT::processCtrlAckAck(const CPacket& ctrlpkt, const time_point& tsArrival
{
int32_t ack = 0;

// Calculate RTT estimate on the receiver side based on ACK/ACKACK pair
// Calculate RTT estimate on the receiver side based on ACK/ACKACK pair.
const int rtt = m_ACKWindow.acknowledge(ctrlpkt.getAckSeqNo(), ack, tsArrival);

if (rtt == -1)
Expand Down Expand Up @@ -8091,12 +8223,32 @@ void CUDT::processCtrlAckAck(const CPacket& ctrlpkt, const time_point& tsArrival
return;
}

// If increasing delay is detected
// If increasing delay is detected.
// sendCtrl(UMSG_CGWARNING);

// Calculate RTT (EWMA) on the receiver side
m_iRTTVar = avg_iir<4>(m_iRTTVar, abs(rtt - m_iRTT));
m_iRTT = avg_iir<8>(m_iRTT, rtt);
// Update the values of smoothed RTT and the variation in RTT samples
// on subsequent RTT samples (during transmission).
if (m_bIsFirstRTTReceived)
{
m_iRTTVar = avg_iir<4>(m_iRTTVar, abs(rtt - m_iRTT));
m_iRTT = avg_iir<8>(m_iRTT, rtt);
}
// Reset the value of smoothed RTT on the first RTT sample after initialization
// (at the beginning of a transmission).
// In case of resumed connection over the same network, the initial RTT
// value will be taken from cache and equal to previous connection's
// final smoothed RTT value.
else
{
m_iRTT = rtt;
m_iRTTVar = rtt / 2;
m_bIsFirstRTTReceived = true;
}

#if SRT_DEBUG_RTT
s_rtt_trace.trace(tsArrival, "ACKACK", rtt, -1, m_bIsFirstRTTReceived,
-1, m_iRTT, m_iRTTVar);
#endif

updateCC(TEV_ACKACK, EventVariant(ack));

Expand Down
25 changes: 16 additions & 9 deletions srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,13 +267,15 @@ class CUDT
//
// NOTE: Use notation with X*1000*1000*... instead of
// million zeros in a row.
static const int COMM_RESPONSE_MAX_EXP = 16;
static const int SRT_TLPKTDROP_MINTHRESHOLD_MS = 1000;
static const uint64_t COMM_KEEPALIVE_PERIOD_US = 1*1000*1000;
static const int32_t COMM_SYN_INTERVAL_US = 10*1000;
static const int COMM_CLOSE_BROKEN_LISTENER_TIMEOUT_MS = 3000;
static const uint16_t MAX_WEIGHT = 32767;
static const size_t ACK_WND_SIZE = 1024;
static const int COMM_RESPONSE_MAX_EXP = 16;
static const int SRT_TLPKTDROP_MINTHRESHOLD_MS = 1000;
static const uint64_t COMM_KEEPALIVE_PERIOD_US = 1*1000*1000;
static const int32_t COMM_SYN_INTERVAL_US = 10*1000;
static const int COMM_CLOSE_BROKEN_LISTENER_TIMEOUT_MS = 3000;
static const uint16_t MAX_WEIGHT = 32767;
static const size_t ACK_WND_SIZE = 1024;
static const int INITIAL_RTT = 10 * COMM_SYN_INTERVAL_US;
static const int INITIAL_RTTVAR = INITIAL_RTT / 2;

int handshakeVersion()
{
Expand Down Expand Up @@ -739,8 +741,13 @@ class CUDT

int m_iEXPCount; // Expiration counter
int m_iBandwidth; // Estimated bandwidth, number of packets per second
int m_iRTT; // RTT, in microseconds
int m_iRTTVar; // RTT variance
int m_iRTT; // Smoothed RTT (an exponentially-weighted moving average (EWMA)
// of an endpoint's RTT samples), in microseconds
int m_iRTTVar; // The variation in the RTT samples (RTT variance), in microseconds
bool m_bIsFirstRTTReceived; // This variable is used to determine whether the first RTT sample were obtained
maxsharabayko marked this conversation as resolved.
Show resolved Hide resolved
// from the ACK/ACKACK pair at the receiver side or received by the sender from
// an ACK packet. It's used to reset the value of smoothed RTT (m_iRTT) after
// initialization at the beginning of a transmission. False by default.
maxsharabayko marked this conversation as resolved.
Show resolved Hide resolved
int m_iDeliveryRate; // Packet arrival rate at the receiver side
int m_iByteDeliveryRate; // Byte arrival rate at the receiver side

Expand Down