Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring mutexes and evet around CSndUList #2045

Merged
merged 3 commits into from
Jul 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ option(ENABLE_CXX_DEPS "Extra library dependencies in srt.pc for the CXX librari
option(USE_STATIC_LIBSTDCXX "Should use static rather than shared libstdc++" OFF)
option(ENABLE_INET_PTON "Set to OFF to prevent usage of inet_pton when building against modern SDKs while still requiring compatibility with older Windows versions, such as Windows XP, Windows Server 2003 etc." ON)
option(ENABLE_CODE_COVERAGE "Enable code coverage reporting" OFF)
option(ENABLE_MONOTONIC_CLOCK "Enforced clock_gettime with monotonic clock on GC CV /temporary fix for #729/" OFF)
option(ENABLE_MONOTONIC_CLOCK "Enforced clock_gettime with monotonic clock on GC CV" OFF)
option(ENABLE_STDCXX_SYNC "Use C++11 chrono and threads for timing instead of pthreads" OFF)
option(USE_OPENSSL_PC "Use pkg-config to find OpenSSL libraries" ON)
option(USE_BUSY_WAITING "Enable more accurate sending times at a cost of potentially higher CPU load" OFF)
Expand Down
4 changes: 2 additions & 2 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7974,14 +7974,14 @@ void srt::CUDT::updateSndLossListOnACK(int32_t ackdata_seqno)
#endif

// insert this socket to snd list if it is not on the list yet
m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE);
const steady_clock::time_point currtime = steady_clock::now();
m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE, currtime);

if (m_config.bSynSending)
{
CSync::lock_signal(m_SendBlockCond, m_SendBlockLock);
}

const steady_clock::time_point currtime = steady_clock::now();
// record total time used for sending
enterCS(m_StatsLock);
m_stats.sndDuration += count_microseconds(currtime - m_stats.sndDurationCounter);
Expand Down
136 changes: 76 additions & 60 deletions srtcore/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,94 +255,71 @@ void srt::CUnitQueue::makeUnitGood(CUnit* unit)
unit->m_iFlag = CUnit::GOOD;
}

srt::CSndUList::CSndUList()
srt::CSndUList::CSndUList(sync::CTimer* pTimer)
: m_pHeap(NULL)
, m_iArrayLength(512)
, m_iLastEntry(-1)
, m_ListLock()
, m_pWindowLock(NULL)
, m_pWindowCond(NULL)
, m_pTimer(NULL)
, m_pTimer(pTimer)
{
setupCond(m_ListCond, "CSndUListCond");
m_pHeap = new CSNode*[m_iArrayLength];
}

srt::CSndUList::~CSndUList()
{
releaseCond(m_ListCond);
delete[] m_pHeap;
}

void srt::CSndUList::update(const CUDT* u, EReschedule reschedule)
void srt::CSndUList::update(const CUDT* u, EReschedule reschedule, sync::steady_clock::time_point ts)
{
ScopedLock listguard(m_ListLock);

CSNode* n = u->m_pSNode;

if (n->m_iHeapLoc >= 0)
{
if (!reschedule) // EReschedule to bool conversion, predicted.
if (reschedule == DONT_RESCHEDULE)
return;

if (n->m_tsTimeStamp <= ts)
return;

if (n->m_iHeapLoc == 0)
{
n->m_tsTimeStamp = steady_clock::now();
n->m_tsTimeStamp = ts;
m_pTimer->interrupt();
return;
}

remove_(u);
insert_norealloc_(steady_clock::now(), u);
insert_norealloc_(ts, u);
return;
}

insert_(steady_clock::now(), u);
insert_(ts, u);
}

int srt::CSndUList::pop(sockaddr_any& w_addr, CPacket& w_pkt)
srt::CUDT* srt::CSndUList::pop()
{
ScopedLock listguard(m_ListLock);

if (-1 == m_iLastEntry)
return -1;
return NULL;

// no pop until the next schedulled time
// no pop until the next scheduled time
if (m_pHeap[0]->m_tsTimeStamp > steady_clock::now())
return -1;
return NULL;

CUDT* u = m_pHeap[0]->m_pUDT;
remove_(u);

#define UST(field) ((u->m_b##field) ? "+" : "-") << #field << " "

HLOGC(qslog.Debug,
log << "SND:pop: requesting packet from @" << u->socketID() << " STATUS: " << UST(Listening)
<< UST(Connecting) << UST(Connected) << UST(Closing) << UST(Shutdown) << UST(Broken) << UST(PeerHealth)
<< UST(Opened));
#undef UST

if (!u->m_bConnected || u->m_bBroken)
return -1;

// pack a packet from the socket
const std::pair<int, steady_clock::time_point> res_time = u->packData((w_pkt));

if (res_time.first <= 0)
return -1;

w_addr = u->m_PeerAddr;

// insert a new entry, ts is the next processing time
const steady_clock::time_point send_time = res_time.second;
if (!is_zero(send_time))
insert_norealloc_(send_time, u);

return 1;
return u;
}

void srt::CSndUList::remove(const CUDT* u)
{
ScopedLock listguard(m_ListLock);

remove_(u);
}

Expand All @@ -356,6 +333,21 @@ steady_clock::time_point srt::CSndUList::getNextProcTime()
return m_pHeap[0]->m_tsTimeStamp;
}

void srt::CSndUList::waitNonEmpty() const
{
UniqueLock listguard(m_ListLock);
if (m_iLastEntry >= 0)
return;

m_ListCond.wait(listguard);
}

void srt::CSndUList::signalInterrupt() const
{
ScopedLock listguard(m_ListLock);
m_ListCond.notify_all();
}

void srt::CSndUList::realloc_()
{
CSNode** temp = NULL;
Expand Down Expand Up @@ -420,7 +412,8 @@ void srt::CSndUList::insert_norealloc_(const steady_clock::time_point& ts, const
// first entry, activate the sending queue
if (0 == m_iLastEntry)
{
CSync::lock_signal(*m_pWindowCond, *m_pWindowLock);
// m_ListLock is assumed to be locked.
m_ListCond.notify_all();
}
}

Expand Down Expand Up @@ -468,10 +461,8 @@ srt::CSndQueue::CSndQueue()
: m_pSndUList(NULL)
, m_pChannel(NULL)
, m_pTimer(NULL)
, m_WindowCond()
, m_bClosing(false)
{
setupCond(m_WindowCond, "Window");
}

srt::CSndQueue::~CSndQueue()
Expand All @@ -483,14 +474,14 @@ srt::CSndQueue::~CSndQueue()
m_pTimer->interrupt();
}

CSync::lock_signal(m_WindowCond, m_WindowLock);
// Unblock CSndQueue worker thread if it is waiting.
m_pSndUList->signalInterrupt();

if (m_WorkerThread.joinable())
{
HLOGC(rslog.Debug, log << "SndQueue: EXIT");
m_WorkerThread.join();
}
releaseCond(m_WindowCond);

delete m_pSndUList;
}
Expand All @@ -510,12 +501,9 @@ int srt::CSndQueue::m_counter = 0;

void srt::CSndQueue::init(CChannel* c, CTimer* t)
{
m_pChannel = c;
m_pTimer = t;
m_pSndUList = new CSndUList;
m_pSndUList->m_pWindowLock = &m_WindowLock;
m_pSndUList->m_pWindowCond = &m_WindowCond;
m_pSndUList->m_pTimer = m_pTimer;
m_pChannel = c;
m_pTimer = t;
m_pSndUList = new CSndUList(t);

#if ENABLE_LOGGING
++m_counter;
Expand Down Expand Up @@ -575,14 +563,11 @@ void* srt::CSndQueue::worker(void* param)
self->m_WorkerStats.lNotReadyTs++;
#endif /* SRT_DEBUG_SNDQ_HIGHRATE */

UniqueLock windlock(self->m_WindowLock);
CSync windsync(self->m_WindowCond, windlock);

// wait here if there is no sockets with data to be sent
THREAD_PAUSED();
if (!self->m_bClosing && (self->m_pSndUList->m_iLastEntry < 0))
if (!self->m_bClosing)
{
windsync.wait();
self->m_pSndUList->waitNonEmpty();

#if defined(SRT_DEBUG_SNDQ_HIGHRATE)
self->m_WorkerStats.lCondWait++;
Expand Down Expand Up @@ -623,18 +608,49 @@ void* srt::CSndQueue::worker(void* param)
}
THREAD_RESUMED();

// it is time to send the next pkt
sockaddr_any addr;
CPacket pkt;
if (self->m_pSndUList->pop((addr), (pkt)) < 0)
// Get a socket with a send request if any.
CUDT* u = self->m_pSndUList->pop();
if (u == NULL)
{
#if defined(SRT_DEBUG_SNDQ_HIGHRATE)
self->m_WorkerStats.lNotReadyPop++;
#endif /* SRT_DEBUG_SNDQ_HIGHRATE */
continue;
}

#define UST(field) ((u->m_b##field) ? "+" : "-") << #field << " "
HLOGC(qslog.Debug,
log << "CSndQueue: requesting packet from @" << u->socketID() << " STATUS: " << UST(Listening)
<< UST(Connecting) << UST(Connected) << UST(Closing) << UST(Shutdown) << UST(Broken) << UST(PeerHealth)
<< UST(Opened));
#undef UST

if (!u->m_bConnected || u->m_bBroken)
{
#if defined(SRT_DEBUG_SNDQ_HIGHRATE)
self->m_WorkerStats.lNotReadyPop++;
#endif /* SRT_DEBUG_SNDQ_HIGHRATE */
continue;
}

// pack a packet from the socket
CPacket pkt;
const std::pair<int, steady_clock::time_point> res_time = u->packData((pkt));

// Check if payload size is invalid.
if (res_time.first <= 0)
{
#if defined(SRT_DEBUG_SNDQ_HIGHRATE)
self->m_WorkerStats.lNotReadyPop++;
#endif /* SRT_DEBUG_SNDQ_HIGHRATE */
continue;
}

const sockaddr_any addr = u->m_PeerAddr;
const steady_clock::time_point next_send_time = res_time.second;
if (!is_zero(next_send_time))
self->m_pSndUList->update(u, CSndUList::DO_RESCHEDULE, next_send_time);

HLOGC(qslog.Debug, log << self->CONID() << "chn:SENDING: " << pkt.Info());
self->m_pChannel->sendto(addr, pkt);

Expand Down
46 changes: 21 additions & 25 deletions srtcore/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,8 @@ struct CSNode

class CSndUList
{
friend class CSndQueue;

public:
CSndUList();
CSndUList(sync::CTimer* pTimer);
~CSndUList();

public:
Expand All @@ -175,30 +173,31 @@ class CSndUList
/// Update the timestamp of the UDT instance on the list.
/// @param [in] u pointer to the UDT instance
/// @param [in] reschedule if the timestamp should be rescheduled
/// @param [in] ts the next time to trigger sending logic on the CUDT
void update(const CUDT* u, EReschedule reschedule, sync::steady_clock::time_point ts = sync::steady_clock::now());

void update(const CUDT* u, EReschedule reschedule);

/// Retrieve the next packet and peer address from the first entry, and reschedule it in the queue.
/// @param [out] addr destination address of the next packet
/// @param [out] pkt the next packet to be sent
/// @return 1 if successfully retrieved, -1 if no packet found.

int pop(sockaddr_any& addr, CPacket& pkt);
/// Retrieve the next (in time) socket from the heap to process its sending request.
/// @return a pointer to CUDT instance to process next.
CUDT* pop();

/// Remove UDT instance from the list.
/// @param [in] u pointer to the UDT instance

void remove(const CUDT* u);
void remove(const CUDT* u);// EXCLUDES(m_ListLock);

/// Retrieve the next scheduled processing time.
/// @return Scheduled processing time of the first UDT socket in the list.

sync::steady_clock::time_point getNextProcTime();

/// Wait for the list to become non empty.
void waitNonEmpty() const;

/// Signal to stop waiting in waitNonEmpty().
void signalInterrupt() const;

private:
/// Doubles the size of the list.
///
void realloc_();
void realloc_();// REQUIRES(m_ListLock);

/// Insert a new UDT instance into the list with realloc if required.
///
Expand All @@ -211,21 +210,21 @@ class CSndUList
///
/// @param [in] ts time stamp: next processing time
/// @param [in] u pointer to the UDT instance
void insert_norealloc_(const sync::steady_clock::time_point& ts, const CUDT* u);
void insert_norealloc_(const sync::steady_clock::time_point& ts, const CUDT* u);// REQUIRES(m_ListLock);

/// Removes CUDT entry from the list.
/// If the last entry is removed, calls sync::CTimer::interrupt().
void remove_(const CUDT* u);

private:
CSNode** m_pHeap; // The heap array
int m_iArrayLength; // physical length of the array
int m_iLastEntry; // position of last entry on the heap array
int m_iLastEntry; // position of last entry on the heap array or -1 if empty.

sync::Mutex m_ListLock;
mutable sync::Mutex m_ListLock; // Protects the list (m_pHeap, m_iArrayLength, m_iLastEntry).
mutable sync::Condition m_ListCond;

sync::Mutex* m_pWindowLock;
sync::Condition* m_pWindowCond;

sync::CTimer* m_pTimer;
sync::CTimer* const m_pTimer;

private:
CSndUList(const CSndUList&);
Expand Down Expand Up @@ -462,9 +461,6 @@ class CSndQueue
CChannel* m_pChannel; // The UDP channel for data sending
srt::sync::CTimer* m_pTimer; // Timing facility

srt::sync::Mutex m_WindowLock;
srt::sync::Condition m_WindowCond;

srt::sync::atomic<bool> m_bClosing; // closing the worker

#if defined(SRT_DEBUG_SNDQ_HIGHRATE) //>>debug high freq worker
Expand Down