Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backup link selection to crop-active improved to find oldest active #1473

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ class CUDT
friend class CRcvUList;
friend class PacketFilter;
friend class CUDTGroup;
friend struct FByOldestActive; // this functional will use private fields

typedef srt::sync::steady_clock::time_point time_point;
typedef srt::sync::steady_clock::duration duration;
Expand Down
128 changes: 84 additions & 44 deletions srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3163,6 +3163,18 @@ void CUDTGroup::send_CloseBrokenSockets(vector<gli_t>& w_wipeme)
w_wipeme.clear();
}

struct FByOldestActive
{
typedef CUDTGroup::gli_t gli_t;
bool operator()(gli_t a, gli_t b)
{
CUDT& x = a->ps->core();
CUDT& y = b->ps->core();

return x.m_tsTmpActiveTime < y.m_tsTmpActiveTime;
}
};

void CUDTGroup::sendBackup_CheckParallelLinks(const vector<gli_t>& unstable,
vector<gli_t>& w_parallel,
int& w_final_stat,
Expand Down Expand Up @@ -3190,7 +3202,7 @@ void CUDTGroup::sendBackup_CheckParallelLinks(const vector<gli_t>& unstable,
if (std::find(unstable.begin(), unstable.end(), *p) != unstable.end())
{
LOGC(gslog.Debug,
log << "grp/sendBackup: IPE: parallel links enclose unstable link @" << (*p)->ps->m_SocketID);
log << "grp/sendBackup: IPE: parallel links enclose unstable link @" << (*p)->ps->m_SocketID);
}
}
#endif
Expand Down Expand Up @@ -3233,54 +3245,54 @@ void CUDTGroup::sendBackup_CheckParallelLinks(const vector<gli_t>& unstable,
size_t nlinks = m_Group.size();
size_t ndead = 0;

RetryWaitBlocked:
{
// Some sockets could have been closed in the meantime.
if (m_SndEpolld->watch_empty())
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);

InvertedLock ug(m_GroupLock);
HLOGC(gslog.Debug,
log << "grp/sendBackup: swait call to get at least one link alive up to " << m_iSndTimeOut << "us");
brdy = m_pGlobal->m_EPoll.swait(*m_SndEpolld, sready, m_iSndTimeOut);

// Check if there's anything in the "error" section.
// This must be cleared here before the lock on group is set again.
// (This loop will not fire neither once if no failed sockets found).
for (CEPoll::fmap_t::const_iterator i = sready.begin(); i != sready.end(); ++i)
RetryWaitBlocked:
{
if (i->second & SRT_EPOLL_ERR)
continue; // broken already
// Some sockets could have been closed in the meantime.
if (m_SndEpolld->watch_empty())
throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);

if ((i->second & SRT_EPOLL_IN) == 0)
continue; // not ready for reading
InvertedLock ug(m_GroupLock);
HLOGC(gslog.Debug,
log << "grp/sendBackup: swait call to get at least one link alive up to " << m_iSndTimeOut << "us");
brdy = m_pGlobal->m_EPoll.swait(*m_SndEpolld, sready, m_iSndTimeOut);

// Check if this socket is in aheads
// If so, don't read from it, wait until the ahead is flushed.
SRTSOCKET id = i->first;
CUDTSocket* s = m_pGlobal->locateSocket(id);
if (s)
// Check if there's anything in the "error" section.
// This must be cleared here before the lock on group is set again.
// (This loop will not fire neither once if no failed sockets found).
for (CEPoll::fmap_t::const_iterator i = sready.begin(); i != sready.end(); ++i)
{
HLOGC(gslog.Debug,
log << "grp/sendBackup: swait/ex on @" << (id)
<< " while waiting for any writable socket - CLOSING");
CUDT::s_UDTUnited.close(s);
}
else
{
HLOGC(gslog.Debug, log << "grp/sendBackup: swait/ex on @" << (id) << " - WAS DELETED IN THE MEANTIME");
}
if (i->second & SRT_EPOLL_ERR)
continue; // broken already

if ((i->second & SRT_EPOLL_IN) == 0)
continue; // not ready for reading

// Check if this socket is in aheads
// If so, don't read from it, wait until the ahead is flushed.
SRTSOCKET id = i->first;
CUDTSocket* s = m_pGlobal->locateSocket(id);
if (s)
{
HLOGC(gslog.Debug,
log << "grp/sendBackup: swait/ex on @" << (id)
<< " while waiting for any writable socket - CLOSING");
CUDT::s_UDTUnited.close(s);
}
else
{
HLOGC(gslog.Debug, log << "grp/sendBackup: swait/ex on @" << (id) << " - WAS DELETED IN THE MEANTIME");
}

++ndead;
++ndead;
}
HLOGC(gslog.Debug, log << "grp/sendBackup: swait/?close done, re-acquiring GroupLock");
}
HLOGC(gslog.Debug, log << "grp/sendBackup: swait/?close done, re-acquiring GroupLock");
}

if (brdy == -1 || ndead >= nlinks)
{
LOGC(gslog.Error,
log << "grp/sendBackup: swait=>" << brdy << " nlinks=" << nlinks << " ndead=" << ndead
<< " - looxlike all links broken");
log << "grp/sendBackup: swait=>" << brdy << " nlinks=" << nlinks << " ndead=" << ndead
<< " - looxlike all links broken");
m_pGlobal->m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_OUT, false);
m_pGlobal->m_EPoll.update_events(id(), m_sPollID, SRT_EPOLL_ERR, true);
// You can safely throw here - nothing to fill in when all sockets down.
Expand Down Expand Up @@ -3315,7 +3327,7 @@ void CUDTGroup::sendBackup_CheckParallelLinks(const vector<gli_t>& unstable,
if (d->sndstate == SRT_GST_RUNNING)
{
HLOGC(gslog.Debug,
log << "grp/sendBackup: link @" << d->id << " RUNNING - SKIPPING from activate and resend");
log << "grp/sendBackup: link @" << d->id << " RUNNING - SKIPPING from activate and resend");
continue;
}

Expand Down Expand Up @@ -3373,6 +3385,34 @@ void CUDTGroup::sendBackup_CheckParallelLinks(const vector<gli_t>& unstable,
steady_clock::time_point currtime = steady_clock::now();

vector<gli_t>::iterator b = w_parallel.begin();

// Additional criterion: if you have multiple links with the same weight,
// check if you have at least one with m_tsTmpActiveTime == 0. If not,
// sort them additionally by this time.

vector<gli_t>::iterator b1 = b, e = ++b1;

// Both e and b1 stand on b+1 position.
// We have a guarantee that b+1 still points to a valid element.
while (e != w_parallel.end())
{
if ((*e)->weight != (*b)->weight)
break;
++e;
}

if (b1 != e)
{
// More than 1 link with the same weight. Sorting them according
// to a different criterion will not change the previous sorting order
// because the elements in this range are equal according to the previous
// criterion.
// Here find the link with least time. The "trap" zero time matches this
// requirement, occasionally.
sort(b, e, FByOldestActive());
}

// After finding the link to leave active, leave it behind.
HLOGC(gslog.Debug, log << "grp/sendBackup: keeping parallel link @" << (*b)->id << " and silencing others:");
++b;
for (; b != w_parallel.end(); ++b)
Expand All @@ -3381,17 +3421,17 @@ void CUDTGroup::sendBackup_CheckParallelLinks(const vector<gli_t>& unstable,
if (d->sndstate != SRT_GST_RUNNING)
{
LOGC(gslog.Error,
log << "grp/sendBackup: IPE: parallel link container contains non-running link @" << d->id);
log << "grp/sendBackup: IPE: parallel link container contains non-running link @" << d->id);
continue;
}
CUDT& ce = d->ps->core();
steady_clock::duration td(0);
if (!is_zero(ce.m_tsTmpActiveTime) &&
count_microseconds(td = currtime - ce.m_tsTmpActiveTime) < ce.m_uOPT_StabilityTimeout)
count_microseconds(td = currtime - ce.m_tsTmpActiveTime) < ce.m_uOPT_StabilityTimeout)
{
HLOGC(gslog.Debug,
log << "... not silencing @" << d->id << ": too early: " << FormatDuration(td) << " < "
<< ce.m_uOPT_StabilityTimeout << "(stability timeout)");
log << "... not silencing @" << d->id << ": too early: " << FormatDuration(td) << " < "
<< ce.m_uOPT_StabilityTimeout << "(stability timeout)");
continue;
}

Expand Down