From 790b7831fb7ec4851111e38ce601e4a13548847c Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Mon, 16 Aug 2021 16:18:26 +0200 Subject: [PATCH] [core] Small refax of CUDTUnited::channelSettingsMatch(..) --- srtcore/api.cpp | 127 ++++++++++++++++++++++++------------------------ srtcore/api.h | 10 +++- srtcore/core.h | 7 ++- 3 files changed, 76 insertions(+), 68 deletions(-) diff --git a/srtcore/api.cpp b/srtcore/api.cpp index 38b86cf8c..dd8266f5c 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -1816,71 +1816,69 @@ int srt::CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, i int srt::CUDTUnited::connectIn(CUDTSocket* s, const sockaddr_any& target_addr, int32_t forced_isn) { - ScopedLock cg(s->m_ControlLock); - // a socket can "connect" only if it is in the following states: - // - OPENED: assume the socket binding parameters are configured - // - INIT: configure binding parameters here - // - any other (meaning, already connected): report error - - if (s->m_Status == SRTS_INIT) - { - if (s->core().m_config.bRendezvous) - throw CUDTException(MJ_NOTSUP, MN_ISRENDUNBOUND, 0); - - // If bind() was done first on this socket, then the - // socket will not perform this step. This actually does the - // same thing as bind() does, just with empty address so that - // the binding parameters are autoselected. - - s->core().open(); - sockaddr_any autoselect_sa (target_addr.family()); - // This will create such a sockaddr_any that - // will return true from empty(). - updateMux(s, autoselect_sa); // <<---- updateMux - // -> C(Snd|Rcv)Queue::init - // -> pthread_create(...C(Snd|Rcv)Queue::worker...) - s->m_Status = SRTS_OPENED; - } - else - { - if (s->m_Status != SRTS_OPENED) - throw CUDTException(MJ_NOTSUP, MN_ISCONNECTED, 0); + ScopedLock cg(s->m_ControlLock); + // a socket can "connect" only if it is in the following states: + // - OPENED: assume the socket binding parameters are configured + // - INIT: configure binding parameters here + // - any other (meaning, already connected): report error - // status = SRTS_OPENED, so family should be known already. - if (target_addr.family() != s->m_SelfAddr.family()) - { - LOGP(cnlog.Error, "srt_connect: socket is bound to a different family than target address"); - throw CUDTException(MJ_NOTSUP, MN_INVAL, 0); - } - } + if (s->m_Status == SRTS_INIT) + { + if (s->core().m_config.bRendezvous) + throw CUDTException(MJ_NOTSUP, MN_ISRENDUNBOUND, 0); + + // If bind() was done first on this socket, then the + // socket will not perform this step. This actually does the + // same thing as bind() does, just with empty address so that + // the binding parameters are autoselected. + + s->core().open(); + sockaddr_any autoselect_sa (target_addr.family()); + // This will create such a sockaddr_any that + // will return true from empty(). + updateMux(s, autoselect_sa); // <<---- updateMux + // -> C(Snd|Rcv)Queue::init + // -> pthread_create(...C(Snd|Rcv)Queue::worker...) + s->m_Status = SRTS_OPENED; + } + else + { + if (s->m_Status != SRTS_OPENED) + throw CUDTException(MJ_NOTSUP, MN_ISCONNECTED, 0); + // status = SRTS_OPENED, so family should be known already. + if (target_addr.family() != s->m_SelfAddr.family()) + { + LOGP(cnlog.Error, "srt_connect: socket is bound to a different family than target address"); + throw CUDTException(MJ_NOTSUP, MN_INVAL, 0); + } + } - // connect_complete() may be called before connect() returns. - // So we need to update the status before connect() is called, - // otherwise the status may be overwritten with wrong value - // (CONNECTED vs. CONNECTING). - s->m_Status = SRTS_CONNECTING; - /* - * In blocking mode, connect can block for up to 30 seconds for - * rendez-vous mode. Holding the s->m_ControlLock prevent close - * from cancelling the connect - */ - try - { - // record peer address - s->m_PeerAddr = target_addr; - s->core().startConnect(target_addr, forced_isn); - } - catch (CUDTException& e) // Interceptor, just to change the state. - { - s->m_Status = SRTS_OPENED; - throw e; - } + // connect_complete() may be called before connect() returns. + // So we need to update the status before connect() is called, + // otherwise the status may be overwritten with wrong value + // (CONNECTED vs. CONNECTING). + s->m_Status = SRTS_CONNECTING; - // ScopedLock destructor will delete cg and unlock s->m_ControlLock + /* + * In blocking mode, connect can block for up to 30 seconds for + * rendez-vous mode. Holding the s->m_ControlLock prevent close + * from cancelling the connect + */ + try + { + // record peer address + s->m_PeerAddr = target_addr; + s->core().startConnect(target_addr, forced_isn); + } + catch (CUDTException& e) // Interceptor, just to change the state. + { + s->m_Status = SRTS_OPENED; + throw e; + } - return 0; + return 0; } @@ -2857,9 +2855,9 @@ uint16_t srt::CUDTUnited::installMuxer(CUDTSocket* w_s, CMultiplexer& fw_sm) return sa.hport(); } -bool srt::CUDTUnited::channelSettingsMatch(const CMultiplexer& m, const CUDTSocket* s) +bool srt::CUDTUnited::channelSettingsMatch(const CSrtMuxerConfig& cfgMuxer, const CSrtConfig& cfgSocket) { - return m.m_mcfg.bReuseAddr && m.m_mcfg == s->core().m_config; + return cfgMuxer.bReuseAddr && cfgMuxer == cfgSocket; } void srt::CUDTUnited::updateMux(CUDTSocket* s, const sockaddr_any& addr, const UDPSOCKET* udpsock /*[[nullable]]*/) @@ -2876,6 +2874,7 @@ void srt::CUDTUnited::updateMux(CUDTSocket* s, const sockaddr_any& addr, const U // If not, we need to see if there exist already a multiplexer bound // to the same endpoint. const int port = addr.hport(); + const CSrtConfig& cfgSocket = s->core().m_config; bool reuse_attempt = false; for (map::iterator i = m_mMultiplexer.begin(); @@ -2912,14 +2911,14 @@ void srt::CUDTUnited::updateMux(CUDTSocket* s, const sockaddr_any& addr, const U // Still, for ANY you need either the same family, or open // for families. - if (m.m_mcfg.iIpV6Only != -1 && m.m_mcfg.iIpV6Only != s->core().m_config.iIpV6Only) + if (m.m_mcfg.iIpV6Only != -1 && m.m_mcfg.iIpV6Only != cfgSocket.iIpV6Only) { LOGC(smlog.Error, log << "bind: Address: " << addr.str() << " conflicts with existing IPv6 wildcard binding: " << sa.str()); throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0); } - if ((m.m_mcfg.iIpV6Only == 0 || s->core().m_config.iIpV6Only == 0) && m.m_iIPversion != addr.family()) + if ((m.m_mcfg.iIpV6Only == 0 || cfgSocket.iIpV6Only == 0) && m.m_iIPversion != addr.family()) { LOGC(smlog.Error, log << "bind: Address: " << addr.str() << " conflicts with IPv6 wildcard binding: " << sa.str() @@ -2955,7 +2954,7 @@ void srt::CUDTUnited::updateMux(CUDTSocket* s, const sockaddr_any& addr, const U if (reuse_attempt) { // - if the channel settings match, it can be reused - if (channelSettingsMatch(m, s)) + if (channelSettingsMatch(m.m_mcfg, cfgSocket)) { HLOGC(smlog.Debug, log << "bind: reusing multiplexer for port " << port); // reuse the existing multiplexer diff --git a/srtcore/api.h b/srtcore/api.h index a1bfbffd7..516322292 100644 --- a/srtcore/api.h +++ b/srtcore/api.h @@ -123,7 +123,8 @@ class CUDTSocket void construct(); - srt::sync::atomic m_Status; //< current socket state + SRT_ATTR_GUARDED_BY(m_ControlLock) + sync::atomic m_Status; //< current socket state /// Time when the socket is closed. /// When the socket is closed, it is not removed immediately from the list @@ -441,7 +442,12 @@ friend class CRendezvousQueue; // Utility functions for updateMux void configureMuxer(CMultiplexer& w_m, const CUDTSocket* s, int af); uint16_t installMuxer(CUDTSocket* w_s, CMultiplexer& sm); - bool channelSettingsMatch(const CMultiplexer& m, const CUDTSocket* s); + + /// @brief Checks if channel configuration matches the socket configuration. + /// @param cfgMuxer multiplexer configuration. + /// @param cfgSocket socket configuration. + /// @return tru if configurations match, false otherwise. + static bool channelSettingsMatch(const CSrtMuxerConfig& cfgMuxer, const CSrtConfig& cfgSocket); private: std::map m_mMultiplexer; // UDP multiplexer diff --git a/srtcore/core.h b/srtcore/core.h index 1fd5f6730..e74c58c6e 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -499,14 +499,17 @@ class CUDT SRT_ATR_NODISCARD size_t fillSrtHandshake_HSRSP(uint32_t* srtdata, size_t srtlen, int hs_version); SRT_ATR_NODISCARD size_t fillSrtHandshake(uint32_t* srtdata, size_t srtlen, int msgtype, int hs_version); - SRT_ATR_NODISCARD bool createSrtHandshake(int srths_cmd, int srtkm_cmd, const uint32_t* data, size_t datalen, + SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock) + bool createSrtHandshake(int srths_cmd, int srtkm_cmd, const uint32_t* data, size_t datalen, CPacket& w_reqpkt, CHandShake& w_hs); SRT_ATR_NODISCARD size_t fillHsExtConfigString(uint32_t *pcmdspec, int cmd, const std::string &str); #if ENABLE_EXPERIMENTAL_BONDING SRT_ATR_NODISCARD size_t fillHsExtGroup(uint32_t *pcmdspec); #endif - SRT_ATR_NODISCARD size_t fillHsExtKMREQ(uint32_t *pcmdspec, size_t ki); + SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock) + size_t fillHsExtKMREQ(uint32_t *pcmdspec, size_t ki); + SRT_ATR_NODISCARD size_t fillHsExtKMRSP(uint32_t *pcmdspec, const uint32_t *kmdata, size_t kmdata_wordsize); SRT_ATR_NODISCARD size_t prepareSrtHsMsg(int cmd, uint32_t* srtdata, size_t size);