Skip to content

Commit

Permalink
Fixed crash found with non-blocking linger in srt-live-transmit (#627)
Browse files Browse the repository at this point in the history
* Fixed: called srt_cleanup at the end. Added some fixes around global variables. Added some logs.
  • Loading branch information
ethouris authored and rndi committed Apr 15, 2019
1 parent 686f72d commit 2bdebef
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 25 deletions.
2 changes: 2 additions & 0 deletions apps/srt-live-transmit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ int parse_args(LiveTransmitConfig &cfg, int argc, char** argv)

int main(int argc, char** argv)
{
srt_startup();
// This is mainly required on Windows to initialize the network system,
// for a case when the instance would use UDP. SRT does it on its own, independently.
if (!SysInitializeNetwork())
Expand All @@ -324,6 +325,7 @@ int main(int argc, char** argv)
{
~NetworkCleanup()
{
srt_cleanup();
SysCleanupNetwork();
}
} cleanupobj;
Expand Down
29 changes: 17 additions & 12 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ std::string CUDTUnited::CONID(SRTSOCKET sock)
return "";

std::ostringstream os;
os << "%" << sock << ":";
os << "@" << sock << ":";
return os.str();
}

Expand Down Expand Up @@ -908,8 +908,6 @@ int CUDTUnited::close(const SRTSOCKET u)
bool synch_close_snd = s->m_pUDT->m_bSynSending;
//bool synch_close_rcv = s->m_pUDT->m_bSynRecving;

int id = s->m_SocketID;

if (s->m_Status == SRTS_LISTENING)
{
if (s->m_pUDT->m_bBroken)
Expand Down Expand Up @@ -945,14 +943,17 @@ int CUDTUnited::close(const SRTSOCKET u)
s->m_pUDT->close();

// synchronize with garbage collection.
HLOGC(mglog.Debug, log << "%" << id << " CUDT::close done. GLOBAL CLOSE: " << s->m_pUDT->CONID() << ". Acquiring GLOBAL control lock");
HLOGC(mglog.Debug, log << "@" << u << "U::close done. GLOBAL CLOSE: " << s->m_pUDT->CONID() << ". Acquiring GLOBAL control lock");
CGuard manager_cg(m_ControlLock);

// since "s" is located before m_ControlLock, locate it again in case
// it became invalid
map<SRTSOCKET, CUDTSocket*>::iterator i = m_Sockets.find(u);
if ((i == m_Sockets.end()) || (i->second->m_Status == SRTS_CLOSED))
{
HLOGC(mglog.Debug, log << "@" << u << "U::close: NOT AN ACTIVE SOCKET, returning.");
return 0;
}
s = i->second;

s->m_Status = SRTS_CLOSED;
Expand All @@ -964,35 +965,36 @@ int CUDTUnited::close(const SRTSOCKET u)
s->m_TimeStamp = CTimer::getTime();

m_Sockets.erase(s->m_SocketID);
m_ClosedSockets.insert(pair<SRTSOCKET, CUDTSocket*>(s->m_SocketID, s));
m_ClosedSockets[s->m_SocketID] = s;
HLOGC(mglog.Debug, log << "@" << u << "U::close: Socket MOVED TO CLOSED for collecting later.");

CTimer::triggerEvent();
}

HLOGC(mglog.Debug, log << "%" << id << ": GLOBAL: CLOSING DONE");
HLOGC(mglog.Debug, log << "%" << u << ": GLOBAL: CLOSING DONE");

// Check if the ID is still in closed sockets before you access it
// (the last triggerEvent could have deleted it).
if ( synch_close_snd )
{
#if SRT_ENABLE_CLOSE_SYNCH

HLOGC(mglog.Debug, log << "%" << id << " GLOBAL CLOSING: sync-waiting for releasing sender resources...");
HLOGC(mglog.Debug, log << "@" << u << " GLOBAL CLOSING: sync-waiting for releasing sender resources...");
for (;;)
{
CSndBuffer* sb = s->m_pUDT->m_pSndBuffer;

// Disconnected from buffer - nothing more to check.
if (!sb)
{
HLOGC(mglog.Debug, log << "%" << id << " GLOBAL CLOSING: sending buffer disconnected. Allowed to close.");
HLOGC(mglog.Debug, log << "@" << u << " GLOBAL CLOSING: sending buffer disconnected. Allowed to close.");
break;
}

// Sender buffer empty
if (sb->getCurrBufSize() == 0)
{
HLOGC(mglog.Debug, log << "%" << id << " GLOBAL CLOSING: sending buffer depleted. Allowed to close.");
HLOGC(mglog.Debug, log << "@" << u << " GLOBAL CLOSING: sending buffer depleted. Allowed to close.");
break;
}

Expand All @@ -1004,19 +1006,19 @@ int CUDTUnited::close(const SRTSOCKET u)
bool isgone = false;
{
CGuard manager_cg(m_ControlLock);
isgone = m_ClosedSockets.count(id) == 0;
isgone = m_ClosedSockets.count(u) == 0;
}
if (!isgone)
{
isgone = !s->m_pUDT->m_bOpened;
}
if (isgone)
{
HLOGC(mglog.Debug, log << "%" << id << " GLOBAL CLOSING: ... gone in the meantime, whatever. Exiting close().");
HLOGC(mglog.Debug, log << "@" << u << " GLOBAL CLOSING: ... gone in the meantime, whatever. Exiting close().");
break;
}

HLOGC(mglog.Debug, log << "%" << id << " GLOBAL CLOSING: ... still waiting for any update.");
HLOGC(mglog.Debug, log << "@" << u << " GLOBAL CLOSING: ... still waiting for any update.");
CTimer::EWait wt = CTimer::waitForEvent();

if ( wt == CTimer::WT_ERROR )
Expand Down Expand Up @@ -1831,11 +1833,13 @@ void* CUDTUnited::garbageCollect(void* p)
timeout.tv_sec = now.tv_sec + 1;
timeout.tv_nsec = now.tv_usec * 1000;

HLOGC(mglog.Debug, log << "GC: sleep until " << FormatTime(uint64_t(now.tv_usec) + 1000000*(timeout.tv_sec)));
pthread_cond_timedwait(
&self->m_GCStopCond, &self->m_GCStopLock, &timeout);
}

// remove all sockets and multiplexers
HLOGC(mglog.Debug, log << "GC: GLOBAL EXIT - releasing all pending sockets. Acquring control lock...");
CGuard::enterCS(self->m_ControlLock);
for (map<SRTSOCKET, CUDTSocket*>::iterator i = self->m_Sockets.begin();
i != self->m_Sockets.end(); ++ i)
Expand Down Expand Up @@ -1870,6 +1874,7 @@ void* CUDTUnited::garbageCollect(void* p)
}
CGuard::leaveCS(self->m_ControlLock);

HLOGC(mglog.Debug, log << "GC: GLOBAL EXIT - releasing all CLOSED sockets.");
while (true)
{
self->checkBrokenSockets();
Expand Down
2 changes: 1 addition & 1 deletion srtcore/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,7 @@ std::string ConnectStatusStr(EConnectStatus cst)

std::string TransmissionEventStr(ETransmissionEvent ev)
{
static const std::string vals [] =
static const char* const vals [] =
{
"init",
"ack",
Expand Down
20 changes: 13 additions & 7 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ CUDT::CUDT(const CUDT& ancestor)
#endif
m_iSndTimeOut = ancestor.m_iSndTimeOut;
m_iRcvTimeOut = ancestor.m_iRcvTimeOut;
m_bReuseAddr = true; // this must be true, because all accepted sockets shared the same port with the listener
m_bReuseAddr = true; // this must be true, because all accepted sockets share the same port with the listener
m_llMaxBW = ancestor.m_llMaxBW;
#ifdef SRT_ENABLE_IPOPTS
m_iIpTTL = ancestor.m_iIpTTL;
Expand Down Expand Up @@ -4702,7 +4702,8 @@ void CUDT::checkSndTimers(Whether2RegenKm regen)
// Don't call this function in "non-regen mode" (sending only),
// if this side is RESPONDER. This shall be called only with
// regeneration request, which is required by the sender.
m_pCryptoControl->sendKeysToPeer(regen);
if (m_pCryptoControl)
m_pCryptoControl->sendKeysToPeer(regen);
}
}

Expand All @@ -4717,13 +4718,13 @@ void CUDT::addressAndSend(CPacket& pkt)
}


void CUDT::close()
bool CUDT::close()
{
// NOTE: this function is called from within the garbage collector thread.

if (!m_bOpened)
{
return;
return false;
}

HLOGC(mglog.Debug, log << CONID() << " - closing socket:");
Expand All @@ -4746,7 +4747,10 @@ void CUDT::close()
if (m_ullLingerExpiration == 0)
m_ullLingerExpiration = entertime + m_Linger.l_linger * uint64_t(1000000);

return;
HLOGC(mglog.Debug, log << "CUDT::close: linger-nonblocking, setting expire time T="
<< FormatTime(m_ullLingerExpiration));

return false;
}

#ifndef _WIN32
Expand Down Expand Up @@ -4786,7 +4790,7 @@ void CUDT::close()
// XXX What's this, could any of the above actions make it !m_bOpened?
if (!m_bOpened)
{
return;
return true;
}

// Inform the threads handler to stop.
Expand Down Expand Up @@ -4857,6 +4861,8 @@ void CUDT::close()
m_ullRcvPeerStartTime = 0;

m_bOpened = false;

return true;
}

/*
Expand Down Expand Up @@ -7608,7 +7614,7 @@ int CUDT::processData(CUnit* unit)
}

int pktrexmitflag = m_bPeerRexmitFlag ? (int)packet.getRexmitFlag() : 2;
static const string rexmitstat [] = {"ORIGINAL", "REXMITTED", "RXS-UNKNOWN"};
static const char* const rexmitstat [] = {"ORIGINAL", "REXMITTED", "RXS-UNKNOWN"};
string rexmit_reason;


Expand Down
2 changes: 1 addition & 1 deletion srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ class CUDT

/// Close the opened UDT entity.

void close();
bool close();

/// Request UDT to send out a data block "data" with size of "len".
/// @param data [in] The address of the application data to be sent.
Expand Down
8 changes: 4 additions & 4 deletions srtcore/packet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -480,10 +480,10 @@ std::string CPacket::MessageFlagStr()

stringstream out;

static const string boundary [] = { "PB_SUBSEQUENT", "PB_LAST", "PB_FIRST", "PB_SOLO" };
static const string order [] = { "ORD_RELAXED", "ORD_REQUIRED" };
static const string crypto [] = { "EK_NOENC", "EK_EVEN", "EK_ODD", "EK*ERROR" };
static const string rexmit [] = { "SN_ORIGINAL", "SN_REXMIT" };
static const char* const boundary [] = { "PB_SUBSEQUENT", "PB_LAST", "PB_FIRST", "PB_SOLO" };
static const char* const order [] = { "ORD_RELAXED", "ORD_REQUIRED" };
static const char* const crypto [] = { "EK_NOENC", "EK_EVEN", "EK_ODD", "EK*ERROR" };
static const char* const rexmit [] = { "SN_ORIGINAL", "SN_REXMIT" };

out << boundary[int(getMsgBoundary())] << " ";
out << order[int(getMsgOrderFlag())] << " ";
Expand Down
15 changes: 15 additions & 0 deletions srtcore/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,21 @@ int CSndUList::pop(sockaddr*& addr, CPacket& pkt)
CUDT* u = m_pHeap[0]->m_pUDT;
remove_(u);

#define UST(field) ( (u->m_b##field) ? "+" : "-" ) << #field << " "

HLOGC(mglog.Debug, log << "SND:pop: requesting packet from @" << u->socketID()
<< " STATUS: "
<< UST(Listening)
<< UST(Connecting)
<< UST(Connected)
<< UST(Closing)
<< UST(Shutdown)
<< UST(Broken)
<< UST(PeerHealth)
<< UST(Opened)
);
#undef UST

if (!u->m_bConnected || u->m_bBroken)
return -1;

Expand Down

0 comments on commit 2bdebef

Please sign in to comment.