Skip to content

Commit

Permalink
[core] Added maximum BW limit for retransmissions (#2714).
Browse files Browse the repository at this point in the history
  • Loading branch information
maxsharabayko authored Aug 16, 2023
1 parent e9eb8b3 commit 78a1020
Show file tree
Hide file tree
Showing 13 changed files with 394 additions and 10 deletions.
8 changes: 8 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ option(ENABLE_GETNAMEINFO "In-logs sockaddr-to-string should do rev-dns" OFF)
option(ENABLE_UNITTESTS "Enable unit tests" OFF)
option(ENABLE_ENCRYPTION "Enable encryption in SRT" ON)
option(ENABLE_AEAD_API_PREVIEW "Enable AEAD API preview in SRT" Off)
option(ENABLE_MAXREXMITBW "Enable SRTO_MAXREXMITBW (v1.6.0 API preview)" Off)
option(ENABLE_CXX_DEPS "Extra library dependencies in srt.pc for the CXX libraries useful with C language" ON)
option(USE_STATIC_LIBSTDCXX "Should use static rather than shared libstdc++" OFF)
option(ENABLE_INET_PTON "Set to OFF to prevent usage of inet_pton when building against modern SDKs while still requiring compatibility with older Windows versions, such as Windows XP, Windows Server 2003 etc." ON)
Expand Down Expand Up @@ -466,6 +467,13 @@ if (USE_GNUSTL)
set (SRT_LIBS_PRIVATE ${SRT_LIBS_PRIVATE} ${GNUSTL_LIBRARIES} ${GNUSTL_LDFLAGS})
endif()

if (ENABLE_MAXREXMITBW)
add_definitions(-DENABLE_MAXREXMITBW)
message(STATUS "MAXREXMITBW API: ENABLED")
else()
message(STATUS "MAXREXMITBW API: DISABLED")
endif()

if (USING_DEFAULT_COMPILER_PREFIX)
# Detect if the compiler is GNU compatible for flags
if (CMAKE_CXX_COMPILER_ID MATCHES "GNU|Intel|Clang|AppleClang")
Expand Down
3 changes: 3 additions & 0 deletions apps/socketoptions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,9 @@ const SocketOption srt_options [] {
#ifdef ENABLE_AEAD_API_PREVIEW
,{ "cryptomode", 0, SRTO_CRYPTOMODE, SocketOption::PRE, SocketOption::INT, nullptr }
#endif
#ifdef ENABLE_MAXREXMITBW
,{ "maxrexmitbw", 0, SRTO_MAXREXMITBW, SocketOption::POST, SocketOption::INT64, nullptr }
#endif
};
}

Expand Down
17 changes: 17 additions & 0 deletions docs/API/API-socket-options.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ The following table lists SRT API socket options in alphabetical order. Option d
| [`SRTO_LINGER`](#SRTO_LINGER) | | post | `linger` | s | off \* | 0.. | RW | GSD |
| [`SRTO_LOSSMAXTTL`](#SRTO_LOSSMAXTTL) | 1.2.0 | post | `int32_t` | packets | 0 | 0.. | RW | GSD+ |
| [`SRTO_MAXBW`](#SRTO_MAXBW) | | post | `int64_t` | B/s | -1 | -1.. | RW | GSD |
| [`SRTO_MAXREXMITBW`](#SRTO_MAXREXMITBW) | 1.5.3 | post | `int64_t` | B/s | -1 | -1.. | RW | GSD |
| [`SRTO_MESSAGEAPI`](#SRTO_MESSAGEAPI) | 1.3.0 | pre | `bool` | | true | | W | GSD |
| [`SRTO_MININPUTBW`](#SRTO_MININPUTBW) | 1.4.3 | post | `int64_t` | B/s | 0 | 0.. | RW | GSD |
| [`SRTO_MINVERSION`](#SRTO_MINVERSION) | 1.3.0 | pre | `int32_t` | version | 0x010000 | \* | RW | GSD |
Expand Down Expand Up @@ -846,6 +847,22 @@ therefore the default -1 remains even in live mode.

---

#### SRTO_MAXREXMITBW

| OptName | Since | Restrict | Type | Units | Default | Range | Dir | Entity |
| -------------------- | ----- | -------- | ---------- | ------- | -------- | ------ | --- | ------ |
| `SRTO_MAXREXMITBW` | 1.5.3 | post | `int64_t` | B/s | -1 | -1.. | RW | GSD |

Maximum BW limit for retransmissions:

- `-1`: unlimited;
- `0`: do not allow retransmissions.
- `>0`: BW usage limit in Bytes/sec for packet retransmissions (including 16 bytes of SRT header).

[Return to list](#list-of-options)

---

#### SRTO_MESSAGEAPI

| OptName | Since | Restrict | Type | Units | Default | Range | Dir | Entity |
Expand Down
6 changes: 6 additions & 0 deletions docs/build/build-options.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Option details are given further below.
| [`ENABLE_DEBUG`](#enable_debug) | 1.2.0 | `INT` | ON | Allows release/debug control through the `CMAKE_BUILD_TYPE` variable. |
| [`ENABLE_ENCRYPTION`](#enable_encryption) | 1.3.3 | `BOOL` | ON | Enables encryption feature, with dependency on an external encryption library. |
| [`ENABLE_AEAD_API_PREVIEW`](#enable_aead_api_preview) | 1.5.2 | `BOOL` | OFF | Enables AEAD preview API (encryption with integrity check). |
| [`ENABLE_MAXREXMITBW`](#enable_maxrexmitbw) | 1.5.3 | `BOOL` | OFF | Enables SRTO_MAXREXMITBW (v1.6.0 API). |
| [`ENABLE_GETNAMEINFO`](#enable_getnameinfo) | 1.3.0 | `BOOL` | OFF | Enables the use of `getnameinfo` to allow using reverse DNS to resolve an internal IP address into a readable internet domain name. |
| [`ENABLE_HAICRYPT_LOGGING`](#enable_haicrypt_logging) | 1.3.1 | `BOOL` | OFF | Enables logging in the *haicrypt* module, which serves as a connector to an encryption library. |
| [`ENABLE_HEAVY_LOGGING`](#enable_heavy_logging) | 1.3.0 | `BOOL` | OFF | Enables heavy logging instructions in the code that occur often and cover many detailed aspects of library behavior. Default: OFF in release mode. |
Expand Down Expand Up @@ -279,6 +280,11 @@ build option should be set to `USE_ENCLIB=openssl-evp`.

The AEAD API is to be official in SRT v1.6.0.

#### ENABLE_MAXREXMITBW
**`--enable-maxrexmitbw`** (default: OFF)

When ON, the `SRTO_MAXREXMITBW` is enabled (to become official in SRT v1.6.0).


#### ENABLE_GETNAMEINFO
**`--enable-getnameinfo`** (default: OFF)
Expand Down
119 changes: 117 additions & 2 deletions srtcore/buffer_tools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,131 @@ void CRateEstimator::updateInputRate(const time_point& time, int pkts, int bytes
m_iInRateBytesCount += (m_iInRatePktsCount * CPacket::SRT_DATA_HDR_SIZE);
m_iInRateBps = (int)(((int64_t)m_iInRateBytesCount * 1000000) / period_us);
HLOGC(bslog.Debug,
log << "updateInputRate: pkts:" << m_iInRateBytesCount << " bytes:" << m_iInRatePktsCount
<< " rate=" << (m_iInRateBps * 8) / 1000 << "kbps interval=" << period_us);
log << "updateInputRate: pkts:" << m_iInRateBytesCount << " bytes:" << m_iInRatePktsCount
<< " rate=" << (m_iInRateBps * 8) / 1000 << "kbps interval=" << period_us);
m_iInRatePktsCount = 0;
m_iInRateBytesCount = 0;
m_tsInRateStartTime = time;

setInputRateSmpPeriod(INPUTRATE_RUNNING_US);
}

CSndRateEstimator::CSndRateEstimator(const time_point& tsNow)
: m_tsFirstSampleTime(tsNow)
, m_iFirstSampleIdx(0)
, m_iCurSampleIdx(0)
, m_iRateBps(0)
{

}

void CSndRateEstimator::addSample(const time_point& ts, int pkts, size_t bytes)
{
const int iSampleDeltaIdx = (int) count_milliseconds(ts - m_tsFirstSampleTime) / SAMPLE_DURATION_MS;
const int delta = NUM_PERIODS - iSampleDeltaIdx;

// TODO: -delta <= NUM_PERIODS, then just reset the state on the estimator.

if (iSampleDeltaIdx >= 2 * NUM_PERIODS)
{
// Just reset the estimator and start like if new.
for (int i = 0; i < NUM_PERIODS; ++i)
{
const int idx = incSampleIdx(m_iFirstSampleIdx, i);
m_Samples[idx].reset();

if (idx == m_iCurSampleIdx)
break;
}

m_iFirstSampleIdx = 0;
m_iCurSampleIdx = 0;
m_iRateBps = 0;
m_tsFirstSampleTime += milliseconds_from(iSampleDeltaIdx * SAMPLE_DURATION_MS);
}
else if (iSampleDeltaIdx > NUM_PERIODS)
{
// In run-time a constant flow of samples is expected. Once all periods are filled (after 1 second of sampling),
// the iSampleDeltaIdx should be either (NUM_PERIODS - 1),
// or NUM_PERIODS. In the later case it means the start of a new sampling period.
int d = delta;
while (d < 0)
{
m_Samples[m_iFirstSampleIdx].reset();
m_iFirstSampleIdx = incSampleIdx(m_iFirstSampleIdx);
m_tsFirstSampleTime += milliseconds_from(SAMPLE_DURATION_MS);
m_iCurSampleIdx = incSampleIdx(m_iCurSampleIdx);
++d;
}
}

// Check if the new sample period has started.
const int iNewDeltaIdx = (int) count_milliseconds(ts - m_tsFirstSampleTime) / SAMPLE_DURATION_MS;
if (incSampleIdx(m_iFirstSampleIdx, iNewDeltaIdx) != m_iCurSampleIdx)
{
// Now there should be some periods (at most last NUM_PERIODS) ready to be summed,
// rate estimation updated, after which all the new entry should be added.
Sample sum;
int iNumPeriods = 0;
bool bMetNonEmpty = false;
for (int i = 0; i < NUM_PERIODS; ++i)
{
const int idx = incSampleIdx(m_iFirstSampleIdx, i);
const Sample& s = m_Samples[idx];
sum += s;
if (bMetNonEmpty || !s.empty())
{
++iNumPeriods;
bMetNonEmpty = true;
}

if (idx == m_iCurSampleIdx)
break;
}

if (iNumPeriods == 0)
{
m_iRateBps = 0;
}
else
{
m_iRateBps = sum.m_iBytesCount * 1000 / (iNumPeriods * SAMPLE_DURATION_MS);
}

HLOGC(bslog.Note,
log << "CSndRateEstimator: new rate estimation :" << (m_iRateBps * 8) / 1000 << " kbps. Based on "
<< iNumPeriods << " periods, " << sum.m_iPktsCount << " packets, " << sum.m_iBytesCount << " bytes.");

// Shift one sampling period to start collecting the new one.
m_iCurSampleIdx = incSampleIdx(m_iCurSampleIdx);
m_Samples[m_iCurSampleIdx].reset();

// If all NUM_SAMPLES are recorded, the first position has to be shifted as well.
if (delta <= 0)
{
m_iFirstSampleIdx = incSampleIdx(m_iFirstSampleIdx);
m_tsFirstSampleTime += milliseconds_from(SAMPLE_DURATION_MS);
}
}

m_Samples[m_iCurSampleIdx].m_iBytesCount += bytes;
m_Samples[m_iCurSampleIdx].m_iPktsCount += pkts;
}

int CSndRateEstimator::getCurrentRate() const
{
SRT_ASSERT(m_iCurSampleIdx >= 0 && m_iCurSampleIdx < NUM_PERIODS);
return (int) avg_iir<16, unsigned long long>(m_iRateBps, m_Samples[m_iCurSampleIdx].m_iBytesCount * 1000 / SAMPLE_DURATION_MS);
}

int CSndRateEstimator::incSampleIdx(int val, int inc) const
{
SRT_ASSERT(inc >= 0 && inc <= NUM_PERIODS);
val += inc;
while (val >= NUM_PERIODS)
val -= NUM_PERIODS;
return val;
}

}

83 changes: 76 additions & 7 deletions srtcore/buffer_tools.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ modified by

#include "common.h"

namespace srt {
namespace srt
{

/// The AvgBufSize class is used to calculate moving average of the buffer (RCV or SND)
class AvgBufSize
Expand Down Expand Up @@ -104,11 +105,9 @@ class CRateEstimator
void setInputRateSmpPeriod(int period);

/// Update input rate calculation.
/// @param [in] time current time in microseconds
/// @param [in] time current time
/// @param [in] pkts number of packets newly added to the buffer
/// @param [in] bytes number of payload bytes in those newly added packets
///
/// @return Current size of the data in the sending list.
void updateInputRate(const time_point& time, int pkts = 0, int bytes = 0);

void resetInputRateSmpPeriod(bool disable = false) { setInputRateSmpPeriod(disable ? 0 : INPUTRATE_FAST_START_US); }
Expand All @@ -123,10 +122,80 @@ class CRateEstimator
int m_iInRatePktsCount; // number of payload packets added since InRateStartTime.
int m_iInRateBytesCount; // number of payload bytes added since InRateStartTime.
time_point m_tsInRateStartTime;
uint64_t m_InRatePeriod; // usec
int m_iInRateBps; // Input Rate in Bytes/sec
uint64_t m_InRatePeriod; // usec
int m_iInRateBps; // Input Rate in Bytes/sec
};


class CSndRateEstimator
{
typedef sync::steady_clock::time_point time_point;

public:
CSndRateEstimator(const time_point& tsNow);

/// Add sample.
/// @param [in] time sample (sending) time.
/// @param [in] pkts number of packets in the sample.
/// @param [in] bytes number of payload bytes in the sample.
void addSample(const time_point& time, int pkts = 0, size_t bytes = 0);

/// Retrieve estimated bitrate in bytes per second
int getRate() const { return m_iRateBps; }

/// Retrieve estimated bitrate in bytes per second inluding the current sampling interval.
int getCurrentRate() const;

private:
static const int NUM_PERIODS = 10;
static const int SAMPLE_DURATION_MS = 100; // 100 ms
struct Sample
{
int m_iPktsCount; // number of payload packets
int m_iBytesCount; // number of payload bytes

void reset()
{
m_iPktsCount = 0;
m_iBytesCount = 0;
}

Sample()
: m_iPktsCount(0)
, m_iBytesCount(0)
{
}

Sample(int iPkts, int iBytes)
: m_iPktsCount(iPkts)
, m_iBytesCount(iBytes)
{
}

Sample operator+(const Sample& other)
{
return Sample(m_iPktsCount + other.m_iPktsCount, m_iBytesCount + other.m_iBytesCount);
}

Sample& operator+=(const Sample& other)
{
*this = *this + other;
return *this;
}

bool empty() const { return m_iPktsCount == 0; }
};

int incSampleIdx(int val, int inc = 1) const;

Sample m_Samples[NUM_PERIODS];

time_point m_tsFirstSampleTime; //< Start time of the first sameple.
int m_iFirstSampleIdx; //< Index of the first sample.
int m_iCurSampleIdx; //< Index of the current sample being collected.
int m_iRateBps; // Input Rate in Bytes/sec
};

}
} // namespace srt

#endif
22 changes: 22 additions & 0 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,9 @@ void srt::CUDT::construct()

srt::CUDT::CUDT(CUDTSocket* parent)
: m_parent(parent)
#ifdef ENABLE_MAXREXMITBW
, m_SndRexmitRate(sync::steady_clock::now())
#endif
, m_iISN(-1)
, m_iPeerISN(-1)
{
Expand All @@ -333,6 +336,9 @@ srt::CUDT::CUDT(CUDTSocket* parent)

srt::CUDT::CUDT(CUDTSocket* parent, const CUDT& ancestor)
: m_parent(parent)
#ifdef ENABLE_MAXREXMITBW
, m_SndRexmitRate(sync::steady_clock::now())
#endif
, m_iISN(-1)
, m_iPeerISN(-1)
{
Expand Down Expand Up @@ -9274,6 +9280,10 @@ int srt::CUDT::packLostData(CPacket& w_packet)
}
setDataPacketTS(w_packet, tsOrigin);

#ifdef ENABLE_MAXREXMITBW
m_SndRexmitRate.addSample(time_now, 1, w_packet.getLength());
#endif

return payload;
}

Expand Down Expand Up @@ -9435,6 +9445,18 @@ bool srt::CUDT::isRetransmissionAllowed(const time_point& tnow SRT_ATR_UNUSED)
return false;
}

#ifdef ENABLE_MAXREXMITBW
m_SndRexmitRate.addSample(tnow, 0, 0); // Update the estimation.
const int64_t iRexmitRateBps = m_SndRexmitRate.getRate();
const int64_t iRexmitRateLimitBps = m_config.llMaxRexmitBW;
if (iRexmitRateLimitBps >= 0 && iRexmitRateBps > iRexmitRateLimitBps)
{
// Too many retransmissions, so don't send anything.
// TODO: When to wake up next time?
return false;
}
#endif

#if SRT_DEBUG_TRACE_SND
g_snd_logger.state.canRexmit = true;
#endif
Expand Down
5 changes: 4 additions & 1 deletion srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ class CUDT

/// Create the CryptoControl object based on the HS packet.
SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock)
bool prepareConnectionObjects(const CHandShake &hs, HandshakeSide hsd, CUDTException *eout);
bool prepareConnectionObjects(const CHandShake &hs, HandshakeSide hsd, CUDTException* eout);

/// Allocates sender and receiver buffers and loss lists.
SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock)
Expand Down Expand Up @@ -822,6 +822,9 @@ class CUDT
CSndBuffer* m_pSndBuffer; // Sender buffer
CSndLossList* m_pSndLossList; // Sender loss list
CPktTimeWindow<16, 16> m_SndTimeWindow; // Packet sending time window
#ifdef ENABLE_MAXREXMITBW
CSndRateEstimator m_SndRexmitRate; // Retransmission retae estimation.
#endif

atomic_duration m_tdSendInterval; // Inter-packet time, in CPU clock cycles

Expand Down
Loading

0 comments on commit 78a1020

Please sign in to comment.