Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improving CUnitQueue performance: don't adjust mcount #2405

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1867,7 +1867,7 @@ int srt::CUDTUnited::connectIn(CUDTSocket* s, const sockaddr_any& target_addr, i
{
s->core().startConnect(target_addr, forced_isn);
}
catch (CUDTException& e) // Interceptor, just to change the state.
catch (const CUDTException&) // Interceptor, just to change the state.
{
s->m_Status = SRTS_OPENED;
throw;
Expand Down
32 changes: 5 additions & 27 deletions srtcore/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ srt::CUnitQueue::CUnitQueue()
, m_iSize(0)
, m_iCount(0)
, m_iMSS()
, m_iIPversion()
{
}

Expand All @@ -94,7 +93,7 @@ srt::CUnitQueue::~CUnitQueue()
}
}

int srt::CUnitQueue::init(int size, int mss, int version)
int srt::CUnitQueue::init(int size, int mss)
{
CQEntry* tempq = NULL;
CUnit* tempu = NULL;
Expand Down Expand Up @@ -131,7 +130,6 @@ int srt::CUnitQueue::init(int size, int mss, int version)

m_iSize = size;
m_iMSS = mss;
m_iIPversion = version;

return 0;
}
Expand All @@ -140,22 +138,6 @@ int srt::CUnitQueue::init(int size, int mss, int version)
// Consider merging.
int srt::CUnitQueue::increase()
{
// adjust/correct m_iCount
int real_count = 0;
CQEntry* p = m_pQEntry;
while (p != NULL)
{
CUnit* u = p->m_pUnit;
for (CUnit* end = u + p->m_iSize; u != end; ++u)
if (u->m_iFlag != CUnit::FREE)
++real_count;

if (p == m_pLastQueue)
p = NULL;
else
p = p->m_pNext;
}
m_iCount = real_count;
if (double(m_iCount) / m_iSize < 0.9)
return -1;

Expand Down Expand Up @@ -202,12 +184,6 @@ int srt::CUnitQueue::increase()
return 0;
}

int srt::CUnitQueue::shrink()
{
// currently queue cannot be shrunk.
return -1;
}

srt::CUnit* srt::CUnitQueue::getNextAvailUnit()
{
if (m_iCount * 10 > m_iSize * 9)
Expand Down Expand Up @@ -1139,6 +1115,7 @@ srt::CRcvQueue::CRcvQueue()
, m_pHash(NULL)
, m_pChannel(NULL)
, m_pTimer(NULL)
, m_iIPversion()
, m_szPayloadSize()
, m_bClosing(false)
, m_LSLock()
Expand Down Expand Up @@ -1186,9 +1163,10 @@ srt::sync::atomic<int> srt::CRcvQueue::m_counter(0);

void srt::CRcvQueue::init(int qsize, size_t payload, int version, int hsize, CChannel* cc, CTimer* t)
{
m_iIPversion = version;
m_szPayloadSize = payload;

m_UnitQueue.init(qsize, (int)payload, version);
m_UnitQueue.init(qsize, (int)payload);

m_pHash = new CHash;
m_pHash->init(hsize);
Expand All @@ -1215,7 +1193,7 @@ void srt::CRcvQueue::init(int qsize, size_t payload, int version, int hsize, CCh
void* srt::CRcvQueue::worker(void* param)
{
CRcvQueue* self = (CRcvQueue*)param;
sockaddr_any sa(self->m_UnitQueue.getIPversion());
sockaddr_any sa(self->getIPversion());
int32_t id = 0;

#if ENABLE_LOGGING
Expand Down
32 changes: 11 additions & 21 deletions srtcore/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,39 +90,29 @@ class CUnitQueue
~CUnitQueue();

public: // Storage size operations
/// Initialize the unit queue.
/// @param [in] size queue size
/// @param [in] mss maximum segment size
/// @param [in] version IP version
/// @return 0: success, -1: failure.
int init(int size, int mss, int version);
/// Initialize the unit queue.
/// @param [in] size queue size
/// @param [in] mss maximum segment size
/// @return 0: success, -1: failure.
int init(int size, int mss);

/// Increase (double) the unit queue size.
/// @return 0: success, -1: failure.

int increase();

/// Decrease (halve) the unit queue size.
/// @return 0: success, -1: failure.

int shrink();

public:
int size() const { return m_iSize - m_iCount; }
int capacity() const { return m_iSize; }

public: // Operations on units
/// find an available unit for incoming packet.
/// @return Pointer to the available unit, NULL if not found.
/// find an available unit for incoming packet.
/// @return Pointer to the available unit, NULL if not found.
CUnit* getNextAvailUnit();

void makeUnitFree(CUnit* unit);

void makeUnitGood(CUnit* unit);

public:
inline int getIPversion() const { return m_iIPversion; }

private:
struct CQEntry
{
Expand All @@ -141,7 +131,6 @@ class CUnitQueue
sync::atomic<int> m_iCount; // total number of valid (occupied) packets in the queue

int m_iMSS; // unit buffer size
int m_iIPversion; // IP version

private:
CUnitQueue(const CUnitQueue&);
Expand Down Expand Up @@ -510,20 +499,20 @@ class CRcvQueue
/// @param [in] hsize hash table size
/// @param [in] c UDP channel to be associated to the queue
/// @param [in] t timer

void init(int size, size_t payload, int version, int hsize, CChannel* c, sync::CTimer* t);

/// Read a packet for a specific UDT socket id.
/// @param [in] id Socket ID
/// @param [out] packet received packet
/// @return Data size of the packet

int recvfrom(int32_t id, CPacket& to_packet);

void stopWorker();

void setClosing() { m_bClosing = true; }

int getIPversion() { return m_iIPversion; }

private:
static void* worker(void* param);
sync::CThread m_WorkerThread;
Expand All @@ -540,7 +529,8 @@ class CRcvQueue
CChannel* m_pChannel; // UDP channel for receving packets
sync::CTimer* m_pTimer; // shared timer with the snd queue

size_t m_szPayloadSize; // packet payload size
int m_iIPversion; // IP version
size_t m_szPayloadSize; // packet payload size

sync::atomic<bool> m_bClosing; // closing the worker
#if ENABLE_LOGGING
Expand Down
2 changes: 1 addition & 1 deletion test/test_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class CRcvBufferReadMsg
// make_unique is unfortunatelly C++14
m_unit_queue = unique_ptr<CUnitQueue>(new CUnitQueue);
ASSERT_NE(m_unit_queue.get(), nullptr);
m_unit_queue->init(m_buff_size_pkts, 1500, AF_INET);
m_unit_queue->init(m_buff_size_pkts, 1500);

#if ENABLE_NEW_RCVBUFFER
const bool enable_msg_api = m_use_message_api;
Expand Down
6 changes: 3 additions & 3 deletions test/test_unitqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ TEST(CUnitQueue, Increase)
{
const int buffer_size_pkts = 4;
CUnitQueue unit_queue;
unit_queue.init(buffer_size_pkts, 1500, AF_INET);
unit_queue.init(buffer_size_pkts, 1500);

vector<CUnit*> taken_units;
for (int i = 0; i < 5 * buffer_size_pkts; ++i)
Expand All @@ -38,7 +38,7 @@ TEST(CUnitQueue, IncreaseAndFree)
{
const int buffer_size_pkts = 4;
CUnitQueue unit_queue;
unit_queue.init(buffer_size_pkts, 1500, AF_INET);
unit_queue.init(buffer_size_pkts, 1500);

CUnit* taken_unit = nullptr;
for (int i = 0; i < 5 * buffer_size_pkts; ++i)
Expand All @@ -63,7 +63,7 @@ TEST(CUnitQueue, IncreaseAndFreeGrouped)
{
const int buffer_size_pkts = 4;
CUnitQueue unit_queue;
unit_queue.init(buffer_size_pkts, 1500, AF_INET);
unit_queue.init(buffer_size_pkts, 1500);

vector<CUnit*> taken_units;
for (int i = 0; i < 5 * buffer_size_pkts; ++i)
Expand Down