Skip to content

Commit

Permalink
[core] CUnitQueue access via functions.
Browse files Browse the repository at this point in the history
CRcvBuffer and CRcvQueue were accessing and modifying state of the CUnitQueue directly (friend classes). Now they do it via public functions.
  • Loading branch information
maxsharabayko authored and rndi committed Feb 28, 2019
1 parent f865119 commit 04ade2e
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 39 deletions.
35 changes: 14 additions & 21 deletions srtcore/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -742,8 +742,7 @@ CRcvBuffer::~CRcvBuffer()
{
if (m_pUnit[i] != NULL)
{
m_pUnit[i]->m_iFlag = CUnit::FREE;
-- m_pUnitQueue->m_iCount;
m_pUnitQueue->makeUnitFree(m_pUnit[i]);
}
}

Expand Down Expand Up @@ -790,10 +789,9 @@ int CRcvBuffer::addData(CUnit* unit, int offset)
return -1;
}
m_pUnit[pos] = unit;
countBytes(1, unit->m_Packet.getLength());
countBytes(1, (int) unit->m_Packet.getLength());

unit->m_iFlag = CUnit::GOOD;
++ m_pUnitQueue->m_iCount;
m_pUnitQueue->makeUnitGood(unit);

return 0;
}
Expand All @@ -819,7 +817,7 @@ int CRcvBuffer::readBuffer(char* data, int len)
break; /* too early for this unit, return whatever was copied */
}

int unitsize = m_pUnit[p]->m_Packet.getLength() - m_iNotch;
int unitsize = (int) m_pUnit[p]->m_Packet.getLength() - m_iNotch;
if (unitsize > rs)
unitsize = rs;

Expand All @@ -832,8 +830,7 @@ int CRcvBuffer::readBuffer(char* data, int len)
{
CUnit* tmp = m_pUnit[p];
m_pUnit[p] = NULL;
tmp->m_iFlag = CUnit::FREE;
-- m_pUnitQueue->m_iCount;
m_pUnitQueue->makeUnitFree(tmp);

if (++ p == m_iSize)
p = 0;
Expand Down Expand Up @@ -861,7 +858,7 @@ int CRcvBuffer::readBufferToFile(fstream& ofs, int len)

while ((p != lastack) && (rs > 0))
{
int unitsize = m_pUnit[p]->m_Packet.getLength() - m_iNotch;
int unitsize = (int) m_pUnit[p]->m_Packet.getLength() - m_iNotch;
if (unitsize > rs)
unitsize = rs;

Expand All @@ -873,8 +870,7 @@ int CRcvBuffer::readBufferToFile(fstream& ofs, int len)
{
CUnit* tmp = m_pUnit[p];
m_pUnit[p] = NULL;
tmp->m_iFlag = CUnit::FREE;
-- m_pUnitQueue->m_iCount;
m_pUnitQueue->makeUnitFree(tmp);

if (++ p == m_iSize)
p = 0;
Expand Down Expand Up @@ -905,7 +901,7 @@ void CRcvBuffer::ackData(int len)
continue;

pkts++;
bytes += m_pUnit[i]->m_Packet.getLength();
bytes += (int) m_pUnit[i]->m_Packet.getLength();
}
if (pkts > 0) countBytes(pkts, bytes, true);
}
Expand Down Expand Up @@ -1094,9 +1090,8 @@ bool CRcvBuffer::getRcvReadyMsg(ref_t<uint64_t> tsbpdtime, ref_t<int32_t> curpkt
CUnit* tmp = m_pUnit[i];
m_pUnit[i] = NULL;
rmpkts++;
rmbytes += tmp->m_Packet.getLength();
tmp->m_iFlag = CUnit::FREE;
--m_pUnitQueue->m_iCount;
rmbytes += (int) tmp->m_Packet.getLength();
m_pUnitQueue->makeUnitFree(tmp);

if (++m_iStartPos == m_iSize)
m_iStartPos = 0;
Expand Down Expand Up @@ -1600,7 +1595,7 @@ int CRcvBuffer::readMsg(char* data, int len, ref_t<SRT_MSGCTRL> r_msgctl)
int rs = len;
while (p != (q + 1) % m_iSize)
{
int unitsize = m_pUnit[p]->m_Packet.getLength();
int unitsize = (int) m_pUnit[p]->m_Packet.getLength();
if ((rs >= 0) && (unitsize > rs))
unitsize = rs;

Expand Down Expand Up @@ -1641,8 +1636,7 @@ int CRcvBuffer::readMsg(char* data, int len, ref_t<SRT_MSGCTRL> r_msgctl)
{
CUnit* tmp = m_pUnit[p];
m_pUnit[p] = NULL;
tmp->m_iFlag = CUnit::FREE;
-- m_pUnitQueue->m_iCount;
m_pUnitQueue->makeUnitFree(tmp);
}
else
m_pUnit[p]->m_iFlag = CUnit::PASSACK;
Expand Down Expand Up @@ -1727,9 +1721,8 @@ bool CRcvBuffer::scanMsg(ref_t<int> r_p, ref_t<int> r_q, ref_t<bool> passack)
CUnit* tmp = m_pUnit[m_iStartPos];
m_pUnit[m_iStartPos] = NULL;
rmpkts++;
rmbytes += tmp->m_Packet.getLength();
tmp->m_iFlag = CUnit::FREE;
-- m_pUnitQueue->m_iCount;
rmbytes += (int) tmp->m_Packet.getLength();
m_pUnitQueue->makeUnitFree(tmp);

if (++ m_iStartPos == m_iSize)
m_iStartPos = 0;
Expand Down
19 changes: 10 additions & 9 deletions srtcore/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,12 @@ class CRcvBuffer
{
public:

// XXX There's currently no way to access the socket ID set for
// whatever the queue is currently working for. Required to find
// some way to do this, possibly by having a "reverse pointer".
// Currently just "unimplemented".
std::string CONID() const { return ""; }
// XXX There's currently no way to access the socket ID set for
// whatever the queue is currently working for. Required to find
// some way to do this, possibly by having a "reverse pointer".
// Currently just "unimplemented".
std::string CONID() const { return ""; }


CRcvBuffer(CUnitQueue* queue, int bufsize = 65536);
~CRcvBuffer();
Expand Down Expand Up @@ -394,14 +395,14 @@ class CRcvBuffer
private:
CUnit** m_pUnit; // pointer to the protocol buffer
int m_iSize; // size of the protocol buffer
CUnitQueue* m_pUnitQueue; // the shared unit queue
CUnitQueue* m_pUnitQueue; // the shared unit queue

int m_iStartPos; // the head position for I/O (inclusive)
int m_iLastAckPos; // the last ACKed position (exclusive)
// EMPTY: m_iStartPos = m_iLastAckPos FULL: m_iStartPos = m_iLastAckPos + 1
int m_iMaxPos; // the furthest data position
// EMPTY: m_iStartPos = m_iLastAckPos FULL: m_iStartPos = m_iLastAckPos + 1
int m_iMaxPos; // the furthest data position

int m_iNotch; // the starting read point of the first unit
int m_iNotch; // the starting read point of the first unit

pthread_mutex_t m_BytesCountLock; // used to protect counters operations
int m_iBytesCount; // Number of payload bytes in the buffer
Expand Down
9 changes: 9 additions & 0 deletions srtcore/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ modified by
#include "udt.h"
#include "utilities.h"


#ifdef _DEBUG
#include <assert.h>
#define SRT_ASSERT(cond) assert(cond)
#else
#define SRT_ASSERT(cond)
#endif


enum UDTSockType
{
UDT_UNDEFINED = 0, // initial trap representation
Expand Down
30 changes: 24 additions & 6 deletions srtcore/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,24 @@ CUnit* CUnitQueue::getNextAvailUnit()
}


void CUnitQueue::makeUnitFree(CUnit * unit)
{
SRT_ASSERT(unit != NULL);
SRT_ASSERT(unit->m_iFlag != CUnit::FREE);
unit->m_iFlag = CUnit::FREE;
--m_iCount;
}


void CUnitQueue::makeUnitGood(CUnit * unit)
{
SRT_ASSERT(unit != NULL);
SRT_ASSERT(unit->m_iFlag == CUnit::FREE);
unit->m_iFlag = CUnit::GOOD;
++m_iCount;
}


CSndUList::CSndUList():
m_pHeap(NULL),
m_iArrayLength(4096),
Expand Down Expand Up @@ -487,7 +505,7 @@ void CSndQueue::init(CChannel* c, CTimer* t)
ThreadName tn("SRT:SndQ:worker");
if (0 != pthread_create(&m_WorkerThread, NULL, CSndQueue::worker, this))
{
m_WorkerThread = pthread_t();
m_WorkerThread = pthread_t();
throw CUDTException(MJ_SYSTEMRES, MN_THREAD);
}
}
Expand Down Expand Up @@ -609,7 +627,7 @@ int CSndQueue::sendto(const sockaddr* addr, CPacket& packet)
{
// send out the packet immediately (high priority), this is a control packet
m_pChannel->sendto(addr, packet);
return packet.getLength();
return (int) packet.getLength();
}


Expand Down Expand Up @@ -1027,7 +1045,7 @@ CRcvQueue::CRcvQueue():
CRcvQueue::~CRcvQueue()
{
m_bClosing = true;
if (!pthread_equal(m_WorkerThread, pthread_t()))
if (!pthread_equal(m_WorkerThread, pthread_t()))
pthread_join(m_WorkerThread, NULL);
pthread_mutex_destroy(&m_PassLock);
pthread_cond_destroy(&m_PassCond);
Expand Down Expand Up @@ -1069,15 +1087,15 @@ void CRcvQueue::init(int qsize, int payload, int version, int hsize, CChannel* c
ThreadName tn("SRT:RcvQ:worker");
if (0 != pthread_create(&m_WorkerThread, NULL, CRcvQueue::worker, this))
{
m_WorkerThread = pthread_t();
m_WorkerThread = pthread_t();
throw CUDTException(MJ_SYSTEMRES, MN_THREAD);
}
}

void* CRcvQueue::worker(void* param)
{
CRcvQueue* self = (CRcvQueue*)param;
sockaddr_any sa ( self->m_UnitQueue.m_iIPversion );
sockaddr_any sa (self->m_UnitQueue.getIPversion());
int32_t id = 0;

THREAD_STATE_INIT("SRT:RcvQ:worker");
Expand Down Expand Up @@ -1535,7 +1553,7 @@ int CRcvQueue::recvfrom(int32_t id, ref_t<CPacket> r_packet)
if (i->second.empty())
m_mBuffer.erase(i);

return packet.getLength();
return (int) packet.getLength();
}

int CRcvQueue::setListener(CUDT* u)
Expand Down
17 changes: 14 additions & 3 deletions srtcore/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,13 @@ struct CUnit

class CUnitQueue
{
friend class CRcvQueue;
friend class CRcvBuffer;

public:

CUnitQueue();
~CUnitQueue();

public:
public: // Storage size operations

/// Initialize the unit queue.
/// @param [in] size queue size
Expand All @@ -102,11 +101,23 @@ friend class CRcvBuffer;

int shrink();

public: // Operations on units

/// find an available unit for incoming packet.
/// @return Pointer to the available unit, NULL if not found.

CUnit* getNextAvailUnit();


void makeUnitFree(CUnit * unit);

void makeUnitGood(CUnit * unit);

public:

inline int getIPversion() const { return m_iIPversion; }


private:
struct CQEntry
{
Expand Down

0 comments on commit 04ade2e

Please sign in to comment.