Skip to content

Commit

Permalink
[core] Update sender's loss list on lite ACK
Browse files Browse the repository at this point in the history
  • Loading branch information
maxsharabayko authored and rndi committed Oct 15, 2019
1 parent 7870c8a commit 82158b9
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 87 deletions.
136 changes: 50 additions & 86 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5708,10 +5708,10 @@ int CUDT::sendmsg2(const char* data, int len, ref_t<SRT_MSGCTRL> r_mctrl)
// delay the EXP timer to avoid mis-fired timeout
uint64_t currtime_tk;
CTimer::rdtsc(currtime_tk);
// (fix keepalive) m_ullLastRspTime_tk = currtime_tk;
m_ullLastRspAckTime_tk = currtime_tk;

m_iReXmitCount = 1; // Is also modified in checkRexmitTimer and processCtrlAck (receiver's thread)
CGuard ack_lock(m_RecvAckLock);
m_ullLastRspAckTime_tk = currtime_tk; // (fix keepalive)
m_iReXmitCount = 1; // can be modified in checkRexmitTimer and processCtrlAck (receiver's thread)
}

// checkNeedDrop(...) may lock m_RecvAckLock
Expand Down Expand Up @@ -5817,20 +5817,24 @@ int CUDT::sendmsg2(const char* data, int len, ref_t<SRT_MSGCTRL> r_mctrl)
size = min(len, sndBuffersLeft() * m_iMaxSRTPayloadSize);
}

// insert the user buffer into the sending list
// This should be protected by a mutex. m_SendLock does this.
m_pSndBuffer->addBuffer(data, size, mctrl.msgttl, mctrl.inorder, mctrl.srctime, Ref(mctrl.msgno));
HLOGC(dlog.Debug, log << CONID() << "sock:SENDING srctime: " << mctrl.srctime << "us DATA SIZE: " << size);
{
CGuard recvAckLock(m_RecvAckLock);
// insert the user buffer into the sending list
// This should be protected by a mutex. m_SendLock does this.
m_pSndBuffer->addBuffer(data, size, mctrl.msgttl, mctrl.inorder, mctrl.srctime, Ref(mctrl.msgno));
HLOGC(dlog.Debug, log << CONID() << "sock:SENDING srctime: " << mctrl.srctime << "us DATA SIZE: " << size);

if (sndBuffersLeft() < 1) // XXX Not sure if it should test if any space in the buffer, or as requried.
{
// write is not available any more
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false);
}
}

// insert this socket to the snd list if it is not on the list yet
// m_pSndUList->pop may lock CSndUList::m_ListLock and then m_RecvAckLock
m_pSndQueue->m_pSndUList->update(this, CSndUList::rescheduleIf(bCongestion));

if (sndBuffersLeft() < 1) // XXX Not sure if it should test if any space in the buffer, or as requried.
{
// write is not available any more
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false);
}

#ifdef SRT_ENABLE_ECN
if (bCongestion)
throw CUDTException(MJ_AGAIN, MN_CONGESTION, 0);
Expand Down Expand Up @@ -6158,24 +6162,27 @@ int64_t CUDT::sendfile(fstream& ifs, int64_t& offset, int64_t size, int block)
CGuard::leaveCS(m_StatsLock);
}

int64_t sentsize = m_pSndBuffer->addBufferFromFile(ifs, unitsize);

if (sentsize > 0)
{
tosend -= sentsize;
offset += sentsize;
CGuard recvAckLock(m_RecvAckLock);
const int64_t sentsize = m_pSndBuffer->addBufferFromFile(ifs, unitsize);

if (sentsize > 0)
{
tosend -= sentsize;
offset += sentsize;
}

if (sndBuffersLeft() <= 0)
{
// write is not available any more
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false);
}
}

// insert this socket to snd list if it is not on the list yet
m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE);
}

if (sndBuffersLeft() <= 0)
{
// write is not available any more
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false);
}

return size - tosend;
}

Expand Down Expand Up @@ -7135,44 +7142,36 @@ void CUDT::sendCtrl(UDTMessageType pkttype, const void* lparam, void* rparam, in

void CUDT::updateSndLossListOnACK(int32_t ackdata_seqno)
{
int offset = 0;
// protect packet retransmission
// Update sender's loss list and acknowledge packets in the sender's buffer
{
// m_RecvAckLock protects sender's loss list
// m_RecvAckLock protects sender's loss list and epoll
CGuard ack_lock(m_RecvAckLock);

offset = CSeqNo::seqoff(m_iSndLastDataAck, ackdata_seqno);
const int offset = CSeqNo::seqoff(m_iSndLastDataAck, ackdata_seqno);
// IF distance between m_iSndLastDataAck and ack is nonempty...
if (offset <= 0)
return;

HLOGC(mglog.Debug, log << CONID() << "ACK covers: " << m_iSndLastDataAck << " - " << ackdata_seqno
<< " [ACK=" << m_iSndLastAck << "]");

// update sending variables
m_iSndLastDataAck = ackdata_seqno;

// remove any loss that predates 'ack' (not to be considered loss anymore)
m_pSndLossList->remove(CSeqNo::decseq(m_iSndLastDataAck));
}

{
// acknowledge the sending buffer (remove data that predate 'ack')
m_pSndBuffer->ackData(offset);

CGuard ack_lock(m_SendLock);

if (m_bSynSending)
{
CGuard lk(m_SendBlockLock);
pthread_cond_signal(&m_SendBlockCond);
}

// acknowledde any waiting epolls to write
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);
}

// insert this socket to snd list if it is not on the list yet
m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE);
// insert this socket to snd list if it is not on the list yet
m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE);

if (m_bSynSending)
{
CGuard lk(m_SendBlockLock);
pthread_cond_signal(&m_SendBlockCond);
}

const int64_t currtime = CTimer::getTime();
Expand All @@ -7182,28 +7181,31 @@ void CUDT::updateSndLossListOnACK(int32_t ackdata_seqno)
m_stats.m_sndDurationTotal += currtime - m_stats.sndDurationCounter;
m_stats.sndDurationCounter = currtime;
CGuard::leaveCS(m_StatsLock);

}

void CUDT::processCtrlAck(const CPacket& ctrlpkt, const uint64_t currtime_tk)
{
const int32_t* ackdata = (const int32_t*)ctrlpkt.m_pcData;
const int32_t ackdata_seqno = ackdata[ACKD_RCVLASTACK];

const bool isLiteAck = ctrlpkt.getLength() == (size_t)SEND_LITE_ACK;
HLOGC(mglog.Debug, log << CONID() << "ACK covers: " << m_iSndLastDataAck << " - " << ackdata_seqno
<< " [ACK=" << m_iSndLastAck << "]" << (isLiteAck ? "[LITE]" : "[FULL]"));

updateSndLossListOnACK(ackdata_seqno);

// Process a lite ACK
if (ctrlpkt.getLength() == (size_t)SEND_LITE_ACK)
if (isLiteAck)
{
if (CSeqNo::seqcmp(ackdata_seqno, m_iSndLastAck) >= 0)
{
CGuard ack_lock(m_RecvAckLock);
m_iFlowWindowSize -= CSeqNo::seqoff(m_iSndLastAck, ackdata_seqno);
HLOGC(mglog.Debug, log << CONID() << "ACK covers: " << m_iSndLastDataAck << " - " << ack << " [ACK=" << m_iSndLastAck << "] (FLW: " << m_iFlowWindowSize << ") [LITE]");

m_iSndLastAck = ackdata_seqno;

// TODO: m_ullLastRspAckTime_tk should be protected with m_RecvAckLock
// because the sendmsg2 may want to change it at the same time.
m_ullLastRspAckTime_tk = currtime_tk;

m_iReXmitCount = 1; // Reset re-transmit count since last ACK
}

Expand Down Expand Up @@ -7274,48 +7276,10 @@ void CUDT::processCtrlAck(const CPacket& ctrlpkt, const uint64_t currtime_tk)
}
m_iSndLastFullAck = ackdata_seqno;

int offset = CSeqNo::seqoff(m_iSndLastDataAck, ackdata_seqno);
// IF distance between m_iSndLastDataAck and ack is nonempty...
if (offset > 0) {
// acknowledge the sending buffer (remove data that predate 'ack')
m_pSndBuffer->ackData(offset);

const int64_t currtime = CTimer::getTime();
// record total time used for sending
CGuard::enterCS(m_StatsLock);
m_stats.sndDuration += currtime - m_stats.sndDurationCounter;
m_stats.m_sndDurationTotal += currtime - m_stats.sndDurationCounter;
m_stats.sndDurationCounter = currtime;
CGuard::leaveCS(m_StatsLock);

HLOGC(mglog.Debug, log << CONID() << "ACK covers: " << m_iSndLastDataAck << " - " << ackdata_seqno
<< " [ACK=" << m_iSndLastAck << "] BUFr=" << m_iFlowWindowSize
<< " RTT=" << ackdata[ACKD_RTT] << " RTT*=" << ackdata[ACKD_RTTVAR]
<< " BW=" << ackdata[ACKD_BANDWIDTH] << " Vrec=" << ackdata[ACKD_RCVSPEED]);
// update sending variables
m_iSndLastDataAck = ackdata_seqno;

// remove any loss that predates 'ack' (not to be considered loss anymore)
m_pSndLossList->remove(CSeqNo::decseq(m_iSndLastDataAck));
}


//
// END of the new code with TLPKTDROP
//

CGuard::leaveCS(m_RecvAckLock);
if (m_bSynSending)
{
CGuard lk(m_SendBlockLock);
pthread_cond_signal(&m_SendBlockCond);
}

// acknowledde any waiting epolls to write
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);

// insert this socket to snd list if it is not on the list yet
m_pSndQueue->m_pSndUList->update(this, CSndUList::DONT_RESCHEDULE);

size_t acksize = ctrlpkt.getLength(); // TEMPORARY VALUE FOR CHECKING
bool wrongsize = 0 != (acksize % ACKD_FIELD_SIZE);
Expand Down
2 changes: 1 addition & 1 deletion srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ class CUDT
pthread_mutex_t m_RcvBufferLock; // Protects the state of the m_pRcvBuffer

// Protects access to m_iSndCurrSeqNo, m_iSndLastAck
pthread_mutex_t m_RecvAckLock; // Protects the state changes while processing incomming ACK
pthread_mutex_t m_RecvAckLock; // Protects the state changes while processing incomming ACK (UDT_EPOLL_OUT)


pthread_cond_t m_RecvDataCond; // used to block "recv" when there is no data
Expand Down

0 comments on commit 82158b9

Please sign in to comment.