Skip to content

Commit

Permalink
[core] Added busy counter for sockets and various fixes for data race…
Browse files Browse the repository at this point in the history
… problems (#2893).
  • Loading branch information
ethouris authored Aug 20, 2024
1 parent 70e2127 commit eecc176
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 23 deletions.
90 changes: 84 additions & 6 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1914,11 +1914,28 @@ int srt::CUDTUnited::close(const SRTSOCKET u)
return 0;
}
#endif
CUDTSocket* s = locateSocket(u);
if (!s)
throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
#if ENABLE_HEAVY_LOGGING
// Wrapping the log into a destructor so that it
// is printed AFTER the destructor of SocketKeeper.
struct ScopedExitLog
{
const CUDTSocket* const ps;
ScopedExitLog(const CUDTSocket* p): ps(p){}
~ScopedExitLog()
{
if (ps) // Could be not acquired by SocketKeeper, occasionally
{
HLOGC(smlog.Debug, log << "CUDTUnited::close/end: @" << ps->m_SocketID << " busy=" << ps->isStillBusy());
}
}
};
#endif

SocketKeeper k(*this, u, ERH_THROW);
IF_HEAVY_LOGGING(ScopedExitLog slog(k.socket));
HLOGC(smlog.Debug, log << "CUDTUnited::close/begin: @" << u << " busy=" << k.socket->isStillBusy());

return close(s);
return close(k.socket);
}

#if ENABLE_BONDING
Expand Down Expand Up @@ -2547,6 +2564,45 @@ srt::CUDTGroup* srt::CUDTUnited::acquireSocketsGroup(CUDTSocket* s)
}
#endif

srt::CUDTSocket* srt::CUDTUnited::locateAcquireSocket(SRTSOCKET u, ErrorHandling erh)
{
ScopedLock cg(m_GlobControlLock);

CUDTSocket* s = locateSocket_LOCKED(u);
if (!s)
{
if (erh == ERH_THROW)
throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
return NULL;
}

s->apiAcquire();
return s;
}

bool srt::CUDTUnited::acquireSocket(CUDTSocket* s)
{
// Note that before using this function you must be certain
// that the socket isn't broken already and it still has at least
// one more GC cycle to live. In other words, you must be certain
// that this pointer passed here isn't dangling and was obtained
// directly from m_Sockets, or even better, has been acquired
// by some other functionality already, which is only about to
// be released earlier than you need.
ScopedLock cg(m_GlobControlLock);
s->apiAcquire();
// Keep the lock so that no one changes anything in the meantime.
// If the socket m_Status == SRTS_CLOSED (set by setClosed()), then
// this socket is no longer present in the m_Sockets container
if (s->m_Status >= SRTS_BROKEN)
{
s->apiRelease();
return false;
}

return true;
}

srt::CUDTSocket* srt::CUDTUnited::locatePeer(const sockaddr_any& peer, const SRTSOCKET id, int32_t isn)
{
ScopedLock cg(m_GlobControlLock);
Expand Down Expand Up @@ -2613,7 +2669,7 @@ void srt::CUDTUnited::checkBrokenSockets()

if (s->m_Status == SRTS_LISTENING)
{
const steady_clock::duration elapsed = steady_clock::now() - s->m_tsClosureTimeStamp;
const steady_clock::duration elapsed = steady_clock::now() - s->m_tsClosureTimeStamp.load();
// A listening socket should wait an extra 3 seconds
// in case a client is connecting.
if (elapsed < milliseconds_from(CUDT::COMM_CLOSE_BROKEN_LISTENER_TIMEOUT_MS))
Expand Down Expand Up @@ -2672,6 +2728,20 @@ void srt::CUDTUnited::checkBrokenSockets()
for (sockets_t::iterator j = m_ClosedSockets.begin(); j != m_ClosedSockets.end(); ++j)
{
CUDTSocket* ps = j->second;

// NOTE: There is still a hypothetical risk here that ps
// was made busy while the socket was already moved to m_ClosedSocket,
// if the socket was acquired through CUDTUnited::acquireSocket (that is,
// busy flag acquisition was done through the CUDTSocket* pointer rather
// than through the numeric ID). Therefore this way of busy acquisition
// should be done only if at the moment of acquisition there are certainly
// other conditions applying on the socket that prevent it from being deleted.
if (ps->isStillBusy())
{
HLOGC(smlog.Debug, log << "checkBrokenSockets: @" << ps->m_SocketID << " is still busy, SKIPPING THIS CYCLE.");
continue;
}

CUDT& u = ps->core();

// HLOGC(smlog.Debug, log << "checking CLOSED socket: " << j->first);
Expand All @@ -2691,7 +2761,7 @@ void srt::CUDTUnited::checkBrokenSockets()
// timeout 1 second to destroy a socket AND it has been removed from
// RcvUList
const steady_clock::time_point now = steady_clock::now();
const steady_clock::duration closed_ago = now - ps->m_tsClosureTimeStamp;
const steady_clock::duration closed_ago = now - ps->m_tsClosureTimeStamp.load();
if (closed_ago > seconds_from(1))
{
CRNode* rnode = u.m_pRNode;
Expand Down Expand Up @@ -2741,6 +2811,14 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u)
if (rn && rn->m_bOnList)
return;

if (s->isStillBusy())
{
HLOGC(smlog.Debug, log << "@" << s->m_SocketID << " is still busy, NOT deleting");
return;
}

LOGC(smlog.Note, log << "@" << s->m_SocketID << " busy=" << s->isStillBusy());

#if ENABLE_BONDING
if (s->m_GroupOf)
{
Expand Down
63 changes: 59 additions & 4 deletions srtcore/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,18 @@ class CUDTSocket

void construct();

private:
srt::sync::atomic<int> m_iBusy;
public:
void apiAcquire() { ++m_iBusy; }
void apiRelease() { --m_iBusy; }

int isStillBusy() const
{
return m_iBusy;
}


SRT_ATTR_GUARDED_BY(m_ControlLock)
sync::atomic<SRT_SOCKSTATUS> m_Status; //< current socket state

Expand All @@ -131,7 +143,8 @@ class CUDTSocket
/// of sockets in order to prevent other methods from accessing invalid address.
/// A timer is started and the socket will be removed after approximately
/// 1 second (see CUDTUnited::checkBrokenSockets()).
sync::steady_clock::time_point m_tsClosureTimeStamp;
//sync::steady_clock::time_point m_tsClosureTimeStamp;
sync::AtomicClock<sync::steady_clock> m_tsClosureTimeStamp;

sockaddr_any m_SelfAddr; //< local address of the socket
sockaddr_any m_PeerAddr; //< peer address of the socket
Expand Down Expand Up @@ -324,7 +337,7 @@ class CUDTUnited
int epoll_release(const int eid);

#if ENABLE_BONDING
// [[using locked(m_GlobControlLock)]]
SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_GlobControlLock)
CUDTGroup& addGroup(SRTSOCKET id, SRT_GROUP_TYPE type)
{
// This only ensures that the element exists.
Expand All @@ -346,7 +359,7 @@ class CUDTUnited
void deleteGroup(CUDTGroup* g);
void deleteGroup_LOCKED(CUDTGroup* g);

// [[using locked(m_GlobControlLock)]]
SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_GlobControlLock)
CUDTGroup* findPeerGroup_LOCKED(SRTSOCKET peergroup)
{
for (groups_t::iterator i = m_Groups.begin(); i != m_Groups.end(); ++i)
Expand Down Expand Up @@ -445,8 +458,50 @@ class CUDTUnited
}
}
};

#endif

CUDTSocket* locateAcquireSocket(SRTSOCKET u, ErrorHandling erh = ERH_RETURN);
bool acquireSocket(CUDTSocket* s);

public:
struct SocketKeeper
{
CUDTSocket* socket;

SocketKeeper(): socket(NULL) {}

// This is intended for API functions to lock the socket's existence
// for the lifetime of their call.
SocketKeeper(CUDTUnited& glob, SRTSOCKET id, ErrorHandling erh = ERH_RETURN) { socket = glob.locateAcquireSocket(id, erh); }

// This is intended for TSBPD thread that should lock the socket's
// existence until it exits.
SocketKeeper(CUDTUnited& glob, CUDTSocket* s)
{
acquire(glob, s);
}

// Note: acquire doesn't check if the keeper already keeps anything.
// This is only for a use together with an empty constructor.
bool acquire(CUDTUnited& glob, CUDTSocket* s)
{
const bool caught = glob.acquireSocket(s);
socket = caught ? s : NULL;
return caught;
}

~SocketKeeper()
{
if (socket)
{
SRT_ASSERT(socket->isStillBusy() > 0);
socket->apiRelease();
}
}
};

private:

void updateMux(CUDTSocket* s, const sockaddr_any& addr, const UDPSOCKET* = NULL);
bool updateListenerMux(CUDTSocket* s, const CUDTSocket* ls);

Expand Down
25 changes: 15 additions & 10 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5844,13 +5844,15 @@ void srt::CUDT::acceptAndRespond(const sockaddr_any& agent, const sockaddr_any&
}

#if ENABLE_BONDING
m_ConnectionLock.unlock();
// The socket and the group are only linked to each other after interpretSrtHandshake(..) has been called.
// Keep the group alive for the lifetime of this function,
// and do it BEFORE acquiring m_ConnectionLock to avoid
// lock inversion.
// This will check if a socket belongs to a group and if so
// it will remember this group and keep it alive here.
CUDTUnited::GroupKeeper group_keeper(uglobal(), m_parent);
m_ConnectionLock.lock();
#endif

if (!prepareBuffers(NULL))
Expand Down Expand Up @@ -8759,7 +8761,7 @@ void srt::CUDT::processCtrlAckAck(const CPacket& ctrlpkt, const time_point& tsAr
if (m_config.bDriftTracer)
{
#if ENABLE_BONDING
ScopedLock glock(uglobal().m_GlobControlLock);
ScopedLock glock(uglobal().m_GlobControlLock); // XXX not too excessive?
const bool drift_updated =
#endif
m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), tsArrival, rtt);
Expand Down Expand Up @@ -11737,17 +11739,20 @@ void srt::CUDT::completeBrokenConnectionDependencies(int errorcode)
// Bound to one call because this requires locking
pg->updateFailedLink();
}
// Sockets that never succeeded to connect must be deleted
// explicitly, otherwise they will never be deleted. OTOH
// the socket can be on the path of deletion already, so
// this only makes sure that the socket will be deleted,
// one way or another.
if (pending_broken)
{
// XXX This somehow can cause a deadlock
// uglobal()->close(m_parent);
LOGC(smlog.Debug, log << "updateBrokenConnection...: BROKEN SOCKET @" << m_SocketID << " - CLOSING, to be removed from group.");
m_parent->setBrokenClosed();
}
}

// Sockets that never succeeded to connect must be deleted
// explicitly, otherwise they will never be deleted.
if (pending_broken)
{
// XXX This somehow can cause a deadlock
// uglobal()->close(m_parent);
LOGC(smlog.Debug, log << "updateBrokenConnection...: BROKEN SOCKET @" << m_SocketID << " - CLOSING, to be removed from group.");
m_parent->setBrokenClosed();
}
#endif
}

Expand Down
40 changes: 38 additions & 2 deletions srtcore/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,13 @@ void* srt::CSndQueue::worker(void* param)
continue;
}

CUDTUnited::SocketKeeper sk (CUDT::uglobal(), u->id());
if (!sk.socket)
{
HLOGC(qslog.Debug, log << "Socket to be processed was deleted in the meantime, not packing");
continue;
}

// pack a packet from the socket
CPacket pkt;
steady_clock::time_point next_send_time;
Expand Down Expand Up @@ -929,6 +936,16 @@ void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst
EReadStatus read_st = rst;
EConnectStatus conn_st = cst;

CUDTUnited::SocketKeeper sk (CUDT::uglobal(), i->id);
if (!sk.socket)
{
// Socket deleted already, so stop this and proceed to the next loop.
LOGC(cnlog.Error, log << "updateConnStatus: IPE: socket @" << i->id << " already closed, proceed to only removal from lists");
toRemove.push_back(*i);
continue;
}


if (cst != CONN_RENDEZVOUS && dest_id != 0)
{
if (i->id != dest_id)
Expand Down Expand Up @@ -974,14 +991,22 @@ void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst
for (vector<LinkStatusInfo>::iterator i = toRemove.begin(); i != toRemove.end(); ++i)
{
HLOGC(cnlog.Debug, log << "updateConnStatus: COMPLETING dep objects update on failed @" << i->id);
//
remove(i->id);

CUDTUnited::SocketKeeper sk (CUDT::uglobal(), i->id);
if (!sk.socket)
{
// This actually shall never happen, so it's a kind of paranoid check.
LOGC(cnlog.Error, log << "updateConnStatus: IPE: socket @" << i->id << " already closed, NOT ACCESSING its contents");
continue;
}

// Setting m_bConnecting to false, and need to remove the socket from the rendezvous queue
// because the next CUDT::close will not remove it from the queue when m_bConnecting = false,
// and may crash on next pass.
//
// TODO: maybe lock i->u->m_ConnectionLock?
i->u->m_bConnecting = false;
remove(i->u->m_SocketID);

// DO NOT close the socket here because in this case it might be
// unable to get status from at the right moment. Also only member
Expand All @@ -992,6 +1017,11 @@ void srt::CRendezvousQueue::updateConnStatus(EReadStatus rst, EConnectStatus cst
CUDT::uglobal().m_EPoll.update_events(
i->u->m_SocketID, i->u->m_sPollID, SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR, true);

// Make sure that the socket wasn't deleted in the meantime.
// Skip this part if it was. Note also that if the socket was
// decided to be deleted, it's already moved to m_ClosedSockets
// and should have been therefore already processed for deletion.

i->u->completeBrokenConnectionDependencies(i->errorcode);
}

Expand Down Expand Up @@ -1442,6 +1472,12 @@ srt::EConnectStatus srt::CRcvQueue::worker_ProcessAddressedPacket(int32_t id, CU
HLOGC(cnlog.Debug, log << "worker_ProcessAddressedPacket: resending to QUEUED socket @" << id);
return worker_TryAsyncRend_OrStore(id, unit, addr);
}
// Although we don´t have an exclusive passing here,
// we can count on that when the socket was once present in the hash,
// it will not be deleted for at least one GC cycle. But we still need
// to maintain the object existence until it's in use.
// Note that here we are out of any locks, so m_GlobControlLock can be locked.
CUDTUnited::SocketKeeper sk (CUDT::uglobal(), u->m_parent);

// Found associated CUDT - process this as control or data packet
// addressed to an associated socket.
Expand Down
2 changes: 1 addition & 1 deletion test/test_fec_rebuilding.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ TEST(TestFEC, ConnectionMess)

SRTSOCKET la[] = { l };
SRTSOCKET a = srt_accept_bond(la, 1, 2000);
ASSERT_NE(a, SRT_ERROR);
ASSERT_NE(a, SRT_ERROR) << srt_getlasterror_str();
EXPECT_EQ(connect_res.get(), SRT_SUCCESS);

// Now that the connection is established, check negotiated config
Expand Down

0 comments on commit eecc176

Please sign in to comment.