Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Epic] Fix the issues identified by the sanitizer #1813

Open
23 tasks
mbakholdina opened this issue Feb 17, 2021 · 12 comments
Open
23 tasks

[Epic] Fix the issues identified by the sanitizer #1813

mbakholdina opened this issue Feb 17, 2021 · 12 comments
Labels
[core] Area: Changes in SRT library core Epic
Milestone

Comments

@mbakholdina
Copy link
Collaborator

mbakholdina commented Feb 17, 2021

This ticket is created to track the work on issues identified by the sanitizer.
Related tickets: #1547, #1609, #1610.

Findings

  • FINDING P00
    ==============================
    False positives or sanitizer collision.

SEVERITY: NONE.

  • FINDING P01-LastRspAckTime
    ==============================
    Accessing CUDT::m_tsLastRspAckTime without lock.

SEVERITY: LOW

  • FINDING P02-SndBuffer_count
    ==============================
    Unprotected access to CSndBuffer::m_iCount through getCurrBufSize
    with simultaneous modification from CSndBuffer::addBuffer

SEVERITY: MEDIUM
FOCUS: might be that atomic can fix it, but the size is modified
usually together with buffer contents and internal pointers, so
this is likely data integrity violation.

  • FINDING P03-CSndUList
    ==============================
    CSndUList against CSndQueue and m_WindowCond/m_WindowLock locking
    that caused accessing m_iLastEntry without locking. CV synchronization
    and access control use two different locks located in two different classes,
    shared by a plain pointer. Larger rework required.

SEVERITY: MEDIUM
FOCUS: This is a design flaw; two different mutexes in two different
classes (one class of interest accesses it by a pointer) guard the same data.
A planned rework code is already in progress.

  • FINDING P04-Cycle3
    ==============================
    Report Add SRT logo and fix typo. #8, Cycled ordering between CUDT::m_ConnectionLock and CUDT::m_SendLock,
    with involved CSndUList::m_ListLock.

Report #28, Cycled ordering between CUDT::m_ConnectionLock and CUDTUnited::m_GlobControlLock
with involved CRcvQueue::m_LSLock.

SEVERITY: HIGH
FOCUS: This is a series of lock cycles, all of them must be resolved
under highest priority. This is a high deadlock risk.

  • FINDING P05-m_dCongestionWindow
    ==============================
    Unguarded writing to CUDT::m_dCongestionWindow while reading
    (likely requires atomic)

SEVERITY: LOW

  • FINDING P06-LastSendTime
    ==============================
    Unguarded read of CUDT::m_tsLastSndTime,
    likely should be under CUDT::m_ConnectionLock

SEVERITY: MEDIUM
FOCUS: Although this itself shouldn't be a big deal, the matter of
what exactly data is guarded by m_ConnectionLock is worth focusing.
Theoreticaly it should guard data that are used during connection
process, but then there are also data being used by workers and the
main thread simultaneously as a part of the data transmission activities,
and these don't even have dedicated mutexes to protect the data.
Not high priority because this looks like a problem derived from UDT.

  • FINDING P07-SendInterval
    ==============================
    Unguarded write to CUDT::m_tdSendInterval

SEVERITY: LOW

  • FINDING P08-m_pLastBlock
    ==============================
    Unguarded READING AND WRITING of CSndBuffer::m_pLastBlock.

Simultaneous access from CSndQueue::worker and CRcvQueue::worker.

Likely to be guarded by CSndBuffer::m_BufLock.

SEVERITY: MEDIUM
FOCUS: Likely a mutex that isn't consistently used.

  • FINDING P09-m_iFlowWindowSize
    ==============================
    Unguarded access to CUDT::m_iFlowWindowSize

Writtten by: ACK report
Read by: sender worker when preparing a packet to send.

Likely to be fixed by atomic.

SEVERITY: LOW

  • FINDING P11-m_iSndLastAck
    ==============================
    Unguarded read and write of CUDT::m_iSndLastAck

Likely to be solved by atomic

SEVERITY: LOW

  • FINDING P12-ACK-data
    ==============================
    Unguarded reading and writing of CUDT::m_iRTT and CUDT::m_iRTTVar,
    plus other fields belonging to those filled by incoming ACK message.

Likely atomic.

SEVERITY: LOW

  • FINDING P13-zPayloadSize
    ==============================
    Unguarded r/w of LiveCC::m_zSndAvgPayloadSize

Likely atomic

SEVERITY: LOW

  • FINDING P14-SndBuf_CurBlock
    ==============================
    Unguarded WRITE (!!!) to CSndBuffer::m_pCurrBlock with reading by RcvQueue::worker with RecvAckLock and BufLock applied

SEVERITY: MEDIUM
FOCUS: Likely the whole CSndBuffer should be under that protection,
although might be that atomic can fix it.

  • FINDING P15-SndBuf-BytesCount
    ==============================
    Unguarded read of CSndBuffer::m_iBytesCount in CSndBuffer::getCurrBufSize().

May lead to data discrepancy as this function accesses also CSndBuffer::m_pFirstBlock,
both being modified in CSndBuffer::ackData.

Likely the reading function requires applying lock on m_BufLock.

SEVERITY: MEDIUM
FOCUS: Likely a series referring to guards for CSndBuffer.

  • FINDING P16-SndCurrSeqNo
    ==============================
    Unguarded reading and writing of CUDT::m_iSndCurrSeqNo.

Important: in CUDT::packData this field is READ AND MODIFIED.

Likely atomic.

SEVERITY: MEDIUM
FOCUS: Looks like soluble by atomic, but read-and-write is kinda
more complicated.

  • FINDING P17-SndBuffer_Block
    ==============================
    Sumultaneous read and write of CSndBuffer::Block::m_iLength.

This looks like simply a data race conflict between CSndBuffer::readData() and CSndBuffer::addBuffer().
Likely to be protected by CSndBuffer::m_iBufLock, but this looks much more complicated.

Report #27 additionally contains possible conflict between CChannel::sendto and CSndBuffer::addBuffer,
which seems to write into the same block, although this still seems to be caused by quite ordered read
and write by data-read ordering, just not sanctioned by atomic or mutexes.

SEVERITY: MEDIUM
FOCUS: Likely a series referring to guards for CSndBuffer.

  • FINDING P18-pCryptoControl
    ==============================
    Unguarded access to CUDT::m_pCryptoControl in CRcvQueue::worker, while it might be
    simultaneously deleted through the call to srt_close() from the main thread.

SEVERITY: MEDIUM
FOCUS: Likely there's no mutex employed to guard particular data.

  • FINDING P19-CloseGC
    ==============================
    Unguarded (mutexes applied, but different) access to CUDTUnited::m_bClosing.

Likely atomic.

FOCUS: Likely to be merged with P21.

  • FINDING P21-bool-state-flags
    ==============================
    The CUDT::m_bConnected field is read without protection.

Similar reports about m_bClosing or m_bBroken.

Likely can be fixed by atomic, but might be that CUDT::m_ConnectionLock could be locked
for reading this field.

SEVERITY: LOW

  • FINDING P22-GC-against-RcvUList
    ==============================

CRNode::m_bOnList accessed for writing without protection. The same node
is accessed for reading in a GC thread in the loop rolling over closed sockets.

This could be a bigger problem related to the containment in m_pRcvUList, that is,
the loop over the CRcvQueue::m_pRcvUList might contain CUDT's that have been already
recycled into m_ClosedSockets. The case of m_bOnList is only a marker that the
node entry is invalid, as the entity has been removed from the container. The loop
in GC is also checking the node itself. This is likely something old and exists
since the beginning, so might be that atomic could solve it painlessly.


CRcvQueue::m_pHash contains CUDT entities that may belong to socket that are recycled
and GC is going to delete them. What happened here:

  1. The CRcvQueue::worker (worker_ProcessAddressedPacket) extracts the CUDT by given ID in its m_pHash container.
  2. Then performs any kind of operations (in this case, checkTimers -> checkNAKTimer -> m_pRcvLossList->getLossLength()

Simultaneously:

The socket of this entity is being deleted, so the destructor is called:
- for CUDT
- because destroying CUDTSocket
- because the CUDTUnited::removeSocket is deleting RECYCLED sockets

Likely how to fix it: The m_pHash container must be taken out and the entities
collected some other way. This could be also a container of the same kind and
purpose, just in a different place, and the activities performed on its contents
must be done strictly under a mutex that protect its contents. For that purpose
the sockets can be using a busy flag to prevent them from deletion. And the loop
in the CRcvQueue::worker, when finding a socket that has been marked as closed,
should be removed from the continer immediately. The GC thread should also
remove the objects by itself from this container, which is why the whole operation
on the container should be guarded by a dedicated mutex.

In #38 there's a similar problem, the entity is extracted from the RcvUList list,
while the alleged entity is at the same time being deleted.

SEVERITY: HIGH
FOCUS: The socket deletion is done kinda dangerous way and conditions
under which the socket is allowed to be physically deleted are unclear.
In specific situations it may lead to deleting a socket that is under
operation in another thread and this may end up with a deleted-memory-access
and crash.

  • FINDING P23-QueueClosingFlag
    ==============================
    m_bClosing flag in the queues.

Likely fixable by atomic.

SEVERITY: LOW

  • FINDING P24-iRexmitCount
    ==============================
    Unguarded reading of m_iReXmitCount in CRcvQueue::worker.

Likely needs guard of m_RcvAckLock, although atomic might help, too.

SEVERITY: LOW

Findings Related to Socket Groups

See below.

Attachements

The details are collected in directories in the attached archive.

TSAN_REPORT_PACK.tar.gz

@ethouris
Copy link
Collaborator

ethouris commented Mar 5, 2021

A collective PR #1844 (required due to conflicting changes) is created to fix the following problems:

  1. Cycled 3 mutexes that break the order - temporary unlocking added
  2. Removed lock on m_ControlLock while m_ConnectionLock is applied (counter-order)
  3. Refax on mutex/CV in CSndQueue/CSndULust and fixing the locking problem here (two different locks guarding the same data and lack of locking on single field changes).

@ethouris
Copy link
Collaborator

ethouris commented Mar 5, 2021

This is a list of findings collected after the fix from #1844 (including a failed fix for UList nodes).

These are problems that are likely reidentified, the numbers are the same as for findings with previous P* prefix:

GP02-connect-ControlLock/  <--- NOTE: fix provided already in #1844.
GP05-dCongestionWindow/
GP07-SendInterval/
GP08-SndBuf-pLastBlock/
GP12-ACK-data/
GP13-zPayloadSize/
GP14-SndBuf-CurBlock/
GP15-SndBuf-BytesCount/
GP16-SndCurrSeqNo/
GP17-SndBuffer_Block/
GP18-connect-while-rcv-update/
GP19-delete-while-update/
GP20-closingQueues/
GP21-bool-state-flags/
GP22-GC-against-RcvUList/

New findings in the bonding tests:

GP01-syncWithSocket/
GP30-iRexmitCount/
GP31-tsLastSendTime/
GP32-SndBuffer-m_iCount/
GP33-ackMessage/
GP35-memberstate/
GP36-socketState/
GP37-ulist-node-nofix/
GP38-FATAL-epollEmpty/

The following items seem to require immediate attention:

  • GP01: This calls syncWithSocket() at the wrong time and this way takes "as a good deal" the value of latency from the first connected socket at the moment when it's not really connected, potentially taking the latency value from BEFORE it has been negotiated in the handshake. In fact, the latency defined in the group shall remain kinda "undefined" until the connection is confirmed.
  • GP38: This should be relatively easy to fix - the group keeps the internal data for epoll container and tries to access internal fields without locking, while all other references to it use locking on m_EPollLock. This should best introduce a new function that is explicitly qualified as allowed for "user body" to access and somehow disallow access to private fields from methods to avoid similar problems in the future.

Note: GP37 is introduced by the trial fix for GP18 and GP19 that has also failed. A more elaborate solution is required, although the GP37 problem can be ignored.

@ethouris
Copy link
Collaborator

ethouris commented Mar 8, 2021

#1848 submitted to address GP01. The PR for GP38 is already merged.

@ethouris
Copy link
Collaborator

ethouris commented Mar 8, 2021

GP33 addressed by #1849.

@ethouris
Copy link
Collaborator

ethouris commented Mar 9, 2021

More information about the GP30+ findings, except those that were already fixed:

  • GP30-iRexmitCount/

NOTE: Likely identical to P24

The call to CUDT::checkRexmitTimer() is reading from two fields as per these instructions:

    const uint64_t exp_int_us = (m_iReXmitCount * rtt_syn + COMM_SYN_INTERVAL_US);

    if (currtime <= (m_tsLastRspAckTime + microseconds_from(exp_int_us)))
        return;

while simultaneously they are being written to by - lilkely (the stack was not recoverable in the test, but it is believe it could only happen here) in CUDT::sendmsg2:

    UniqueLock sendguard(m_SendLock);

    if (m_pSndBuffer->getCurrBufSize() == 0)
    {
        // delay the EXP timer to avoid mis-fired timeout
        ScopedLock ack_lock(m_RecvAckLock);
        m_tsLastRspAckTime = steady_clock::now();
        m_iReXmitCount   = 1;
    }

SEVERITY: important. Likely the data calculated out of these data might be sometimes wrong.
POSSIBLE FIX: Likely if these data were protected by m_RecvAckLock, reading them should be protected by the same mutex.

  • GP31-tsLastSendTime/

Total of 5 different cases, of reading and writing the same field in two different threads without protection.

SEVERITY: minor
POSSIBLE FIX: likely all could be fixed by adding atomic.

  • GP32-SndBuffer-m_iCount/

"Sender buffer group". Severity unknown, but this problem exists since always.

  • GP35-memberstate/

The state of a connection is being updated as per closing, while another thread is getting the status. It's not a problem, if this state is reporting connected while the socket will be simultaneously closed by another thread and it will happen just
after. The status will be read again and will learn about the closed socket soon.

SEVERITY: minor
POSSIBLE FIX: Likely the problem could be solved by atomic.

  • GP36-socketState/

m_State field of the socket is updated when being closed in the receiver thread, while it's just been qualified as existing in send_CheckValidSockets(). Of course, this shouldn't be a problem because the socket will still remain in the thrashcan until all
other activities are finished. This will simply fail during sending and will be closed.

In other reports there's e.g.:

  • write-read race on m_bBroken field
  • simultaneous call to setClosed() from GC and RcvQ:worker threads

SEVERITY: minor
POSSIBLE FIX: add atomic

@ethouris
Copy link
Collaborator

#1859 Fixes the problem of updating 4 fields, the issue described in GP30.
#1863 submitted to address the atomic problem. Still as draft as more tests need to be done at the moment. It also contains the fix for deleted socket that has previously failed, but this time it's made together with atomic and now it seems to work.

@ethouris
Copy link
Collaborator

ethouris commented Mar 12, 2021

After a test with included all above fixes there are only the following findings still collected:

  • AP01-unidentified-ostream/

This is extremely weird and of completely unknown reason. The sanitizer reports that "write happened" here (in the instruction inside the loop body):

void CUDT::EmitSignal(ETransmissionEvent tev, EventVariant var)
{
    for (std::vector<EventSlot>::iterator i = m_Slots[tev].begin(); i != m_Slots[tev].end(); ++i)
    {
        i->emit(tev, var);
    }
}

while simultaneously "previous write" happened here (case identified by referring to identical address):

    Verb() << "(#" << mctrl.msgno << " %" << mctrl.pktseq << "  " << BufferStamp(data.payload.data(), data.payload.size()) << ") " << VerbNoEOL;

The function that is next on stack towards this above one is:

    #2 std::ostream::operator<<(std::ios_base& (*)(std::ios_base&)) /home/sektor/repos/gcc.git/x86_64-pc-linux-gnu/libstdc++-v3/include/ostream:132:6 (libstdc++.so.6+0x12cfc0)

There's no pointer-to-function object visible in this call at all. Unknown as to what could have caused this.

This problem is much like others that have been fixed using atomic, however this is double and as such cannot be really fixed portable way by atomic. There is, however, a possibility to resolve it by mutexes. The other thread that does out-of-sync reading is locking CUDT::m_ConnectionLock, so maybe it might be enough to add this lock in CUDT::updateCC in this place, at least for modifying these fields:

    if (evt != TEV_ACKACK && evt != TEV_SEND && evt != TEV_RECEIVE)
    {
        m_tdSendInterval    = microseconds_from((int64_t)m_CongCtl->pktSndPeriod_us());
        m_dCongestionWindow = m_CongCtl->cgWindowSize();
    }
  • AP03-SenderBuffer/

This collects about 9 findings in total, mostly related to simultaneously reading and writing of the same object being the item in the sender buffer. Although there is a mutex intended to be used for the sender buffer, somehow it's used at best partially and only for some operations. This is something that exists since UDT times; likely this problem wasn't possible to be detected in the file mode because in file mode the schedule window in the sender buffer gets stretched to full possible immediately and the flight window eats it up much slower anyway, so there's practically no way that the CSndQueue::worker thread reads the same buffer element that is being simultaneously filled by the main application thread - even at the very end of file and when there's last packet buffer to be filled, it was unlikely to be simultaneously picked up to shift it from schedule window to flight window. In live mode it's different - the schedule window, if the thread layout is perfect and there's no chocking in the network,may at worst stretch to 2, and most of the time it's 1 or even 0, so collision on the same packet being stored and read by the sender thread is highly probable.

This is a long-term task that likely needs redesigning of the sender buffer, or at least tough and very detailed analysis for possible mutex locking to be added to key operations in the sender buffer. For performance reasons this may require also the use of a shared lock (aka read-write-lock - not available even in C++11) or simply use both atomic for key fields and mutexes.

Likely in void CUDT::completeBrokenConnectionDependencies(int errorcode) the check for group, after locking GlobControlLock there should be also a GroupLock acquired to protect fields being updated here. Theoretically making these state fields atomic could help, too, but the lock is already available there, can be acquired, there are data read and modified there, and locking it here is correct according to ordering definitions.

@ethouris
Copy link
Collaborator

IMPORTANT!!! All the above tests have been done by testing the group sender in backup mode only. This means that this isn't the complete set of tests that could be performed with thread sanitizer and more will have to be done.

@ethouris
Copy link
Collaborator

For AP03, here is the list of sanitizer findings with extra description:

TSAN_ATM_REPORT_MP-part003.txt

Likely m_RcvAckLock could be locked when CUDT::packData calls CSndBuffer::readData to prevent data race here.
This mutex is locked in the main thread calling CUDT::sendmsg2 and inside CSndBuffer::addBuffer; likely it
is being locked because the m_RecvAckLock is guarding mainly the activities of "acking" and in result removal
of packets from the sender buffer.

This isn't certain because the call to CSndBuffer::ackData locks both m_RcvAckLock and CSndBuffer::m_BufLock.
Might be that a better idea would be to refax the sender buffer to maintain the overall activity of ack-ing
and use only one mutex for modifying itself.

TSAN_ATM_REPORT_MP-part004.txt

This is a collision between readData and ackData. If we pretend that m_BufLock is guarding the internal data
consistency, then readData using these data and depending on their value should definitely lock it as well.

TSAN_ATM_REPORT_MP-part005.txt

The problem is on accessing the m_iCount field in the buffer, which is likely updated for a reason, although
in this particular case the check for buffer emptiness is done in order to know if to reset particular fields,
not to access the buffer's internals. Might be then that atomic could suffice.

The only problem is that the buffer might be potentially updated from both incoming ACK (CRcvQueue::worker thread)
and sending a packet (by an application), therefore whether this check is really correct is questionable because
the buffer might be set empty by incoming ACK "at some point" when the sending function is called and therefore
miss the point when the buffer WASN'T empty at the time of checking, but BECAME empty just after this check is done.

This fragment needs further research as per correct usage of the fields modified here:

    UniqueLock sendguard(m_SendLock);

    if (m_pSndBuffer->getCurrBufSize() == 0)
    {
        // delay the EXP timer to avoid mis-fired timeout
        ScopedLock ack_lock(m_RecvAckLock);
        m_tsLastRspAckTime = steady_clock::now();
        m_iReXmitCount   = 1;
    }

It is possible that this value could be protected by just atomic modifier, but then there likely should be
created a different function to provide this information, as the caller is only interested in the emptiness
status, not the exact size.

OTOH important thing is that this emptiness might be a temporary state that will definitely be missed in
strict cases - needed to decide if this temporary state needs to be somehow notified to make sure it's not
missed, or it should be only notified in case when it was empty for some longer time, in which case this
check must involve also some timer check, OR resetting of this value must be done exclusively by a function
that is capable of REMOVAL of the elements from the buffer.

TSAN_ATM_REPORT_MP-part006.txt

Similar to the problem in 005, the getCurrBufSize - another overload - was called while the sizes being
extracted were modified; this could result in returning INCOMPATIBLE (!) values in the packet-size and
byte-size values. Rather highly probable that the call to getCurrBufSize shall be protected by m_BufLock.

TSAN_ATM_REPORT_MP-part007.txt

idem

TSAN_ATM_REPORT_MP-part008.txt

Looks DANGEROUS, this modifies the block in the buffer directly assigned to m_pLastBlock in addBuffer, while
being simultaneously read by the readData.

CSndBuffer::addBuffer is protected by m_RecvAckLock, so maybe it is enough to embrace the call to
CSndBuffer::readData with it, as proposed in 003.

TSAN_ATM_REPORT_MP-part009.txt

idem (m_iSeqNo field)

TSAN_ATM_REPORT_MP-part010.txt

idem.

TSAN_ATM_REPORT_MP-part011.txt

idem, getSourceTime.

@maxsharabayko maxsharabayko modified the milestones: v1.4.3, v1.4.4 Mar 17, 2021
@ethouris
Copy link
Collaborator

ethouris commented Mar 18, 2021

Ok, the test was conducted with all previous tests passed and no issues as above described seen anymore. Here is the list of the new findings extracted when testing application was listener and receiver:

TSAN_RTM_REPORT_MP-part001.txt, TSAN_RTM_REPORT_MP-part005.txt

One order must be selected between m_GroupLock and m_ConnectionLock.

Order G ; C --> locking the group container, then setting options. Single option locks m_ConnectionLock.
Order C ; G --> handshake locks m_ConnectionLock for a socket, then this socket is being added to the group

In one of these cases one of the lock must be lifted.

COMPLEXITY: HIGH
SEVERITY: LOW. This might only happen in case when you try to alter an option in the main thread, while the receiver-worker thread is trying to add a socket to the group. This isn't physically possible because this happens only on the accepted socket (listener side) and this adding to group is complete before the socket is given up to the user. Hence the user wouldn't be able to execute the option setting procedure before it completes.

TSAN_RTM_REPORT_MP-part002.txt

CUnitQueue::m_iCount should likely be atomic

COMPLEXITY: LOW
SEVERITY: LOW

TSAN_RTM_REPORT_MP-part003.txt, TSAN_RTM_REPORT_MP-part004.txt

Collision between:

  • removal of a packet in CRcvBuffer::ackData
  • checking reception-ready packets in CUDT::receiveMessage

Some mutex must guard these data and likely it should be m_RecvBufferLock.

COMPLEXITY: MEDIUM. This shouldn't be a problem if adding m_RecvBufferLock is safe and fixes the problem.
SEVERITY: MEDIUM. This poses only a risk that the check for reception-ready packet reports "not ready" just after packets were signed off. This signoff however is also signaled through TSBPD.

TSAN_RTM_REPORT_MP-part006.txt

The official order so far is:

m_GlobControlLock
m_GroupLock
m_ConnectionLock

The ordering for m_LSLock is undefined, and likely it can't be treated as final. The problem
of unordered locking for m_LSLock has never been solved.

COMPLEXITY: HIGH
SEVERITY: UNKNOWN. Potential deadlock, but unknown how probable.

@maxsharabayko
Copy link
Collaborator

Related issue #2273

@maxsharabayko
Copy link
Collaborator

Moving some old overview from #1620 here for future reference.

Core threads synchronization overview.

Note. CRcvQueue::m_pRcvUList may still have this socket, and may be processing it during releaseSynch(). Some synchronization is likely required.

tsbpd()

A separate internal thread per receiving socket.

CUDT::tsbpd() (click to expand/collapse)

{
    UniqueLock recv_lock  (self->m_RecvLock);
    CSync tsbpd_cc    (self->m_RcvTsbPdCond, recv_lock);
    
    while (!self->m_bClosing)
    {
        enterCS(self->m_RcvBufferLock);
        self->m_pRcvBuffer->getRcvFirstMsg();
        self->m_pRcvBuffer->skipData(seqlen);
        self->m_pRcvBuffer->isRcvDataReady(..);
        leaveCS(self->m_RcvBufferLock);
        
        if (self->m_bSynRecving)
           // TODO: [SYNC] Lock before signalling?
           self->m_ReadyToReadEvent.notify_one();
        self->s_UDTUnited.m_EPoll.update_events(self->m_SocketID, self->m_sPollID, SRT_EPOLL_IN, true);
        CGlobEvent::triggerEvent();
        
        if (tsbpdtime)
            tsbpd_cc.wait_for(timediff);
        else
            tsbpd_cc.wait();
    }
}

receiveMessage()

CUDT::receiveMessage() is called from the srt_recvmsg(..) thread.

CUDT::receiveMessage() (click to expand/collapse)

{
    UniqueLock recvguard (m_RecvLock);
    CSync tscond     (m_RcvTsbPdCond,  recvguard);
    
    if (m_bBroken || m_bClosing)
    {
        enterCS(m_RcvBufferLock);
        const int res = m_pRcvBuffer->readMsg(data, len);
        leaveCS(m_RcvBufferLock);
        if (m_bTsbPd)
        {
            HLOGP(tslog.Debug, "Ping TSBPD thread to schedule wakeup");
            tscond.signal_locked(recvguard);
        }
    }
    
    if (!m_bSynRecving)
    {
        enterCS(m_RcvBufferLock);
        const int res = m_pRcvBuffer->readMsg(data, len, (w_mctrl), seqdistance);
        leaveCS(m_RcvBufferLock);
        
        if (m_bTsbPd)
        {
            HLOGP(arlog.Debug, "receiveMessage: nothing to read, kicking TSBPD, return AGAIN");
            tscond.signal_locked(recvguard);
        }
        
        if (!m_pRcvBuffer->isRcvDataReady())
        {
            // Kick TsbPd thread to schedule next wakeup (if running)
            if (m_bTsbPd)
            {
                HLOGP(arlog.Debug, "receiveMessage: DATA READ, but nothing more - kicking TSBPD.");
                tscond.signal_locked(recvguard);
            }
        }
        return res;
    }
    
    do
    {
        if (stillConnected() && !timeout && !m_pRcvBuffer->isRcvDataReady(..))
        {
            /* Kick TsbPd thread to schedule next wakeup (if running) */
            if (m_bTsbPd)
                tscond.signal_locked(recvguard);

            do
            {
                if (!m_ReadyToReadEvent.lock_wait_until(exptime))
                {
                    if (m_iRcvTimeOut >= 0) // otherwise it's "no timeout set"
                        timeout = true;
                }
            } while (stillConnected() && !timeout && (!m_pRcvBuffer->isRcvDataReady()));
            
            
        }
        
        enterCS(m_RcvBufferLock);
        res = m_pRcvBuffer->readMsg((data), len, (w_mctrl), seqdistance);
        leaveCS(m_RcvBufferLock);

    } while ((res == 0) && !timeout);
            
    if (!m_pRcvBuffer->isRcvDataReady())
    {
        // Kick TsbPd thread to schedule next wakeup (if running)
        if (m_bTsbPd)
            tscond.signal_locked(recvguard);

        s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_IN, false);
    }
    
    return res;
}

processData(..)

CUDT::processData(CUnit* ) is called from the internal receiving thread.

CUDT::processData(..) (click to expand/collapse)

{
    const bool need_tsbpd = m_bTsbPd || m_bGroupTsbPd;

    // We are receiving data, start tsbpd thread if TsbPd is enabled
    if (need_tsbpd && !m_RcvTsbPdThread.joinable())
    {
        StartThread(m_RcvTsbPdThread, CUDT::tsbpd, this, thname);
    }
    
    {
        UniqueLock recvbuf_acklock(m_RcvBufferLock);
        m_pRcvBuffer->addData(*i, offset);
    }

    // Wake up TSBPD on loss to reschedule possible TL drop
    if (!srt_loss_seqs.empty() && m_bTsbPd)
        CSync::lock_signal(m_RcvTsbPdCond, m_RecvLock);

    if (!filter_loss_seqs.empty() && m_bTsbPd)
        CSync::lock_signal(m_RcvTsbPdCond, m_RecvLock);
}

releaseSynch()

CUDT::releaseSynch() is called on:

  • UMSG_SHUTDOWN
  • CUDT::checkExpTimer
  • in CUDT::processData (SEQUENCE DISCREPANCY. BREAKING CONNECTION)
  • in srt_close() or sending thread in non-blocking mode
  • in Garbage Collector thread (checkBrokenSockets(..) or makeClosed())
  • in srt_connect
CUDT::releaseSynch() (click to expand/collapse)

{
    // wake up user calls
    CSync::lock_signal(m_SendBlockCond, m_SendBlockLock);

    enterCS(m_SendLock);
    leaveCS(m_SendLock);

    m_ReadyToReadEvent.lock_notify_one();
    CSync::lock_signal(m_RcvTsbPdCond, m_RecvLock);

    // TODO: [SYNC] Protect TBBPD Thread join
    //enterCS(m_NewDataReceivedLock);
    if (m_RcvTsbPdThread.joinable())
    {
        m_RcvTsbPdThread.join();
    }
    //leaveCS(m_NewDataReceivedLock);

    enterCS(m_RecvLock);
    leaveCS(m_RecvLock);
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
[core] Area: Changes in SRT library core Epic
Projects
None yet
Development

No branches or pull requests

3 participants