Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

{FEC} Random refactoring and small bugfixes during FEC works. #852

Merged
merged 4 commits into from
Sep 6, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions configure-data.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ set cmake_options {
enable-static "Should libsrt be built as a static library (default: ON)"
enable-suflip "Should suflip tool be built (default: OFF)"
enable-getnameinfo "In-logs sockaddr-to-string should do rev-dns (default: OFF)"
enable-unittests "Enable unit tests (default: OFF)"
enable-thread-check "Enable #include <threadcheck.h> that implements THREAD_* macros"
openssl-crypto-library=<filepath> "Path to a library."
openssl-include-dir=<path> "Path to a file."
Expand Down
2 changes: 1 addition & 1 deletion docs/API-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,7 @@ connection parties.

#### SRT_REJ_FILTER

The `SRTO_FILTER` option has been set differently on both connection
The `SRTO_PACKETFILTER` option has been set differently on both connection
parties (NOTE: this flag may not exist yet in this version).


Expand Down
137 changes: 89 additions & 48 deletions srtcore/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -769,74 +769,78 @@ int CRcvBuffer::addData(CUnit* unit, int offset)
m_iMaxPos = offset + 1;

if (m_pUnit[pos] != NULL) {
HLOGC(dlog.Debug, log << "addData: unit %" << unit->m_Packet.m_iSeqNo
<< " rejected, already exists");
return -1;
}
m_pUnit[pos] = unit;
countBytes(1, (int) unit->m_Packet.getLength());

m_pUnitQueue->makeUnitGood(unit);

HLOGC(dlog.Debug, log << "addData: unit %" << unit->m_Packet.m_iSeqNo
<< " accepted, off=" << offset << " POS=" << pos);
return 0;
}

int CRcvBuffer::readBuffer(char* data, int len)
{
int p = m_iStartPos;
int lastack = m_iLastAckPos;
int rs = len;
int p = m_iStartPos;
int lastack = m_iLastAckPos;
int rs = len;
#if ENABLE_HEAVY_LOGGING
char* begin = data;
char* begin = data;
#endif

const uint64_t now = (m_bTsbPdMode ? CTimer::getTime() : uint64_t());
const uint64_t now = (m_bTsbPdMode ? CTimer::getTime() : uint64_t());

HLOGC(dlog.Debug, log << CONID() << "readBuffer: start=" << p << " lastack=" << lastack);
while ((p != lastack) && (rs > 0))
{
if (m_pUnit[p] == NULL)
{
LOGC(dlog.Error, log << CONID() << " IPE readBuffer on null packet pointer");
return -1;
}
HLOGC(dlog.Debug, log << CONID() << "readBuffer: start=" << p << " lastack=" << lastack);
while ((p != lastack) && (rs > 0))
{
if (m_pUnit[p] == NULL)
{
LOGC(dlog.Error, log << CONID() << " IPE readBuffer on null packet pointer");
return -1;
}

if (m_bTsbPdMode)
{
HLOGC(dlog.Debug, log << CONID() << "readBuffer: chk if time2play: NOW=" << now << " PKT TS=" << getPktTsbPdTime(m_pUnit[p]->m_Packet.getMsgTimeStamp()));
if ((getPktTsbPdTime(m_pUnit[p]->m_Packet.getMsgTimeStamp()) > now))
break; /* too early for this unit, return whatever was copied */
}
if (m_bTsbPdMode)
{
HLOGC(dlog.Debug, log << CONID() << "readBuffer: chk if time2play: NOW=" << now << " PKT TS=" << getPktTsbPdTime(m_pUnit[p]->m_Packet.getMsgTimeStamp()));
if ((getPktTsbPdTime(m_pUnit[p]->m_Packet.getMsgTimeStamp()) > now))
break; /* too early for this unit, return whatever was copied */
}

int unitsize = (int) m_pUnit[p]->m_Packet.getLength() - m_iNotch;
if (unitsize > rs)
unitsize = rs;
int unitsize = (int) m_pUnit[p]->m_Packet.getLength() - m_iNotch;
if (unitsize > rs)
unitsize = rs;

HLOGC(dlog.Debug, log << CONID() << "readBuffer: copying buffer #" << p
<< " targetpos=" << int(data-begin) << " sourcepos=" << m_iNotch << " size=" << unitsize << " left=" << (unitsize-rs));
memcpy(data, m_pUnit[p]->m_Packet.m_pcData + m_iNotch, unitsize);
data += unitsize;
HLOGC(dlog.Debug, log << CONID() << "readBuffer: copying buffer #" << p
<< " targetpos=" << int(data-begin) << " sourcepos=" << m_iNotch << " size=" << unitsize << " left=" << (unitsize-rs));
memcpy(data, m_pUnit[p]->m_Packet.m_pcData + m_iNotch, unitsize);
data += unitsize;

if ((rs > unitsize) || (rs == int(m_pUnit[p]->m_Packet.getLength()) - m_iNotch))
{
CUnit* tmp = m_pUnit[p];
m_pUnit[p] = NULL;
m_pUnitQueue->makeUnitFree(tmp);
if ((rs > unitsize) || (rs == int(m_pUnit[p]->m_Packet.getLength()) - m_iNotch))
{
CUnit* tmp = m_pUnit[p];
m_pUnit[p] = NULL;
m_pUnitQueue->makeUnitFree(tmp);

if (++ p == m_iSize)
p = 0;
if (++ p == m_iSize)
p = 0;

m_iNotch = 0;
}
else
m_iNotch += rs;
m_iNotch = 0;
}
else
m_iNotch += rs;

rs -= unitsize;
}
rs -= unitsize;
}

/* we removed acked bytes form receive buffer */
countBytes(-1, -(len - rs), true);
m_iStartPos = p;
/* we removed acked bytes form receive buffer */
countBytes(-1, -(len - rs), true);
m_iStartPos = p;

return len - rs;
return len - rs;
}

int CRcvBuffer::readBufferToFile(fstream& ofs, int len)
Expand Down Expand Up @@ -936,10 +940,12 @@ bool CRcvBuffer::getRcvFirstMsg(ref_t<uint64_t> r_tsbpdtime, ref_t<bool> r_passa
/* Check the acknowledged packets */
if (getRcvReadyMsg(r_tsbpdtime, r_curpktseq))
{
HLOGC(dlog.Debug, log << "getRcvFirstMsg: ready CONTIG packet: %" << (*r_curpktseq));
return true;
}
else if (*r_tsbpdtime != 0)
{
HLOGC(dlog.Debug, log << "getRcvFirstMsg: no packets found");
return false;
}

Expand Down Expand Up @@ -993,6 +999,7 @@ bool CRcvBuffer::getRcvFirstMsg(ref_t<uint64_t> r_tsbpdtime, ref_t<bool> r_passa
{
/* There are packets in the sequence not received yet */
haslost = true;
HLOGC(dlog.Debug, log << "getRcvFirstMsg: empty hole at *" << i);
}
else
{
Expand All @@ -1011,13 +1018,18 @@ bool CRcvBuffer::getRcvFirstMsg(ref_t<uint64_t> r_tsbpdtime, ref_t<bool> r_passa
*r_curpktseq = skipseqno;
}

HLOGC(dlog.Debug, log << "getRcvFirstMsg: found ready packet, nSKIPPED: "
<< ((i - m_iLastAckPos + m_iSize) % m_iSize));

// NOTE: if haslost is not set, it means that this is the VERY FIRST
// packet, that is, packet currently at pos = m_iLastAckPos. There's no
// possibility that it is so otherwise because:
// - if this first good packet is ready to play, THIS HERE RETURNS NOW.
// ...
return true;
}
HLOGC(dlog.Debug, log << "getRcvFirstMsg: found NOT READY packet, nSKIPPED: "
<< ((i - m_iLastAckPos + m_iSize) % m_iSize));
// ... and if this first good packet WASN'T ready to play, THIS HERE RETURNS NOW, TOO,
// just states that there's no ready packet to play.
// ...
Expand All @@ -1027,6 +1039,7 @@ bool CRcvBuffer::getRcvFirstMsg(ref_t<uint64_t> r_tsbpdtime, ref_t<bool> r_passa
// the 'haslost' is set, which means that it continues only to find the first valid
// packet after stating that the very first packet isn't valid.
}
HLOGC(dlog.Debug, log << "getRcvFirstMsg: found NO PACKETS");
return false;
}

Expand All @@ -1048,6 +1061,9 @@ bool CRcvBuffer::getRcvReadyMsg(ref_t<uint64_t> tsbpdtime, ref_t<int32_t> curpkt
/* Skip any invalid skipped/dropped packets */
if (m_pUnit[i] == NULL)
{
HLOGC(mglog.Debug, log << "getRcvReadyMsg: POS=" << i
<< " +" << ((i - m_iStartPos + m_iSize) % m_iSize)
<< " SKIPPED - no unit there");
if (++ m_iStartPos == m_iSize)
m_iStartPos = 0;
continue;
Expand All @@ -1057,6 +1073,9 @@ bool CRcvBuffer::getRcvReadyMsg(ref_t<uint64_t> tsbpdtime, ref_t<int32_t> curpkt

if (m_pUnit[i]->m_iFlag != CUnit::GOOD)
{
HLOGC(mglog.Debug, log << "getRcvReadyMsg: POS=" << i
<< " +" << ((i - m_iStartPos + m_iSize) % m_iSize)
<< " SKIPPED - unit not good");
freeunit = true;
}
else
Expand All @@ -1065,7 +1084,10 @@ bool CRcvBuffer::getRcvReadyMsg(ref_t<uint64_t> tsbpdtime, ref_t<int32_t> curpkt
int64_t towait = (*tsbpdtime - CTimer::getTime());
if (towait > 0)
{
HLOGC(mglog.Debug, log << "getRcvReadyMsg: found packet, but not ready to play (only in " << (towait/1000.0) << "ms)");
HLOGC(mglog.Debug, log << "getRcvReadyMsg: POS=" << i
<< " +" << ((i - m_iStartPos + m_iSize) % m_iSize)
<< " pkt %" << curpktseq.get()
<< " NOT ready to play (only in " << (towait/1000.0) << "ms)");
return false;
}

Expand All @@ -1076,13 +1098,17 @@ bool CRcvBuffer::getRcvReadyMsg(ref_t<uint64_t> tsbpdtime, ref_t<int32_t> curpkt
}
else
{
HLOGC(mglog.Debug, log << "getRcvReadyMsg: packet seq=" << curpktseq.get() << " ready to play (delayed " << (-towait/1000.0) << "ms)");
HLOGC(mglog.Debug, log << "getRcvReadyMsg: POS=" << i
<< " +" << ((i - m_iStartPos + m_iSize) % m_iSize)
<< " pkt %" << curpktseq.get()
<< " ready to play (delayed " << (-towait/1000.0) << "ms)");
return true;
}
}

if (freeunit)
{
HLOGC(mglog.Debug, log << "getRcvReadyMsg: POS=" << i << " FREED");
/* removed skipped, dropped, undecryptable bytes from rcv buffer */
const int rmbytes = (int)m_pUnit[i]->m_Packet.getLength();
countBytes(-1, -rmbytes, true);
Expand Down Expand Up @@ -1598,6 +1624,8 @@ int CRcvBuffer::readMsg(char* data, int len, ref_t<SRT_MSGCTRL> r_msgctl)

const int unitsize = ((rs >= 0) && (unitsize > rs)) ? rs : pktlen;

HLOGC(mglog.Debug, log << "readMsg: checking unit POS=" << p);

if (unitsize > 0)
{
memcpy(data, m_pUnit[p]->m_Packet.m_pcData, unitsize);
Expand All @@ -1611,8 +1639,9 @@ int CRcvBuffer::readMsg(char* data, int len, ref_t<SRT_MSGCTRL> r_msgctl)
{
static uint64_t prev_now;
static uint64_t prev_srctime;
CPacket& pkt = m_pUnit[p]->m_Packet;

int32_t seq = m_pUnit[p]->m_Packet.m_iSeqNo;
int32_t seq = pkt.m_iSeqNo;

uint64_t nowtime = CTimer::getTime();
//CTimer::rdtsc(nowtime);
Expand All @@ -1622,23 +1651,35 @@ int CRcvBuffer::readMsg(char* data, int len, ref_t<SRT_MSGCTRL> r_msgctl)
int64_t nowdiff = prev_now ? (nowtime - prev_now) : 0;
uint64_t srctimediff = prev_srctime ? (srctime - prev_srctime) : 0;

HLOGC(dlog.Debug, log << CONID() << "readMsg: DELIVERED seq=" << seq << " T=" << FormatTime(srctime) << " in " << (timediff/1000.0) << "ms - "
"TIME-PREVIOUS: PKT: " << (srctimediff/1000.0) << " LOCAL: " << (nowdiff/1000.0));
HLOGC(dlog.Debug, log << CONID() << "readMsg: DELIVERED seq=" << seq
<< " from POS=" << p << " T="
<< FormatTime(srctime) << " in " << (timediff/1000.0)
<< "ms - TIME-PREVIOUS: PKT: " << (srctimediff/1000.0)
<< " LOCAL: " << (nowdiff/1000.0)
<< " !" << BufferStamp(pkt.m_pcData, pkt.getLength()));

prev_now = nowtime;
prev_srctime = srctime;
}
#endif
}
else
{
HLOGC(dlog.Debug, log << CONID() << "readMsg: SKIPPED POS=" << p << " - ZERO SIZE UNIT");
}

if (!passack)
{
HLOGC(dlog.Debug, log << CONID() << "readMsg: FREEING UNIT POS=" << p);
CUnit* tmp = m_pUnit[p];
m_pUnit[p] = NULL;
m_pUnitQueue->makeUnitFree(tmp);
}
else
{
HLOGC(dlog.Debug, log << CONID() << "readMsg: PASSACK UNIT POS=" << p);
m_pUnit[p]->m_iFlag = CUnit::PASSACK;
}

if (++ p == m_iSize)
p = 0;
Expand Down
8 changes: 8 additions & 0 deletions srtcore/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,14 @@ class CRcvBuffer
uint64_t getPktTsbPdTime(uint32_t timestamp);
int debugGetSize() const;
bool empty() const;

// Required by PacketFilter facility to use as a storage
// for provided packets
CUnitQueue* getUnitQueue()
{
return m_pUnitQueue;
}

private:

/// thread safe bytes counter of the Recv & Ack buffer
Expand Down
60 changes: 60 additions & 0 deletions srtcore/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,66 @@ int CChannel::sendto(const sockaddr* addr, CPacket& packet) const
<< spec.str());
#endif

#ifdef SRT_TEST_FAKE_LOSS

#define FAKELOSS_STRING_0(x) #x
#define FAKELOSS_STRING(x) FAKELOSS_STRING_0(x)
const char* fakeloss_text = FAKELOSS_STRING(SRT_TEST_FAKE_LOSS);
#undef FAKELOSS_STRING
#undef FAKELOSS_WRAP

static int dcounter = 0;
static int flwcounter = 0;

struct FakelossConfig
{
pair<int,int> config;
FakelossConfig(const char* f)
{
vector<string> out;
Split(f, '+', back_inserter(out));

config.first = atoi(out[0].c_str());
config.second = out.size() > 1 ? atoi(out[1].c_str()) : 8;
}
};
static FakelossConfig fakeloss = fakeloss_text;

if (!packet.isControl())
{
if (dcounter == 0)
{
timeval tv;
gettimeofday(&tv, 0);
srand(tv.tv_usec & 0xFFFF);
}
++dcounter;

if (flwcounter)
{
// This is a counter of how many packets in a row shall be lost
--flwcounter;
HLOGC(mglog.Debug, log << "CChannel: TEST: FAKE LOSS OF %" << packet.getSeqNo() << " (" << flwcounter << " more to drop)");
return packet.getLength(); // fake successful sendinf
}

if (dcounter > 8)
{
// Make a random number in the range between 8 and 24
int rnd = rand() % 16 + SRT_TEST_FAKE_LOSS;

if (dcounter > rnd)
{
dcounter = 1;
HLOGC(mglog.Debug, log << "CChannel: TEST: FAKE LOSS OF %" << packet.getSeqNo() << " (will drop " << fakeloss.config.first << " more)");
flwcounter = fakeloss.config.first;
return packet.getLength(); // fake successful sendinf
}
}
}

#endif

// convert control information into network order
// XXX USE HtoNLA!
if (packet.isControl())
Expand Down
Loading