Skip to content

Commit

Permalink
[core] SND prioritize original packets in live configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
maxsharabayko committed Jan 31, 2022
1 parent f372356 commit 08e6482
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 1 deletion.
9 changes: 9 additions & 0 deletions srtcore/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,15 @@ int CSndBuffer::readData(CPacket& w_packet, steady_clock::time_point& w_srctime,
return readlen;
}

CSndBuffer::time_point CSndBuffer::peekNextOriginal() const
{
ScopedLock bufferguard(m_BufLock);
if (m_pCurrBlock == m_pLastBlock)
return time_point();

return m_pCurrBlock->m_tsOriginTime;
}

int32_t CSndBuffer::getMsgNoAt(const int offset)
{
ScopedLock bufferguard(m_BufLock);
Expand Down
5 changes: 5 additions & 0 deletions srtcore/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ class CSndBuffer
SRT_ATTR_EXCLUDES(m_BufLock)
int readData(CPacket& w_packet, time_point& w_origintime, int kflgs, int& w_seqnoinc);

/// Peek an information on the next original data packet to send.
/// @return origin time stamp of the next packet; epoch start time otherwise.
SRT_ATTR_EXCLUDES(m_BufLock)
time_point peekNextOriginal() const;

/// Find data position to pack a DATA packet for a retransmission.
/// @param [in] offset offset from the last ACK point (backward sequence number difference)
/// @param [out] packet the packet to read.
Expand Down
136 changes: 135 additions & 1 deletion srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9178,6 +9178,131 @@ int srt::CUDT::packLostData(CPacket& w_packet, steady_clock::time_point& w_origi
return 0;
}

#if SRT_DEBUG_TRACE_SND
class snd_logger
{
typedef srt::sync::steady_clock steady_clock;

public:
snd_logger() {}

~snd_logger()
{
ScopedLock lck(m_mtx);
m_fout.close();
}

struct
{
typedef srt::sync::steady_clock steady_clock;
long long usElapsed;
steady_clock::time_point tsNow;
int usSRTT;
int usRTTVar;
int msSndBuffSpan;
int msTimespanTh;
int msNextUniqueToSend;
long long usElapsedLastDrop;
bool canRexmit;
int iPktSeqno;
bool isRetransmitted;
} state;

void trace()
{
using namespace srt::sync;
ScopedLock lck(m_mtx);
create_file();

m_fout << state.usElapsed << ",";
m_fout << state.usSRTT << ",";
m_fout << state.usRTTVar << ",";
m_fout << state.msSndBuffSpan << ",";
m_fout << state.msTimespanTh << ",";
m_fout << state.msNextUniqueToSend << ",";
m_fout << state.usElapsedLastDrop << ",";
m_fout << state.canRexmit << ",";
m_fout << state.iPktSeqno << ',';
m_fout << state.isRetransmitted << '\n';

m_fout.flush();
}

private:
void print_header()
{
m_fout << "usElapsed,usSRTT,usRTTVar,msSndBuffTimespan,msTimespanTh,msNextUniqueToSend,usDLastDrop,canRexmit,sndPktSeqno,isRexmit";
m_fout << "\n";
}

void create_file()
{
if (m_fout.is_open())
return;

m_start_time = srt::sync::steady_clock::now();
std::string str_tnow = srt::sync::FormatTimeSys(m_start_time);
str_tnow.resize(str_tnow.size() - 7); // remove trailing ' [SYST]' part
while (str_tnow.find(':') != std::string::npos)
{
str_tnow.replace(str_tnow.find(':'), 1, 1, '_');
}
const std::string fname = "snd_trace_" + str_tnow + ".csv";
m_fout.open(fname, std::ofstream::out);
if (!m_fout)
std::cerr << "IPE: Failed to open " << fname << "!!!\n";

print_header();
}

private:
srt::sync::Mutex m_mtx;
std::ofstream m_fout;
srt::sync::steady_clock::time_point m_start_time;
};

snd_logger g_snd_logger;
#endif // SRT_DEBUG_TRACE_SND

bool srt::CUDT::isRetransmissionAllowed(const time_point& tnow SRT_ATR_UNUSED)
{
// Prioritization of original packets only applies to Live CC.
if (!m_bPeerTLPktDrop || !m_config.bMessageAPI)
return true;

// TODO: lock sender buffer?
const time_point tsNextPacket = m_pSndBuffer->peekNextOriginal();

#if SRT_DEBUG_TRACE_SND
const int buffdelay_ms = count_milliseconds(m_pSndBuffer->getBufferingDelay(tnow));
// If there is a small loss, still better to retransmit. If timespan is already big,
// then consider sending original packets.
const int threshold_ms_min = (2 * m_iSRTT + 4 * m_iRTTVar + COMM_SYN_INTERVAL_US) / 1000;
const int msNextUniqueToSend = count_milliseconds(tnow - tsNextPacket) + m_iPeerTsbPdDelay_ms;

g_snd_logger.state.tsNow = tnow;
g_snd_logger.state.usElapsed = count_microseconds(tnow - m_stats.tsStartTime);
g_snd_logger.state.usSRTT = m_iSRTT;
g_snd_logger.state.usRTTVar = m_iRTTVar;
g_snd_logger.state.msSndBuffSpan = buffdelay_ms;
g_snd_logger.state.msTimespanTh = threshold_ms_min;
g_snd_logger.state.msNextUniqueToSend = msNextUniqueToSend;
g_snd_logger.state.usElapsedLastDrop = count_microseconds(tnow - m_tsLastTLDrop);
g_snd_logger.state.canRexmit = false;
#endif

if (tsNextPacket != time_point())
{
// Can send original packet, so just send it
return false;
}

#if SRT_DEBUG_TRACE_SND
g_snd_logger.state.canRexmit = true;
#endif
return true;
}

std::pair<bool, steady_clock::time_point> srt::CUDT::packData(CPacket& w_packet)
{
int payload = 0;
Expand Down Expand Up @@ -9206,7 +9331,10 @@ std::pair<bool, steady_clock::time_point> srt::CUDT::packData(CPacket& w_packet)
if (!m_bOpened)
return std::make_pair(false, enter_time);

payload = packLostData((w_packet), (origintime));
payload = isRetransmissionAllowed(enter_time)
? packLostData((w_packet), (origintime))
: 0;

if (payload > 0)
{
reason = "reXmit";
Expand Down Expand Up @@ -9459,6 +9587,12 @@ bool srt::CUDT::packUniqueData(CPacket& w_packet, time_point& w_origintime)
}
}

#if SRT_DEBUG_TRACE_SND
g_snd_logger.state.iPktSeqno = w_packet.m_iSeqNo;
g_snd_logger.state.isRetransmitted = w_packet.getRexmitFlag();
g_snd_logger.trace();
#endif

return true;
}

Expand Down
6 changes: 6 additions & 0 deletions srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,12 @@ class CUDT
SRT_ATTR_REQUIRES(m_RecvAckLock, m_StatsLock)
int sndDropTooLate();

/// @bried Allow packet retransmission.
/// Depending on the configuration mode (live / file), retransmission
/// can be blocked if e.g. there are original packets pending to be sent.
/// @return true if retransmission is allowed; false otherwise.
bool isRetransmissionAllowed(const time_point& tnow);

/// Connect to a UDT entity as per hs request. This will update
/// required data in the entity, then update them also in the hs structure,
/// and then send the response back to the caller.
Expand Down

0 comments on commit 08e6482

Please sign in to comment.