Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor CSndBuffer edits. #2430

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 52 additions & 52 deletions srtcore/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ void CRateEstimator::updateInputRate(const time_point& time, int pkts, int bytes
}
}

CSndBuffer::CSndBuffer(int size, int mss)
CSndBuffer::CSndBuffer(int size, int maxpld)
: m_BufLock()
, m_pBlock(NULL)
, m_pFirstBlock(NULL)
Expand All @@ -171,35 +171,34 @@ CSndBuffer::CSndBuffer(int size, int mss)
, m_pBuffer(NULL)
, m_iNextMsgNo(1)
, m_iSize(size)
, m_iMSS(mss)
, m_iBlockLen(maxpld)
, m_iCount(0)
, m_iBytesCount(0)
{
// initial physical buffer of "size"
m_pBuffer = new Buffer;
m_pBuffer->m_pcData = new char[m_iSize * m_iMSS];
m_pBuffer->m_pcData = new char[m_iSize * m_iBlockLen];
m_pBuffer->m_iSize = m_iSize;
m_pBuffer->m_pNext = NULL;

// circular linked list for out bound packets
m_pBlock = new Block;
Block* pb = m_pBlock;
for (int i = 1; i < m_iSize; ++i)
{
pb->m_pNext = new Block;
pb->m_iMsgNoBitset = 0;
pb = pb->m_pNext;
}
pb->m_pNext = m_pBlock;
char* pc = m_pBuffer->m_pcData;

pb = m_pBlock;
char* pc = m_pBuffer->m_pcData;
for (int i = 0; i < m_iSize; ++i)
{
pb->m_pcData = pc;
pb = pb->m_pNext;
pc += m_iMSS;
pb->m_iMsgNoBitset = 0;
pb->m_pcData = pc;
pc += m_iBlockLen;

if (i < m_iSize - 1)
{
pb->m_pNext = new Block;
pb = pb->m_pNext;
}
}
pb->m_pNext = m_pBlock;

m_pFirstBlock = m_pCurrBlock = m_pLastBlock = m_pBlock;

Expand Down Expand Up @@ -230,31 +229,31 @@ CSndBuffer::~CSndBuffer()

void CSndBuffer::addBuffer(const char* data, int len, SRT_MSGCTRL& w_mctrl)
{
int32_t& w_msgno = w_mctrl.msgno;
int32_t& w_seqno = w_mctrl.pktseq;
int32_t& w_msgno = w_mctrl.msgno;
int32_t& w_seqno = w_mctrl.pktseq;
int64_t& w_srctime = w_mctrl.srctime;
const int& ttl = w_mctrl.msgttl;
int size = len / m_iMSS;
if ((len % m_iMSS) != 0)
size++;
const int iPktLen = m_iBlockLen; // Payload length per packet.
int iNumBlocks = len / iPktLen;
if ((len % m_iBlockLen) != 0)
++iNumBlocks;

HLOGC(bslog.Debug,
log << "addBuffer: size=" << m_iCount << " reserved=" << m_iSize << " needs=" << size << " buffers for "
<< len << " bytes");
log << "addBuffer: needs=" << iNumBlocks << " buffers for " << len << " bytes. Taken=" << m_iCount << "/" << m_iSize);
// Retrieve current time before locking the mutex to be closer to packet submission event.
const steady_clock::time_point tnow = steady_clock::now();

ScopedLock bufferguard(m_BufLock);
// Dynamically increase sender buffer if there is not enough room.
while (size + m_iCount >= m_iSize)
while (iNumBlocks + m_iCount >= m_iSize)
{
HLOGC(bslog.Debug, log << "addBuffer: ... still lacking " << (size + m_iCount - m_iSize) << " buffers...");
HLOGC(bslog.Debug, log << "addBuffer: ... still lacking " << (iNumBlocks + m_iCount - m_iSize) << " buffers...");
increase();
}

const int32_t inorder = w_mctrl.inorder ? MSGNO_PACKET_INORDER::mask : 0;
HLOGC(bslog.Debug,
log << CONID() << "addBuffer: adding " << size << " packets (" << len << " bytes) to send, msgno="
log << CONID() << "addBuffer: adding " << iNumBlocks << " packets (" << len << " bytes) to send, msgno="
<< (w_msgno > 0 ? w_msgno : m_iNextMsgNo) << (inorder ? "" : " NOT") << " in order");

// Calculate origin time (same for all blocks of the message).
Expand All @@ -281,16 +280,16 @@ void CSndBuffer::addBuffer(const char* data, int len, SRT_MSGCTRL& w_mctrl)
m_iNextMsgNo = w_msgno;
}

for (int i = 0; i < size; ++i)
for (int i = 0; i < iNumBlocks; ++i)
{
int pktlen = len - i * m_iMSS;
if (pktlen > m_iMSS)
pktlen = m_iMSS;
int pktlen = len - i * iPktLen;
if (pktlen > iPktLen)
pktlen = iPktLen;

HLOGC(bslog.Debug,
log << "addBuffer: %" << w_seqno << " #" << w_msgno << " spreading from=" << (i * m_iMSS)
log << "addBuffer: %" << w_seqno << " #" << w_msgno << " offset=" << (i * iPktLen)
<< " size=" << pktlen << " TO BUFFER:" << (void*)s->m_pcData);
memcpy((s->m_pcData), data + i * m_iMSS, pktlen);
memcpy((s->m_pcData), data + i * iPktLen, pktlen);
s->m_iLength = pktlen;

s->m_iSeqNo = w_seqno;
Expand All @@ -299,7 +298,7 @@ void CSndBuffer::addBuffer(const char* data, int len, SRT_MSGCTRL& w_mctrl)
s->m_iMsgNoBitset = m_iNextMsgNo | inorder;
if (i == 0)
s->m_iMsgNoBitset |= PacketBoundaryBits(PB_FIRST);
if (i == size - 1)
if (i == iNumBlocks - 1)
s->m_iMsgNoBitset |= PacketBoundaryBits(PB_LAST);
// NOTE: if i is neither 0 nor size-1, it resuls with PB_SUBSEQUENT.
// if i == 0 == size-1, it results with PB_SOLO.
Expand All @@ -318,10 +317,10 @@ void CSndBuffer::addBuffer(const char* data, int len, SRT_MSGCTRL& w_mctrl)
}
m_pLastBlock = s;

m_iCount += size;
m_iCount += iNumBlocks;
m_iBytesCount += len;

m_rateEstimator.updateInputRate(m_tsLastOriginTime, size, len);
m_rateEstimator.updateInputRate(m_tsLastOriginTime, iNumBlocks, len);
updAvgBufSize(m_tsLastOriginTime);

// MSGNO_SEQ::mask has a form: 00000011111111...
Expand All @@ -337,39 +336,40 @@ void CSndBuffer::addBuffer(const char* data, int len, SRT_MSGCTRL& w_mctrl)

int CSndBuffer::addBufferFromFile(fstream& ifs, int len)
{
int size = len / m_iMSS;
if ((len % m_iMSS) != 0)
size++;
const int iPktLen = m_iBlockLen; // Payload length per packet.
int iNumBlocks = len / iPktLen;
if ((len % m_iBlockLen) != 0)
++iNumBlocks;

HLOGC(bslog.Debug,
log << "addBufferFromFile: size=" << m_iCount << " reserved=" << m_iSize << " needs=" << size
log << "addBufferFromFile: size=" << m_iCount << " reserved=" << m_iSize << " needs=" << iPktLen
<< " buffers for " << len << " bytes");

// dynamically increase sender buffer
while (size + m_iCount >= m_iSize)
while (iPktLen + m_iCount >= m_iSize)
{
HLOGC(bslog.Debug,
log << "addBufferFromFile: ... still lacking " << (size + m_iCount - m_iSize) << " buffers...");
log << "addBufferFromFile: ... still lacking " << (iPktLen + m_iCount - m_iSize) << " buffers...");
increase();
}

HLOGC(bslog.Debug,
log << CONID() << "addBufferFromFile: adding " << size << " packets (" << len
log << CONID() << "addBufferFromFile: adding " << iPktLen << " packets (" << len
<< " bytes) to send, msgno=" << m_iNextMsgNo);

Block* s = m_pLastBlock;
int total = 0;
for (int i = 0; i < size; ++i)
for (int i = 0; i < iPktLen; ++i)
{
if (ifs.bad() || ifs.fail() || ifs.eof())
break;

int pktlen = len - i * m_iMSS;
if (pktlen > m_iMSS)
pktlen = m_iMSS;
int pktlen = len - i * iPktLen;
if (pktlen > iPktLen)
pktlen = iPktLen;

HLOGC(bslog.Debug,
log << "addBufferFromFile: reading from=" << (i * m_iMSS) << " size=" << pktlen
log << "addBufferFromFile: reading from=" << (i * iPktLen) << " size=" << pktlen
<< " TO BUFFER:" << (void*)s->m_pcData);
ifs.read(s->m_pcData, pktlen);
if ((pktlen = int(ifs.gcount())) <= 0)
Expand All @@ -379,7 +379,7 @@ int CSndBuffer::addBufferFromFile(fstream& ifs, int len)
s->m_iMsgNoBitset = m_iNextMsgNo | MSGNO_PACKET_INORDER::mask;
if (i == 0)
s->m_iMsgNoBitset |= PacketBoundaryBits(PB_FIRST);
if (i == size - 1)
if (i == iPktLen - 1)
s->m_iMsgNoBitset |= PacketBoundaryBits(PB_LAST);
// NOTE: PB_FIRST | PB_LAST == PB_SOLO.
// none of PB_FIRST & PB_LAST == PB_SUBSEQUENT.
Expand All @@ -393,7 +393,7 @@ int CSndBuffer::addBufferFromFile(fstream& ifs, int len)
m_pLastBlock = s;

enterCS(m_BufLock);
m_iCount += size;
m_iCount += iPktLen;
m_iBytesCount += total;

leaveCS(m_BufLock);
Expand Down Expand Up @@ -588,7 +588,7 @@ int CSndBuffer::readData(const int offset, CPacket& w_packet, steady_clock::time
}

w_packet.m_pcData = p->m_pcData;
int readlen = p->m_iLength;
const int readlen = p->m_iLength;
w_packet.setLength(readlen);

// XXX Here the value predicted to be applied to PH_MSGNO field is extracted.
Expand Down Expand Up @@ -750,7 +750,7 @@ void CSndBuffer::increase()
try
{
nbuf = new Buffer;
nbuf->m_pcData = new char[unitsize * m_iMSS];
nbuf->m_pcData = new char[unitsize * m_iBlockLen];
}
catch (...)
{
Expand Down Expand Up @@ -794,13 +794,13 @@ void CSndBuffer::increase()
{
pb->m_pcData = pc;
pb = pb->m_pNext;
pc += m_iMSS;
pc += m_iBlockLen;
}

m_iSize += unitsize;

HLOGC(bslog.Debug,
log << "CSndBuffer: BUFFER FULL - adding " << (unitsize * m_iMSS) << " bytes spread to " << unitsize
log << "CSndBuffer: BUFFER FULL - adding " << (unitsize * m_iBlockLen) << " bytes spread to " << unitsize
<< " blocks"
<< " (total size: " << m_iSize << " bytes)");
}
Expand Down
9 changes: 6 additions & 3 deletions srtcore/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,10 @@ class CSndBuffer
// Currently just "unimplemented".
std::string CONID() const { return ""; }

CSndBuffer(int size = 32, int mss = 1500);
/// @brief CSndBuffer constructor.
/// @param size initial number of blocks (each block to store one packet payload).
/// @param maxpld maximum packet payload.
CSndBuffer(int size = 32, int maxpld = 1500);
~CSndBuffer();

public:
Expand Down Expand Up @@ -256,7 +259,7 @@ class CSndBuffer
struct Block
{
char* m_pcData; // pointer to the data block
int m_iLength; // length of the block
int m_iLength; // payload length of the block.

int32_t m_iMsgNoBitset; // message number
int32_t m_iSeqNo; // sequence number for scheduling
Expand Down Expand Up @@ -292,7 +295,7 @@ class CSndBuffer
int32_t m_iNextMsgNo; // next message number

int m_iSize; // buffer size (number of packets)
int m_iMSS; // maximum seqment/packet size
const int m_iBlockLen; // maximum length of a block holding packet payload (excluding packet header).
int m_iCount; // number of used blocks

int m_iBytesCount; // number of payload bytes in queue
Expand Down