Skip to content

Commit

Permalink
Timedwait fix (#379)
Browse files Browse the repository at this point in the history
* Created condTimedWaitUS() and fixed CUDT::tsbpd() wait on OSX
  • Loading branch information
Gummilion authored and rndi committed May 11, 2018
1 parent b851180 commit cce679c
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 40 deletions.
11 changes: 11 additions & 0 deletions srtcore/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,17 @@ void CTimer::sleep()
#endif
}

int CTimer::condTimedWaitUS(pthread_cond_t* cond, pthread_mutex_t* mutex, uint64_t delay) {
timeval now;
gettimeofday(&now, 0);
uint64_t time_us = now.tv_sec * 1000000 + now.tv_usec + delay;
timespec timeout;
timeout.tv_sec = time_us / 1000000;
timeout.tv_nsec = (time_us % 1000000) * 1000;

return pthread_cond_timedwait(cond, mutex, &timeout);
}


// Automatically lock in constructor
CGuard::CGuard(pthread_mutex_t& lock, bool shouldwork):
Expand Down
9 changes: 9 additions & 0 deletions srtcore/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,15 @@ class CTimer
/// sleep for a short interval. exact sleep time does not matter

static void sleep();

/// Wait for condition with timeout
/// @param [in] cond Condition variable to wait for
/// @param [in] mutex locked mutex associated with the condition variable
/// @param [in] delay timeout in microseconds
/// @retval 0 Wait was successfull
/// @retval ETIMEDOUT The wait timed out

static int condTimedWaitUS(pthread_cond_t* cond, pthread_mutex_t* mutex, uint64_t delay);

private:
uint64_t getTimeInMicroSec();
Expand Down
43 changes: 10 additions & 33 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3919,18 +3919,15 @@ void* CUDT::tsbpd(void* param)
self->m_iRcvLastSkipAck = skiptoseqno;

#if ENABLE_LOGGING
uint64_t now = CTimer::getTime();

#if ENABLE_HEAVY_LOGGING
int64_t timediff = 0;
if ( tsbpdtime )
timediff = int64_t(now) - int64_t(tsbpdtime);

timediff = int64_t(tsbpdtime) - int64_t(CTimer::getTime());
#if ENABLE_HEAVY_LOGGING
HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: DROPSEQ: up to seq=" << CSeqNo::decseq(skiptoseqno)
<< " (" << seqlen << " packets) playable at " << logging::FormatTime(tsbpdtime) << " delayed "
<< (timediff/1000) << "." << (timediff%1000) << " ms");
#endif
LOGC(dlog.Debug, log << "RCV-DROPPED packet delay=" << int64_t(now - tsbpdtime) << "ms");
LOGC(dlog.Debug, log << "RCV-DROPPED packet delay=" << (timediff%1000) << "ms");
#endif

tsbpdtime = 0; //Next sent ack will unblock
Expand Down Expand Up @@ -3972,18 +3969,16 @@ void* CUDT::tsbpd(void* param)

if (tsbpdtime != 0)
{
int64_t timediff = int64_t(tsbpdtime) - int64_t(CTimer::getTime());
/*
* Buffer at head of queue is not ready to play.
* Schedule wakeup when it will be.
*/
self->m_bTsbPdAckWakeup = false;
THREAD_PAUSED();
timespec locktime;
locktime.tv_sec = tsbpdtime / 1000000;
locktime.tv_nsec = (tsbpdtime % 1000000) * 1000;
HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: FUTURE PACKET seq=" << current_pkt_seq
<< " T=" << logging::FormatTime(tsbpdtime) << " - waiting " << ((tsbpdtime - CTimer::getTime())/1000.0) << "ms");
pthread_cond_timedwait(&self->m_RcvTsbPdCond, &self->m_RecvLock, &locktime);
<< " T=" << logging::FormatTime(tsbpdtime) << " - waiting " << (timediff/1000.0) << "ms");
CTimer::condTimedWaitUS(&self->m_RcvTsbPdCond, &self->m_RecvLock, timediff);
THREAD_RESUMED();
}
else
Expand Down Expand Up @@ -4624,24 +4619,15 @@ int CUDT::receiveBuffer(char* data, int len)
while (stillConnected() && !m_pRcvBuffer->isRcvDataReady())
{
//Do not block forever, check connection status each 1 sec.
uint64_t exptime = CTimer::getTime() + 1000000ULL;
timespec locktime;

locktime.tv_sec = exptime / 1000000;
locktime.tv_nsec = (exptime % 1000000) * 1000;
pthread_cond_timedwait(&m_RecvDataCond, &m_RecvLock, &locktime);
CTimer::condTimedWaitUS(&m_RecvDataCond, &m_RecvLock, 1000000ULL);
}
}
else
{
uint64_t exptime = CTimer::getTime() + m_iRcvTimeOut * 1000;
timespec locktime;
locktime.tv_sec = exptime / 1000000;
locktime.tv_nsec = (exptime % 1000000) * 1000;

while (stillConnected() && !m_pRcvBuffer->isRcvDataReady())
{
pthread_cond_timedwait(&m_RecvDataCond, &m_RecvLock, &locktime);
CTimer::condTimedWaitUS(&m_RecvDataCond, &m_RecvLock, m_iRcvTimeOut * 1000);
if (CTimer::getTime() >= exptime)
break;
}
Expand Down Expand Up @@ -4878,16 +4864,12 @@ int CUDT::sendmsg2(const char* data, int len, ref_t<SRT_MSGCTRL> r_mctrl)
else
{
uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL;
timespec locktime;

locktime.tv_sec = exptime / 1000000;
locktime.tv_nsec = (exptime % 1000000) * 1000;

while (stillConnected()
&& sndBuffersLeft() < minlen
&& m_bPeerHealth
&& exptime > CTimer::getTime())
pthread_cond_timedwait(&m_SendBlockCond, &m_SendBlockLock, &locktime);
CTimer::condTimedWaitUS(&m_SendBlockCond, &m_SendBlockLock, m_iSndTimeOut * 1000ULL);
}
}

Expand Down Expand Up @@ -5129,12 +5111,7 @@ int CUDT::receiveMessage(char* data, int len, ref_t<SRT_MSGCTRL> r_mctrl)

do
{
uint64_t exptime = CTimer::getTime() + (recvtmo * 1000ULL);
timespec locktime;

locktime.tv_sec = exptime / 1000000;
locktime.tv_nsec = (exptime % 1000000) * 1000;
if (pthread_cond_timedwait(&m_RecvDataCond, &m_RecvLock, &locktime) == ETIMEDOUT)
if (CTimer::condTimedWaitUS(&m_RecvDataCond, &m_RecvLock, recvtmo * 1000ULL) == ETIMEDOUT)
{
if (!(m_iRcvTimeOut < 0))
timeout = true;
Expand Down
8 changes: 1 addition & 7 deletions srtcore/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1379,13 +1379,7 @@ int CRcvQueue::recvfrom(int32_t id, ref_t<CPacket> r_packet)

if (i == m_mBuffer.end())
{
uint64_t now = CTimer::getTime();
timespec timeout;

timeout.tv_sec = now / 1000000 + 1;
timeout.tv_nsec = (now % 1000000) * 1000;

pthread_cond_timedwait(&m_PassCond, &m_PassLock, &timeout);
CTimer::condTimedWaitUS(&m_PassCond, &m_PassLock, 1000000ULL);

i = m_mBuffer.find(id);
if (i == m_mBuffer.end())
Expand Down

0 comments on commit cce679c

Please sign in to comment.