diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 0e75ad190..252663852 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -5708,10 +5708,10 @@ int CUDT::sendmsg2(const char* data, int len, ref_t r_mctrl) // delay the EXP timer to avoid mis-fired timeout uint64_t currtime_tk; CTimer::rdtsc(currtime_tk); - // (fix keepalive) m_ullLastRspTime_tk = currtime_tk; - m_ullLastRspAckTime_tk = currtime_tk; - m_iReXmitCount = 1; // Is also modified in checkRexmitTimer and processCtrlAck (receiver's thread) + CGuard ack_lock(m_RecvAckLock); + m_ullLastRspAckTime_tk = currtime_tk; // (fix keepalive) + m_iReXmitCount = 1; // can be modified in checkRexmitTimer and processCtrlAck (receiver's thread) } // checkNeedDrop(...) may lock m_RecvAckLock @@ -5817,20 +5817,24 @@ int CUDT::sendmsg2(const char* data, int len, ref_t r_mctrl) size = min(len, sndBuffersLeft() * m_iMaxSRTPayloadSize); } - // insert the user buffer into the sending list - // This should be protected by a mutex. m_SendLock does this. - m_pSndBuffer->addBuffer(data, size, mctrl.msgttl, mctrl.inorder, mctrl.srctime, Ref(mctrl.msgno)); - HLOGC(dlog.Debug, log << CONID() << "sock:SENDING srctime: " << mctrl.srctime << "us DATA SIZE: " << size); + { + CGuard recvAckLock(m_RecvAckLock); + // insert the user buffer into the sending list + // This should be protected by a mutex. m_SendLock does this. + m_pSndBuffer->addBuffer(data, size, mctrl.msgttl, mctrl.inorder, mctrl.srctime, Ref(mctrl.msgno)); + HLOGC(dlog.Debug, log << CONID() << "sock:SENDING srctime: " << mctrl.srctime << "us DATA SIZE: " << size); + + if (sndBuffersLeft() < 1) // XXX Not sure if it should test if any space in the buffer, or as requried. + { + // write is not available any more + s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false); + } + } // insert this socket to the snd list if it is not on the list yet + // m_pSndUList->pop may lock CSndUList::m_ListLock and then m_RecvAckLock m_pSndQueue->m_pSndUList->update(this, CSndUList::rescheduleIf(bCongestion)); - if (sndBuffersLeft() < 1) // XXX Not sure if it should test if any space in the buffer, or as requried. - { - // write is not available any more - s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false); - } - #ifdef SRT_ENABLE_ECN if (bCongestion) throw CUDTException(MJ_AGAIN, MN_CONGESTION, 0); @@ -6158,24 +6162,27 @@ int64_t CUDT::sendfile(fstream& ifs, int64_t& offset, int64_t size, int block) CGuard::leaveCS(m_StatsLock); } - int64_t sentsize = m_pSndBuffer->addBufferFromFile(ifs, unitsize); - - if (sentsize > 0) { - tosend -= sentsize; - offset += sentsize; + CGuard recvAckLock(m_RecvAckLock); + const int64_t sentsize = m_pSndBuffer->addBufferFromFile(ifs, unitsize); + + if (sentsize > 0) + { + tosend -= sentsize; + offset += sentsize; + } + + if (sndBuffersLeft() <= 0) + { + // write is not available any more + s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false); + } } // insert this socket to snd list if it is not on the list yet m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE); } - if (sndBuffersLeft() <= 0) - { - // write is not available any more - s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false); - } - return size - tosend; } @@ -7135,44 +7142,36 @@ void CUDT::sendCtrl(UDTMessageType pkttype, const void* lparam, void* rparam, in void CUDT::updateSndLossListOnACK(int32_t ackdata_seqno) { - int offset = 0; - // protect packet retransmission + // Update sender's loss list and acknowledge packets in the sender's buffer { - // m_RecvAckLock protects sender's loss list + // m_RecvAckLock protects sender's loss list and epoll CGuard ack_lock(m_RecvAckLock); - offset = CSeqNo::seqoff(m_iSndLastDataAck, ackdata_seqno); + const int offset = CSeqNo::seqoff(m_iSndLastDataAck, ackdata_seqno); // IF distance between m_iSndLastDataAck and ack is nonempty... if (offset <= 0) return; - HLOGC(mglog.Debug, log << CONID() << "ACK covers: " << m_iSndLastDataAck << " - " << ackdata_seqno - << " [ACK=" << m_iSndLastAck << "]"); - // update sending variables m_iSndLastDataAck = ackdata_seqno; // remove any loss that predates 'ack' (not to be considered loss anymore) m_pSndLossList->remove(CSeqNo::decseq(m_iSndLastDataAck)); - } - { // acknowledge the sending buffer (remove data that predate 'ack') m_pSndBuffer->ackData(offset); - CGuard ack_lock(m_SendLock); - - if (m_bSynSending) - { - CGuard lk(m_SendBlockLock); - pthread_cond_signal(&m_SendBlockCond); - } - // acknowledde any waiting epolls to write s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true); + } - // insert this socket to snd list if it is not on the list yet - m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE); + // insert this socket to snd list if it is not on the list yet + m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE); + + if (m_bSynSending) + { + CGuard lk(m_SendBlockLock); + pthread_cond_signal(&m_SendBlockCond); } const int64_t currtime = CTimer::getTime(); @@ -7182,7 +7181,6 @@ void CUDT::updateSndLossListOnACK(int32_t ackdata_seqno) m_stats.m_sndDurationTotal += currtime - m_stats.sndDurationCounter; m_stats.sndDurationCounter = currtime; CGuard::leaveCS(m_StatsLock); - } void CUDT::processCtrlAck(const CPacket& ctrlpkt, const uint64_t currtime_tk) @@ -7190,20 +7188,24 @@ void CUDT::processCtrlAck(const CPacket& ctrlpkt, const uint64_t currtime_tk) const int32_t* ackdata = (const int32_t*)ctrlpkt.m_pcData; const int32_t ackdata_seqno = ackdata[ACKD_RCVLASTACK]; + const bool isLiteAck = ctrlpkt.getLength() == (size_t)SEND_LITE_ACK; + HLOGC(mglog.Debug, log << CONID() << "ACK covers: " << m_iSndLastDataAck << " - " << ackdata_seqno + << " [ACK=" << m_iSndLastAck << "]" << (isLiteAck ? "[LITE]" : "[FULL]")); + + updateSndLossListOnACK(ackdata_seqno); + // Process a lite ACK - if (ctrlpkt.getLength() == (size_t)SEND_LITE_ACK) + if (isLiteAck) { if (CSeqNo::seqcmp(ackdata_seqno, m_iSndLastAck) >= 0) { + CGuard ack_lock(m_RecvAckLock); m_iFlowWindowSize -= CSeqNo::seqoff(m_iSndLastAck, ackdata_seqno); - HLOGC(mglog.Debug, log << CONID() << "ACK covers: " << m_iSndLastDataAck << " - " << ack << " [ACK=" << m_iSndLastAck << "] (FLW: " << m_iFlowWindowSize << ") [LITE]"); - m_iSndLastAck = ackdata_seqno; // TODO: m_ullLastRspAckTime_tk should be protected with m_RecvAckLock // because the sendmsg2 may want to change it at the same time. m_ullLastRspAckTime_tk = currtime_tk; - m_iReXmitCount = 1; // Reset re-transmit count since last ACK } @@ -7274,48 +7276,10 @@ void CUDT::processCtrlAck(const CPacket& ctrlpkt, const uint64_t currtime_tk) } m_iSndLastFullAck = ackdata_seqno; - int offset = CSeqNo::seqoff(m_iSndLastDataAck, ackdata_seqno); - // IF distance between m_iSndLastDataAck and ack is nonempty... - if (offset > 0) { - // acknowledge the sending buffer (remove data that predate 'ack') - m_pSndBuffer->ackData(offset); - - const int64_t currtime = CTimer::getTime(); - // record total time used for sending - CGuard::enterCS(m_StatsLock); - m_stats.sndDuration += currtime - m_stats.sndDurationCounter; - m_stats.m_sndDurationTotal += currtime - m_stats.sndDurationCounter; - m_stats.sndDurationCounter = currtime; - CGuard::leaveCS(m_StatsLock); - - HLOGC(mglog.Debug, log << CONID() << "ACK covers: " << m_iSndLastDataAck << " - " << ackdata_seqno - << " [ACK=" << m_iSndLastAck << "] BUFr=" << m_iFlowWindowSize - << " RTT=" << ackdata[ACKD_RTT] << " RTT*=" << ackdata[ACKD_RTTVAR] - << " BW=" << ackdata[ACKD_BANDWIDTH] << " Vrec=" << ackdata[ACKD_RCVSPEED]); - // update sending variables - m_iSndLastDataAck = ackdata_seqno; - - // remove any loss that predates 'ack' (not to be considered loss anymore) - m_pSndLossList->remove(CSeqNo::decseq(m_iSndLastDataAck)); - } - - // // END of the new code with TLPKTDROP // - CGuard::leaveCS(m_RecvAckLock); - if (m_bSynSending) - { - CGuard lk(m_SendBlockLock); - pthread_cond_signal(&m_SendBlockCond); - } - - // acknowledde any waiting epolls to write - s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true); - - // insert this socket to snd list if it is not on the list yet - m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE); size_t acksize = ctrlpkt.getLength(); // TEMPORARY VALUE FOR CHECKING bool wrongsize = 0 != (acksize % ACKD_FIELD_SIZE); diff --git a/srtcore/core.h b/srtcore/core.h index e711654ed..5b78e374a 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -705,7 +705,7 @@ class CUDT pthread_mutex_t m_RcvBufferLock; // Protects the state of the m_pRcvBuffer // Protects access to m_iSndCurrSeqNo, m_iSndLastAck - pthread_mutex_t m_RecvAckLock; // Protects the state changes while processing incomming ACK + pthread_mutex_t m_RecvAckLock; // Protects the state changes while processing incomming ACK (UDT_EPOLL_OUT) pthread_cond_t m_RecvDataCond; // used to block "recv" when there is no data