Skip to content

Commit

Permalink
[core] Protecting rcv buffer access
Browse files Browse the repository at this point in the history
Fixes partially Haivision#486
  • Loading branch information
maxsharabayko committed Jun 9, 2020
1 parent 49f5ec6 commit 035004d
Showing 1 changed file with 25 additions and 6 deletions.
31 changes: 25 additions & 6 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6863,7 +6863,9 @@ int CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_excep
if (m_bBroken || m_bClosing)
{
HLOGC(mglog.Debug, log << CONID() << "receiveMessage: CONNECTION BROKEN - reading from recv buffer just for formality");
enterCS(m_RcvBufferLock);
int res = m_pRcvBuffer->readMsg(data, len);
leaveCS(m_RcvBufferLock);
w_mctrl.srctime = 0;

// Kick TsbPd thread to schedule next wakeup (if running)
Expand All @@ -6877,11 +6879,13 @@ int CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_excep
HLOGP(tslog.Debug, "NOT pinging TSBPD - not set");
}

enterCS(m_RcvBufferLock);
if (!m_pRcvBuffer->isRcvDataReady())
{
// read is not available any more
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, false);
}
leaveCS(m_RcvBufferLock);

if (res == 0)
{
Expand All @@ -6901,8 +6905,9 @@ int CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_excep
if (!m_bSynRecving)
{
HLOGC(dlog.Debug, log << CONID() << "receiveMessage: BEGIN ASYNC MODE. Going to extract payload size=" << len);

int res = m_pRcvBuffer->readMsg(data, len, (w_mctrl), seqdistance);
enterCS(m_RcvBufferLock);
const int res = m_pRcvBuffer->readMsg(data, len, (w_mctrl), seqdistance);
leaveCS(m_RcvBufferLock);
HLOGC(dlog.Debug, log << CONID() << "AFTER readMsg: (NON-BLOCKING) result=" << res);

if (res == 0)
Expand All @@ -6927,6 +6932,7 @@ int CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_excep
throw CUDTException(MJ_AGAIN, MN_RDAVAIL, 0);
}

enterCS(m_RcvBufferLock);
if (!m_pRcvBuffer->isRcvDataReady())
{
// Kick TsbPd thread to schedule next wakeup (if running)
Expand All @@ -6949,6 +6955,7 @@ int CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_excep
HLOGC(mglog.Debug, log << CONID() << "CURRENT BANDWIDTH: " << bw << "Mbps (" << m_iBandwidth << " buffers per second)");
#endif
}
leaveCS(m_RcvBufferLock);
return res;
}

Expand All @@ -6965,7 +6972,10 @@ int CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_excep
{
steady_clock::time_point tstime SRT_ATR_UNUSED;
int32_t seqno;
if (stillConnected() && !timeout && (!m_pRcvBuffer->isRcvDataReady((tstime), (seqno), seqdistance)))
enterCS(m_RcvBufferLock);
bool rcvready = m_pRcvBuffer->isRcvDataReady((tstime), (seqno), seqdistance);
leaveCS(m_RcvBufferLock);
if (stillConnected() && !timeout && !rcvready)
{
/* Kick TsbPd thread to schedule next wakeup (if running) */
if (m_bTsbPd)
Expand Down Expand Up @@ -7005,11 +7015,15 @@ int CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_excep
{
HLOGP(tslog.Debug, "receiveMessage: DATA COND: KICKED.");
}
} while (stillConnected() && !timeout && (!m_pRcvBuffer->isRcvDataReady()));

enterCS(m_RcvBufferLock);
rcvready = m_pRcvBuffer->isRcvDataReady();
leaveCS(m_RcvBufferLock);
} while (stillConnected() && !timeout && (!rcvready));

HLOGC(tslog.Debug,
log << CONID() << "receiveMessage: lock-waiting loop exited: stillConntected=" << stillConnected()
<< " timeout=" << timeout << " data-ready=" << m_pRcvBuffer->isRcvDataReady());
<< " timeout=" << timeout << " data-ready=" << rcvready);
}

/* XXX DEBUG STUFF - enable when required
Expand All @@ -7018,7 +7032,9 @@ int CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_excep
<< " NMSG " << m_pRcvBuffer->getRcvMsgNum());
*/

enterCS(m_RcvBufferLock);
res = m_pRcvBuffer->readMsg((data), len, (w_mctrl), seqdistance);
leaveCS(m_RcvBufferLock);
HLOGC(dlog.Debug, log << CONID() << "AFTER readMsg: (BLOCKING) result=" << res);

if (m_bBroken || m_bClosing)
Expand All @@ -7039,7 +7055,10 @@ int CUDT::receiveMessage(char* data, int len, SRT_MSGCTRL& w_mctrl, int by_excep
}
} while ((res == 0) && !timeout);

if (!m_pRcvBuffer->isRcvDataReady())
enterCS(m_RcvBufferLock);
const bool rcvready = m_pRcvBuffer->isRcvDataReady();
leaveCS(m_RcvBufferLock);
if (!rcvready)
{
// Falling here means usually that res == 0 && timeout == true.
// res == 0 would repeat the above loop, unless there was also a timeout.
Expand Down

0 comments on commit 035004d

Please sign in to comment.