Skip to content

Commit

Permalink
[core] Refactor CSndUList: pop CUDT, not a packet
Browse files Browse the repository at this point in the history
  • Loading branch information
maxsharabayko committed Jul 12, 2021
1 parent 31efe89 commit ac07134
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 45 deletions.
4 changes: 2 additions & 2 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7974,14 +7974,14 @@ void srt::CUDT::updateSndLossListOnACK(int32_t ackdata_seqno)
#endif

// insert this socket to snd list if it is not on the list yet
m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE);
const steady_clock::time_point currtime = steady_clock::now();
m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE, currtime);

if (m_config.bSynSending)
{
CSync::lock_signal(m_SendBlockCond, m_SendBlockLock);
}

const steady_clock::time_point currtime = steady_clock::now();
// record total time used for sending
enterCS(m_StatsLock);
m_stats.sndDuration += count_microseconds(currtime - m_stats.sndDurationCounter);
Expand Down
82 changes: 45 additions & 37 deletions srtcore/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ srt::CSndUList::~CSndUList()
delete[] m_pHeap;
}

void srt::CSndUList::update(const CUDT* u, EReschedule reschedule)
void srt::CSndUList::update(const CUDT* u, EReschedule reschedule, sync::steady_clock::time_point ts)
{
ScopedLock listguard(m_ListLock);

Expand All @@ -285,58 +285,33 @@ void srt::CSndUList::update(const CUDT* u, EReschedule reschedule)

if (n->m_iHeapLoc == 0)
{
n->m_tsTimeStamp = steady_clock::now();
n->m_tsTimeStamp = ts;
m_pTimer->interrupt();
return;
}

remove_(u);
insert_norealloc_(steady_clock::now(), u);
insert_norealloc_(ts, u);
return;
}

insert_(steady_clock::now(), u);
insert_(ts, u);
}

int srt::CSndUList::pop(sockaddr_any& w_addr, CPacket& w_pkt)
srt::CUDT* srt::CSndUList::pop()
{
ScopedLock listguard(m_ListLock);

if (-1 == m_iLastEntry)
return -1;
return NULL;

// no pop until the next scheduled time
if (m_pHeap[0]->m_tsTimeStamp > steady_clock::now())
return -1;
return NULL;

CUDT* u = m_pHeap[0]->m_pUDT;
remove_(u);

#define UST(field) ((u->m_b##field) ? "+" : "-") << #field << " "

HLOGC(qslog.Debug,
log << "SND:pop: requesting packet from @" << u->socketID() << " STATUS: " << UST(Listening)
<< UST(Connecting) << UST(Connected) << UST(Closing) << UST(Shutdown) << UST(Broken) << UST(PeerHealth)
<< UST(Opened));
#undef UST

if (!u->m_bConnected || u->m_bBroken)
return -1;

// pack a packet from the socket
const std::pair<int, steady_clock::time_point> res_time = u->packData((w_pkt));

if (res_time.first <= 0)
return -1;

w_addr = u->m_PeerAddr;

// insert a new entry, ts is the next processing time
const steady_clock::time_point send_time = res_time.second;
if (!is_zero(send_time))
insert_norealloc_(send_time, u);

return 1;
return u;
}

void srt::CSndUList::remove(const CUDT* u)
Expand Down Expand Up @@ -630,18 +605,51 @@ void* srt::CSndQueue::worker(void* param)
}
THREAD_RESUMED();

// it is time to send the next pkt
sockaddr_any addr;
CPacket pkt;
if (self->m_pSndUList->pop((addr), (pkt)) < 0)
// Get a socket with a send request if any.
CUDT* u = self->m_pSndUList->pop();
if (u == NULL)
{
#if defined(SRT_DEBUG_SNDQ_HIGHRATE)
self->m_WorkerStats.lNotReadyPop++;
#endif /* SRT_DEBUG_SNDQ_HIGHRATE */
continue;
}

#define UST(field) ((u->m_b##field) ? "+" : "-") << #field << " "
HLOGC(qslog.Debug,
log << "CSndQueue: requesting packet from @" << u->socketID() << " STATUS: " << UST(Listening)
<< UST(Connecting) << UST(Connected) << UST(Closing) << UST(Shutdown) << UST(Broken) << UST(PeerHealth)
<< UST(Opened));
#undef UST

if (!u->m_bConnected || u->m_bBroken)
{
#if defined(SRT_DEBUG_SNDQ_HIGHRATE)
self->m_WorkerStats.lNotReadyPop++;
#endif /* SRT_DEBUG_SNDQ_HIGHRATE */
continue;
}

// pack a packet from the socket
CPacket pkt;
const std::pair<int, steady_clock::time_point> res_time = u->packData((pkt));

// Check if payload size is invalid.
if (res_time.first <= 0)
{
#if defined(SRT_DEBUG_SNDQ_HIGHRATE)
self->m_WorkerStats.lNotReadyPop++;
#endif /* SRT_DEBUG_SNDQ_HIGHRATE */
continue;
}

const sockaddr_any addr = u->m_PeerAddr;
// Insert a new entry, send_time is the next processing time.
// TODO: maybe reschedule by taking the smaller time?
const steady_clock::time_point send_time = res_time.second;
if (!is_zero(send_time))
self->m_pSndUList->update(u, CSndUList::DONT_RESCHEDULE, send_time);

HLOGC(qslog.Debug, log << self->CONID() << "chn:SENDING: " << pkt.Info());
self->m_pChannel->sendto(addr, pkt);

Expand Down
11 changes: 5 additions & 6 deletions srtcore/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,12 @@ class CSndUList
/// Update the timestamp of the UDT instance on the list.
/// @param [in] u pointer to the UDT instance
/// @param [in] reschedule if the timestamp should be rescheduled
void update(const CUDT* u, EReschedule reschedule);
/// @param [in] ts the next time to trigger sending logic on the CUDT
void update(const CUDT* u, EReschedule reschedule, sync::steady_clock::time_point ts = sync::steady_clock::now());

/// Retrieve the next packet and peer address from the first entry, and reschedule it in the queue.
/// @param [out] addr destination address of the next packet
/// @param [out] pkt the next packet to be sent
/// @return 1 if successfully retrieved, -1 if no packet found.
int pop(sockaddr_any& addr, CPacket& pkt);
/// Retrieve the next (in time) socket from the heap to process its sending request.
/// @return a pointer to CUDT instance to process next.
CUDT* pop();

/// Remove UDT instance from the list.
/// @param [in] u pointer to the UDT instance
Expand Down

0 comments on commit ac07134

Please sign in to comment.