Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TSBPD waits for 10 ms or next ACK if CRcvBuffer has no acknowledged packet #553

Closed
maxsharabayko opened this issue Feb 4, 2019 · 1 comment
Assignees
Labels
[core] Area: Changes in SRT library core Priority: Low Type: Bug Indicates an unexpected problem or unintended behavior
Milestone

Comments

@maxsharabayko
Copy link
Collaborator

maxsharabayko commented Feb 4, 2019

The issue is related to TSBP for the receiving thread.

At some point under certain conditions the receiving may get delayed for up tp 10 ms (usually less) on top of the existing delay of reading a packet. When this happens on the receiving side, the sender drops some packets due to the increased delay of the receiver. This happens when the data from the receiver buffer is read out faster than it is being actually acknowledged (while the data itself may have already been received). Then there will be a timeout for reading until the next number of packets is acknowledged. By acknowledgement I mean sending actual ACK message, because now this is what triggers the receiver buffer to allow you to read those data from it.

Let's have a look at receiveMessage(...) function. If there are no packets in the receiver buffer to read, it will wait for 1s (default) on the m_RecvDataCond. In TSBPD mode the isRcvDataReady(...) function returns false (no packet ready to be read) if there is no ACKnowledged packets in the receiver buffer or the TSBPD time of the next packet is less than current time. See m_iLastAckPos of the CRcvBuffer.
So, the CRcvBuffer will tell there is no packet ready to be read in case there are packets in it, but they are not ACKnowledged.

Reading packets from the receive buffer (click to expand)

int CUDT::receiveMessage(char* data, int len, ref_t<SRT_MSGCTRL> r_mctrl)
{
    // (...)
    do
    {
        if (stillConnected() && !timeout && (!m_pRcvBuffer->isRcvDataReady()))
        {
            /* Kick TsbPd thread to schedule next wakeup (if running) */
            if (m_bTsbPd)
            {
                pthread_cond_signal(&m_RcvTsbPdCond);
            }

            do
            {
                if (CTimer::condTimedWaitUS(&m_RecvDataCond, &m_RecvLock, 1s) == ETIMEDOUT)
                {
                    if (!(m_iRcvTimeOut < 0))
                        timeout = true;
                }
            } while (stillConnected() && !timeout && (!m_pRcvBuffer->isRcvDataReady()));
        }

        res = m_pRcvBuffer->readMsg(data, len, r_mctrl);

        // ...
    } while ((res == 0) && !timeout);
    // (...)
}

The acknowledgement of the packets in the CRcvBuffer happens when an actual ACK packet is sent (see CUDT::sendCtrl(...) calling m_pRcvBuffer->ackData(acksize);).

The acknowledment decision is made in CUDT::checkTimers() based on time (currtime_tk > m_ullNextACKTime_tk) or packet count (m_iPktCount >= m_Smoother->ACKInterval() - default 0).
So by default there is only time interval, that is 10 ms by default.

int ack_interval_tk = m_ullACKInt_tk;    // simplified
m_ullNextACKTime_tk = currtime_tk + ack_interval_tk;
static const int32_t COMM_SYN_INTERVAL_US = 10*1000;
m_ullACKInt_tk = COMM_SYN_INTERVAL_US * m_ullCPUFrequency;

The second way to acknowledge the packets is with quick ACK on every 64 packets if (refer to static const int SELF_CLOCK_INTERVAL = 64;). But it does not force to ACK data in the receiver buffer.
The following code shows the ACK sending function. Note the place where m_pRcvBuffer->ackData(acksize); is called and the condition is signaled pthread_cond_signal(&m_RcvTsbPdCond);.

Sending ACK (click to expand)

void CUDT::sendCtrl(UDTMessageType pkttype, void* lparam, void* rparam, int size)
{
   // (...)
   case UMSG_ACK: //010 - Acknowledgement
      {
      int32_t ack;

      // If there is no loss, the ACK is the current largest sequence number plus 1;
      // Otherwise it is the smallest sequence number in the receiver loss list.
      if (m_pRcvLossList->getLossLength() == 0)
         ack = CSeqNo::incseq(m_iRcvCurrSeqNo);
      else
         ack = m_pRcvLossList->getFirstLostSeq();

      if (m_iRcvLastAckAck == ack)
         break;

      // send out a lite ACK
      // to save time on buffer processing and bandwidth/AS measurement, a lite ACK only feeds back an ACK number
      if (size == SEND_LITE_ACK)
      {
         ctrlpkt.pack(pkttype, NULL, &ack, size);
         ctrlpkt.m_iID = m_PeerID;
         nbsent = m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);
         DebugAck("sendCtrl(lite):" + CONID(), local_prevack, ack);
         break;
      }

      uint64_t currtime_tk;
      CTimer::rdtsc(currtime_tk);

      // There are new received packets to acknowledge, update related information.
      /* tsbpd thread may also call ackData when skipping packet so protect code */
      CGuard::enterCS(m_AckLock);

      // IF ack > m_iRcvLastAck
      if (CSeqNo::seqcmp(ack, m_iRcvLastAck) > 0)
      {
         int acksize = CSeqNo::seqoff(m_iRcvLastSkipAck, ack);

         m_iRcvLastAck = ack;
         m_iRcvLastSkipAck = ack;

         // XXX Unknown as to whether it matters.
         // This if (acksize) causes that ackData() won't be called.
         // With size == 0 it wouldn't do anything except calling CTimer::triggerEvent().
         // This, again, signals the condition, CTimer::m_EventCond.
         // This releases CTimer::waitForEvent() call used in CUDTUnited::selectEx().
         // Preventing to call this on zero size makes sense, if it prevents false alerts.
         if (acksize > 0)
             m_pRcvBuffer->ackData(acksize);
         CGuard::leaveCS(m_AckLock);

         // If TSBPD is enabled, then INSTEAD OF signaling m_RecvDataCond,
         // signal m_RcvTsbPdCond. This will kick in the tsbpd thread, which
         // will signal m_RecvDataCond when there's time to play for particular
         // data packet.

         if (m_bTsbPd)
         {
             /* Newly acknowledged data, signal TsbPD thread */
             pthread_mutex_lock(&m_RecvLock);
             if (m_bTsbPdAckWakeup)
                pthread_cond_signal(&m_RcvTsbPdCond);
             pthread_mutex_unlock(&m_RecvLock);
         }
         else
         {
             if (m_bSynRecving)
             {
                 // signal a waiting "recv" call if there is any data available
                 pthread_mutex_lock(&m_RecvDataLock);
                 pthread_cond_signal(&m_RecvDataCond);
                 pthread_mutex_unlock(&m_RecvDataLock);
             }
             // acknowledge any waiting epolls to read
             s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, true);
         }
         CGuard::enterCS(m_AckLock);
      }

Now let's check the TSBPD thread. It asks the m_pRcvBuffer for the next available packets. Intersting thing will happen, if there are packets in the buffer, but they are not acknowledged.
In this case the TSBPD thread starts waiting for m_RcvTsbPdCond, although there might be a packet in the receiver buffer, and we may retrieve its time. Instead we will effectively wait for the next ACK to happen, that might happen in up to 10 ms.

Sending ACK (click to expand)

void* CUDT::tsbpd(void* param)
{
   // (...)
   CGuard::enterCS(self->m_RecvLock);
   self->m_bTsbPdAckWakeup = true;
   while (!self->m_bClosing)
   {
      // (...)

      {
          // Here we get tsbpdtime
          rxready = self->m_pRcvBuffer->isRcvDataReady(Ref(tsbpdtime), Ref(current_pkt_seq));
      }
      CGuard::leaveCS(self->m_AckLock);

      if (rxready)
      {
         /*
         * There are packets ready to be delivered
         * signal a waiting "recv" call if there is any data available
         */
         if (self->m_bSynRecving)
         {
             pthread_cond_signal(&self->m_RecvDataCond);
         }
         /*
         * Set EPOLL_IN to wakeup any thread waiting on epoll
         */
         self->s_UDTUnited.m_EPoll.update_events(self->m_SocketID, self->m_sPollID, UDT_EPOLL_IN, true);
         CTimer::triggerEvent();
         tsbpdtime = 0;
      }

      if (tsbpdtime != 0)
      {
         int64_t timediff = int64_t(tsbpdtime) - int64_t(CTimer::getTime());
         /*
         * Buffer at head of queue is not ready to play.
         * Schedule wakeup when it will be.
         */
          self->m_bTsbPdAckWakeup = false;
          THREAD_PAUSED();
          HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: FUTURE PACKET seq=" << current_pkt_seq
              << " T=" << logging::FormatTime(tsbpdtime) << " - waiting " << (timediff/1000.0) << "ms");
          CTimer::condTimedWaitUS(&self->m_RcvTsbPdCond, &self->m_RecvLock, timediff);
          THREAD_RESUMED();
      }
      else
      {
         /*
         * We have just signaled epoll; or
         * receive queue is empty; or
         * next buffer to deliver is not in receive queue (missing packet in sequence).
         *
         * Block until woken up by one of the following event:
         * - All ready-to-play packets have been pulled and EPOLL_IN cleared (then loop to block until next pkt time if any)
         * - New buffers ACKed
         * - Closing the connection
         */
         self->m_bTsbPdAckWakeup = true;
         pthread_cond_wait(&self->m_RcvTsbPdCond, &self->m_RecvLock);
      }
   }
   // (...)
}

This situation, when it happens, increases the latency without any reason. Furthermore, increasing the latency increases the number of packets stored in the receiver buffer. At some point this value may reach the 8000 limit.

The situation was reproduced when sending 34 Mbps and above from Windows and receiving on CentOS with the default maxbw value.

Reproduction steps (click to expand)

tsp -I file --infinite "Eden_HD.ts" -P regulate --bitrate 34000000 -O ip localhost:4900

The tsp tool is TSDuck
File can be any.
So the command builds an internal pipeline with file reader (-I file --infinite "Eden_HD.ts"), bitrate regulator (-P regulate --bitrate 34000000) and output component (-O ip localhost:4900)

Windows 10 sender:

srt-test-live udp://:4900 srt://:4200

CentOS 7 receiver:

srt-test-live srt://192.168.0.10:4200 /dev/nul

After some time the sender starts to report SND_DROPPED. That was due to sender now acnowledging the packets and increasing the delay.

@maxsharabayko maxsharabayko added the Type: Bug Indicates an unexpected problem or unintended behavior label Feb 4, 2019
@maxsharabayko maxsharabayko added this to the v.1.3.3 milestone Feb 4, 2019
@maxsharabayko maxsharabayko modified the milestones: v.1.3.3, v.1.3.4 May 29, 2019
@maxsharabayko maxsharabayko modified the milestones: v1.3.4, v1.4.1 Aug 9, 2019
@ethouris ethouris added the [core] Area: Changes in SRT library core label Aug 13, 2019
@maxsharabayko maxsharabayko modified the milestones: v1.4.1, v1.4.2 Nov 4, 2019
@maxsharabayko maxsharabayko self-assigned this Dec 27, 2019
@maxsharabayko maxsharabayko modified the milestones: v1.5.0, v1.5.1 Jun 19, 2020
@mbakholdina mbakholdina modified the milestones: v1.5.1, Backlog May 26, 2021
@maxsharabayko
Copy link
Collaborator Author

Fixed by #1964.

@maxsharabayko maxsharabayko modified the milestones: Backlog, v1.4.5 Jan 26, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
[core] Area: Changes in SRT library core Priority: Low Type: Bug Indicates an unexpected problem or unintended behavior
Projects
None yet
Development

No branches or pull requests

3 participants