Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed crash found with non-blocking linger in srt-live-transmit #627

Merged
merged 4 commits into from
Apr 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/srt-live-transmit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ int parse_args(LiveTransmitConfig &cfg, int argc, char** argv)

int main(int argc, char** argv)
{
srt_startup();
// This is mainly required on Windows to initialize the network system,
// for a case when the instance would use UDP. SRT does it on its own, independently.
if (!SysInitializeNetwork())
Expand All @@ -324,6 +325,7 @@ int main(int argc, char** argv)
{
~NetworkCleanup()
{
srt_cleanup();
SysCleanupNetwork();
}
} cleanupobj;
Expand Down
29 changes: 17 additions & 12 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ std::string CUDTUnited::CONID(SRTSOCKET sock)
return "";

std::ostringstream os;
os << "%" << sock << ":";
os << "@" << sock << ":";
return os.str();
}

Expand Down Expand Up @@ -908,8 +908,6 @@ int CUDTUnited::close(const SRTSOCKET u)
bool synch_close_snd = s->m_pUDT->m_bSynSending;
//bool synch_close_rcv = s->m_pUDT->m_bSynRecving;

int id = s->m_SocketID;

if (s->m_Status == SRTS_LISTENING)
{
if (s->m_pUDT->m_bBroken)
Expand Down Expand Up @@ -945,14 +943,17 @@ int CUDTUnited::close(const SRTSOCKET u)
s->m_pUDT->close();

// synchronize with garbage collection.
HLOGC(mglog.Debug, log << "%" << id << " CUDT::close done. GLOBAL CLOSE: " << s->m_pUDT->CONID() << ". Acquiring GLOBAL control lock");
HLOGC(mglog.Debug, log << "@" << u << "U::close done. GLOBAL CLOSE: " << s->m_pUDT->CONID() << ". Acquiring GLOBAL control lock");
CGuard manager_cg(m_ControlLock);

// since "s" is located before m_ControlLock, locate it again in case
// it became invalid
map<SRTSOCKET, CUDTSocket*>::iterator i = m_Sockets.find(u);
if ((i == m_Sockets.end()) || (i->second->m_Status == SRTS_CLOSED))
{
HLOGC(mglog.Debug, log << "@" << u << "U::close: NOT AN ACTIVE SOCKET, returning.");
return 0;
}
s = i->second;

s->m_Status = SRTS_CLOSED;
Expand All @@ -964,35 +965,36 @@ int CUDTUnited::close(const SRTSOCKET u)
s->m_TimeStamp = CTimer::getTime();

m_Sockets.erase(s->m_SocketID);
m_ClosedSockets.insert(pair<SRTSOCKET, CUDTSocket*>(s->m_SocketID, s));
m_ClosedSockets[s->m_SocketID] = s;
HLOGC(mglog.Debug, log << "@" << u << "U::close: Socket MOVED TO CLOSED for collecting later.");

CTimer::triggerEvent();
}

HLOGC(mglog.Debug, log << "%" << id << ": GLOBAL: CLOSING DONE");
HLOGC(mglog.Debug, log << "%" << u << ": GLOBAL: CLOSING DONE");

// Check if the ID is still in closed sockets before you access it
// (the last triggerEvent could have deleted it).
if ( synch_close_snd )
{
#if SRT_ENABLE_CLOSE_SYNCH

HLOGC(mglog.Debug, log << "%" << id << " GLOBAL CLOSING: sync-waiting for releasing sender resources...");
HLOGC(mglog.Debug, log << "@" << u << " GLOBAL CLOSING: sync-waiting for releasing sender resources...");
for (;;)
{
CSndBuffer* sb = s->m_pUDT->m_pSndBuffer;

// Disconnected from buffer - nothing more to check.
if (!sb)
{
HLOGC(mglog.Debug, log << "%" << id << " GLOBAL CLOSING: sending buffer disconnected. Allowed to close.");
HLOGC(mglog.Debug, log << "@" << u << " GLOBAL CLOSING: sending buffer disconnected. Allowed to close.");
break;
}

// Sender buffer empty
if (sb->getCurrBufSize() == 0)
{
HLOGC(mglog.Debug, log << "%" << id << " GLOBAL CLOSING: sending buffer depleted. Allowed to close.");
HLOGC(mglog.Debug, log << "@" << u << " GLOBAL CLOSING: sending buffer depleted. Allowed to close.");
break;
}

Expand All @@ -1004,19 +1006,19 @@ int CUDTUnited::close(const SRTSOCKET u)
bool isgone = false;
{
CGuard manager_cg(m_ControlLock);
isgone = m_ClosedSockets.count(id) == 0;
isgone = m_ClosedSockets.count(u) == 0;
}
if (!isgone)
{
isgone = !s->m_pUDT->m_bOpened;
}
if (isgone)
{
HLOGC(mglog.Debug, log << "%" << id << " GLOBAL CLOSING: ... gone in the meantime, whatever. Exiting close().");
HLOGC(mglog.Debug, log << "@" << u << " GLOBAL CLOSING: ... gone in the meantime, whatever. Exiting close().");
break;
}

HLOGC(mglog.Debug, log << "%" << id << " GLOBAL CLOSING: ... still waiting for any update.");
HLOGC(mglog.Debug, log << "@" << u << " GLOBAL CLOSING: ... still waiting for any update.");
CTimer::EWait wt = CTimer::waitForEvent();

if ( wt == CTimer::WT_ERROR )
Expand Down Expand Up @@ -1831,11 +1833,13 @@ void* CUDTUnited::garbageCollect(void* p)
timeout.tv_sec = now.tv_sec + 1;
timeout.tv_nsec = now.tv_usec * 1000;

HLOGC(mglog.Debug, log << "GC: sleep until " << FormatTime(uint64_t(now.tv_usec) + 1000000*(timeout.tv_sec)));
pthread_cond_timedwait(
&self->m_GCStopCond, &self->m_GCStopLock, &timeout);
}

// remove all sockets and multiplexers
HLOGC(mglog.Debug, log << "GC: GLOBAL EXIT - releasing all pending sockets. Acquring control lock...");
CGuard::enterCS(self->m_ControlLock);
for (map<SRTSOCKET, CUDTSocket*>::iterator i = self->m_Sockets.begin();
i != self->m_Sockets.end(); ++ i)
Expand Down Expand Up @@ -1870,6 +1874,7 @@ void* CUDTUnited::garbageCollect(void* p)
}
CGuard::leaveCS(self->m_ControlLock);

HLOGC(mglog.Debug, log << "GC: GLOBAL EXIT - releasing all CLOSED sockets.");
while (true)
{
self->checkBrokenSockets();
Expand Down
2 changes: 1 addition & 1 deletion srtcore/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,7 @@ std::string ConnectStatusStr(EConnectStatus cst)

std::string TransmissionEventStr(ETransmissionEvent ev)
{
static const std::string vals [] =
static const char* const vals [] =
{
"init",
"ack",
Expand Down
20 changes: 13 additions & 7 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ CUDT::CUDT(const CUDT& ancestor)
#endif
m_iSndTimeOut = ancestor.m_iSndTimeOut;
m_iRcvTimeOut = ancestor.m_iRcvTimeOut;
m_bReuseAddr = true; // this must be true, because all accepted sockets shared the same port with the listener
m_bReuseAddr = true; // this must be true, because all accepted sockets share the same port with the listener
m_llMaxBW = ancestor.m_llMaxBW;
#ifdef SRT_ENABLE_IPOPTS
m_iIpTTL = ancestor.m_iIpTTL;
Expand Down Expand Up @@ -4702,7 +4702,8 @@ void CUDT::checkSndTimers(Whether2RegenKm regen)
// Don't call this function in "non-regen mode" (sending only),
// if this side is RESPONDER. This shall be called only with
// regeneration request, which is required by the sender.
m_pCryptoControl->sendKeysToPeer(regen);
if (m_pCryptoControl)
m_pCryptoControl->sendKeysToPeer(regen);
}
}

Expand All @@ -4717,13 +4718,13 @@ void CUDT::addressAndSend(CPacket& pkt)
}


void CUDT::close()
bool CUDT::close()
{
// NOTE: this function is called from within the garbage collector thread.

if (!m_bOpened)
{
return;
return false;
}

HLOGC(mglog.Debug, log << CONID() << " - closing socket:");
Expand All @@ -4746,7 +4747,10 @@ void CUDT::close()
if (m_ullLingerExpiration == 0)
m_ullLingerExpiration = entertime + m_Linger.l_linger * uint64_t(1000000);

return;
HLOGC(mglog.Debug, log << "CUDT::close: linger-nonblocking, setting expire time T="
<< FormatTime(m_ullLingerExpiration));

return false;
}

#ifndef _WIN32
Expand Down Expand Up @@ -4786,7 +4790,7 @@ void CUDT::close()
// XXX What's this, could any of the above actions make it !m_bOpened?
if (!m_bOpened)
{
return;
return true;
}

// Inform the threads handler to stop.
Expand Down Expand Up @@ -4857,6 +4861,8 @@ void CUDT::close()
m_ullRcvPeerStartTime = 0;

m_bOpened = false;

return true;
}

/*
Expand Down Expand Up @@ -7608,7 +7614,7 @@ int CUDT::processData(CUnit* unit)
}

int pktrexmitflag = m_bPeerRexmitFlag ? (int)packet.getRexmitFlag() : 2;
static const string rexmitstat [] = {"ORIGINAL", "REXMITTED", "RXS-UNKNOWN"};
static const char* const rexmitstat [] = {"ORIGINAL", "REXMITTED", "RXS-UNKNOWN"};
string rexmit_reason;


Expand Down
2 changes: 1 addition & 1 deletion srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ class CUDT

/// Close the opened UDT entity.

void close();
bool close();

/// Request UDT to send out a data block "data" with size of "len".
/// @param data [in] The address of the application data to be sent.
Expand Down
8 changes: 4 additions & 4 deletions srtcore/packet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -480,10 +480,10 @@ std::string CPacket::MessageFlagStr()

stringstream out;

static const string boundary [] = { "PB_SUBSEQUENT", "PB_LAST", "PB_FIRST", "PB_SOLO" };
static const string order [] = { "ORD_RELAXED", "ORD_REQUIRED" };
static const string crypto [] = { "EK_NOENC", "EK_EVEN", "EK_ODD", "EK*ERROR" };
static const string rexmit [] = { "SN_ORIGINAL", "SN_REXMIT" };
static const char* const boundary [] = { "PB_SUBSEQUENT", "PB_LAST", "PB_FIRST", "PB_SOLO" };
static const char* const order [] = { "ORD_RELAXED", "ORD_REQUIRED" };
static const char* const crypto [] = { "EK_NOENC", "EK_EVEN", "EK_ODD", "EK*ERROR" };
static const char* const rexmit [] = { "SN_ORIGINAL", "SN_REXMIT" };

out << boundary[int(getMsgBoundary())] << " ";
out << order[int(getMsgOrderFlag())] << " ";
Expand Down
15 changes: 15 additions & 0 deletions srtcore/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,21 @@ int CSndUList::pop(sockaddr*& addr, CPacket& pkt)
CUDT* u = m_pHeap[0]->m_pUDT;
remove_(u);

#define UST(field) ( (u->m_b##field) ? "+" : "-" ) << #field << " "

HLOGC(mglog.Debug, log << "SND:pop: requesting packet from @" << u->socketID()
<< " STATUS: "
<< UST(Listening)
<< UST(Connecting)
<< UST(Connected)
<< UST(Closing)
<< UST(Shutdown)
<< UST(Broken)
<< UST(PeerHealth)
<< UST(Opened)
);
#undef UST

if (!u->m_bConnected || u->m_bBroken)
return -1;

Expand Down