Skip to content

Commit

Permalink
[core] Minor internal logging format changes (#2939).
Browse files Browse the repository at this point in the history
Also renamed CUDT::m_bTsbPdAckWakeup.
Added some function to the CRcvBuffer.

Co-authored-by: Sektor van Skijlen <ethouris@gmail.com>
  • Loading branch information
maxsharabayko and ethouris committed Apr 24, 2024
1 parent 973be58 commit 882dff9
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 21 deletions.
19 changes: 16 additions & 3 deletions srtcore/buffer_rcv.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,20 @@ class CRcvBuffer
return (m_iMaxPosOff == 0);
}

/// Returns the currently used number of cells, including
/// gaps with empty cells, or in other words, the distance
/// between the initial position and the youngest received packet.
size_t size() const
{
return m_iMaxPosOff;
}

// Returns true if the buffer is full. Requires locking.
bool full() const
{
return size() == capacity();
}

/// Return buffer capacity.
/// One slot had to be empty in order to tell the difference between "empty buffer" and "full buffer".
/// E.g. m_iFirstNonreadPos would again point to m_iStartPos if m_szSize entries are added continiously.
Expand Down Expand Up @@ -333,9 +347,8 @@ class CRcvBuffer
EntryStatus status;
};

//static Entry emptyEntry() { return Entry { NULL, EntryState_Empty }; }

FixedArray<Entry> m_entries;
typedef FixedArray<Entry> entries_t;
entries_t m_entries;

const size_t m_szSize; // size of the array of units (buffer)
CUnitQueue* m_pUnitQueue; // the shared unit queue
Expand Down
54 changes: 40 additions & 14 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ void srt::CUDT::construct()
m_iPeerTsbPdDelay_ms = 0;
m_bPeerTsbPd = false;
m_bTsbPd = false;
m_bTsbPdAckWakeup = false;
m_bTsbPdNeedsWakeup = false;
m_bGroupTsbPd = false;
m_bPeerTLPktDrop = false;
m_bBufferWasFull = false;
Expand Down Expand Up @@ -5405,7 +5405,7 @@ void * srt::CUDT::tsbpd(void* param)
CUniqueSync recvdata_lcc (self->m_RecvLock, self->m_RecvDataCond);
CSync tsbpd_cc(self->m_RcvTsbPdCond, recvdata_lcc.locker());

self->m_bTsbPdAckWakeup = true;
self->m_bTsbPdNeedsWakeup = true;
while (!self->m_bClosing)
{
steady_clock::time_point tsNextDelivery; // Next packet delivery time
Expand All @@ -5425,6 +5425,21 @@ void * srt::CUDT::tsbpd(void* param)
const bool is_time_to_deliver = !is_zero(info.tsbpd_time) && (tnow >= info.tsbpd_time);
tsNextDelivery = info.tsbpd_time;

#if ENABLE_HEAVY_LOGGING
if (info.seqno == SRT_SEQNO_NONE)
{
HLOGC(tslog.Debug, log << self->CONID() << "sok/tsbpd: packet check: NO PACKETS");
}
else
{
HLOGC(tslog.Debug, log << self->CONID() << "sok/tsbpd: packet check: %"
<< info.seqno << " T=" << FormatTime(tsNextDelivery)
<< " diff-now-playtime=" << FormatDuration(tnow - tsNextDelivery)
<< " ready=" << is_time_to_deliver
<< " ondrop=" << info.seq_gap);
}
#endif

if (!self->m_bTLPktDrop)
{
rxready = !info.seq_gap && is_time_to_deliver;
Expand Down Expand Up @@ -5470,8 +5485,8 @@ void * srt::CUDT::tsbpd(void* param)
if (rxready)
{
HLOGC(tslog.Debug,
log << self->CONID() << "tsbpd: PLAYING PACKET seq=" << info.seqno << " (belated "
<< (count_milliseconds(steady_clock::now() - info.tsbpd_time)) << "ms)");
log << self->CONID() << "tsbpd: PLAYING PACKET seq=" << info.seqno << " (belated "
<< FormatDuration<DUNIT_MS>(steady_clock::now() - info.tsbpd_time) << ")");
/*
* There are packets ready to be delivered
* signal a waiting "recv" call if there is any data available
Expand Down Expand Up @@ -5534,19 +5549,21 @@ void * srt::CUDT::tsbpd(void* param)
if (self->m_bClosing)
break;

SRT_ATR_UNUSED bool bWokeUpOnSignal = true;

if (!is_zero(tsNextDelivery))
{
IF_HEAVY_LOGGING(const steady_clock::duration timediff = tsNextDelivery - tnow);
/*
* Buffer at head of queue is not ready to play.
* Schedule wakeup when it will be.
*/
self->m_bTsbPdAckWakeup = false;
self->m_bTsbPdNeedsWakeup = false;
HLOGC(tslog.Debug,
log << self->CONID() << "tsbpd: FUTURE PACKET seq=" << info.seqno
<< " T=" << FormatTime(tsNextDelivery) << " - waiting " << count_milliseconds(timediff) << "ms");
log << self->CONID() << "tsbpd: FUTURE PACKET seq=" << info.seqno
<< " T=" << FormatTime(tsNextDelivery) << " - waiting " << FormatDuration<DUNIT_MS>(timediff));
THREAD_PAUSED();
tsbpd_cc.wait_until(tsNextDelivery);
bWokeUpOnSignal = tsbpd_cc.wait_until(tsNextDelivery);
THREAD_RESUMED();
}
else
Expand All @@ -5563,13 +5580,15 @@ void * srt::CUDT::tsbpd(void* param)
* - Closing the connection
*/
HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: no data, scheduling wakeup at ack");
self->m_bTsbPdAckWakeup = true;
self->m_bTsbPdNeedsWakeup = true;
THREAD_PAUSED();
tsbpd_cc.wait();
THREAD_RESUMED();
}

HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: WAKE UP!!!");
HLOGC(tslog.Debug,
log << self->CONID() << "tsbpd: WAKE UP [" << (bWokeUpOnSignal ? "signal" : "timeout") << "]!!! - "
<< "NOW=" << FormatTime(steady_clock::now()));
}
THREAD_EXIT();
HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: EXITING");
Expand Down Expand Up @@ -6951,6 +6970,12 @@ bool srt::CUDT::isRcvBufferReadyNoLock() const
return m_pRcvBuffer->isRcvDataReady(steady_clock::now());
}

bool srt::CUDT::isRcvBufferFull() const
{
ScopedLock lck(m_RcvBufferLock);
return m_pRcvBuffer->full();
}

// int by_exception: accepts values of CUDTUnited::ErrorHandling:
// - 0 - by return value
// - 1 - by exception
Expand Down Expand Up @@ -7738,8 +7763,8 @@ bool srt::CUDT::updateCC(ETransmissionEvent evt, const EventVariant arg)
m_iCongestionWindow = cgwindow;
#if ENABLE_HEAVY_LOGGING
HLOGC(rslog.Debug,
log << CONID() << "updateCC: updated values from congctl: interval=" << count_microseconds(m_tdSendInterval) << " us ("
<< "tk (" << m_CongCtl->pktSndPeriod_us() << "us) cgwindow="
log << CONID() << "updateCC: updated values from congctl: interval=" << FormatDuration<DUNIT_US>(m_tdSendInterval)
<< " (cfg:" << m_CongCtl->pktSndPeriod_us() << "us) cgwindow="
<< std::setprecision(3) << cgwindow);
#endif
}
Expand Down Expand Up @@ -8141,7 +8166,7 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)
/* Newly acknowledged data, signal TsbPD thread */
CUniqueSync tslcc (m_RecvLock, m_RcvTsbPdCond);
// m_bTsbPdAckWakeup is protected by m_RecvLock in the tsbpd() thread
if (m_bTsbPdAckWakeup)
if (m_bTsbPdNeedsWakeup)
tslcc.notify_one();
}
else
Expand Down Expand Up @@ -8204,7 +8229,8 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)
else if (!bNeedFullAck)
{
// Not possible (m_iRcvCurrSeqNo+1 <% m_iRcvLastAck ?)
LOGC(xtlog.Error, log << CONID() << "sendCtrl(UMSG_ACK): IPE: curr %" << ack << " <% last %" << m_iRcvLastAck);
LOGC(xtlog.Error, log << CONID() << "sendCtrl(UMSG_ACK): IPE: curr(" << reason << ") %"
<< ack << " <% last %" << m_iRcvLastAck);
return nbsent;
}

Expand Down
12 changes: 10 additions & 2 deletions srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ class CUDT
#endif

int32_t rcvSeqNo() const { return m_iRcvCurrSeqNo; }
SRT_ATTR_REQUIRES(m_RecvAckLock)
int flowWindowSize() const { return m_iFlowWindowSize; }
int32_t deliveryRate() const { return m_iDeliveryRate; }
int bandwidth() const { return m_iBandwidth; }
Expand Down Expand Up @@ -388,6 +389,7 @@ class CUDT

/// Returns the number of packets in flight (sent, but not yet acknowledged).
/// @returns The number of packets in flight belonging to the interval [0; ...)
SRT_ATTR_REQUIRES(m_RecvAckLock)
int32_t getFlightSpan() const
{
return getFlightSpan(m_iSndLastAck, m_iSndCurrSeqNo);
Expand Down Expand Up @@ -697,6 +699,8 @@ class CUDT
/// the receiver fresh loss list.
void unlose(const CPacket& oldpacket);
void dropFromLossLists(int32_t from, int32_t to);

SRT_ATTR_REQUIRES(m_RecvAckLock)
bool getFirstNoncontSequence(int32_t& w_seq, std::string& w_log_reason);

SRT_ATTR_EXCLUDES(m_ConnectionLock)
Expand Down Expand Up @@ -752,6 +756,9 @@ class CUDT
SRT_ATTR_REQUIRES(m_RcvBufferLock)
bool isRcvBufferReadyNoLock() const;

SRT_ATTR_EXCLUDES(m_RcvBufferLock)
bool isRcvBufferFull() const;

// TSBPD thread main function.
static void* tsbpd(void* param);

Expand Down Expand Up @@ -987,7 +994,7 @@ class CUDT

sync::CThread m_RcvTsbPdThread; // Rcv TsbPD Thread handle
sync::Condition m_RcvTsbPdCond; // TSBPD signals if reading is ready. Use together with m_RecvLock
bool m_bTsbPdAckWakeup; // Signal TsbPd thread on Ack sent
bool m_bTsbPdNeedsWakeup; // Signal TsbPd thread to wake up on RCV buffer state change.
sync::Mutex m_RcvTsbPdStartupLock; // Protects TSBPD thread creating and joining

CallbackHolder<srt_listen_callback_fn> m_cbAcceptHook;
Expand Down Expand Up @@ -1136,7 +1143,8 @@ class CUDT
/// @return -2 The incoming packet exceeds the expected sequence by more than a length of the buffer (irrepairable discrepancy).
int handleSocketPacketReception(const std::vector<CUnit*>& incoming, bool& w_new_inserted, bool& w_was_sent_in_order, CUDT::loss_seqs_t& w_srt_loss_seqs);

/// Get the packet's TSBPD time.
/// Get the packet's TSBPD time -
/// the time when it is passed to the reading application.
/// The @a grp passed by void* is not used yet
/// and shall not be used when ENABLE_BONDING=0.
time_point getPktTsbPdTime(void* grp, const CPacket& packet);
Expand Down
4 changes: 2 additions & 2 deletions srtcore/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1092,8 +1092,8 @@ bool srt::CRendezvousQueue::qualifyToHandle(EReadStatus rst,
if ((rst == RST_AGAIN || i->m_iID != iDstSockID) && tsNow <= tsRepeat)
{
HLOGC(cnlog.Debug,
log << "RID:@" << i->m_iID << std::fixed << count_microseconds(tsNow - tsLastReq) / 1000.0
<< " ms passed since last connection request.");
log << "RID:@" << i->m_iID << " " << FormatDuration<DUNIT_MS>(tsNow - tsLastReq)
<< " passed since last connection request.");

continue;
}
Expand Down

0 comments on commit 882dff9

Please sign in to comment.