Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prefer Using Source Time if Provided #2185

Merged
merged 2 commits into from
Nov 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 24 additions & 33 deletions srtcore/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,21 +193,28 @@ void CSndBuffer::addBuffer(const char* data, int len, SRT_MSGCTRL& w_mctrl)
HLOGC(bslog.Debug,
log << "addBuffer: size=" << m_iCount << " reserved=" << m_iSize << " needs=" << size << " buffers for "
<< len << " bytes");
// Retrieve current time before locking the mutex to be closer to packet submission event.
const steady_clock::time_point tnow = steady_clock::now();

// dynamically increase sender buffer
ScopedLock bufferguard(m_BufLock);
// Dynamically increase sender buffer if there is not enough room.
while (size + m_iCount >= m_iSize)
{
HLOGC(bslog.Debug, log << "addBuffer: ... still lacking " << (size + m_iCount - m_iSize) << " buffers...");
increase();
}

const steady_clock::time_point time = steady_clock::now();
const int32_t inorder = w_mctrl.inorder ? MSGNO_PACKET_INORDER::mask : 0;

HLOGC(bslog.Debug,
log << CONID() << "addBuffer: adding " << size << " packets (" << len << " bytes) to send, msgno="
<< (w_msgno > 0 ? w_msgno : m_iNextMsgNo) << (inorder ? "" : " NOT") << " in order");

// Calculate origin time (same for all blocks of the message).
m_tsLastOriginTime = w_srctime ? time_point() + microseconds_from(w_srctime) : tnow;
// Rewrite back the actual value, even if it stays the same, so that the calling facilities can reuse it.
// May also be a subject to conversion error, thus the actual value is signalled back.
w_srctime = count_microseconds(m_tsLastOriginTime.time_since_epoch());

// The sequence number passed to this function is the sequence number
// that the very first packet from the packet series should get here.
// If there's more than one packet, this function must increase it by itself
Expand Down Expand Up @@ -253,33 +260,21 @@ void CSndBuffer::addBuffer(const char* data, int len, SRT_MSGCTRL& w_mctrl)
// [PB_FIRST] [PB_LAST] - 2 packets per message
// [PB_SOLO] - 1 packet per message

s->m_llSourceTime_us = w_srctime;
s->m_tsOriginTime = time;
s->m_tsRexmitTime = time_point();
s->m_iTTL = ttl;
// Rewrite the actual sending time back into w_srctime
// so that the calling facilities can reuse it
if (!w_srctime)
w_srctime = count_microseconds(s->m_tsOriginTime.time_since_epoch());

// XXX unchecked condition: s->m_pNext == NULL.
s->m_tsRexmitTime = time_point();
s->m_tsOriginTime = m_tsLastOriginTime;

// Should never happen, as the call to increase() should ensure enough buffers.
SRT_ASSERT(s->m_pNext);
s = s->m_pNext;
}
m_pLastBlock = s;

enterCS(m_BufLock);
m_iCount += size;

m_iBytesCount += len;
m_tsLastOriginTime = time;

updateInputRate(time, size, len);

updAvgBufSize(time);

leaveCS(m_BufLock);
updateInputRate(m_tsLastOriginTime, size, len);
updAvgBufSize(m_tsLastOriginTime);

// MSGNO_SEQ::mask has a form: 00000011111111...
// At least it's known that it's from some index inside til the end (to bit 0).
Expand Down Expand Up @@ -402,16 +397,6 @@ int CSndBuffer::addBufferFromFile(fstream& ifs, int len)
return total;
}

steady_clock::time_point CSndBuffer::getSourceTime(const CSndBuffer::Block& block)
{
if (block.m_llSourceTime_us)
{
return steady_clock::time_point() + microseconds_from(block.m_llSourceTime_us);
}

return block.m_tsOriginTime;
}

int CSndBuffer::readData(CPacket& w_packet, steady_clock::time_point& w_srctime, int kflgs)
{
// No data to read
Expand Down Expand Up @@ -459,7 +444,7 @@ int CSndBuffer::readData(CPacket& w_packet, steady_clock::time_point& w_srctime,
}

w_packet.m_iMsgNo = m_pCurrBlock->m_iMsgNoBitset;
w_srctime = getSourceTime(*m_pCurrBlock);
w_srctime = m_pCurrBlock->m_tsOriginTime;
m_pCurrBlock = m_pCurrBlock->m_pNext;

HLOGC(bslog.Debug, log << CONID() << "CSndBuffer: extracting packet size=" << readlen << " to send");
Expand Down Expand Up @@ -593,7 +578,7 @@ int CSndBuffer::readData(const int offset, CPacket& w_packet, steady_clock::time
// the packet originally (the other overload of this function) must set these
// flags.
w_packet.m_iMsgNo = p->m_iMsgNoBitset;
w_srctime = getSourceTime(*p);
w_srctime = p->m_tsOriginTime;

// This function is called when packet retransmission is triggered.
// Therefore we are setting the rexmit time.
Expand Down Expand Up @@ -683,11 +668,17 @@ int CSndBuffer::getCurrBufSize(int& w_bytes, int& w_timespan)
* Also, if there is only one pkt in buffer, the time difference will be 0.
* Therefore, always add 1 ms if not empty.
*/
w_timespan = 0 < m_iCount ? count_milliseconds(m_tsLastOriginTime - m_pFirstBlock->m_tsOriginTime) + 1 : 0;
w_timespan = 0 < m_iCount ? (int) count_milliseconds(m_tsLastOriginTime - m_pFirstBlock->m_tsOriginTime) + 1 : 0;

return m_iCount;
}

CSndBuffer::time_point CSndBuffer::getOldestTime() const
{
SRT_ASSERT(m_pFirstBlock);
return m_pFirstBlock->m_tsOriginTime;
}

int CSndBuffer::dropLateData(int& w_bytes, int32_t& w_first_msgno, const steady_clock::time_point& too_late_time)
{
int dpkts = 0;
Expand Down
8 changes: 3 additions & 5 deletions srtcore/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ class CSndBuffer
int getAvgBufSize(int& bytes, int& timespan);
int getCurrBufSize(int& bytes, int& timespan);

time_point getOldestTime() const;

uint64_t getInRatePeriod() const { return m_InRatePeriod; }

/// Retrieve input bitrate in bytes per second
Expand All @@ -197,9 +199,6 @@ class CSndBuffer
void increase();
void setInputRateSmpPeriod(int period);

struct Block; // Defined below
static time_point getSourceTime(const CSndBuffer::Block& block);

private: // Constants
static const uint64_t INPUTRATE_FAST_START_US = 500000; // 500 ms
static const uint64_t INPUTRATE_RUNNING_US = 1000000; // 1000 ms
Expand All @@ -216,9 +215,8 @@ class CSndBuffer

int32_t m_iMsgNoBitset; // message number
int32_t m_iSeqNo; // sequence number for scheduling
time_point m_tsOriginTime; // original request time
time_point m_tsOriginTime; // block origin time (either provided from above or equials the time a message was submitted for sending.
time_point m_tsRexmitTime; // packet retransmission time
uint64_t m_llSourceTime_us;
int m_iTTL; // time to live (milliseconds)

Block* m_pNext; // next block
Expand Down
14 changes: 7 additions & 7 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6362,10 +6362,10 @@ int srt::CUDT::receiveBuffer(char *data, int len)

// [[using maybe_locked(CUDTGroup::m_GroupLock, m_parent->m_GroupOf != NULL)]];
// [[using locked(m_SendLock)]];
void srt::CUDT::checkNeedDrop(bool& w_bCongestion)
bool srt::CUDT::checkNeedDrop()
{
if (!m_bPeerTLPktDrop)
return;
return false;

if (!m_config.bMessageAPI)
{
Expand All @@ -6390,6 +6390,7 @@ void srt::CUDT::checkNeedDrop(bool& w_bCongestion)
(2 * COMM_SYN_INTERVAL_US / 1000);
}

bool bCongestion = false;
if (threshold_ms && timespan_ms > threshold_ms)
{
// protect packet retransmission
Expand Down Expand Up @@ -6447,16 +6448,17 @@ void srt::CUDT::checkNeedDrop(bool& w_bCongestion)
}
#endif
}
w_bCongestion = true;
bCongestion = true;
leaveCS(m_RecvAckLock);
}
else if (timespan_ms > (m_iPeerTsbPdDelay_ms / 2))
{
HLOGC(aslog.Debug,
log << "cong, BYTES " << bytes << ", TMSPAN " << timespan_ms << "ms");

w_bCongestion = true;
bCongestion = true;
}
return bCongestion;
}

int srt::CUDT::sendmsg(const char *data, int len, int msttl, bool inorder, int64_t srctime)
Expand All @@ -6473,8 +6475,6 @@ int srt::CUDT::sendmsg(const char *data, int len, int msttl, bool inorder, int64
// which is the only case when the m_parent->m_GroupOf is not NULL.
int srt::CUDT::sendmsg2(const char *data, int len, SRT_MSGCTRL& w_mctrl)
{
bool bCongestion = false;

// throw an exception if not connected
if (m_bBroken || m_bClosing)
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
Expand Down Expand Up @@ -6563,7 +6563,7 @@ int srt::CUDT::sendmsg2(const char *data, int len, SRT_MSGCTRL& w_mctrl)

// checkNeedDrop(...) may lock m_RecvAckLock
// to modify m_pSndBuffer and m_pSndLossList
checkNeedDrop((bCongestion));
const bool bCongestion = checkNeedDrop();

int minlen = 1; // Minimum sender buffer space required for STREAM API
if (m_config.bMessageAPI)
Expand Down
2 changes: 1 addition & 1 deletion srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ class CUDT

void updateIdleLinkFrom(CUDT* source);

void checkNeedDrop(bool& bCongestion);
bool checkNeedDrop();

/// Connect to a UDT entity as per hs request. This will update
/// required data in the entity, then update them also in the hs structure,
Expand Down