Skip to content

Commit

Permalink
[core] Refax around Window measurement facility (#2857).
Browse files Browse the repository at this point in the history
  • Loading branch information
ethouris committed Jan 19, 2024
1 parent 633e3ea commit 2eb47e3
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 87 deletions.
128 changes: 128 additions & 0 deletions srtcore/utilities.h
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,134 @@ struct CallbackHolder
};

#define CALLBACK_CALL(holder,...) (*holder.fn)(holder.opaque, __VA_ARGS__)
// The version of std::tie from C++11, but for pairs only.
template <class T1, class T2>
struct PairProxy
{
T1& v1;
T2& v2;

PairProxy(T1& c1, T2& c2): v1(c1), v2(c2) {}

void operator=(const std::pair<T1, T2>& p)
{
v1 = p.first;
v2 = p.second;
}
};

template <class T1, class T2> inline
PairProxy<T1, T2> Tie2(T1& v1, T2& v2)
{
return PairProxy<T1, T2>(v1, v2);
}

template<class T>
struct PassFilter
{
T lower, median, upper;

bool encloses(const T& value)
{
// Throw away those that don't fit in the filter
return value > lower && value < upper;
}
};

// This utility is used in window.cpp where it is required to calculate
// the median value basing on the value in the very middle and filtered
// out values exceeding its range of 1/8 and 8 times. Returned is a structure
// that shows the median and also the lower and upper value used for filtering.
inline PassFilter<int> GetPeakRange(const int* window, int* replica, size_t size)
{
// This calculation does more-less the following:
//
// 1. Having example window:
// - 50, 51, 100, 55, 80, 1000, 600, 1500, 1200, 10, 90
// 2. This window is now sorted, but we only know the value in the middle:
// - 10, 50, 51, 55, 80, [[90]], 100, 600, 1000, 1200, 1500
// 3. Now calculate:
// - lower: 90/8 = 11.25
// - upper: 90*8 = 720
// 4. Now calculate the arithmetic median from all these values,
// but drop those from outside the <lower, upper> range:
// - 10, (11<) [ 50, 51, 55, 80, 90, 100, 600, ] (>720) 1000, 1200, 1500
// 5. Calculate the median from the extracted range,
// NOTE: the median is actually repeated once, so size is +1.
//
// values = { 50, 51, 55, 80, 90, 100, 600 };
// sum = 90 + accumulate(values); ==> 1026
// median = sum/(1 + values.size()); ==> 147
//
// For comparison: the overall arithmetic median from this window == 430
//
// 6. Returned value = 1M/median

// get median value, but cannot change the original value order in the window
std::copy(window, window + size, replica);
std::nth_element(replica, replica + (size / 2), replica + size);
//std::sort(replica, replica + psize); <--- was used for debug, just leave it as a mark

PassFilter<int> filter;
filter.median = replica[size / 2];
filter.upper = filter.median << 3; // median*8
filter.lower = filter.median >> 3; // median/8

return filter;
}

// This function sums up all values in the array (from p to end),
// except those that don't fit in the low- and high-pass filter.
// Returned is the sum and the number of elements taken into account.
inline std::pair<int, int> AccumulatePassFilter(const int* p, size_t size, PassFilter<int> filter)
{
int count = 0;
int sum = 0;
const int* const end = p + size;
for (; p != end; ++p)
{
// Throw away those that don't fit in the filter
if (!filter.encloses(*p))
continue;

sum += *p;
++count;
}

return std::make_pair(sum, count);
}

// This function sums up all values in the array (from p to end)
// and simultaneously elements from `para`, stated it points to
// an array of the same size. The first array is used as a driver
// for which elements to include and which to skip, and this is done
// for both arrays at particular index position. Returner is the sum
// of the elements passed from the first array and from the `para`
// array, as well as the number of included elements.
template <class IntCount, class IntParaCount>
inline void AccumulatePassFilterParallel(const int* p, size_t size, PassFilter<int> filter,
const int* para,
int& w_sum, IntCount& w_count, IntParaCount& w_paracount)
{
IntCount count = 0;
int sum = 0;
IntParaCount parasum = 0;
const int* const end = p + size;
for (; p != end; ++p, ++para)
{
// Throw away those that don't fit in the filter
if (!filter.encloses(*p))
continue;

sum += *p;
parasum += *para;
++count;
}
w_count = count;
w_sum = sum;
w_paracount = parasum;
}


inline std::string FormatBinaryString(const uint8_t* bytes, size_t size)
{
Expand Down
121 changes: 34 additions & 87 deletions srtcore/window.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,100 +157,47 @@ void srt::CPktTimeWindowTools::initializeWindowArrays(int* r_pktWindow, int* r_p
r_bytesWindow[i] = max_payload_size; //based on 1 pkt/sec set in r_pktWindow[i]
}


int srt::CPktTimeWindowTools::getPktRcvSpeed_in(const int* window, int* replica, const int* abytes, size_t asize, size_t hdr_size, int& bytesps)
int srt::CPktTimeWindowTools::ceilPerMega(double value, double count)
{
// get median value, but cannot change the original value order in the window
std::copy(window, window + asize, replica);
std::nth_element(replica, replica + (asize / 2), replica + asize);
//std::sort(replica, replica + asize);
int median = replica[asize / 2];

unsigned count = 0;
int sum = 0;
int upper = median << 3;
int lower = median >> 3;

bytesps = 0;
unsigned long bytes = 0;
const int* bp = abytes;
// median filtering
const int* p = window;
for (int i = 0, n = (int)asize; i < n; ++ i)
{
if ((*p < upper) && (*p > lower))
{
++ count; //packet counter
sum += *p; //usec counter
bytes += (unsigned long)*bp; //byte counter
}
++ p; //advance packet pointer
++ bp; //advance bytes pointer
}
static const double MEGA = 1000.0 * 1000.0;
return ::ceil(MEGA / (value / count));
}

// claculate speed, or return 0 if not enough valid value
if (count > (asize >> 1))
{
bytes += (hdr_size * count); //Add protocol headers to bytes received
bytesps = (int)ceil(1000000.0 / (double(sum) / double(bytes)));
return (int)ceil(1000000.0 / (sum / count));
}
else
{
bytesps = 0;
return 0;
}
int srt::CPktTimeWindowTools::getPktRcvSpeed_in(const int* window, int* replica, const int* abytes, size_t asize, size_t hdr_size, int& w_bytesps)
{
PassFilter<int> filter = GetPeakRange(window, replica, asize);

unsigned count = 0;
int sum = 0;

w_bytesps = 0;
unsigned long bytes = 0;
// // (explicit specialization due to problems on MSVC 2013 and 2015)
AccumulatePassFilterParallel<unsigned, unsigned long>(window, asize, filter, abytes,
(sum), (count), (bytes));

// claculate speed, or return 0 if not enough valid value
if (count <= (asize/2))
{
w_bytesps = 0;
return 0;
}

bytes += (hdr_size * count); //Add protocol headers to bytes received
w_bytesps = ceilPerMega(sum, bytes);
return ceilPerMega(sum, count);
}

int srt::CPktTimeWindowTools::getBandwidth_in(const int* window, int* replica, size_t psize)
{
// This calculation does more-less the following:
//
// 1. Having example window:
// - 50, 51, 100, 55, 80, 1000, 600, 1500, 1200, 10, 90
// 2. This window is now sorted, but we only know the value in the middle:
// - 10, 50, 51, 55, 80, [[90]], 100, 600, 1000, 1200, 1500
// 3. Now calculate:
// - lower: 90/8 = 11.25
// - upper: 90*8 = 720
// 4. Now calculate the arithmetic median from all these values,
// but drop those from outside the <lower, upper> range:
// - 10, (11<) [ 50, 51, 55, 80, 90, 100, 600, ] (>720) 1000, 1200, 1500
// 5. Calculate the median from the extracted range,
// NOTE: the median is actually repeated once, so size is +1.
//
// values = { 50, 51, 55, 80, 90, 100, 600 };
// sum = 90 + accumulate(values); ==> 1026
// median = sum/(1 + values.size()); ==> 147
//
// For comparison: the overall arithmetic median from this window == 430
//
// 6. Returned value = 1M/median

// get median value, but cannot change the original value order in the window
std::copy(window, window + psize - 1, replica);
std::nth_element(replica, replica + (psize / 2), replica + psize - 1);
//std::sort(replica, replica + psize); <--- was used for debug, just leave it as a mark
int median = replica[psize / 2];

int count = 1;
int sum = median;
int upper = median << 3; // median*8
int lower = median >> 3; // median/8

// median filtering
const int* p = window;
for (int i = 0, n = (int)psize; i < n; ++ i)
{
if ((*p < upper) && (*p > lower))
{
++ count;
sum += *p;
}
++ p;
}
PassFilter<int> filter = GetPeakRange(window, replica, psize);

int sum, count;
Tie2(sum, count) = AccumulatePassFilter(window, psize, filter);
sum += filter.median;
count += 1;

return (int)ceil(1000000.0 / (double(sum) / double(count)));
return ceilPerMega(sum, count);
}


2 changes: 2 additions & 0 deletions srtcore/window.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ class CPktTimeWindowTools
static int getBandwidth_in(const int* window, int* replica, size_t psize);

static void initializeWindowArrays(int* r_pktWindow, int* r_probeWindow, int* r_bytesWindow, size_t asize, size_t psize, size_t max_payload_size);

static int ceilPerMega(double value, double count);
};

template <size_t ASIZE = 16, size_t PSIZE = 16>
Expand Down

0 comments on commit 2eb47e3

Please sign in to comment.