Skip to content

Commit

Permalink
Fixed bugs in the tsbpd thread
Browse files Browse the repository at this point in the history
  • Loading branch information
maxsharabayko committed Jul 21, 2021
1 parent 428fc95 commit 2efc5dd
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 26 deletions.
51 changes: 26 additions & 25 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ void srt::CUDT::getOpt(SRT_SOCKOPT optName, void *optval, int &optlen)
else
{
enterCS(m_RecvLock);
if (m_pRcvBuffer && m_pRcvBuffer->isRcvDataReady())
if (m_pRcvBuffer && isRcvBufferReady())
event |= SRT_EPOLL_IN;
leaveCS(m_RecvLock);
if (m_pSndBuffer && (m_config.iSndBufSize > m_pSndBuffer->getCurrBufSize()))
Expand Down Expand Up @@ -5152,7 +5152,7 @@ void * srt::CUDT::tsbpd(void* param)
self->m_bTsbPdAckWakeup = true;
while (!self->m_bClosing)
{
steady_clock::time_point tsbpd_time;
steady_clock::time_point tsNextDelivery; // Next packet delivery time
bool rxready = false;
#if ENABLE_EXPERIMENTAL_BONDING
bool shall_update_group = false;
Expand All @@ -5164,7 +5164,8 @@ void * srt::CUDT::tsbpd(void* param)
self->m_pRcvBuffer->updRcvAvgDataSize(tnow);
const srt::CRcvBufferNew::PacketInfo info = self->m_pRcvBuffer->getFirstValidPacketInfo();

const bool is_time_to_deliver = (tnow >= info.tsbpd_time);
const bool is_time_to_deliver = !is_zero(info.tsbpd_time) && (tnow >= info.tsbpd_time);
tsNextDelivery = info.tsbpd_time;

if (!self->m_bTLPktDrop)
{
Expand Down Expand Up @@ -5212,7 +5213,7 @@ void * srt::CUDT::tsbpd(void* param)
<< (timediff_us % 1000) << " ms");
#endif

tsbpd_time = steady_clock::time_point(); // Ready to read, nothing to wait for.
tsNextDelivery = steady_clock::time_point(); // Ready to read, nothing to wait for.
}
}
leaveCS(self->m_RcvBufferLock);
Expand Down Expand Up @@ -5277,22 +5278,22 @@ void * srt::CUDT::tsbpd(void* param)
}
#endif
CGlobEvent::triggerEvent();
tsbpd_time = steady_clock::time_point(); // Ready to read, nothing to wait for.
tsNextDelivery = steady_clock::time_point(); // Ready to read, nothing to wait for.
}

if (!is_zero(tsbpd_time))
if (!is_zero(tsNextDelivery))
{
IF_HEAVY_LOGGING(const steady_clock::duration timediff = tsbpd_time - tnow);
IF_HEAVY_LOGGING(const steady_clock::duration timediff = tsNextDelivery - tnow);
/*
* Buffer at head of queue is not ready to play.
* Schedule wakeup when it will be.
*/
self->m_bTsbPdAckWakeup = false;
HLOGC(tslog.Debug,
log << self->CONID() << "tsbpd: FUTURE PACKET seq=" << info.seqno
<< " T=" << FormatTime(tsbpd_time) << " - waiting " << count_milliseconds(timediff) << "ms");
<< " T=" << FormatTime(tsNextDelivery) << " - waiting " << count_milliseconds(timediff) << "ms");
THREAD_PAUSED();
tsbpd_cc.wait_until(tsbpd_time);
tsbpd_cc.wait_until(tsNextDelivery);
THREAD_RESUMED();
}
else
Expand Down Expand Up @@ -6213,7 +6214,7 @@ int srt::CUDT::receiveBuffer(char *data, int len)

UniqueLock recvguard(m_RecvLock);

if ((m_bBroken || m_bClosing) && !m_pRcvBuffer->isRcvDataReady())
if ((m_bBroken || m_bClosing) && !isRcvBufferReady())
{
if (m_bShutdown)
{
Expand Down Expand Up @@ -6245,7 +6246,7 @@ int srt::CUDT::receiveBuffer(char *data, int len)

CSync rcond (m_RecvDataCond, recvguard);
CSync tscond (m_RcvTsbPdCond, recvguard);
if (!m_pRcvBuffer->isRcvDataReady())
if (!isRcvBufferReady())
{
if (!m_config.bSynRecving)
{
Expand All @@ -6257,7 +6258,7 @@ int srt::CUDT::receiveBuffer(char *data, int len)
if (m_config.iRcvTimeOut < 0)
{
THREAD_PAUSED();
while (stillConnected() && !m_pRcvBuffer->isRcvDataReady())
while (stillConnected() && !isRcvBufferReady())
{
// Do not block forever, check connection status each 1 sec.
rcond.wait_for(seconds_from(1));
Expand All @@ -6269,7 +6270,7 @@ int srt::CUDT::receiveBuffer(char *data, int len)
const steady_clock::time_point exptime =
steady_clock::now() + milliseconds_from(m_config.iRcvTimeOut);
THREAD_PAUSED();
while (stillConnected() && !m_pRcvBuffer->isRcvDataReady())
while (stillConnected() && !isRcvBufferReady())
{
if (!rcond.wait_until(exptime)) // NOT means "not received a signal"
break; // timeout
Expand All @@ -6283,7 +6284,7 @@ int srt::CUDT::receiveBuffer(char *data, int len)
if (!m_bConnected)
throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);

if ((m_bBroken || m_bClosing) && !m_pRcvBuffer->isRcvDataReady())
if ((m_bBroken || m_bClosing) && !isRcvBufferReady())
{
// See at the beginning
if (!m_config.bMessageAPI && m_bShutdown)
Expand Down Expand Up @@ -6317,7 +6318,7 @@ int srt::CUDT::receiveBuffer(char *data, int len)
HLOGP(tslog.Debug, "NOT pinging TSBPD - not set");
}

if (!m_pRcvBuffer->isRcvDataReady())
if (!isRcvBufferReady())
{
// read is not available any more
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, false);
Expand Down Expand Up @@ -6852,7 +6853,7 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_
HLOGP(tslog.Debug, "NOT pinging TSBPD - not set");
}

if (!m_pRcvBuffer->isRcvDataReady())
if (!isRcvBufferReady())
{
// read is not available any more
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, false);
Expand Down Expand Up @@ -6999,7 +7000,7 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_

HLOGC(tslog.Debug,
log << CONID() << "receiveMessage: lock-waiting loop exited: stillConntected=" << stillConnected()
<< " timeout=" << timeout << " data-ready=" << m_pRcvBuffer->isRcvDataReady());
<< " timeout=" << timeout << " data-ready=" << isRcvBufferReady());
}

/* XXX DEBUG STUFF - enable when required
Expand Down Expand Up @@ -7035,7 +7036,7 @@ int srt::CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_
}
} while ((res == 0) && !timeout);

if (!m_pRcvBuffer->isRcvDataReady())
if (!isRcvBufferReady())
{
// Falling here means usually that res == 0 && timeout == true.
// res == 0 would repeat the above loop, unless there was also a timeout.
Expand Down Expand Up @@ -7196,7 +7197,7 @@ int64_t srt::CUDT::recvfile(fstream &ofs, int64_t &offset, int64_t size, int blo
{
if (!m_bConnected || !m_CongCtl.ready())
throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);
else if ((m_bBroken || m_bClosing) && !m_pRcvBuffer->isRcvDataReady())
else if ((m_bBroken || m_bClosing) && !isRcvBufferReady())
{
if (!m_config.bMessageAPI && m_bShutdown)
return 0;
Expand Down Expand Up @@ -7278,14 +7279,14 @@ int64_t srt::CUDT::recvfile(fstream &ofs, int64_t &offset, int64_t size, int blo
CSync rcond (m_RecvDataCond, recvguard);

THREAD_PAUSED();
while (stillConnected() && !m_pRcvBuffer->isRcvDataReady())
while (stillConnected() && !isRcvBufferReady())
rcond.wait();
THREAD_RESUMED();
}

if (!m_bConnected)
throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);
else if ((m_bBroken || m_bClosing) && !m_pRcvBuffer->isRcvDataReady())
else if ((m_bBroken || m_bClosing) && !isRcvBufferReady())
{
if (!m_config.bMessageAPI && m_bShutdown)
return 0;
Expand All @@ -7309,7 +7310,7 @@ int64_t srt::CUDT::recvfile(fstream &ofs, int64_t &offset, int64_t size, int blo
}
}

if (!m_pRcvBuffer->isRcvDataReady())
if (!isRcvBufferReady())
{
// read is not available any more
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, false);
Expand Down Expand Up @@ -9524,7 +9525,7 @@ void srt::CUDT::processClose()

if (m_bTsbPd)
{
HLOGP(smlog.Debug, "processClose: lock-and-signal TSBPD");
LOGP(smlog.Debug, "processClose: lock-and-signal TSBPD");
CSync::lock_signal(m_RcvTsbPdCond, m_RecvLock);
}

Expand Down Expand Up @@ -9641,7 +9642,7 @@ int srt::CUDT::processData(CUnit* in_unit)
if (m_bClosing) // Check again to protect join() in CUDT::releaseSync()
return -1;

HLOGP(qrlog.Debug, "Spawning Socket TSBPD thread");
LOGP(qrlog.Debug, "Spawning Socket TSBPD thread");
#if ENABLE_HEAVY_LOGGING
std::ostringstream tns1, tns2;
// Take the last 2 ciphers from the socket ID.
Expand Down Expand Up @@ -11357,7 +11358,7 @@ void srt::CUDT::addEPoll(const int eid)
return;

enterCS(m_RecvLock);
if (m_pRcvBuffer->isRcvDataReady())
if (isRcvBufferReady())
{
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, true);
}
Expand Down
2 changes: 1 addition & 1 deletion test/test_epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ class TestEPoll: public testing::Test

ASSERT_EQ(rlen, 1); // get exactly one read event without writes
ASSERT_EQ(wlen, 0); // get exactly one read event without writes
ASSERT_EQ(read[0], acpsock); // read event is for bind socket
ASSERT_EQ(read[0], acpsock); // read event is for bind socket
}

char buffer[1316];
Expand Down

0 comments on commit 2efc5dd

Please sign in to comment.