Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added more logging around delivery #320

Merged
merged 10 commits into from
Nov 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions srtcore/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,65 @@ CPacket* CRcvBuffer::getRcvReadyPacket()
return 0;
}

#if ENABLE_HEAVY_LOGGING
// This function is for debug purposes only and it's called only
// from within HLOG* macros.
void CRcvBuffer::reportBufferStats()
{
int nmissing = 0;
int32_t low_seq= -1, high_seq = -1;
int32_t low_ts = 0, high_ts = 0;

for (int i = m_iStartPos, n = m_iLastAckPos; i != n; i = (i + 1) % m_iSize)
{
if ( m_pUnit[i] && m_pUnit[i]->m_iFlag == CUnit::GOOD )
{
low_seq = m_pUnit[i]->m_Packet.m_iSeqNo;
low_ts = m_pUnit[i]->m_Packet.m_iTimeStamp;
break;
}
++nmissing;
}

// Not sure if a packet MUST BE at the last ack pos position, so check, just in case.
int n = m_iLastAckPos;
if (m_pUnit[n] && m_pUnit[n]->m_iFlag == CUnit::GOOD)
{
high_ts = m_pUnit[n]->m_Packet.m_iTimeStamp;
high_seq = m_pUnit[n]->m_Packet.m_iSeqNo;
}
else
{
// Possibilities are:
// m_iStartPos == m_iLastAckPos, high_ts == low_ts, defined.
// No packet: low_ts == 0, so high_ts == 0, too.
high_ts = low_ts;
}
// The 32-bit timestamps are relative and roll over oftten; what
// we really need is the timestamp difference. The only place where
// we can ask for the time base is the upper time because when trying
// to receive the time base for the lower time we'd break the requirement
// for monotonic clock.

uint64_t upper_time = high_ts;
uint64_t lower_time = low_ts;

if (lower_time > upper_time)
upper_time += uint64_t(CPacket::MAX_TIMESTAMP)+1;

int32_t timespan = upper_time - lower_time;
int seqspan = 0;
if (low_seq != -1 && high_seq != -1)
{
seqspan = CSeqNo::seqoff(low_seq, high_seq);
}

LOGC(dlog.Debug, log << "RCV BUF STATS: seqspan=%(" << low_seq << "-" << high_seq << ":" << seqspan << ") missing=" << nmissing << "pkts");
LOGC(dlog.Debug, log << "RCV BUF STATS: timespan=" << timespan << "us (lo=" << FormatTime(lower_time) << " hi=" << FormatTime(upper_time) << ")");
}

#endif // ENABLE_HEAVY_LOGGING

bool CRcvBuffer::isRcvDataReady()
{
uint64_t tsbpdtime;
Expand Down Expand Up @@ -1546,7 +1605,18 @@ void CRcvBuffer::addRcvTsbPdDriftSample(uint32_t timestamp, pthread_mutex_t& mut
printDriftOffset(m_DriftTracer.overdrift(), m_DriftTracer.drift());
#endif /* SRT_DEBUG_TSBPD_DRIFT */

#if ENABLE_HEAVY_LOGGING
uint64_t oldbase = m_ullTsbPdTimeBase;
#endif
m_ullTsbPdTimeBase += m_DriftTracer.overdrift();

HLOGC(dlog.Debug, log << "DRIFT=" << (iDrift/1000.0) << "ms AVG="
<< (m_DriftTracer.drift()/1000.0) << "ms, TB: "
<< FormatTime(oldbase) << " UPDATED TO: " << FormatTime(m_ullTsbPdTimeBase));
}
else
{
HLOGC(dlog.Debug, log << "DRIFT=" << (iDrift/1000.0) << "ms TB REMAINS: " << FormatTime(m_ullTsbPdTimeBase));
}

CGuard::leaveCS(mutex_to_lock);
Expand All @@ -1567,6 +1637,10 @@ int CRcvBuffer::readMsg(char* data, int len, ref_t<SRT_MSGCTRL> r_msgctl)
bool empty = true;
uint64_t& rplaytime = msgctl.srctime;

#ifdef ENABLE_HEAVY_LOGGING
reportBufferStats();
#endif

if (m_bTsbPdMode)
{
passack = false;
Expand Down
7 changes: 6 additions & 1 deletion srtcore/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,9 @@ class CRcvBuffer

void skipData(int len);

#if ENABLE_HEAVY_LOGGING
void reportBufferStats(); // Heavy logging Debug only
#endif

private:
/// Adjust receive queue to 1st ready to play message (tsbpdtime < now).
Expand All @@ -402,10 +405,12 @@ class CRcvBuffer

bool getRcvReadyMsg(ref_t<uint64_t> tsbpdtime, ref_t<int32_t> curpktseq);

public:

// (This is exposed as used publicly in logs)
/// Get packet delivery local time base (adjusted for wrap around)
/// @param [in] timestamp packet timestamp (relative to peer StartTime), wrapping around every ~72 min
/// @return local delivery time (usec)

uint64_t getTsbPdTimeBase(uint32_t timestamp_us);

/// Get packet local delivery time
Expand Down
17 changes: 10 additions & 7 deletions srtcore/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -423,20 +423,23 @@ int CChannel::sendto(const sockaddr* addr, CPacket& packet) const

if (packet.isControl())
{
spec << " CONTROL size=" << packet.getLength()
spec << " type=CONTROL"
<< " cmd=" << MessageTypeStr(packet.getType(), packet.getExtendedType())
<< " arg=" << packet.header(SRT_PH_MSGNO);
}
else
{
spec << " DATA size=" << packet.getLength()
<< " seq=" << packet.getSeqNo();
if (packet.getRexmitFlag())
spec << " [REXMIT]";
spec << " type=DATA"
<< " %" << packet.getSeqNo()
<< " msgno=" << MSGNO_SEQ::unwrap(packet.m_iMsgNo)
<< packet.MessageFlagStr()
<< " !" << BufferStamp(packet.m_pcData, packet.getLength());
}

HLOGC(mglog.Debug, log << "CChannel::sendto: SENDING NOW DST=" << SockaddrToString(addr)
<< " target=%" << packet.m_iID
LOGC(mglog.Debug, log << "CChannel::sendto: SENDING NOW DST=" << SockaddrToString(addr)
<< " target=@" << packet.m_iID
<< " size=" << packet.getLength()
<< " pkt.ts=" << FormatTime(packet.m_iTimeStamp)
<< spec.str());
#endif

Expand Down
12 changes: 4 additions & 8 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,6 @@ modified by

using namespace std;

#if ENABLE_HEAVY_LOGGING
#define IF_HEAVY_LOGGING(instr) instr
#else
#define IF_HEAVY_LOGGING(instr) (void)0
#endif

namespace srt_logging
{

Expand Down Expand Up @@ -8552,8 +8546,10 @@ int CUDT::processData(CUnit *in_unit)
}

HLOGC(mglog.Debug,
log << CONID() << "RECEIVED: seq=" << rpkt.m_iSeqNo << " offset=" << offset << " (" << exc_type << "/"
<< rexmitstat[pktrexmitflag] << rexmit_reason << ") FLAGS: " << packet.MessageFlagStr());
log << CONID() << "RECEIVED: seq=" << rpkt.m_iSeqNo << " offset=" << offset
<< " BUFr=" << avail_bufsize
<< " (" << exc_type << "/" << rexmitstat[pktrexmitflag] << rexmit_reason << ") FLAGS: "
<< packet.MessageFlagStr());

// Decryption should have made the crypto flags EK_NOENC.
// Otherwise it's an error.
Expand Down
50 changes: 48 additions & 2 deletions srtcore/epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,22 @@ modified by

#include "common.h"
#include "epoll.h"
#include "logging.h"
#include "udt.h"

using namespace std;

namespace srt_logging
{
extern Logger mglog;
}

using namespace srt_logging;

#if ENABLE_HEAVY_LOGGING
#define IF_DIRNAME(tested, flag, name) (tested & flag ? name : "")
#endif

CEPoll::CEPoll():
m_iIDSeed(0)
{
Expand Down Expand Up @@ -456,6 +468,9 @@ int CEPoll::wait(const int eid, set<SRTSOCKET>* readfds, set<SRTSOCKET>* writefd
int total = 0;

int64_t entertime = CTimer::getTime();

HLOGC(mglog.Debug, log << "CEPoll::wait: START for eid=" << eid);

while (true)
{
{
Expand Down Expand Up @@ -485,10 +500,13 @@ int CEPoll::wait(const int eid, set<SRTSOCKET>* readfds, set<SRTSOCKET>* writefd
throw CUDTException(MJ_NOTSUP, MN_INVAL);
}

IF_HEAVY_LOGGING(int total_noticed);
IF_HEAVY_LOGGING(ostringstream debug_sockets);
// Sockets with exceptions are returned to both read and write sets.
for (CEPollDesc::enotice_t::iterator it = ed.enotice_begin(), it_next = it; it != ed.enotice_end(); it = it_next)
{
++it_next;
IF_HEAVY_LOGGING(++total_noticed);
if (readfds && ((it->events & UDT_EPOLL_IN) || (it->events & UDT_EPOLL_ERR)))
{
if (readfds->insert(it->fd).second)
Expand All @@ -501,16 +519,28 @@ int CEPoll::wait(const int eid, set<SRTSOCKET>* readfds, set<SRTSOCKET>* writefd
++total;
}

ed.checkEdge(it); // NOTE: potentially erases 'it'.
IF_HEAVY_LOGGING(debug_sockets << " " << it->fd << ":"
<< IF_DIRNAME(it->events, SRT_EPOLL_IN, "R")
<< IF_DIRNAME(it->events, SRT_EPOLL_OUT, "W")
<< IF_DIRNAME(it->events, SRT_EPOLL_ERR, "E"));

if (ed.checkEdge(it)) // NOTE: potentially erases 'it'.
{
IF_HEAVY_LOGGING(debug_sockets << "!");
}
}

HLOGC(mglog.Debug, log << "CEPoll::wait: REPORTED " << total << "/" << total_noticed
<< debug_sockets.str());

if (lrfds || lwfds)
{
#ifdef LINUX
const int max_events = ed.m_sLocals.size();
epoll_event ev[max_events];
int nfds = ::epoll_wait(ed.m_iLocalID, ev, max_events, 0);

IF_HEAVY_LOGGING(const int prev_total = total);
for (int i = 0; i < nfds; ++ i)
{
if ((NULL != lrfds) && (ev[i].events & EPOLLIN))
Expand All @@ -524,12 +554,15 @@ int CEPoll::wait(const int eid, set<SRTSOCKET>* readfds, set<SRTSOCKET>* writefd
++ total;
}
}
HLOGC(mglog.Debug, log << "CEPoll::wait: LINUX: picking up " << (total - prev_total) << " ready fds.");

#elif defined(BSD) || defined(OSX) || (TARGET_OS_IOS == 1) || (TARGET_OS_TV == 1)
struct timespec tmout = {0, 0};
const int max_events = ed.m_sLocals.size();
struct kevent ke[max_events];

int nfds = kevent(ed.m_iLocalID, NULL, 0, ke, max_events, &tmout);
IF_HEAVY_LOGGING(const int prev_total = total);

for (int i = 0; i < nfds; ++ i)
{
Expand All @@ -544,6 +577,9 @@ int CEPoll::wait(const int eid, set<SRTSOCKET>* readfds, set<SRTSOCKET>* writefd
++ total;
}
}

HLOGC(mglog.Debug, log << "CEPoll::wait: Darwin/BSD: picking up " << (total - prev_total) << " ready fds.");

#else
//currently "select" is used for all non-Linux platforms.
//faster approaches can be applied for specific systems in the future.
Expand All @@ -566,6 +602,7 @@ int CEPoll::wait(const int eid, set<SRTSOCKET>* readfds, set<SRTSOCKET>* writefd
max_fd = *i;
}

IF_HEAVY_LOGGING(const int prev_total = total);
timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 0;
Expand All @@ -585,18 +622,27 @@ int CEPoll::wait(const int eid, set<SRTSOCKET>* readfds, set<SRTSOCKET>* writefd
}
}
}

HLOGC(mglog.Debug, log << "CEPoll::wait: select(otherSYS): picking up " << (total - prev_total) << " ready fds.");
#endif
}

} // END-LOCK: m_EPollLock

HLOGC(mglog.Debug, log << "CEPoll::wait: Total of " << total << " READY SOCKETS");

if (total > 0)
return total;

if ((msTimeOut >= 0) && (int64_t(CTimer::getTime() - entertime) >= msTimeOut * int64_t(1000)))
{
HLOGP(mglog.Debug, "... not waiting longer - timeout");
throw CUDTException(MJ_AGAIN, MN_XMTIMEOUT, 0);
}

CTimer::waitForEvent();
CTimer::EWait wt ATR_UNUSED = CTimer::waitForEvent();
HLOGC(mglog.Debug, log << "CEPoll::wait: EVENT WAITING: "
<< (wt == CTimer::WT_TIMEOUT ? "CHECKPOINT" : wt == CTimer::WT_EVENT ? "TRIGGERED" : "ERROR"));
}

return 0;
Expand Down
6 changes: 5 additions & 1 deletion srtcore/epoll.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,14 +268,18 @@ struct CEPollDesc
}
}

void checkEdge(enotice_t::iterator i)
bool checkEdge(enotice_t::iterator i)
{
// This function should check if this event was subscribed
// as edge-triggered, and if so, clear the event from the notice.
// Update events and check edge mode at the subscriber
i->events &= ~i->parent->edgeOnly();
if(!i->events)
{
removeExistingNotices(*i->parent);
return true;
}
return false;
}
};

Expand Down
4 changes: 4 additions & 0 deletions srtcore/logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,16 @@ written by
#define HLOGP LOGP
#define HLOGF LOGF

#define IF_HEAVY_LOGGING(instr) instr

#else

#define HLOGC(...)
#define HLOGF(...)
#define HLOGP(...)

#define IF_HEAVY_LOGGING(instr) (void)0

#endif

#else
Expand Down