Skip to content

Commit

Permalink
[core] Refactoring: added packUniqueData(..) func
Browse files Browse the repository at this point in the history
  • Loading branch information
maxsharabayko committed Jan 24, 2022
1 parent 8518558 commit a31e618
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 161 deletions.
37 changes: 13 additions & 24 deletions srtcore/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -410,30 +410,19 @@ int CSndBuffer::readData(CPacket& w_packet, steady_clock::time_point& w_srctime,
w_packet.setLength(readlen);
w_packet.m_iSeqNo = m_pCurrBlock->m_iSeqNo;

// XXX This is probably done because the encryption should happen
// just once, and so this sets the encryption flags to both msgno bitset
// IN THE PACKET and IN THE BLOCK. This is probably to make the encryption
// happen at the time when scheduling a new packet to send, but the packet
// must remain in the send buffer until it's ACKed. For the case of rexmit
// the packet will be taken "as is" (that is, already encrypted).
//
// The problem is in the order of things:
// 0. When the application stores the data, some of the flags for PH_MSGNO are set.
// 1. The readData() is called to get the original data sent by the application.
// 2. The data are original and must be encrypted. They WILL BE encrypted, later.
// 3. So far we are in readData() so the encryption flags must be updated NOW because
// later we won't have access to the block's data.
// 4. After exiting from readData(), the packet is being encrypted. It's immediately
// sent, however the data must remain in the sending buffer until they are ACKed.
// 5. In case when rexmission is needed, the second overloaded version of readData
// is being called, and the buffer + PH_MSGNO value is extracted. All interesting
// flags must be present and correct at that time.
//
// The only sensible way to fix this problem is to encrypt the packet not after
// extracting from here, but when the packet is stored into CSndBuffer. The appropriate
// flags for PH_MSGNO will be applied directly there. Then here the value for setting
// PH_MSGNO will be set as is.

// 1. On submission (addBuffer), the KK flag is set to EK_NOENC (0).
// 2. The readData() is called to get the original (unique) payload not ever sent yet.
// The payload must be encrypted for the first time if the encryption
// is enabled (arg kflgs != EK_NOENC). The KK encryption flag of the data packet
// header must be set and remembered accordingly (see EncryptionKeySpec).
// 3. The next time this packet is read (only for retransmission), the payload is already
// encrypted, and the proper flag value is already stored.

// TODO: Alternatively, encryption could happen before the packet is submitted to the buffer
// (before the addBuffer() call), and corresponding flags could be set accordingly.
// This may also put an encryption burden on the application thread, rather than the sending thread,
// which could be more efficient. Note that packet sequence number must be properly set in that case,
// as it is used as a counter for the AES encryption.
if (kflgs == -1)
{
HLOGC(bslog.Debug, log << CONID() << " CSndBuffer: ERROR: encryption required and not possible. NOT SENDING.");
Expand Down
256 changes: 126 additions & 130 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9189,16 +9189,14 @@ int srt::CUDT::packLostData(CPacket& w_packet, steady_clock::time_point& w_origi
return 0;
}

std::pair<int, steady_clock::time_point> srt::CUDT::packData(CPacket& w_packet)
std::pair<bool, steady_clock::time_point> srt::CUDT::packData(CPacket& w_packet)
{
int payload = 0;
bool probe = false;
steady_clock::time_point origintime;
bool new_packet_packed = false;
bool filter_ctl_pkt = false;

int kflg = EK_NOENC;

const steady_clock::time_point enter_time = steady_clock::now();

if (!is_zero(m_tsNextSendTime) && enter_time > m_tsNextSendTime)
Expand All @@ -9217,7 +9215,7 @@ std::pair<int, steady_clock::time_point> srt::CUDT::packData(CPacket& w_packet)
// start the dissolving process, this process will
// not be started until this function is finished.
if (!m_bOpened)
return std::make_pair(0, enter_time);
return std::make_pair(false, enter_time);

payload = packLostData((w_packet), (origintime));
if (payload > 0)
Expand All @@ -9233,122 +9231,24 @@ std::pair<int, steady_clock::time_point> srt::CUDT::packData(CPacket& w_packet)
filter_ctl_pkt = true; // Mark that this packet ALREADY HAS timestamp field and it should not be set

// Stats
{
ScopedLock lg(m_StatsLock);
m_stats.sndr.sentFilterExtra.count(1);
}
ScopedLock lg(m_StatsLock);
m_stats.sndr.sentFilterExtra.count(1);
}
else
{
// If no loss, and no packetfilter control packet, pack a new packet.

// Check the congestion/flow window limit
const int cwnd = std::min(int(m_iFlowWindowSize), int(m_dCongestionWindow));
const int flightspan = getFlightSpan();
if (cwnd > flightspan)
{
// XXX Here it's needed to set kflg to msgno_bitset in the block stored in the
// send buffer. This should be somehow avoided, the crypto flags should be set
// together with encrypting, and the packet should be sent as is, when rexmitting.
// It would be nice to research as to whether CSndBuffer::Block::m_iMsgNoBitset field
// isn't a useless redundant state copy. If it is, then taking the flags here can be removed.
kflg = m_pCryptoControl->getSndCryptoFlags();
int pktskipseqno = 0;
payload = m_pSndBuffer->readData((w_packet), (origintime), kflg, (pktskipseqno));
if (pktskipseqno)
{
// Some packets were skipped due to TTL expiry.
m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo, pktskipseqno);
}

if (payload)
{
// A CHANGE. The sequence number is currently added to the packet
// when scheduling, not when extracting. This is a inter-migration form,
// so still override the value, but trace it.
m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo);

// Do this checking only for groups and only at the very first moment,
// when there's still nothing in the buffer. Otherwise there will be
// a serious data discrepancy between the agent and the peer.
// After increasing by 1, but being previously set as ISN-1, this should be == ISN,
// if this is the very first packet to send.
#if ENABLE_EXPERIMENTAL_BONDING
// Fortunately here is only the procedure that verifies if the extraction
// sequence is moved due to the difference between ISN caught during the existing
// transmission and the first sequence possible to be used at the first sending
// instruction. The group itself isn't being accessed.
if (m_parent->m_GroupOf && m_iSndCurrSeqNo != w_packet.m_iSeqNo && m_iSndCurrSeqNo == m_iISN)
{
const int packetspan = CSeqNo::seqcmp(w_packet.m_iSeqNo, m_iSndCurrSeqNo);

HLOGC(qslog.Debug, log << CONID() << "packData: Fixing EXTRACTION sequence " << m_iSndCurrSeqNo
<< " from SCHEDULING sequence " << w_packet.m_iSeqNo
<< " DIFF: " << packetspan << " STAMP:" << BufferStamp(w_packet.m_pcData, w_packet.getLength()));

// This is the very first packet to be sent; so there's nothing in
// the sending buffer yet, and therefore we are in a situation as just
// after connection. No packets in the buffer, no packets are sent,
// no ACK to be awaited. We can screw up all the variables that are
// initialized from ISN just after connection.
//
// Additionally send the drop request to the peer so that it
// won't stupidly request the packets to be retransmitted.
// Don't do it if the difference isn't positive or exceeds the threshold.
if (packetspan > 0)
{
int32_t seqpair[2];
seqpair[0] = m_iSndCurrSeqNo;
seqpair[1] = w_packet.m_iSeqNo;
HLOGC(qslog.Debug, log << "... sending INITIAL DROP (ISN FIX): "
<< "msg=" << MSGNO_SEQ::unwrap(w_packet.m_iMsgNo) << " SEQ:"
<< seqpair[0] << " - " << seqpair[1] << "(" << packetspan << " packets)");
sendCtrl(UMSG_DROPREQ, &w_packet.m_iMsgNo, seqpair, sizeof(seqpair));

// In case when this message is lost, the peer will still get the
// UMSG_DROPREQ message when the agent realizes that the requested
// packet are not present in the buffer (preadte the send buffer).
}
}
else
#endif
{
HLOGC(qslog.Debug, log << CONID() << "packData: Applying EXTRACTION sequence " << m_iSndCurrSeqNo
<< " over SCHEDULING sequence " << w_packet.m_iSeqNo
<< " DIFF: " << CSeqNo::seqcmp(m_iSndCurrSeqNo, w_packet.m_iSeqNo)
<< " STAMP:" << BufferStamp(w_packet.m_pcData, w_packet.getLength()));

#if ENABLE_EXPERIMENTAL_BONDING
HLOGC(qslog.Debug, log << "... CONDITION: IN GROUP: " << (m_parent->m_GroupOf ? "yes":"no")
<< " extraction-seq=" << m_iSndCurrSeqNo << " scheduling-seq=" << w_packet.m_iSeqNo << " ISN=" << m_iISN);
#endif

// Do this always when not in a group,
w_packet.m_iSeqNo = m_iSndCurrSeqNo;
}

// every 16 (0xF) packets, a packet pair is sent
if ((w_packet.m_iSeqNo & PUMASK_SEQNO_PROBE) == 0)
probe = true;

new_packet_packed = true;
}
else
{
m_tsNextSendTime = steady_clock::time_point();
m_tdSendTimeDiff = steady_clock::duration();
return std::make_pair(0, enter_time);
}
}
else
if (!packUniqueData(w_packet, origintime))
{
HLOGC(qslog.Debug, log << "packData: CONGESTED: cwnd=min(" << m_iFlowWindowSize << "," << m_dCongestionWindow
<< ")=" << cwnd << " seqlen=(" << m_iSndLastAck << "-" << m_iSndCurrSeqNo << ")=" << flightspan);
m_tsNextSendTime = steady_clock::time_point();
m_tdSendTimeDiff = steady_clock::duration();
return std::make_pair(0, enter_time);
return std::make_pair(false, enter_time);
}
new_packet_packed = true;

// every 16 (0xF) packets, a packet pair is sent
if ((w_packet.m_iSeqNo & PUMASK_SEQNO_PROBE) == 0)
probe = true;

payload = (int) w_packet.getLength();
reason = "normal";
}

Expand Down Expand Up @@ -9389,23 +9289,6 @@ std::pair<int, steady_clock::time_point> srt::CUDT::packData(CPacket& w_packet)

w_packet.m_iID = m_PeerID;

/* Encrypt if 1st time this packet is sent and crypto is enabled */
if (kflg)
{
// XXX Encryption flags are already set on the packet before calling this.
// See readData() above.
if (m_pCryptoControl->encrypt((w_packet)))
{
// Encryption failed
//>>Add stats for crypto failure
LOGC(qslog.Warn, log << "ENCRYPT FAILED - packet won't be sent, size=" << payload);
// Encryption failed
return std::make_pair(-1, enter_time);
}
payload = (int) w_packet.getLength(); /* Cipher may change length */
reason += " (encrypted)";
}

if (new_packet_packed && m_PacketFilter)
{
HLOGC(qslog.Debug, log << "filter: Feeding packet for source clip");
Expand Down Expand Up @@ -9472,7 +9355,120 @@ std::pair<int, steady_clock::time_point> srt::CUDT::packData(CPacket& w_packet)
#endif
}

return std::make_pair(payload, m_tsNextSendTime);
return std::make_pair(payload >= 0, m_tsNextSendTime);
}

bool srt::CUDT::packUniqueData(CPacket& w_packet, time_point& w_origintime)
{
// Check the congestion/flow window limit
const int cwnd = std::min(int(m_iFlowWindowSize), int(m_dCongestionWindow));
const int flightspan = getFlightSpan();
if (cwnd <= flightspan)
{
HLOGC(qslog.Debug, log << "packData: CONGESTED: cwnd=min(" << m_iFlowWindowSize << "," << m_dCongestionWindow
<< ")=" << cwnd << " seqlen=(" << m_iSndLastAck << "-" << m_iSndCurrSeqNo << ")=" << flightspan);
return false;
}

// XXX Here it's needed to set kflg to msgno_bitset in the block stored in the
// send buffer. This should be somehow avoided, the crypto flags should be set
// together with encrypting, and the packet should be sent as is, when rexmitting.
// It would be nice to research as to whether CSndBuffer::Block::m_iMsgNoBitset field
// isn't a useless redundant state copy. If it is, then taking the flags here can be removed.
const int kflg = m_pCryptoControl->getSndCryptoFlags();
int pktskipseqno = 0;
const int pld_size = m_pSndBuffer->readData((w_packet), (w_origintime), kflg, (pktskipseqno));
if (pktskipseqno)
{
// Some packets were skipped due to TTL expiry.
m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo, pktskipseqno);
}

if (pld_size == 0)
{
return false;
}

// A CHANGE. The sequence number is currently added to the packet
// when scheduling, not when extracting. This is a inter-migration form,
// so still override the value, but trace it.
m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo);

// Do this checking only for groups and only at the very first moment,
// when there's still nothing in the buffer. Otherwise there will be
// a serious data discrepancy between the agent and the peer.
// After increasing by 1, but being previously set as ISN-1, this should be == ISN,
// if this is the very first packet to send.
#if ENABLE_EXPERIMENTAL_BONDING
// Fortunately here is only the procedure that verifies if the extraction
// sequence is moved due to the difference between ISN caught during the existing
// transmission and the first sequence possible to be used at the first sending
// instruction. The group itself isn't being accessed.
if (m_parent->m_GroupOf && m_iSndCurrSeqNo != w_packet.m_iSeqNo && m_iSndCurrSeqNo == m_iISN)
{
const int packetspan = CSeqNo::seqcmp(w_packet.m_iSeqNo, m_iSndCurrSeqNo);

HLOGC(qslog.Debug, log << CONID() << "packData: Fixing EXTRACTION sequence " << m_iSndCurrSeqNo
<< " from SCHEDULING sequence " << w_packet.m_iSeqNo
<< " DIFF: " << packetspan << " STAMP:" << BufferStamp(w_packet.m_pcData, w_packet.getLength()));

// This is the very first packet to be sent; so there's nothing in
// the sending buffer yet, and therefore we are in a situation as just
// after connection. No packets in the buffer, no packets are sent,
// no ACK to be awaited. We can screw up all the variables that are
// initialized from ISN just after connection.
//
// Additionally send the drop request to the peer so that it
// won't stupidly request the packets to be retransmitted.
// Don't do it if the difference isn't positive or exceeds the threshold.
if (packetspan > 0)
{
int32_t seqpair[2];
seqpair[0] = m_iSndCurrSeqNo;
seqpair[1] = w_packet.m_iSeqNo;
HLOGC(qslog.Debug, log << "... sending INITIAL DROP (ISN FIX): "
<< "msg=" << MSGNO_SEQ::unwrap(w_packet.m_iMsgNo) << " SEQ:"
<< seqpair[0] << " - " << seqpair[1] << "(" << packetspan << " packets)");
sendCtrl(UMSG_DROPREQ, &w_packet.m_iMsgNo, seqpair, sizeof(seqpair));

// In case when this message is lost, the peer will still get the
// UMSG_DROPREQ message when the agent realizes that the requested
// packet are not present in the buffer (preadte the send buffer).
}
}
else
#endif
{
HLOGC(qslog.Debug, log << CONID() << "packData: Applying EXTRACTION sequence " << m_iSndCurrSeqNo
<< " over SCHEDULING sequence " << w_packet.m_iSeqNo
<< " DIFF: " << CSeqNo::seqcmp(m_iSndCurrSeqNo, w_packet.m_iSeqNo)
<< " STAMP:" << BufferStamp(w_packet.m_pcData, w_packet.getLength()));

#if ENABLE_EXPERIMENTAL_BONDING
HLOGC(qslog.Debug, log << "... CONDITION: IN GROUP: " << (m_parent->m_GroupOf ? "yes":"no")
<< " extraction-seq=" << m_iSndCurrSeqNo << " scheduling-seq=" << w_packet.m_iSeqNo << " ISN=" << m_iISN);
#endif

// Do this always when not in a group,
w_packet.m_iSeqNo = m_iSndCurrSeqNo;
}

// Encrypt if 1st time this packet is sent and crypto is enabled
if (kflg != EK_NOENC)
{
// Note that the packet header must have a valid seqno set, as it is used as a counter for encryption.
// Other fields of the data packet header (e.g. timestamp, destination socket ID) are not used for the counter.
// Cypher may change packet length!
if (m_pCryptoControl->encrypt((w_packet)))
{
// Encryption failed
//>>Add stats for crypto failure
LOGC(qslog.Warn, log << "ENCRYPT FAILED - packet won't be sent, size=" << pld_size);
return -1;
}
}

return true;
}

// This is a close request, but called from the
Expand Down
16 changes: 12 additions & 4 deletions srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -1030,15 +1030,23 @@ class CUDT
/// @return payload size on success, <=0 on failure
int packLostData(CPacket &packet, time_point &origintime);

/// Pack a unique data packet (never sent so far) in CPacket for sending.
///
/// @param packet [in, out] a CPacket structure to fill.
/// @param origintime [in, out] origin timestamp of the packet.
///
/// @return true if a packet has been packets; false otherwise.
bool packUniqueData(CPacket& packet, time_point& origintime);

/// Pack in CPacket the next data to be send.
///
/// @param packet [in, out] a CPacket structure to fill
///
/// @return A pair of values is returned (payload, timestamp).
/// The payload tells the size of the payload, packed in CPacket.
/// @return A pair of values is returned (is_payload_valid, timestamp).
/// If is_payload_valid is false, there was nothing packed for sending,
/// and the timestamp value should be ignored.
/// The timestamp is the full source/origin timestamp of the data.
/// If payload is <= 0, consider the timestamp value invalid.
std::pair<int, time_point> packData(CPacket& packet);
std::pair<bool, time_point> packData(CPacket& packet);

int processData(CUnit* unit);
void processClose();
Expand Down
4 changes: 2 additions & 2 deletions srtcore/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -635,10 +635,10 @@ void* srt::CSndQueue::worker(void* param)

// pack a packet from the socket
CPacket pkt;
const std::pair<int, steady_clock::time_point> res_time = u->packData((pkt));
const std::pair<bool, steady_clock::time_point> res_time = u->packData((pkt));

// Check if payload size is invalid.
if (res_time.first <= 0)
if (res_time.first == false)
{
#if defined(SRT_DEBUG_SNDQ_HIGHRATE)
self->m_WorkerStats.lNotReadyPop++;
Expand Down
Loading

0 comments on commit a31e618

Please sign in to comment.