From 616aa2af398454458f7c51a787c476b4a1df09ea Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Tue, 9 Nov 2021 16:42:24 +0100 Subject: [PATCH] [core] Fixed read-ready epoll event in stream (file) mode. Only the new RCV buffer (PR #1964) is affected. --- srtcore/core.cpp | 80 ++++++++++++++++++++++++++---------------------- 1 file changed, 43 insertions(+), 37 deletions(-) diff --git a/srtcore/core.cpp b/srtcore/core.cpp index d0849c6b8..9aa2c5eb2 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -6275,8 +6275,8 @@ int srt::CUDT::receiveBuffer(char *data, int len) return 0; } HLOGC(arlog.Debug, - log << (m_config.bMessageAPI ? "MESSAGE" : "STREAM") << " API, " << (m_bShutdown ? "" : "no") - << " SHUTDOWN. Reporting as BROKEN."); + log << (m_config.bMessageAPI ? "MESSAGE" : "STREAM") << " API, " << (m_bShutdown ? "" : "no") + << " SHUTDOWN. Reporting as BROKEN."); throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0); } @@ -6288,31 +6288,29 @@ int srt::CUDT::receiveBuffer(char *data, int len) { throw CUDTException(MJ_AGAIN, MN_RDAVAIL, 0); } - else + + // Kick TsbPd thread to schedule next wakeup (if running) + if (m_config.iRcvTimeOut < 0) { - /* Kick TsbPd thread to schedule next wakeup (if running) */ - if (m_config.iRcvTimeOut < 0) + THREAD_PAUSED(); + while (stillConnected() && !isRcvBufferReady()) { - THREAD_PAUSED(); - while (stillConnected() && !isRcvBufferReady()) - { - // Do not block forever, check connection status each 1 sec. - rcond.wait_for(seconds_from(1)); - } - THREAD_RESUMED(); + // Do not block forever, check connection status each 1 sec. + rcond.wait_for(seconds_from(1)); } - else + THREAD_RESUMED(); + } + else + { + const steady_clock::time_point exptime = + steady_clock::now() + milliseconds_from(m_config.iRcvTimeOut); + THREAD_PAUSED(); + while (stillConnected() && !isRcvBufferReady()) { - const steady_clock::time_point exptime = - steady_clock::now() + milliseconds_from(m_config.iRcvTimeOut); - THREAD_PAUSED(); - while (stillConnected() && !isRcvBufferReady()) - { - if (!rcond.wait_until(exptime)) // NOT means "not received a signal" - break; // timeout - } - THREAD_RESUMED(); + if (!rcond.wait_until(exptime)) // NOT means "not received a signal" + break; // timeout } + THREAD_RESUMED(); } } @@ -8066,23 +8064,31 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) { UniqueLock rdlock (m_RecvLock); CSync rdcond (m_RecvDataCond, rdlock); - if (m_config.bSynRecving) + +#if ENABLE_NEW_RCVBUFFER + // Locks m_RcvBufferLock, which is unlocked above by InvertedLock un_bufflock. + // Must check read-readiness under m_RecvLock to protect the epoll from concurrent changes in readBuffer() + if (isRcvBufferReady()) +#endif { - // signal a waiting "recv" call if there is any data available - rdcond.signal_locked(rdlock); + if (m_config.bSynRecving) + { + // signal a waiting "recv" call if there is any data available + rdcond.signal_locked(rdlock); + } + // acknowledge any waiting epolls to read + // fix SRT_EPOLL_IN event loss but rcvbuffer still have data: + // 1. user call receive/receivemessage(about line number:6482) + // 2. after read/receive, if rcvbuffer is empty, will set SRT_EPOLL_IN event to false + // 3. but if we do not do some lock work here, will cause some sync problems between threads: + // (1) user thread: call receive/receivemessage + // (2) user thread: read data + // (3) user thread: no data in rcvbuffer, set SRT_EPOLL_IN event to false + // (4) receive thread: receive data and set SRT_EPOLL_IN to true + // (5) user thread: set SRT_EPOLL_IN to false + // 4. so , m_RecvLock must be used here to protect epoll event + uglobal().m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, true); } - // acknowledge any waiting epolls to read - // fix SRT_EPOLL_IN event loss but rcvbuffer still have data: - // 1. user call receive/receivemessage(about line number:6482) - // 2. after read/receive, if rcvbuffer is empty, will set SRT_EPOLL_IN event to false - // 3. but if we do not do some lock work here, will cause some sync problems between threads: - // (1) user thread: call receive/receivemessage - // (2) user thread: read data - // (3) user thread: no data in rcvbuffer, set SRT_EPOLL_IN event to false - // (4) receive thread: receive data and set SRT_EPOLL_IN to true - // (5) user thread: set SRT_EPOLL_IN to false - // 4. so , m_RecvLock must be used here to protect epoll event - uglobal().m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, true); } #if ENABLE_EXPERIMENTAL_BONDING if (m_parent->m_GroupOf)