Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Replaced CGuard with corresponding lock: UniqueLock or ScopedLock #1418

Merged
merged 5 commits into from
Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 44 additions & 44 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ std::string CUDTUnited::CONID(SRTSOCKET sock)

int CUDTUnited::startup()
{
CGuard gcinit(m_InitLock);
ScopedLock gcinit(m_InitLock);

if (m_iInstanceCount++ > 0)
return 1;
Expand Down Expand Up @@ -269,7 +269,7 @@ int CUDTUnited::startup()

int CUDTUnited::cleanup()
{
CGuard gcinit(m_InitLock);
ScopedLock gcinit(m_InitLock);

if (--m_iInstanceCount > 0)
return 0;
Expand Down Expand Up @@ -309,7 +309,7 @@ int CUDTUnited::cleanup()

SRTSOCKET CUDTUnited::generateSocketID(bool for_group)
{
CGuard guard(m_IDLock);
ScopedLock guard(m_IDLock);

int sockval = m_SocketIDGenerator - 1;

Expand Down Expand Up @@ -444,7 +444,7 @@ SRTSOCKET CUDTUnited::newSocket(CUDTSocket** pps)
<< ns->m_SocketID);

// protect the m_Sockets structure.
CGuard cs(m_GlobControlLock);
ScopedLock cs(m_GlobControlLock);
m_Sockets[ns->m_SocketID] = ns;
}
catch (...)
Expand Down Expand Up @@ -491,7 +491,7 @@ int CUDTUnited::newConnection(const SRTSOCKET listen, const sockaddr_any& peer,
ns->m_Status = SRTS_CLOSED;
ns->m_tsClosureTimeStamp = steady_clock::now();

CGuard acceptcg(ls->m_AcceptLock);
ScopedLock acceptcg(ls->m_AcceptLock);
ls->m_pQueuedSockets->erase(ns->m_SocketID);
ls->m_pAcceptSockets->erase(ns->m_SocketID);
}
Expand Down Expand Up @@ -593,7 +593,7 @@ int CUDTUnited::newConnection(const SRTSOCKET listen, const sockaddr_any& peer,
"newConnection: incoming %s, mapping socket %d",
SockaddrToString(peer).c_str(), ns->m_SocketID);
{
CGuard cg(m_GlobControlLock);
ScopedLock cg(m_GlobControlLock);
m_Sockets[ns->m_SocketID] = ns;
}

Expand Down Expand Up @@ -652,7 +652,7 @@ int CUDTUnited::newConnection(const SRTSOCKET listen, const sockaddr_any& peer,
// XXX this might require another check of group type.
// For redundancy group, at least, update the status in the group
CUDTGroup* g = ns->m_IncludedGroup;
CGuard glock (g->m_GroupLock);
ScopedLock glock (g->m_GroupLock);
CUDTGroup::gli_t gi;

// Check if this is the first socket in the group.
Expand Down Expand Up @@ -759,7 +759,7 @@ int CUDTUnited::newConnection(const SRTSOCKET listen, const sockaddr_any& peer,
HLOGC(mglog.Debug, log << "ACCEPT: new socket @" << ns->m_SocketID
<< " NOT submitted to acceptance, another socket in the group is already connected");
{
CGuard cg (ls->m_AcceptLock);
ScopedLock cg (ls->m_AcceptLock);
ls->m_pAcceptSockets->insert(ls->m_pAcceptSockets->end(), ns->m_SocketID);
}

Expand Down Expand Up @@ -792,7 +792,7 @@ int CUDTUnited::newConnection(const SRTSOCKET listen, const sockaddr_any& peer,
// connect() in UDT code) may fail, in which case this socket should not be
// further processed and should be removed.
{
CGuard cg(m_GlobControlLock);
ScopedLock cg(m_GlobControlLock);
m_Sockets.erase(id);
m_ClosedSockets[id] = ns;
}
Expand Down Expand Up @@ -828,7 +828,7 @@ int CUDTUnited::installAcceptHook(const SRTSOCKET lsn, srt_listen_callback_fn* h
SRT_SOCKSTATUS CUDTUnited::getStatus(const SRTSOCKET u)
{
// protects the m_Sockets structure
CGuard cg(m_GlobControlLock);
ScopedLock cg(m_GlobControlLock);

sockets_t::const_iterator i = m_Sockets.find(u);

Expand All @@ -844,7 +844,7 @@ SRT_SOCKSTATUS CUDTUnited::getStatus(const SRTSOCKET u)

int CUDTUnited::bind(CUDTSocket* s, const sockaddr_any& name)
{
CGuard cg(s->m_ControlLock);
ScopedLock cg(s->m_ControlLock);

// cannot bind a socket more than once
if (s->m_Status != SRTS_INIT)
Expand All @@ -862,7 +862,7 @@ int CUDTUnited::bind(CUDTSocket* s, const sockaddr_any& name)

int CUDTUnited::bind(CUDTSocket* s, UDPSOCKET udpsock)
{
CGuard cg(s->m_ControlLock);
ScopedLock cg(s->m_ControlLock);

// cannot bind a socket more than once
if (s->m_Status != SRTS_INIT)
Expand Down Expand Up @@ -903,7 +903,7 @@ int CUDTUnited::listen(const SRTSOCKET u, int backlog)
if (!s)
throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);

CGuard cg(s->m_ControlLock);
ScopedLock cg(s->m_ControlLock);

// NOTE: since now the socket is protected against simultaneous access.
// In the meantime the socket might have been closed, which means that
Expand Down Expand Up @@ -1018,7 +1018,7 @@ SRTSOCKET CUDTUnited::accept(const SRTSOCKET listen, sockaddr* pw_addr, int* pw_
// !!only one conection can be set up each time!!
while (!accepted)
{
CGuard accept_lock(ls->m_AcceptLock);
UniqueLock accept_lock(ls->m_AcceptLock);
CSync accept_sync(ls->m_AcceptCond, accept_lock);

if ((ls->m_Status != SRTS_LISTENING) || ls->m_pUDT->m_bBroken)
Expand Down Expand Up @@ -1094,7 +1094,7 @@ SRTSOCKET CUDTUnited::accept(const SRTSOCKET listen, sockaddr* pw_addr, int* pw_
s->core().m_OPT_GroupConnect = 0;
}

CGuard cg(s->m_ControlLock);
ScopedLock cg(s->m_ControlLock);

// Check if the length of the buffer to fill the name in
// was large enough.
Expand Down Expand Up @@ -1292,7 +1292,7 @@ int CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, int ar

if (targets[tii].errorcode != SRT_SUCCESS)
{
CGuard cs(m_GlobControlLock);
ScopedLock cs(m_GlobControlLock);
SRTSOCKET id = ns->m_SocketID;
delete ns;
m_Sockets.erase(id);
Expand Down Expand Up @@ -1367,7 +1367,7 @@ int CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, int ar
ns->removeFromGroup();
erc_rloc = e.getErrorCode();

CGuard cl (m_GlobControlLock);
ScopedLock cl (m_GlobControlLock);
m_Sockets.erase(ns->m_SocketID);
// Intercept to delete the socket on failure.
delete ns;
Expand All @@ -1377,7 +1377,7 @@ int CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, int ar
{
LOGC(mglog.Fatal, log << "groupConnect: IPE: UNKNOWN EXCEPTION from connectIn");
ns->removeFromGroup();
CGuard cl (m_GlobControlLock);
ScopedLock cl (m_GlobControlLock);
m_Sockets.erase(ns->m_SocketID);
// Intercept to delete the socket on failure.
delete ns;
Expand All @@ -1388,12 +1388,12 @@ int CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, int ar

SRT_SOCKSTATUS st;
{
CGuard grd (ns->m_ControlLock);
ScopedLock grd (ns->m_ControlLock);
st = ns->getStatus();
}

{
CGuard grd (g.m_GroupLock);
ScopedLock grd (g.m_GroupLock);

if (isn == 0)
{
Expand Down Expand Up @@ -1564,7 +1564,7 @@ int CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, int ar

int CUDTUnited::connectIn(CUDTSocket* s, const sockaddr_any& target_addr, int32_t forced_isn)
{
CGuard cg(s->m_ControlLock);
ScopedLock cg(s->m_ControlLock);
// a socket can "connect" only if it is in the following states:
// - OPENED: assume the socket binding parameters are configured
// - INIT: configure binding parameters here
Expand Down Expand Up @@ -1619,7 +1619,7 @@ int CUDTUnited::connectIn(CUDTSocket* s, const sockaddr_any& target_addr, int32_
// record peer address
s->m_PeerAddr = target_addr;

// CGuard destructor will delete cg and unlock s->m_ControlLock
// ScopedLock destructor will delete cg and unlock s->m_ControlLock

return 0;
}
Expand Down Expand Up @@ -1648,7 +1648,7 @@ int CUDTUnited::close(CUDTSocket* s)
{
HLOGC(mglog.Debug, log << s->m_pUDT->CONID() << " CLOSE. Acquiring control lock");

CGuard socket_cg(s->m_ControlLock);
ScopedLock socket_cg(s->m_ControlLock);

HLOGC(mglog.Debug, log << s->m_pUDT->CONID() << " CLOSING (removing from listening, closing CUDT)");

Expand Down Expand Up @@ -1689,7 +1689,7 @@ int CUDTUnited::close(CUDTSocket* s)

// synchronize with garbage collection.
HLOGC(mglog.Debug, log << "@" << u << "U::close done. GLOBAL CLOSE: " << s->m_pUDT->CONID() << ". Acquiring GLOBAL control lock");
CGuard manager_cg(m_GlobControlLock);
ScopedLock manager_cg(m_GlobControlLock);

// since "s" is located before m_GlobControlLock, locate it again in case
// it became invalid
Expand Down Expand Up @@ -1758,7 +1758,7 @@ int CUDTUnited::close(CUDTSocket* s)
// Done the other way, but still done. You can stop waiting.
bool isgone = false;
{
CGuard manager_cg(m_GlobControlLock);
ScopedLock manager_cg(m_GlobControlLock);
isgone = m_ClosedSockets.count(u) == 0;
}
if (!isgone)
Expand Down Expand Up @@ -2169,7 +2169,7 @@ int CUDTUnited::epoll_release(const int eid)

CUDTSocket* CUDTUnited::locateSocket(const SRTSOCKET u, ErrorHandling erh)
{
CGuard cg (m_GlobControlLock);
ScopedLock cg (m_GlobControlLock);

sockets_t::iterator i = m_Sockets.find(u);

Expand All @@ -2185,7 +2185,7 @@ CUDTSocket* CUDTUnited::locateSocket(const SRTSOCKET u, ErrorHandling erh)

CUDTGroup* CUDTUnited::locateGroup(SRTSOCKET u, ErrorHandling erh)
{
CGuard cg (m_GlobControlLock);
ScopedLock cg (m_GlobControlLock);

const groups_t::iterator i = m_Groups.find(u);
if ( i == m_Groups.end() )
Expand All @@ -2203,7 +2203,7 @@ CUDTSocket* CUDTUnited::locatePeer(
const SRTSOCKET id,
int32_t isn)
{
CGuard cg(m_GlobControlLock);
ScopedLock cg(m_GlobControlLock);

map<int64_t, set<SRTSOCKET> >::iterator i = m_PeerRec.find(
CUDTSocket::getPeerSpec(id, isn));
Expand All @@ -2229,7 +2229,7 @@ CUDTSocket* CUDTUnited::locatePeer(

void CUDTUnited::checkBrokenSockets()
{
CGuard cg(m_GlobControlLock);
ScopedLock cg(m_GlobControlLock);

// set of sockets To Be Closed and To Be Removed
vector<SRTSOCKET> tbc;
Expand Down Expand Up @@ -2350,7 +2350,7 @@ void CUDTUnited::removeSocket(const SRTSOCKET u)

if (s->m_pQueuedSockets)
{
CGuard cg(s->m_AcceptLock);
ScopedLock cg(s->m_AcceptLock);

// if it is a listener, close all un-accepted sockets in its queue
// and remove them later
Expand Down Expand Up @@ -2441,7 +2441,7 @@ void CUDTUnited::removeSocket(const SRTSOCKET u)
void CUDTUnited::updateMux(
CUDTSocket* s, const sockaddr_any& addr, const UDPSOCKET* udpsock /*[[nullable]]*/)
{
CGuard cg(m_GlobControlLock);
ScopedLock cg(m_GlobControlLock);

// Don't try to reuse given address, if udpsock was given.
// In such a case rely exclusively on that very socket and
Expand Down Expand Up @@ -2600,7 +2600,7 @@ void CUDTUnited::updateMux(
// socket.
void CUDTUnited::updateListenerMux(CUDTSocket* s, const CUDTSocket* ls)
{
CGuard cg(m_GlobControlLock);
ScopedLock cg(m_GlobControlLock);
const int port = ls->m_SelfAddr.hport();

// find the listener's address
Expand All @@ -2627,15 +2627,15 @@ void* CUDTUnited::garbageCollect(void* p)

THREAD_STATE_INIT("SRT:GC");

CGuard gcguard(self->m_GCStopLock);
UniqueLock gclock(self->m_GCStopLock);

while (!self->m_bClosing)
{
INCREMENT_THREAD_ITERATIONS();
self->checkBrokenSockets();

HLOGC(mglog.Debug, log << "GC: sleep 1 s");
self->m_GCStopCond.wait_for(gcguard, seconds_from(1));
self->m_GCStopCond.wait_for(gclock, seconds_from(1));
}

// remove all sockets and multiplexers
Expand Down Expand Up @@ -2804,7 +2804,7 @@ int CUDT::addSocketToGroup(SRTSOCKET socket, SRTSOCKET group)
g->set_managed(false);
}

CGuard cg (s->m_ControlLock);
ScopedLock cg (s->m_ControlLock);

// Check if the socket already is in the group
CUDTGroup::gli_t f = g->find(socket);
Expand All @@ -2831,7 +2831,7 @@ int CUDT::removeSocketFromGroup(SRTSOCKET socket)
if (!s->m_IncludedGroup)
return APIError(MJ_NOTSUP, MN_INVAL, 0);

CGuard grd (s->m_ControlLock);
ScopedLock grd (s->m_ControlLock);
s->removeFromGroup();
return 0;
}
Expand Down Expand Up @@ -4122,53 +4122,53 @@ namespace srt

void setloglevel(LogLevel::type ll)
{
CGuard gg(srt_logger_config.mutex);
ScopedLock gg(srt_logger_config.mutex);
srt_logger_config.max_level = ll;
}

void addlogfa(LogFA fa)
{
CGuard gg(srt_logger_config.mutex);
ScopedLock gg(srt_logger_config.mutex);
srt_logger_config.enabled_fa.set(fa, true);
}

void dellogfa(LogFA fa)
{
CGuard gg(srt_logger_config.mutex);
ScopedLock gg(srt_logger_config.mutex);
srt_logger_config.enabled_fa.set(fa, false);
}

void resetlogfa(set<LogFA> fas)
{
CGuard gg(srt_logger_config.mutex);
ScopedLock gg(srt_logger_config.mutex);
for (int i = 0; i <= SRT_LOGFA_LASTNONE; ++i)
srt_logger_config.enabled_fa.set(i, fas.count(i));
}

void resetlogfa(const int* fara, size_t fara_size)
{
CGuard gg(srt_logger_config.mutex);
ScopedLock gg(srt_logger_config.mutex);
srt_logger_config.enabled_fa.reset();
for (const int* i = fara; i != fara + fara_size; ++i)
srt_logger_config.enabled_fa.set(*i, true);
}

void setlogstream(std::ostream& stream)
{
CGuard gg(srt_logger_config.mutex);
ScopedLock gg(srt_logger_config.mutex);
srt_logger_config.log_stream = &stream;
}

void setloghandler(void* opaque, SRT_LOG_HANDLER_FN* handler)
{
CGuard gg(srt_logger_config.mutex);
ScopedLock gg(srt_logger_config.mutex);
srt_logger_config.loghandler_opaque = opaque;
srt_logger_config.loghandler_fn = handler;
}

void setlogflags(int flags)
{
CGuard gg(srt_logger_config.mutex);
ScopedLock gg(srt_logger_config.mutex);
srt_logger_config.flags = flags;
}

Expand Down
Loading