Skip to content

Commit

Permalink
[core] Fixed some problems in group code found during review (#1505)
Browse files Browse the repository at this point in the history
  • Loading branch information
ethouris authored Sep 2, 2020
1 parent 5c21170 commit 633da74
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 61 deletions.
8 changes: 6 additions & 2 deletions docs/API-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -1558,8 +1558,12 @@ parties.

#### SRT_REJ_GROUP

The group type or some group settings are incompatible for both connection
parties.
The group type or some group settings are incompatible for both connection parties.
While every connection within a bonding group may have different target addresses,
they should all designate the same endpoint and the same SRT application. If this
condition isn't satisfied, then the peer will respond with a different peer group
ID for the connection that is trying to contact a machine/application that is
completely different from the existing connections in the bonding group.

#### SRT_REJ_TIMEOUT

Expand Down
8 changes: 4 additions & 4 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1244,6 +1244,7 @@ int CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, int ar
// connection is going to later succeed or fail (this will be
// known in the group state information).
bool block_new_opened = !g.m_bOpened && g.m_bSynRecving;
const bool was_empty = g.empty();
SRTSOCKET retval = -1;

int eid = -1;
Expand Down Expand Up @@ -1363,7 +1364,6 @@ int CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, int ar
isn = -1;
}


// Set it the groupconnect option, as all in-group sockets should have.
ns->m_pUDT->m_OPT_GroupConnect = 1;

Expand Down Expand Up @@ -1439,9 +1439,9 @@ int CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, int ar
{
ScopedLock grd (g.m_GroupLock);

if (isn == 0)
if (was_empty)
{
g.syncWithSocket(ns->core());
g.syncWithSocket(ns->core(), HSD_INITIATOR);
}

HLOGC(aclog.Debug, log << "groupConnect: @" << sid << " connection successful, setting group OPEN (was "
Expand Down Expand Up @@ -1488,7 +1488,7 @@ int CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, int ar
if (retval == -1)
{
HLOGC(aclog.Debug, log << "groupConnect: none succeeded as background-spawn, exit with error");
block_new_opened = false;
block_new_opened = false; // Avoid executing further while loop
}

vector<SRTSOCKET> broken;
Expand Down
41 changes: 15 additions & 26 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1862,28 +1862,15 @@ size_t CUDT::fillHsExtGroup(uint32_t* pcmdspec)
if (m_parent->m_IncludedGroup->synconmsgno())
flags |= SRT_GFLAG_SYNCONMSG;

SRTSOCKET master_peerid;
IF_HEAVY_LOGGING(steady_clock::duration master_tdiff);
steady_clock::time_point master_st;

// "Master" is the first found running connection. Will be false, if
// there's no other connection yet. When any connection is found, specify this
// as a determined master connection, and extract its id.
if ( !m_parent->m_IncludedGroup->getMasterData(m_SocketID, (master_peerid), (master_st)) )
{
master_peerid = -1;
IF_HEAVY_LOGGING(master_tdiff = steady_clock::duration());
HLOGC(cnlog.Debug, log << CONID() << "NO GROUP MASTER LINK found for group: $" << m_parent->m_IncludedGroup->id());
}
else
{
// The returned master_st is the master's start time. Calculate the
// differene time.
IF_HEAVY_LOGGING(master_tdiff = m_stats.tsStartTime - master_st);
HLOGC(cnlog.Debug, log << CONID() << "FOUND GROUP MASTER LINK: peer=$" << master_peerid
<< " - start time diff: " << FormatDuration<DUNIT_S>(master_tdiff));
}
// (this function will not fill the variables with anything, if no master is found)
// NOTE: this code remains as is for historical reasons.
// The initial implementation stated that the peer id be
// extracted so that it can be reported and possibly the
// start time somehow encoded and written into the group
// extension, but it was later seen not necessary. Therefore
// this code remains, but now it's informational only.
#if ENABLE_HEAVY_LOGGING
m_parent->m_IncludedGroup->debugMasterData(m_SocketID);
#endif

// See CUDT::interpretGroup()

Expand Down Expand Up @@ -3441,7 +3428,7 @@ bool CUDT::interpretGroup(const int32_t groupdata[], size_t data_size SRT_ATR_UN
return false;
}

// This is called when the group ID has come in in the handshake.
// This is called when the group type has come in the handshake is invalid.
if (gtp >= SRT_GTYPE_E_END)
{
m_RejectReason = SRT_REJ_GROUP;
Expand Down Expand Up @@ -3515,8 +3502,10 @@ bool CUDT::interpretGroup(const int32_t groupdata[], size_t data_size SRT_ATR_UN
// different peers).
else if (pg->peerid() != grpid)
{
LOGC(cnlog.Error, log << "IPE: HS/RSP: group membership responded for peer $" << grpid << " but the current socket's group $" << pg->id()
<< " has already a peer $" << peer);
LOGC(cnlog.Error, log << "IPE: HS/RSP: group membership responded for peer $" << grpid
<< " but the current socket's group $" << pg->id() << " has already a peer $" << peer);
m_RejectReason = SRT_REJ_GROUP;
return false;
}
else
{
Expand Down Expand Up @@ -3620,7 +3609,7 @@ SRTSOCKET CUDT::makeMePeerOf(SRTSOCKET peergroup, SRT_GROUP_TYPE gtp, uint32_t l
if (was_empty)
{
ScopedLock glock (*gp->exp_groupLock());
gp->syncWithSocket(s->core());
gp->syncWithSocket(s->core(), HSD_RESPONDER);
}

// Setting non-blocking reading for group socket.
Expand Down
83 changes: 56 additions & 27 deletions srtcore/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,54 +133,76 @@ bool CUDTGroup::applyGroupSequences(SRTSOCKET target, int32_t& w_snd_isn, int32_
return true;
}

bool CUDTGroup::getMasterData(SRTSOCKET slave, SRTSOCKET& w_mpeer, steady_clock::time_point& w_st)
// NOTE: This function is now for DEBUG PURPOSES ONLY.
// Except for presenting the extracted data in the logs, there's no use of it now.
void CUDTGroup::debugMasterData(SRTSOCKET slave)
{
// Find at least one connection, which is running. Note that this function is called
// from within a handshake process, so the socket that undergoes this process is at best
// currently in SRT_GST_PENDING state and it's going to be in SRT_GST_IDLE state at the
// time when the connection process is done, until the first reading/writing happens.
ScopedLock cg(m_GroupLock);

SRTSOCKET mpeer;
steady_clock::time_point start_time;

bool found = false;

for (gli_t gi = m_Group.begin(); gi != m_Group.end(); ++gi)
{
if (gi->sndstate == SRT_GST_RUNNING)
{
// Found it. Get the socket's peer's ID and this socket's
// Start Time. Once it's delivered, this can be used to calculate
// the Master-to-Slave start time difference.
w_mpeer = gi->ps->m_PeerID;
w_st = gi->ps->core().socketStartTime();
mpeer = gi->ps->m_PeerID;
start_time = gi->ps->core().socketStartTime();
HLOGC(gmlog.Debug,
log << "getMasterData: found RUNNING master @" << gi->id << " - reporting master's peer $" << w_mpeer
<< " starting at " << FormatTime(w_st));
return true;
log << "getMasterData: found RUNNING master @" << gi->id << " - reporting master's peer $" << mpeer
<< " starting at " << FormatTime(start_time));
found = true;
break;
}
}

// If no running one found, then take the first socket in any other
// state than broken, except the slave. This is for a case when a user
// has prepared one link already, but hasn't sent anything through it yet.
for (gli_t gi = m_Group.begin(); gi != m_Group.end(); ++gi)
if (!found)
{
if (gi->sndstate == SRT_GST_BROKEN)
continue;
// If no running one found, then take the first socket in any other
// state than broken, except the slave. This is for a case when a user
// has prepared one link already, but hasn't sent anything through it yet.
for (gli_t gi = m_Group.begin(); gi != m_Group.end(); ++gi)
{
if (gi->sndstate == SRT_GST_BROKEN)
continue;

if (gi->id == slave)
continue;
if (gi->id == slave)
continue;

// Found it. Get the socket's peer's ID and this socket's
// Start Time. Once it's delivered, this can be used to calculate
// the Master-to-Slave start time difference.
w_mpeer = gi->ps->core().m_PeerID;
w_st = gi->ps->core().socketStartTime();
HLOGC(gmlog.Debug,
log << "getMasterData: found IDLE/PENDING master @" << gi->id << " - reporting master's peer $" << w_mpeer
<< " starting at " << FormatTime(w_st));
return true;
// Found it. Get the socket's peer's ID and this socket's
// Start Time. Once it's delivered, this can be used to calculate
// the Master-to-Slave start time difference.
mpeer = gi->ps->core().m_PeerID;
start_time = gi->ps->core().socketStartTime();
HLOGC(gmlog.Debug,
log << "getMasterData: found IDLE/PENDING master @" << gi->id << " - reporting master's peer $" << mpeer
<< " starting at " << FormatTime(start_time));
found = true;
break;
}
}

HLOGC(gmlog.Debug, log << "getMasterData: no link found suitable as master for @" << slave);
return false;
if (!found)
{
LOGC(cnlog.Debug, log << CONID() << "NO GROUP MASTER LINK found for group: $" << id());
}
else
{
// The returned master_st is the master's start time. Calculate the
// differene time.
steady_clock::duration master_tdiff = m_tsStartTime - start_time;
LOGC(cnlog.Debug, log << CONID() << "FOUND GROUP MASTER LINK: peer=$" << mpeer
<< " - start time diff: " << FormatDuration<DUNIT_S>(master_tdiff));
}
}

// GROUP
Expand Down Expand Up @@ -862,11 +884,18 @@ SRT_SOCKSTATUS CUDTGroup::getStatus()
return SRTS_BROKEN;
}

void CUDTGroup::syncWithSocket(const CUDT& core)
void CUDTGroup::syncWithSocket(const CUDT& core, const HandshakeSide side)
{
// [[using locked(m_GroupLock)]];

set_currentSchedSequence(core.ISN());
if (side == HSD_RESPONDER)
{
// On the listener side you should synchronize ISN with the incoming
// socket, which is done immediately after creating the socket and
// adding it to the group. On the caller side the ISN is defined in
// the group directly, before any member socket is created.
set_currentSchedSequence(core.ISN());
}

// XXX
// Might need further investigation as to whether this isn't
Expand Down
4 changes: 2 additions & 2 deletions srtcore/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ class CUDTGroup

SRT_SOCKSTATUS getStatus();

bool getMasterData(SRTSOCKET slave, SRTSOCKET& w_mpeer, time_point& w_st);
void debugMasterData(SRTSOCKET slave);

bool isGroupReceiver()
{
Expand Down Expand Up @@ -299,7 +299,7 @@ class CUDTGroup
/// @param ack The past-the-last-received ACK sequence number
void readyPackets(CUDT* core, int32_t ack);

void syncWithSocket(const CUDT& core);
void syncWithSocket(const CUDT& core, const HandshakeSide side);
int getGroupData(SRT_SOCKGROUPDATA* pdata, size_t* psize);
int configure(const char* str);

Expand Down

0 comments on commit 633da74

Please sign in to comment.