Skip to content

Commit

Permalink
[core] Fixed new RCV buffer in stream mode
Browse files Browse the repository at this point in the history
(reading fractional packets)
  • Loading branch information
maxsharabayko committed Nov 5, 2021
1 parent 3c3824f commit ec571a0
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 40 deletions.
54 changes: 15 additions & 39 deletions srtcore/buffer_rcv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -369,9 +369,10 @@ namespace {
/// @brief Writes bytes to file stream.
/// @param data pointer to data to write.
/// @param len the number of bytes to write
/// @param dst_offset ignored
/// @param arg a void pointer to the fstream to write to.
/// @return true on success, false on failure
bool writeBytesToFile(char* data, int len, void* arg)
bool writeBytesToFile(char* data, int len, int dst_offset SRT_ATR_UNUSED, void* arg)
{
fstream* pofs = reinterpret_cast<fstream*>(arg);
pofs->write(data, len);
Expand All @@ -381,11 +382,12 @@ namespace {
/// @brief Copies bytes to the destination buffer.
/// @param data pointer to data to copy.
/// @param len the number of bytes to copy
/// @param dst_offset offset in destination buffer
/// @param arg A pointer to the destination buffer
/// @return true on success, false on failure
bool copyBytesToBuf(char* data, int len, void* arg)
bool copyBytesToBuf(char* data, int len, int dst_offset, void* arg)
{
char* dst = reinterpret_cast<char*>(arg);
char* dst = reinterpret_cast<char*>(arg) + dst_offset;
memcpy(dst, data, len);
return true;
}
Expand Down Expand Up @@ -427,7 +429,7 @@ int CRcvBufferNew::readBufferTo(int len, copy_to_dst_f funcCopyToDst, void* arg)
const int remain_pktlen = pktlen - m_iNotch;
const int unitsize = std::min(remain_pktlen, rs);

if (!funcCopyToDst(pkt.m_pcData + m_iNotch, unitsize, arg))
if (!funcCopyToDst(pkt.m_pcData + m_iNotch, unitsize, len - rs, arg))
break;

if (rs >= remain_pktlen)
Expand Down Expand Up @@ -622,45 +624,23 @@ void CRcvBufferNew::updateNonreadPos()
if (m_iMaxPosInc == 0)
return;

// const PacketBoundary boundary = packet.getMsgBoundary();

//// The simplest case is when inserting a sequential PB_SOLO packet.
// if (boundary == PB_SOLO && (m_iFirstNonreadPos + 1) % m_szSize == pos)
//{
// m_iFirstNonreadPos = pos;
// return;
//}
const int end_pos = incPos(m_iStartPos, m_iMaxPosInc); // The empty position right after the last valid entry.

int pos = m_iFirstNonreadPos;
while (m_entries[pos].pUnit && m_entries[pos].status == EntryState_Avail && (m_entries[pos].pUnit->m_Packet.getMsgBoundary() & PB_FIRST))
while (m_entries[pos].pUnit && m_entries[pos].status == EntryState_Avail)
{
// bool good = true;

// look ahead for the whole message

// We expect to see either of:
// [PB_FIRST] [PB_SUBSEQUENT] [PB_SUBSEQUENT] [PB_LAST]
// [PB_SOLO]
// but not:
// [PB_FIRST] NULL ...
// [PB_FIRST] FREE/PASSACK/DROPPED...
// If the message didn't look as expected, interrupt this.

// This begins with a message starting at m_iStartPos
// up to end_pos (excluding) OR until the PB_LAST message is found.
// If any of the units on this way isn't good, this OUTER loop
// will be interrupted.
for (int i = pos; i != end_pos; i = (i + 1) % m_szSize)
if (m_bMessageAPI && (m_entries[pos].pUnit->m_Packet.getMsgBoundary() & PB_FIRST) == 0)
break;

for (int i = pos; i != end_pos; i = incPos(i))
{
if (!m_entries[i].pUnit || m_entries[pos].status != EntryState_Avail)
{
// good = false;
break;
}

// Likewise, boundary() & PB_LAST will be satisfied for last OR solo.
if (m_entries[i].pUnit->m_Packet.getMsgBoundary() & PB_LAST)
// Check PB_LAST only in message mode.
if (!m_bMessageAPI || m_entries[i].pUnit->m_Packet.getMsgBoundary() & PB_LAST)
{
m_iFirstNonreadPos = incPos(i);
break;
Expand All @@ -672,11 +652,6 @@ void CRcvBufferNew::updateNonreadPos()

pos = m_iFirstNonreadPos;
}

// 1. If there is a gap between this packet and m_iLastReadablePos
// then no sense to update m_iLastReadablePos.

// 2. The simplest case is when this is the first sequential packet
}

int CRcvBufferNew::findLastMessagePkt()
Expand Down Expand Up @@ -929,9 +904,10 @@ string CRcvBufferNew::strFullnessState(int iFirstUnackSeqNo, const time_point& t
{
ss << ":n/a ms";
}
ss << ". ";
}

ss << ". " SRT_SYNC_CLOCK_STR " drift " << getDrift() / 1000 << " ms.";
ss << SRT_SYNC_CLOCK_STR " drift " << getDrift() / 1000 << " ms.";
return ss.str();
}

Expand Down
2 changes: 1 addition & 1 deletion srtcore/buffer_rcv.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ class CRcvBufferNew
int scanNotInOrderMessageRight(int startPos, int msgNo) const;
int scanNotInOrderMessageLeft(int startPos, int msgNo) const;

typedef bool copy_to_dst_f(char* data, int len, void* arg);
typedef bool copy_to_dst_f(char* data, int len, int dst_offset, void* arg);

/// Read acknowledged data directly into file.
/// @param [in] ofs C++ file stream.
Expand Down

0 comments on commit ec571a0

Please sign in to comment.