Skip to content

Commit

Permalink
[core] Small refax of CUDTUnited::channelSettingsMatch(..)
Browse files Browse the repository at this point in the history
  • Loading branch information
maxsharabayko committed Oct 4, 2021
1 parent 5b0811c commit 790b783
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 68 deletions.
127 changes: 63 additions & 64 deletions srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1816,71 +1816,69 @@ int srt::CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, i

int srt::CUDTUnited::connectIn(CUDTSocket* s, const sockaddr_any& target_addr, int32_t forced_isn)
{
ScopedLock cg(s->m_ControlLock);
// a socket can "connect" only if it is in the following states:
// - OPENED: assume the socket binding parameters are configured
// - INIT: configure binding parameters here
// - any other (meaning, already connected): report error

if (s->m_Status == SRTS_INIT)
{
if (s->core().m_config.bRendezvous)
throw CUDTException(MJ_NOTSUP, MN_ISRENDUNBOUND, 0);

// If bind() was done first on this socket, then the
// socket will not perform this step. This actually does the
// same thing as bind() does, just with empty address so that
// the binding parameters are autoselected.

s->core().open();
sockaddr_any autoselect_sa (target_addr.family());
// This will create such a sockaddr_any that
// will return true from empty().
updateMux(s, autoselect_sa); // <<---- updateMux
// -> C(Snd|Rcv)Queue::init
// -> pthread_create(...C(Snd|Rcv)Queue::worker...)
s->m_Status = SRTS_OPENED;
}
else
{
if (s->m_Status != SRTS_OPENED)
throw CUDTException(MJ_NOTSUP, MN_ISCONNECTED, 0);
ScopedLock cg(s->m_ControlLock);
// a socket can "connect" only if it is in the following states:
// - OPENED: assume the socket binding parameters are configured
// - INIT: configure binding parameters here
// - any other (meaning, already connected): report error

// status = SRTS_OPENED, so family should be known already.
if (target_addr.family() != s->m_SelfAddr.family())
{
LOGP(cnlog.Error, "srt_connect: socket is bound to a different family than target address");
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
}
}
if (s->m_Status == SRTS_INIT)
{
if (s->core().m_config.bRendezvous)
throw CUDTException(MJ_NOTSUP, MN_ISRENDUNBOUND, 0);

// If bind() was done first on this socket, then the
// socket will not perform this step. This actually does the
// same thing as bind() does, just with empty address so that
// the binding parameters are autoselected.

s->core().open();
sockaddr_any autoselect_sa (target_addr.family());
// This will create such a sockaddr_any that
// will return true from empty().
updateMux(s, autoselect_sa); // <<---- updateMux
// -> C(Snd|Rcv)Queue::init
// -> pthread_create(...C(Snd|Rcv)Queue::worker...)
s->m_Status = SRTS_OPENED;
}
else
{
if (s->m_Status != SRTS_OPENED)
throw CUDTException(MJ_NOTSUP, MN_ISCONNECTED, 0);

// status = SRTS_OPENED, so family should be known already.
if (target_addr.family() != s->m_SelfAddr.family())
{
LOGP(cnlog.Error, "srt_connect: socket is bound to a different family than target address");
throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
}
}

// connect_complete() may be called before connect() returns.
// So we need to update the status before connect() is called,
// otherwise the status may be overwritten with wrong value
// (CONNECTED vs. CONNECTING).
s->m_Status = SRTS_CONNECTING;

/*
* In blocking mode, connect can block for up to 30 seconds for
* rendez-vous mode. Holding the s->m_ControlLock prevent close
* from cancelling the connect
*/
try
{
// record peer address
s->m_PeerAddr = target_addr;
s->core().startConnect(target_addr, forced_isn);
}
catch (CUDTException& e) // Interceptor, just to change the state.
{
s->m_Status = SRTS_OPENED;
throw e;
}
// connect_complete() may be called before connect() returns.
// So we need to update the status before connect() is called,
// otherwise the status may be overwritten with wrong value
// (CONNECTED vs. CONNECTING).
s->m_Status = SRTS_CONNECTING;

// ScopedLock destructor will delete cg and unlock s->m_ControlLock
/*
* In blocking mode, connect can block for up to 30 seconds for
* rendez-vous mode. Holding the s->m_ControlLock prevent close
* from cancelling the connect
*/
try
{
// record peer address
s->m_PeerAddr = target_addr;
s->core().startConnect(target_addr, forced_isn);
}
catch (CUDTException& e) // Interceptor, just to change the state.
{
s->m_Status = SRTS_OPENED;
throw e;
}

return 0;
return 0;
}


Expand Down Expand Up @@ -2857,9 +2855,9 @@ uint16_t srt::CUDTUnited::installMuxer(CUDTSocket* w_s, CMultiplexer& fw_sm)
return sa.hport();
}

bool srt::CUDTUnited::channelSettingsMatch(const CMultiplexer& m, const CUDTSocket* s)
bool srt::CUDTUnited::channelSettingsMatch(const CSrtMuxerConfig& cfgMuxer, const CSrtConfig& cfgSocket)
{
return m.m_mcfg.bReuseAddr && m.m_mcfg == s->core().m_config;
return cfgMuxer.bReuseAddr && cfgMuxer == cfgSocket;
}

void srt::CUDTUnited::updateMux(CUDTSocket* s, const sockaddr_any& addr, const UDPSOCKET* udpsock /*[[nullable]]*/)
Expand All @@ -2876,6 +2874,7 @@ void srt::CUDTUnited::updateMux(CUDTSocket* s, const sockaddr_any& addr, const U
// If not, we need to see if there exist already a multiplexer bound
// to the same endpoint.
const int port = addr.hport();
const CSrtConfig& cfgSocket = s->core().m_config;

bool reuse_attempt = false;
for (map<int, CMultiplexer>::iterator i = m_mMultiplexer.begin();
Expand Down Expand Up @@ -2912,14 +2911,14 @@ void srt::CUDTUnited::updateMux(CUDTSocket* s, const sockaddr_any& addr, const U

// Still, for ANY you need either the same family, or open
// for families.
if (m.m_mcfg.iIpV6Only != -1 && m.m_mcfg.iIpV6Only != s->core().m_config.iIpV6Only)
if (m.m_mcfg.iIpV6Only != -1 && m.m_mcfg.iIpV6Only != cfgSocket.iIpV6Only)
{
LOGC(smlog.Error, log << "bind: Address: " << addr.str()
<< " conflicts with existing IPv6 wildcard binding: " << sa.str());
throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
}

if ((m.m_mcfg.iIpV6Only == 0 || s->core().m_config.iIpV6Only == 0) && m.m_iIPversion != addr.family())
if ((m.m_mcfg.iIpV6Only == 0 || cfgSocket.iIpV6Only == 0) && m.m_iIPversion != addr.family())
{
LOGC(smlog.Error, log << "bind: Address: " << addr.str()
<< " conflicts with IPv6 wildcard binding: " << sa.str()
Expand Down Expand Up @@ -2955,7 +2954,7 @@ void srt::CUDTUnited::updateMux(CUDTSocket* s, const sockaddr_any& addr, const U
if (reuse_attempt)
{
// - if the channel settings match, it can be reused
if (channelSettingsMatch(m, s))
if (channelSettingsMatch(m.m_mcfg, cfgSocket))
{
HLOGC(smlog.Debug, log << "bind: reusing multiplexer for port " << port);
// reuse the existing multiplexer
Expand Down
10 changes: 8 additions & 2 deletions srtcore/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ class CUDTSocket

void construct();

srt::sync::atomic<SRT_SOCKSTATUS> m_Status; //< current socket state
SRT_ATTR_GUARDED_BY(m_ControlLock)
sync::atomic<SRT_SOCKSTATUS> m_Status; //< current socket state

/// Time when the socket is closed.
/// When the socket is closed, it is not removed immediately from the list
Expand Down Expand Up @@ -441,7 +442,12 @@ friend class CRendezvousQueue;
// Utility functions for updateMux
void configureMuxer(CMultiplexer& w_m, const CUDTSocket* s, int af);
uint16_t installMuxer(CUDTSocket* w_s, CMultiplexer& sm);
bool channelSettingsMatch(const CMultiplexer& m, const CUDTSocket* s);

/// @brief Checks if channel configuration matches the socket configuration.
/// @param cfgMuxer multiplexer configuration.
/// @param cfgSocket socket configuration.
/// @return tru if configurations match, false otherwise.
static bool channelSettingsMatch(const CSrtMuxerConfig& cfgMuxer, const CSrtConfig& cfgSocket);

private:
std::map<int, CMultiplexer> m_mMultiplexer; // UDP multiplexer
Expand Down
7 changes: 5 additions & 2 deletions srtcore/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -499,14 +499,17 @@ class CUDT
SRT_ATR_NODISCARD size_t fillSrtHandshake_HSRSP(uint32_t* srtdata, size_t srtlen, int hs_version);
SRT_ATR_NODISCARD size_t fillSrtHandshake(uint32_t* srtdata, size_t srtlen, int msgtype, int hs_version);

SRT_ATR_NODISCARD bool createSrtHandshake(int srths_cmd, int srtkm_cmd, const uint32_t* data, size_t datalen,
SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock)
bool createSrtHandshake(int srths_cmd, int srtkm_cmd, const uint32_t* data, size_t datalen,
CPacket& w_reqpkt, CHandShake& w_hs);

SRT_ATR_NODISCARD size_t fillHsExtConfigString(uint32_t *pcmdspec, int cmd, const std::string &str);
#if ENABLE_EXPERIMENTAL_BONDING
SRT_ATR_NODISCARD size_t fillHsExtGroup(uint32_t *pcmdspec);
#endif
SRT_ATR_NODISCARD size_t fillHsExtKMREQ(uint32_t *pcmdspec, size_t ki);
SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock)
size_t fillHsExtKMREQ(uint32_t *pcmdspec, size_t ki);

SRT_ATR_NODISCARD size_t fillHsExtKMRSP(uint32_t *pcmdspec, const uint32_t *kmdata, size_t kmdata_wordsize);

SRT_ATR_NODISCARD size_t prepareSrtHsMsg(int cmd, uint32_t* srtdata, size_t size);
Expand Down

0 comments on commit 790b783

Please sign in to comment.