Skip to content

Commit

Permalink
[core] Fixed read-ready epoll event in stream (file) mode.
Browse files Browse the repository at this point in the history
Only the new RCV buffer (PR #1964) is affected.
  • Loading branch information
maxsharabayko committed Nov 10, 2021
1 parent 01ef57a commit e4a1d2b
Showing 1 changed file with 43 additions and 37 deletions.
80 changes: 43 additions & 37 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6275,8 +6275,8 @@ int srt::CUDT::receiveBuffer(char *data, int len)
return 0;
}
HLOGC(arlog.Debug,
log << (m_config.bMessageAPI ? "MESSAGE" : "STREAM") << " API, " << (m_bShutdown ? "" : "no")
<< " SHUTDOWN. Reporting as BROKEN.");
log << (m_config.bMessageAPI ? "MESSAGE" : "STREAM") << " API, " << (m_bShutdown ? "" : "no")
<< " SHUTDOWN. Reporting as BROKEN.");
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
}

Expand All @@ -6288,31 +6288,29 @@ int srt::CUDT::receiveBuffer(char *data, int len)
{
throw CUDTException(MJ_AGAIN, MN_RDAVAIL, 0);
}
else

// Kick TsbPd thread to schedule next wakeup (if running)
if (m_config.iRcvTimeOut < 0)
{
/* Kick TsbPd thread to schedule next wakeup (if running) */
if (m_config.iRcvTimeOut < 0)
THREAD_PAUSED();
while (stillConnected() && !isRcvBufferReady())
{
THREAD_PAUSED();
while (stillConnected() && !isRcvBufferReady())
{
// Do not block forever, check connection status each 1 sec.
rcond.wait_for(seconds_from(1));
}
THREAD_RESUMED();
// Do not block forever, check connection status each 1 sec.
rcond.wait_for(seconds_from(1));
}
else
THREAD_RESUMED();
}
else
{
const steady_clock::time_point exptime =
steady_clock::now() + milliseconds_from(m_config.iRcvTimeOut);
THREAD_PAUSED();
while (stillConnected() && !isRcvBufferReady())
{
const steady_clock::time_point exptime =
steady_clock::now() + milliseconds_from(m_config.iRcvTimeOut);
THREAD_PAUSED();
while (stillConnected() && !isRcvBufferReady())
{
if (!rcond.wait_until(exptime)) // NOT means "not received a signal"
break; // timeout
}
THREAD_RESUMED();
if (!rcond.wait_until(exptime)) // NOT means "not received a signal"
break; // timeout
}
THREAD_RESUMED();
}
}

Expand Down Expand Up @@ -8066,23 +8064,31 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)
{
UniqueLock rdlock (m_RecvLock);
CSync rdcond (m_RecvDataCond, rdlock);
if (m_config.bSynRecving)

#if ENABLE_NEW_RCVBUFFER
// Locks m_RcvBufferLock, which is unlocked above by InvertedLock un_bufflock.
// Must check read-readiness under m_RecvLock to protect the epoll from concurrent changes in readBuffer()
if (isRcvBufferReady())
#endif
{
// signal a waiting "recv" call if there is any data available
rdcond.signal_locked(rdlock);
if (m_config.bSynRecving)
{
// signal a waiting "recv" call if there is any data available
rdcond.signal_locked(rdlock);
}
// acknowledge any waiting epolls to read
// fix SRT_EPOLL_IN event loss but rcvbuffer still have data:
// 1. user call receive/receivemessage(about line number:6482)
// 2. after read/receive, if rcvbuffer is empty, will set SRT_EPOLL_IN event to false
// 3. but if we do not do some lock work here, will cause some sync problems between threads:
// (1) user thread: call receive/receivemessage
// (2) user thread: read data
// (3) user thread: no data in rcvbuffer, set SRT_EPOLL_IN event to false
// (4) receive thread: receive data and set SRT_EPOLL_IN to true
// (5) user thread: set SRT_EPOLL_IN to false
// 4. so , m_RecvLock must be used here to protect epoll event
uglobal().m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, true);
}
// acknowledge any waiting epolls to read
// fix SRT_EPOLL_IN event loss but rcvbuffer still have data:
// 1. user call receive/receivemessage(about line number:6482)
// 2. after read/receive, if rcvbuffer is empty, will set SRT_EPOLL_IN event to false
// 3. but if we do not do some lock work here, will cause some sync problems between threads:
// (1) user thread: call receive/receivemessage
// (2) user thread: read data
// (3) user thread: no data in rcvbuffer, set SRT_EPOLL_IN event to false
// (4) receive thread: receive data and set SRT_EPOLL_IN to true
// (5) user thread: set SRT_EPOLL_IN to false
// 4. so , m_RecvLock must be used here to protect epoll event
uglobal().m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, true);
}
#if ENABLE_EXPERIMENTAL_BONDING
if (m_parent->m_GroupOf)
Expand Down

0 comments on commit e4a1d2b

Please sign in to comment.