diff --git a/apps/srt-live-transmit.cpp b/apps/srt-live-transmit.cpp index 307a50a7a..9a7baef12 100644 --- a/apps/srt-live-transmit.cpp +++ b/apps/srt-live-transmit.cpp @@ -313,6 +313,7 @@ int parse_args(LiveTransmitConfig &cfg, int argc, char** argv) int main(int argc, char** argv) { + srt_startup(); // This is mainly required on Windows to initialize the network system, // for a case when the instance would use UDP. SRT does it on its own, independently. if (!SysInitializeNetwork()) @@ -324,6 +325,7 @@ int main(int argc, char** argv) { ~NetworkCleanup() { + srt_cleanup(); SysCleanupNetwork(); } } cleanupobj; diff --git a/srtcore/api.cpp b/srtcore/api.cpp index 57f916491..c190e35ff 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -184,7 +184,7 @@ std::string CUDTUnited::CONID(SRTSOCKET sock) return ""; std::ostringstream os; - os << "%" << sock << ":"; + os << "@" << sock << ":"; return os.str(); } @@ -908,8 +908,6 @@ int CUDTUnited::close(const SRTSOCKET u) bool synch_close_snd = s->m_pUDT->m_bSynSending; //bool synch_close_rcv = s->m_pUDT->m_bSynRecving; - int id = s->m_SocketID; - if (s->m_Status == SRTS_LISTENING) { if (s->m_pUDT->m_bBroken) @@ -945,14 +943,17 @@ int CUDTUnited::close(const SRTSOCKET u) s->m_pUDT->close(); // synchronize with garbage collection. - HLOGC(mglog.Debug, log << "%" << id << " CUDT::close done. GLOBAL CLOSE: " << s->m_pUDT->CONID() << ". Acquiring GLOBAL control lock"); + HLOGC(mglog.Debug, log << "@" << u << "U::close done. GLOBAL CLOSE: " << s->m_pUDT->CONID() << ". Acquiring GLOBAL control lock"); CGuard manager_cg(m_ControlLock); // since "s" is located before m_ControlLock, locate it again in case // it became invalid map::iterator i = m_Sockets.find(u); if ((i == m_Sockets.end()) || (i->second->m_Status == SRTS_CLOSED)) + { + HLOGC(mglog.Debug, log << "@" << u << "U::close: NOT AN ACTIVE SOCKET, returning."); return 0; + } s = i->second; s->m_Status = SRTS_CLOSED; @@ -964,12 +965,13 @@ int CUDTUnited::close(const SRTSOCKET u) s->m_TimeStamp = CTimer::getTime(); m_Sockets.erase(s->m_SocketID); - m_ClosedSockets.insert(pair(s->m_SocketID, s)); + m_ClosedSockets[s->m_SocketID] = s; + HLOGC(mglog.Debug, log << "@" << u << "U::close: Socket MOVED TO CLOSED for collecting later."); CTimer::triggerEvent(); } - HLOGC(mglog.Debug, log << "%" << id << ": GLOBAL: CLOSING DONE"); + HLOGC(mglog.Debug, log << "%" << u << ": GLOBAL: CLOSING DONE"); // Check if the ID is still in closed sockets before you access it // (the last triggerEvent could have deleted it). @@ -977,7 +979,7 @@ int CUDTUnited::close(const SRTSOCKET u) { #if SRT_ENABLE_CLOSE_SYNCH - HLOGC(mglog.Debug, log << "%" << id << " GLOBAL CLOSING: sync-waiting for releasing sender resources..."); + HLOGC(mglog.Debug, log << "@" << u << " GLOBAL CLOSING: sync-waiting for releasing sender resources..."); for (;;) { CSndBuffer* sb = s->m_pUDT->m_pSndBuffer; @@ -985,14 +987,14 @@ int CUDTUnited::close(const SRTSOCKET u) // Disconnected from buffer - nothing more to check. if (!sb) { - HLOGC(mglog.Debug, log << "%" << id << " GLOBAL CLOSING: sending buffer disconnected. Allowed to close."); + HLOGC(mglog.Debug, log << "@" << u << " GLOBAL CLOSING: sending buffer disconnected. Allowed to close."); break; } // Sender buffer empty if (sb->getCurrBufSize() == 0) { - HLOGC(mglog.Debug, log << "%" << id << " GLOBAL CLOSING: sending buffer depleted. Allowed to close."); + HLOGC(mglog.Debug, log << "@" << u << " GLOBAL CLOSING: sending buffer depleted. Allowed to close."); break; } @@ -1004,7 +1006,7 @@ int CUDTUnited::close(const SRTSOCKET u) bool isgone = false; { CGuard manager_cg(m_ControlLock); - isgone = m_ClosedSockets.count(id) == 0; + isgone = m_ClosedSockets.count(u) == 0; } if (!isgone) { @@ -1012,11 +1014,11 @@ int CUDTUnited::close(const SRTSOCKET u) } if (isgone) { - HLOGC(mglog.Debug, log << "%" << id << " GLOBAL CLOSING: ... gone in the meantime, whatever. Exiting close()."); + HLOGC(mglog.Debug, log << "@" << u << " GLOBAL CLOSING: ... gone in the meantime, whatever. Exiting close()."); break; } - HLOGC(mglog.Debug, log << "%" << id << " GLOBAL CLOSING: ... still waiting for any update."); + HLOGC(mglog.Debug, log << "@" << u << " GLOBAL CLOSING: ... still waiting for any update."); CTimer::EWait wt = CTimer::waitForEvent(); if ( wt == CTimer::WT_ERROR ) @@ -1831,11 +1833,13 @@ void* CUDTUnited::garbageCollect(void* p) timeout.tv_sec = now.tv_sec + 1; timeout.tv_nsec = now.tv_usec * 1000; + HLOGC(mglog.Debug, log << "GC: sleep until " << FormatTime(uint64_t(now.tv_usec) + 1000000*(timeout.tv_sec))); pthread_cond_timedwait( &self->m_GCStopCond, &self->m_GCStopLock, &timeout); } // remove all sockets and multiplexers + HLOGC(mglog.Debug, log << "GC: GLOBAL EXIT - releasing all pending sockets. Acquring control lock..."); CGuard::enterCS(self->m_ControlLock); for (map::iterator i = self->m_Sockets.begin(); i != self->m_Sockets.end(); ++ i) @@ -1870,6 +1874,7 @@ void* CUDTUnited::garbageCollect(void* p) } CGuard::leaveCS(self->m_ControlLock); + HLOGC(mglog.Debug, log << "GC: GLOBAL EXIT - releasing all CLOSED sockets."); while (true) { self->checkBrokenSockets(); diff --git a/srtcore/common.cpp b/srtcore/common.cpp index 376688e38..ea8e168f4 100644 --- a/srtcore/common.cpp +++ b/srtcore/common.cpp @@ -820,7 +820,7 @@ std::string ConnectStatusStr(EConnectStatus cst) std::string TransmissionEventStr(ETransmissionEvent ev) { - static const std::string vals [] = + static const char* const vals [] = { "init", "ack", diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 4aa88bca9..73fc3b781 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -307,7 +307,7 @@ CUDT::CUDT(const CUDT& ancestor) #endif m_iSndTimeOut = ancestor.m_iSndTimeOut; m_iRcvTimeOut = ancestor.m_iRcvTimeOut; - m_bReuseAddr = true; // this must be true, because all accepted sockets shared the same port with the listener + m_bReuseAddr = true; // this must be true, because all accepted sockets share the same port with the listener m_llMaxBW = ancestor.m_llMaxBW; #ifdef SRT_ENABLE_IPOPTS m_iIpTTL = ancestor.m_iIpTTL; @@ -4702,7 +4702,8 @@ void CUDT::checkSndTimers(Whether2RegenKm regen) // Don't call this function in "non-regen mode" (sending only), // if this side is RESPONDER. This shall be called only with // regeneration request, which is required by the sender. - m_pCryptoControl->sendKeysToPeer(regen); + if (m_pCryptoControl) + m_pCryptoControl->sendKeysToPeer(regen); } } @@ -4717,13 +4718,13 @@ void CUDT::addressAndSend(CPacket& pkt) } -void CUDT::close() +bool CUDT::close() { // NOTE: this function is called from within the garbage collector thread. if (!m_bOpened) { - return; + return false; } HLOGC(mglog.Debug, log << CONID() << " - closing socket:"); @@ -4746,7 +4747,10 @@ void CUDT::close() if (m_ullLingerExpiration == 0) m_ullLingerExpiration = entertime + m_Linger.l_linger * uint64_t(1000000); - return; + HLOGC(mglog.Debug, log << "CUDT::close: linger-nonblocking, setting expire time T=" + << FormatTime(m_ullLingerExpiration)); + + return false; } #ifndef _WIN32 @@ -4786,7 +4790,7 @@ void CUDT::close() // XXX What's this, could any of the above actions make it !m_bOpened? if (!m_bOpened) { - return; + return true; } // Inform the threads handler to stop. @@ -4857,6 +4861,8 @@ void CUDT::close() m_ullRcvPeerStartTime = 0; m_bOpened = false; + + return true; } /* @@ -7608,7 +7614,7 @@ int CUDT::processData(CUnit* unit) } int pktrexmitflag = m_bPeerRexmitFlag ? (int)packet.getRexmitFlag() : 2; - static const string rexmitstat [] = {"ORIGINAL", "REXMITTED", "RXS-UNKNOWN"}; + static const char* const rexmitstat [] = {"ORIGINAL", "REXMITTED", "RXS-UNKNOWN"}; string rexmit_reason; diff --git a/srtcore/core.h b/srtcore/core.h index 1a78576f8..118a5f971 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -369,7 +369,7 @@ class CUDT /// Close the opened UDT entity. - void close(); + bool close(); /// Request UDT to send out a data block "data" with size of "len". /// @param data [in] The address of the application data to be sent. diff --git a/srtcore/packet.cpp b/srtcore/packet.cpp index 6720a1453..7ee8cda34 100644 --- a/srtcore/packet.cpp +++ b/srtcore/packet.cpp @@ -480,10 +480,10 @@ std::string CPacket::MessageFlagStr() stringstream out; - static const string boundary [] = { "PB_SUBSEQUENT", "PB_LAST", "PB_FIRST", "PB_SOLO" }; - static const string order [] = { "ORD_RELAXED", "ORD_REQUIRED" }; - static const string crypto [] = { "EK_NOENC", "EK_EVEN", "EK_ODD", "EK*ERROR" }; - static const string rexmit [] = { "SN_ORIGINAL", "SN_REXMIT" }; + static const char* const boundary [] = { "PB_SUBSEQUENT", "PB_LAST", "PB_FIRST", "PB_SOLO" }; + static const char* const order [] = { "ORD_RELAXED", "ORD_REQUIRED" }; + static const char* const crypto [] = { "EK_NOENC", "EK_EVEN", "EK_ODD", "EK*ERROR" }; + static const char* const rexmit [] = { "SN_ORIGINAL", "SN_REXMIT" }; out << boundary[int(getMsgBoundary())] << " "; out << order[int(getMsgOrderFlag())] << " "; diff --git a/srtcore/queue.cpp b/srtcore/queue.cpp index 02afc7367..31644d53c 100644 --- a/srtcore/queue.cpp +++ b/srtcore/queue.cpp @@ -340,6 +340,21 @@ int CSndUList::pop(sockaddr*& addr, CPacket& pkt) CUDT* u = m_pHeap[0]->m_pUDT; remove_(u); +#define UST(field) ( (u->m_b##field) ? "+" : "-" ) << #field << " " + + HLOGC(mglog.Debug, log << "SND:pop: requesting packet from @" << u->socketID() + << " STATUS: " + << UST(Listening) + << UST(Connecting) + << UST(Connected) + << UST(Closing) + << UST(Shutdown) + << UST(Broken) + << UST(PeerHealth) + << UST(Opened) + ); +#undef UST + if (!u->m_bConnected || u->m_bBroken) return -1;