Skip to content

Commit

Permalink
[core] Extended logs for negative or zero RTT estimate on the receive…
Browse files Browse the repository at this point in the history
…r side (#1876)

* Minor refactoring to processCtrl function, UMSG_ACKACK
* Minor formatting in CACKWindow::acknowledge function
* The time of ACKACK reception is now passed as an argument to the CACKWindow::acknowledge function
  • Loading branch information
mbakholdina authored Mar 29, 2021
1 parent 1d4338a commit 5a46839
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 134 deletions.
33 changes: 27 additions & 6 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8198,19 +8198,40 @@ void CUDT::processCtrl(const CPacket &ctrlpkt)
case UMSG_ACKACK: // 110 - Acknowledgement of Acknowledgement
{
int32_t ack = 0;
const int rtt = m_ACKWindow.acknowledge(ctrlpkt.getAckSeqNo(), ack);

// Calculate RTT estimate on the receiver side based on ACK/ACKACK pair
const int rtt = m_ACKWindow.acknowledge(ctrlpkt.getAckSeqNo(), ack, currtime);

if (rtt == -1)
{
if (ctrlpkt.getAckSeqNo() > (m_iAckSeqNo - static_cast<int>(ACK_WND_SIZE)) && ctrlpkt.getAckSeqNo() <= m_iAckSeqNo)
{
LOGC(inlog.Warn,
log << CONID() << "ACKACK out of order, skipping RTT calculation "
<< "(ACK number: " << ctrlpkt.getAckSeqNo() << ", last ACK sent: " << m_iAckSeqNo
<< ", RTT (EWMA): " << m_iRTT << ")");
break;
}

LOGC(inlog.Error,
log << CONID() << "IPE: ACK record not found, can't estimate RTT "
<< "(ACK number: " << ctrlpkt.getAckSeqNo() << ", last ACK sent: " << m_iAckSeqNo
<< ", RTT (EWMA): " << m_iRTT << ")");
break;
}

if (rtt <= 0)
{
LOGC(inlog.Error,
log << CONID() << "IPE: ACK node overwritten when acknowledging " << ctrlpkt.getAckSeqNo()
<< " (ack extracted: " << ack << ")");
log << CONID() << "IPE: invalid RTT estimate " << rtt
<< ", possible time shift. Clock: " << SRT_SYNC_CLOCK_STR);
break;
}

// if increasing delay detected...
// If increasing delay is detected
// sendCtrl(UMSG_CGWARNING);

// RTT EWMA
// Calculate RTT (EWMA) on the receiver side
m_iRTTVar = avg_iir<4>(m_iRTTVar, abs(rtt - m_iRTT));
m_iRTT = avg_iir<8>(m_iRTT, rtt);

Expand Down Expand Up @@ -8240,7 +8261,7 @@ void CUDT::processCtrl(const CPacket &ctrlpkt)
#endif
}

// update last ACK that has been received by the sender
// Update last ACK that has been received by the sender
if (CSeqNo::seqcmp(ack, m_iRcvLastAckAck) > 0)
m_iRcvLastAckAck = ack;

Expand Down
170 changes: 85 additions & 85 deletions srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,23 +259,22 @@ class CUDT
}
};

static const SRTSOCKET INVALID_SOCK = -1; // invalid socket descriptor
static const int ERROR = -1; // socket api error returned value
static const SRTSOCKET INVALID_SOCK = -1; // Invalid socket descriptor
static const int ERROR = -1; // Socket api error returned value

static const int HS_VERSION_UDT4 = 4;
static const int HS_VERSION_SRT1 = 5;

// Parameters
//
// Note: use notation with X*1000*1000* ... instead of million zeros in a row.
// In C++17 there is a possible notation of 5'000'000 for convenience, but that's
// something only for a far future.
static const int COMM_RESPONSE_MAX_EXP = 16;
static const int SRT_TLPKTDROP_MINTHRESHOLD_MS = 1000;
static const uint64_t COMM_KEEPALIVE_PERIOD_US = 1*1000*1000;
static const int32_t COMM_SYN_INTERVAL_US = 10*1000;
static const int COMM_CLOSE_BROKEN_LISTENER_TIMEOUT_MS = 3000;
static const uint16_t MAX_WEIGHT = 32767;
// Note: use notation with X*1000*1000*... instead of million zeros in a row
static const int COMM_RESPONSE_MAX_EXP = 16;
static const int SRT_TLPKTDROP_MINTHRESHOLD_MS = 1000;
static const uint64_t COMM_KEEPALIVE_PERIOD_US = 1*1000*1000;
static const int32_t COMM_SYN_INTERVAL_US = 10*1000;
static const int COMM_CLOSE_BROKEN_LISTENER_TIMEOUT_MS = 3000;
static const uint16_t MAX_WEIGHT = 32767;
static const size_t ACK_WND_SIZE = 1024;

int handshakeVersion()
{
Expand All @@ -295,37 +294,38 @@ class CUDT

SRTSOCKET socketID() const { return m_SocketID; }

static CUDT* getUDTHandle(SRTSOCKET u);
static std::vector<SRTSOCKET> existingSockets();
static CUDT* getUDTHandle(SRTSOCKET u);
static std::vector<SRTSOCKET> existingSockets();

void addressAndSend(CPacket& pkt);
void sendSrtMsg(int cmd, uint32_t *srtdata_in = NULL, size_t srtlen_in = 0);

bool isOPT_TsbPd() const { return m_config.bTSBPD; }
int RTT() const { return m_iRTT; }
int RTTVar() const { return m_iRTTVar; }
int32_t sndSeqNo() const { return m_iSndCurrSeqNo; }
int32_t schedSeqNo() const { return m_iSndNextSeqNo; }
bool overrideSndSeqNo(int32_t seq);
srt::sync::steady_clock::time_point lastRspTime() const { return m_tsLastRspTime; }
srt::sync::steady_clock::time_point freshActivationStart() const { return m_tsFreshActivation; }

int32_t rcvSeqNo() const { return m_iRcvCurrSeqNo; }
int flowWindowSize() const { return m_iFlowWindowSize; }
int32_t deliveryRate() const { return m_iDeliveryRate; }
int bandwidth() const { return m_iBandwidth; }
int64_t maxBandwidth() const { return m_config.llMaxBW; }
int MSS() const { return m_config.iMSS; }

uint32_t peerLatency_us() const {return m_iPeerTsbPdDelay_ms * 1000; }
int peerIdleTimeout_ms() const { return m_config.iPeerIdleTimeout; }
size_t maxPayloadSize() const { return m_iMaxSRTPayloadSize; }
size_t OPT_PayloadSize() const { return m_config.zExpPayloadSize; }
int sndLossLength() { return m_pSndLossList->getLossLength(); }
int32_t ISN() const { return m_iISN; }
int32_t peerISN() const { return m_iPeerISN; }
duration minNAKInterval() const { return m_tdMinNakInterval; }
sockaddr_any peerAddr() const { return m_PeerAddr; }
bool isOPT_TsbPd() const { return m_config.bTSBPD; }
int RTT() const { return m_iRTT; }
int RTTVar() const { return m_iRTTVar; }
int32_t sndSeqNo() const { return m_iSndCurrSeqNo; }
int32_t schedSeqNo() const { return m_iSndNextSeqNo; }
bool overrideSndSeqNo(int32_t seq);

srt::sync::steady_clock::time_point lastRspTime() const { return m_tsLastRspTime; }
srt::sync::steady_clock::time_point freshActivationStart() const { return m_tsFreshActivation; }

int32_t rcvSeqNo() const { return m_iRcvCurrSeqNo; }
int flowWindowSize() const { return m_iFlowWindowSize; }
int32_t deliveryRate() const { return m_iDeliveryRate; }
int bandwidth() const { return m_iBandwidth; }
int64_t maxBandwidth() const { return m_config.llMaxBW; }
int MSS() const { return m_config.iMSS; }

uint32_t peerLatency_us() const { return m_iPeerTsbPdDelay_ms * 1000; }
int peerIdleTimeout_ms() const { return m_config.iPeerIdleTimeout; }
size_t maxPayloadSize() const { return m_iMaxSRTPayloadSize; }
size_t OPT_PayloadSize() const { return m_config.zExpPayloadSize; }
int sndLossLength() { return m_pSndLossList->getLossLength(); }
int32_t ISN() const { return m_iISN; }
int32_t peerISN() const { return m_iPeerISN; }
duration minNAKInterval() const { return m_tdMinNakInterval; }
sockaddr_any peerAddr() const { return m_PeerAddr; }

/// Returns the number of packets in flight (sent, but not yet acknowledged).
/// @param lastack is the sequence number of the first unacknowledged packet.
Expand Down Expand Up @@ -691,28 +691,28 @@ class CUDT
static loss_seqs_t defaultPacketArrival(void* vself, CPacket& pkt);
static loss_seqs_t groupPacketArrival(void* vself, CPacket& pkt);

static CUDTUnited s_UDTUnited; // UDT global management base
static CUDTUnited s_UDTUnited; // UDT global management base

private: // Identification
CUDTSocket* const m_parent; // temporary, until the CUDTSocket class is merged with CUDT
SRTSOCKET m_SocketID; // UDT socket number
SRTSOCKET m_PeerID; // peer id, for multiplexer
CUDTSocket* const m_parent; // Temporary, until the CUDTSocket class is merged with CUDT
SRTSOCKET m_SocketID; // UDT socket number
SRTSOCKET m_PeerID; // Peer ID, for multiplexer

// HSv4 (legacy handshake) support)
time_point m_tsSndHsLastTime; //Last SRT handshake request time
int m_iSndHsRetryCnt; //SRT handshake retries left
time_point m_tsSndHsLastTime; // Last SRT handshake request time
int m_iSndHsRetryCnt; // SRT handshake retries left

#if ENABLE_EXPERIMENTAL_BONDING
SRT_GROUP_TYPE m_HSGroupType; // group type about-to-be-set in the handshake
SRT_GROUP_TYPE m_HSGroupType; // Group type about-to-be-set in the handshake
#endif

private:
int m_iMaxSRTPayloadSize; // Maximum/regular payload size, in bytes
int m_iTsbPdDelay_ms; // Rx delay to absorb burst in milliseconds
int m_iPeerTsbPdDelay_ms; // Tx delay that the peer uses to absorb burst in milliseconds
bool m_bTLPktDrop; // Enable Too-late Packet Drop
UniquePtr<CCryptoControl> m_pCryptoControl; // congestion control SRT class (small data extension)
CCache<CInfoBlock>* m_pCache; // network information cache
int m_iMaxSRTPayloadSize; // Maximum/regular payload size, in bytes
int m_iTsbPdDelay_ms; // Rx delay to absorb burst, in milliseconds
int m_iPeerTsbPdDelay_ms; // Tx delay that the peer uses to absorb burst, in milliseconds
bool m_bTLPktDrop; // Enable Too-late Packet Drop
UniquePtr<CCryptoControl> m_pCryptoControl; // Congestion control SRT class (small data extension)
CCache<CInfoBlock>* m_pCache; // Network information cache

// Congestion control
std::vector<EventSlot> m_Slots[TEV_E_SIZE];
Expand All @@ -727,7 +727,7 @@ class CUDT
void EmitSignal(ETransmissionEvent tev, EventVariant var);

// Internal state
volatile bool m_bListening; // If the UDT entit is listening to connection
volatile bool m_bListening; // If the UDT entity is listening to connection
volatile bool m_bConnecting; // The short phase when connect() is called but not yet completed
volatile bool m_bConnected; // Whether the connection is on or off
volatile bool m_bClosing; // If the UDT entity is closing
Expand All @@ -736,7 +736,7 @@ class CUDT
volatile bool m_bPeerHealth; // If the peer status is normal
volatile int m_RejectReason;
bool m_bOpened; // If the UDT entity has been opened
int m_iBrokenCounter; // a counter (number of GC checks) to let the GC tag this socket as disconnected
int m_iBrokenCounter; // A counter (number of GC checks) to let the GC tag this socket as disconnected

int m_iEXPCount; // Expiration counter
int m_iBandwidth; // Estimated bandwidth, number of packets per second
Expand All @@ -746,8 +746,8 @@ class CUDT
int m_iByteDeliveryRate; // Byte arrival rate at the receiver side


CHandShake m_ConnReq; // connection request
CHandShake m_ConnRes; // connection response
CHandShake m_ConnReq; // Connection request
CHandShake m_ConnRes; // Connection response
CHandShake::RendezvousState m_RdvState; // HSv5 rendezvous state
HandshakeSide m_SrtHsSide; // HSv5 rendezvous handshake side resolved from cookie contest (DRAW if not yet resolved)

Expand All @@ -758,32 +758,32 @@ class CUDT

/*volatile*/ duration m_tdSendInterval; // Inter-packet time, in CPU clock cycles

/*volatile*/ duration m_tdSendTimeDiff; // aggregate difference in inter-packet sending time
/*volatile*/ duration m_tdSendTimeDiff; // Aggregate difference in inter-packet sending time

volatile int m_iFlowWindowSize; // Flow control window size
volatile double m_dCongestionWindow; // congestion window size
volatile double m_dCongestionWindow; // Congestion window size

private: // Timers
/*volatile*/ time_point m_tsNextACKTime; // Next ACK time, in CPU clock cycles, same below
/*volatile*/ time_point m_tsNextNAKTime; // Next NAK time

/*volatile*/ duration m_tdACKInterval; // ACK interval
/*volatile*/ duration m_tdNAKInterval; // NAK interval
/*volatile*/ time_point m_tsLastRspTime; // time stamp of last response from the peer
/*volatile*/ time_point m_tsLastRspAckTime; // time stamp of last ACK from the peer
/*volatile*/ time_point m_tsLastSndTime; // time stamp of last data/ctrl sent (in system ticks)
time_point m_tsLastWarningTime; // Last time that a warning message is sent
time_point m_tsLastReqTime; // last time when a connection request is sent
/*volatile*/ time_point m_tsNextACKTime; // Next ACK time, in CPU clock cycles, same below
/*volatile*/ time_point m_tsNextNAKTime; // Next NAK time

/*volatile*/ duration m_tdACKInterval; // ACK interval
/*volatile*/ duration m_tdNAKInterval; // NAK interval
/*volatile*/ time_point m_tsLastRspTime; // Timestamp of last response from the peer
/*volatile*/ time_point m_tsLastRspAckTime; // Timestamp of last ACK from the peer
/*volatile*/ time_point m_tsLastSndTime; // Timestamp of last data/ctrl sent (in system ticks)
time_point m_tsLastWarningTime; // Last time that a warning message is sent
time_point m_tsLastReqTime; // last time when a connection request is sent
time_point m_tsRcvPeerStartTime;
time_point m_tsLingerExpiration; // Linger expiration time (for GC to close a socket with data in sending buffer)
time_point m_tsLastAckTime; // Timestamp of last ACK
duration m_tdMinNakInterval; // NAK timeout lower bound; too small value can cause unnecessary retransmission
duration m_tdMinExpInterval; // timeout lower bound threshold: too small timeout can cause problem
time_point m_tsLingerExpiration; // Linger expiration time (for GC to close a socket with data in sending buffer)
time_point m_tsLastAckTime; // Timestamp of last ACK
duration m_tdMinNakInterval; // NAK timeout lower bound; too small value can cause unnecessary retransmission
duration m_tdMinExpInterval; // Timeout lower bound threshold: too small timeout can cause problem

int m_iPktCount; // packet counter for ACK
int m_iLightACKCount; // light ACK counter
int m_iPktCount; // Packet counter for ACK
int m_iLightACKCount; // Light ACK counter

time_point m_tsNextSendTime; // scheduled time of next packet sending
time_point m_tsNextSendTime; // Scheduled time of next packet sending

volatile int32_t m_iSndLastFullAck; // Last full ACK received
volatile int32_t m_iSndLastAck; // Last ACK received
Expand Down Expand Up @@ -843,17 +843,17 @@ class CUDT
int32_t m_iReXmitCount; // Re-Transmit Count since last ACK

private: // Receiving related data
CRcvBuffer* m_pRcvBuffer; //< Receiver buffer
CRcvLossList* m_pRcvLossList; //< Receiver loss list
std::deque<CRcvFreshLoss> m_FreshLoss; //< Lost sequence already added to m_pRcvLossList, but not yet sent UMSG_LOSSREPORT for.
int m_iReorderTolerance; //< Current value of dynamic reorder tolerance
int m_iConsecEarlyDelivery; //< Increases with every OOO packet that came <TTL-2 time, resets with every increased reorder tolerance
int m_iConsecOrderedDelivery; //< Increases with every packet coming in order or retransmitted, resets with every out-of-order packet
CRcvBuffer* m_pRcvBuffer; // Receiver buffer
CRcvLossList* m_pRcvLossList; // Receiver loss list
std::deque<CRcvFreshLoss> m_FreshLoss; // Lost sequence already added to m_pRcvLossList, but not yet sent UMSG_LOSSREPORT for.
int m_iReorderTolerance; // Current value of dynamic reorder tolerance
int m_iConsecEarlyDelivery; // Increases with every OOO packet that came <TTL-2 time, resets with every increased reorder tolerance
int m_iConsecOrderedDelivery; // Increases with every packet coming in order or retransmitted, resets with every out-of-order packet

CACKWindow<1024> m_ACKWindow; //< ACK history window
CPktTimeWindow<16, 64> m_RcvTimeWindow; //< Packet arrival time window
CACKWindow<ACK_WND_SIZE> m_ACKWindow; // ACK history window
CPktTimeWindow<16, 64> m_RcvTimeWindow; // Packet arrival time window

int32_t m_iRcvLastAck; //< Last sent ACK
int32_t m_iRcvLastAck; // Last sent ACK
#ifdef ENABLE_LOGGING
int32_t m_iDebugPrevLastAck;
#endif
Expand All @@ -869,10 +869,10 @@ class CUDT
uint32_t m_uPeerSrtFlags;

bool m_bTsbPd; // Peer sends TimeStamp-Based Packet Delivery Packets
bool m_bGroupTsbPd; // TSBPD should be used for GROUP RECEIVER instead.
bool m_bGroupTsbPd; // TSBPD should be used for GROUP RECEIVER instead

srt::sync::CThread m_RcvTsbPdThread; // Rcv TsbPD Thread handle
srt::sync::Condition m_RcvTsbPdCond; // TSBPD signals if reading is ready. Use together with m_RecvLock.
srt::sync::Condition m_RcvTsbPdCond; // TSBPD signals if reading is ready. Use together with m_RecvLock
bool m_bTsbPdAckWakeup; // Signal TsbPd thread on Ack sent
srt::sync::Mutex m_RcvTsbPdStartupLock; // Protects TSBPD thread creating and joining

Expand Down
Loading

0 comments on commit 5a46839

Please sign in to comment.