diff --git a/srtcore/buffer_snd.cpp b/srtcore/buffer_snd.cpp index 26f885dd6..39e476271 100644 --- a/srtcore/buffer_snd.cpp +++ b/srtcore/buffer_snd.cpp @@ -355,7 +355,11 @@ int CSndBuffer::readData(CPacket& w_packet, steady_clock::time_point& w_srctime, continue; } - HLOGC(bslog.Debug, log << CONID() << "CSndBuffer: extracting packet size=" << readlen << " to send"); + HLOGC(bslog.Debug, log << CONID() << "CSndBuffer: picked up packet to send: size=" << readlen + << " #" << w_packet.getMsgSeq() + << " %" << w_packet.m_iSeqNo + << " !" << BufferStamp(w_packet.m_pcData, w_packet.getLength())); + break; } diff --git a/srtcore/common.h b/srtcore/common.h index de42a7d83..bfc0c1d96 100644 --- a/srtcore/common.h +++ b/srtcore/common.h @@ -1430,6 +1430,23 @@ inline bool checkMappedIPv4(const sockaddr_in6& sa) return checkMappedIPv4(addr); } +inline std::string FormatLossArray(const std::vector< std::pair >& lra) +{ + std::ostringstream os; + + os << "[ "; + for (std::vector< std::pair >::const_iterator i = lra.begin(); i != lra.end(); ++i) + { + int len = CSeqNo::seqoff(i->first, i->second); + os << "%" << i->first; + if (len > 1) + os << "+" << len; + os << " "; + } + + os << "]"; + return os.str(); +} } // namespace srt diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 084cad670..38d59876f 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -11338,7 +11338,7 @@ bool srt::CUDT::checkExpTimer(const steady_clock::time_point& currtime, int chec // Application will detect this when it calls any UDT methods next time. // HLOGC(xtlog.Debug, - log << CONID() << "CONNECTION EXPIRED after " << count_milliseconds(currtime - last_rsp_time) << "ms"); + log << CONID() << "CONNECTION EXPIRED after " << FormatDuration(currtime - last_rsp_time) << " - BREAKING"); m_bClosing = true; m_bBroken = true; m_iBrokenCounter = 30; @@ -11566,6 +11566,7 @@ void srt::CUDT::completeBrokenConnectionDependencies(int errorcode) { // XXX This somehow can cause a deadlock // uglobal()->close(m_parent); + LOGC(smlog.Debug, log << "updateBrokenConnection...: BROKEN SOCKET @" << m_SocketID << " - CLOSING, to be removed from group."); m_parent->setBrokenClosed(); } #endif diff --git a/srtcore/core.h b/srtcore/core.h index 1d48436fb..2fdbe4d12 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -389,6 +389,11 @@ class CUDT return (int32_t) sync::count_microseconds(from_time - tsStartTime); } + static void setPacketTS(CPacket& p, const time_point& start_time, const time_point& ts) + { + p.m_iTimeStamp = makeTS(ts, start_time); + } + /// @brief Set the timestamp field of the packet using the provided value (no check) /// @param p the packet structure to set the timestamp on. /// @param ts timestamp to use as a source for packet timestamp. diff --git a/srtcore/group.cpp b/srtcore/group.cpp index 001dd4802..c6b92a0ff 100644 --- a/srtcore/group.cpp +++ b/srtcore/group.cpp @@ -278,8 +278,8 @@ CUDTGroup::CUDTGroup(SRT_GROUP_TYPE gtype) , m_iLastSchedMsgNo(SRT_MSGNO_NONE) { setupMutex(m_GroupLock, "Group"); - setupMutex(m_RcvDataLock, "RcvData"); - setupCond(m_RcvDataCond, "RcvData"); + setupMutex(m_RcvDataLock, "G/RcvData"); + setupCond(m_RcvDataCond, "G/RcvData"); m_RcvEID = m_Global.m_EPoll.create(&m_RcvEpolld); m_SndEID = m_Global.m_EPoll.create(&m_SndEpolld); @@ -861,7 +861,7 @@ void CUDTGroup::syncWithSocket(const CUDT& core, const HandshakeSide side) // Get the latency (possibly fixed against the opposite side) // from the first socket (core.m_iTsbPdDelay_ms), // and set it on the current socket. - set_latency(core.m_iTsbPdDelay_ms * int64_t(1000)); + set_latency_us(core.m_iTsbPdDelay_ms * int64_t(1000)); } void CUDTGroup::close() diff --git a/srtcore/group.h b/srtcore/group.h index c2863b44e..247423da3 100644 --- a/srtcore/group.h +++ b/srtcore/group.h @@ -611,7 +611,7 @@ class CUDTGroup private: // Fields required for SRT_GTYPE_BACKUP groups. - senderBuffer_t m_SenderBuffer; + senderBuffer_t m_SenderBuffer; // This mechanism is to be removed on group-common sndbuf int32_t m_iSndOldestMsgNo; // oldest position in the sender buffer sync::atomic m_iSndAckedMsgNo; uint32_t m_uOPT_MinStabilityTimeout_us; @@ -800,7 +800,7 @@ class CUDTGroup SRTU_PROPERTY_RW_CHAIN(CUDTGroup, SRT_GROUP_TYPE, type, m_type); SRTU_PROPERTY_RW_CHAIN(CUDTGroup, int32_t, currentSchedSequence, m_iLastSchedSeqNo); SRTU_PROPERTY_RRW(std::set&, epollset, m_sPollID); - SRTU_PROPERTY_RW_CHAIN(CUDTGroup, int64_t, latency, m_iTsbPdDelay_us); + SRTU_PROPERTY_RW_CHAIN(CUDTGroup, int64_t, latency_us, m_iTsbPdDelay_us); SRTU_PROPERTY_RO(bool, closing, m_bClosing); }; diff --git a/srtcore/handshake.h b/srtcore/handshake.h index 93a351f39..c6a5731e4 100644 --- a/srtcore/handshake.h +++ b/srtcore/handshake.h @@ -311,6 +311,8 @@ class CHandShake // Applicable only when m_iVersion == HS_VERSION_SRT1 int32_t flags() { return m_iType; } + bool v5orHigher() { return m_iVersion > 4; } + public: int32_t m_iVersion; // UDT version (HS_VERSION_* symbols) int32_t m_iType; // UDT4: socket type (only UDT_DGRAM is valid); SRT1: extension flags diff --git a/srtcore/list.cpp b/srtcore/list.cpp index b6e70d39d..0a175ee9d 100644 --- a/srtcore/list.cpp +++ b/srtcore/list.cpp @@ -99,16 +99,7 @@ srt::CSndLossList::~CSndLossList() void srt::CSndLossList::traceState() const { - int pos = m_iHead; - while (pos != SRT_SEQNO_NONE) - { - std::cout << pos << ":[" << m_caSeq[pos].seqstart; - if (m_caSeq[pos].seqend != SRT_SEQNO_NONE) - std::cout << ", " << m_caSeq[pos].seqend; - std::cout << "], "; - pos = m_caSeq[pos].inext; - } - std::cout << "\n"; + traceState(std::cout) << "\n"; } int srt::CSndLossList::insert(int32_t seqno1, int32_t seqno2) @@ -508,6 +499,10 @@ srt::CRcvLossList::~CRcvLossList() int srt::CRcvLossList::insert(int32_t seqno1, int32_t seqno2) { + SRT_ASSERT(seqno1 != SRT_SEQNO_NONE && seqno2 != SRT_SEQNO_NONE); + // Make sure that seqno2 isn't earlier than seqno1. + SRT_ASSERT(CSeqNo::seqcmp(seqno1, seqno2) <= 0); + // Data to be inserted must be larger than all those in the list if (m_iLargestSeq != SRT_SEQNO_NONE && CSeqNo::seqcmp(seqno1, m_iLargestSeq) <= 0) { diff --git a/srtcore/list.h b/srtcore/list.h index 8f921c698..40b8ee0c9 100644 --- a/srtcore/list.h +++ b/srtcore/list.h @@ -84,6 +84,25 @@ class CSndLossList /// @return The seq. no. or -1 if the list is empty. int32_t popLostSeq(); + template + Stream& traceState(Stream& sout) const + { + int pos = m_iHead; + while (pos != SRT_SEQNO_NONE) + { + sout << "[" << pos << "]:" << m_caSeq[pos].seqstart; + if (m_caSeq[pos].seqend != SRT_SEQNO_NONE) + sout << ":" << m_caSeq[pos].seqend; + if (m_caSeq[pos].inext == -1) + sout << "=|"; + else + sout << "->[" << m_caSeq[pos].inext << "]"; + sout << ", "; + pos = m_caSeq[pos].inext; + } + sout << " {len:" << m_iLength << " head:" << m_iHead << " last:" << m_iLastInsertPos << "}"; + return sout; + } void traceState() const; // Debug/unittest support. diff --git a/srtcore/logging.h b/srtcore/logging.h index e90ad4ac2..5867c41ae 100644 --- a/srtcore/logging.h +++ b/srtcore/logging.h @@ -75,7 +75,7 @@ written by #define HLOGP LOGP #define HLOGF LOGF -#define IF_HEAVY_LOGGING(instr) instr +#define IF_HEAVY_LOGGING(instr,...) instr,##__VA_ARGS__ #else diff --git a/srtcore/socketconfig.cpp b/srtcore/socketconfig.cpp index 5c5b8e090..bd76ccc19 100644 --- a/srtcore/socketconfig.cpp +++ b/srtcore/socketconfig.cpp @@ -52,6 +52,27 @@ written by #include "srt.h" #include "socketconfig.h" +namespace srt +{ +int RcvBufferSizeOptionToValue(int val, int flightflag, int mss) +{ + // Mimimum recv buffer size is 32 packets + const int mssin_size = mss - CPacket::UDP_HDR_SIZE; + + int bufsize; + if (val > mssin_size * CSrtConfig::DEF_MIN_FLIGHT_PKT) + bufsize = val / mssin_size; + else + bufsize = CSrtConfig::DEF_MIN_FLIGHT_PKT; + + // recv buffer MUST not be greater than FC size + if (bufsize > flightflag) + bufsize = flightflag; + + return bufsize; +} +} + using namespace srt; extern const int32_t SRT_DEF_VERSION = SrtParseVersion(SRT_VERSION); @@ -122,17 +143,7 @@ struct CSrtConfigSetter if (val <= 0) throw CUDTException(MJ_NOTSUP, MN_INVAL, 0); - // Mimimum recv buffer size is 32 packets - const int mssin_size = co.iMSS - CPacket::UDP_HDR_SIZE; - - if (val > mssin_size * co.DEF_MIN_FLIGHT_PKT) - co.iRcvBufSize = val / mssin_size; - else - co.iRcvBufSize = co.DEF_MIN_FLIGHT_PKT; - - // recv buffer MUST not be greater than FC size - if (co.iRcvBufSize > co.iFlightFlagSize) - co.iRcvBufSize = co.iFlightFlagSize; + co.iRcvBufSize = srt::RcvBufferSizeOptionToValue(val, co.iFlightFlagSize, co.iMSS); } }; diff --git a/srtcore/socketconfig.h b/srtcore/socketconfig.h index 403616edf..52e6101d4 100644 --- a/srtcore/socketconfig.h +++ b/srtcore/socketconfig.h @@ -379,6 +379,9 @@ inline bool cast_optval(const void* optval, int optlen) return false; } + +int RcvBufferSizeOptionToValue(int optval, int flightflag, int mss); + } // namespace srt struct SRT_SocketOptionObject diff --git a/srtcore/sync.h b/srtcore/sync.h index 87be6f458..c5a247f87 100644 --- a/srtcore/sync.h +++ b/srtcore/sync.h @@ -771,7 +771,7 @@ struct DurationUnitName template inline std::string FormatDuration(const steady_clock::duration& dur) { - return Sprint(DurationUnitName::count(dur)) + DurationUnitName::name(); + return Sprint(std::fixed, DurationUnitName::count(dur)) + DurationUnitName::name(); } inline std::string FormatDuration(const steady_clock::duration& dur) diff --git a/srtcore/tsbpd_time.cpp b/srtcore/tsbpd_time.cpp index 046c90b74..162fc7ac7 100644 --- a/srtcore/tsbpd_time.cpp +++ b/srtcore/tsbpd_time.cpp @@ -220,7 +220,16 @@ CTsbpdTime::time_point CTsbpdTime::getTsbPdTimeBase(uint32_t timestamp_us) const CTsbpdTime::time_point CTsbpdTime::getPktTsbPdTime(uint32_t usPktTimestamp) const { - return getPktTsbPdBaseTime(usPktTimestamp) + m_tdTsbPdDelay + microseconds_from(m_DriftTracer.drift()); + time_point value = getPktTsbPdBaseTime(usPktTimestamp) + m_tdTsbPdDelay + microseconds_from(m_DriftTracer.drift()); + + /* + HLOGC(brlog.Debug, log << "getPktTsbPdTime:" + << " BASE=" << FormatTime(m_tsTsbPdTimeBase) + << " TS=" << usPktTimestamp << "us, lat=" << FormatDuration(m_tdTsbPdDelay) + << " DRF=" << m_DriftTracer.drift() << "us = " << FormatTime(value)); + */ + + return value; } CTsbpdTime::time_point CTsbpdTime::getPktTsbPdBaseTime(uint32_t usPktTimestamp) const diff --git a/srtcore/utilities.h b/srtcore/utilities.h index 31e05b205..5f9d62898 100644 --- a/srtcore/utilities.h +++ b/srtcore/utilities.h @@ -432,7 +432,7 @@ class FixedArray const T& operator[](size_t index) const { if (index >= m_size) - raise_expection(index); + throw_invalid_index(index); return m_entries[index]; } @@ -440,7 +440,7 @@ class FixedArray T& operator[](size_t index) { if (index >= m_size) - raise_expection(index); + throw_invalid_index(index); return m_entries[index]; } @@ -448,7 +448,7 @@ class FixedArray const T& operator[](int index) const { if (index < 0 || static_cast(index) >= m_size) - raise_expection(index); + throw_invalid_index(index); return m_entries[index]; } @@ -456,7 +456,7 @@ class FixedArray T& operator[](int index) { if (index < 0 || static_cast(index) >= m_size) - raise_expection(index); + throw_invalid_index(index); return m_entries[index]; } @@ -478,7 +478,7 @@ class FixedArray FixedArray(const FixedArray& ); FixedArray& operator=(const FixedArray&); - void raise_expection(int i) const + void throw_invalid_index(int i) const { std::stringstream ss; ss << "Index " << i << "out of range"; @@ -534,6 +534,34 @@ namespace srt_pair_op } } +namespace any_op +{ + template + struct AnyProxy + { + const T& value; + bool result; + + AnyProxy(const T& x, bool res): value(x), result(res) {} + + AnyProxy& operator,(const T& val) + { + if (result) + return *this; + result = value == val; + return *this; + } + + operator bool() { return result; } + }; + + template inline + AnyProxy EqualAny(const T& checked_val) + { + return AnyProxy(checked_val, false); + } +} + #if HAVE_CXX11 template @@ -666,6 +694,15 @@ inline std::string Sprint(const Arg1& arg) return sout.str(); } +// Ok, let it be 2-arg, in case when a manipulator is needed +template +inline std::string Sprint(const Arg1& arg1, const Arg2& arg2) +{ + std::ostringstream sout; + sout << arg1 << arg2; + return sout.str(); +} + template inline std::string Printable(const Container& in) { @@ -748,6 +785,32 @@ inline void insert_uniq(std::vector& v, const ArgValue& val) v.push_back(val); } +template +inline std::pair Tie(Type1& var1, Type2& var2) +{ + return std::pair(var1, var2); +} + +// This can be used in conjunction with Tie to simplify the code +// in loops around a whole container: +// list::const_iterator it, end; +// Tie(it, end) = All(list_container); +template +std::pair +inline All(Container& c) { return std::make_pair(c.begin(), c.end()); } + +template +std::pair +inline All(const Container& c) { return std::make_pair(c.begin(), c.end()); } + + +template +inline void FringeValues(const Container& from, std::map& out) +{ + for (typename Container::const_iterator i = from.begin(); i != from.end(); ++i) + ++out[*i]; +} + template struct CallbackHolder { @@ -1058,11 +1121,11 @@ inline ValueType avg_iir_w(ValueType old_value, ValueType new_value, size_t new_ // This relies only on a convention, which is the following: // // V x = object.prop(); <-- get the property's value -// object.prop(x); <-- set the property a value +// object.set_prop(x); <-- set the property a value // // Properties might be also chained when setting: // -// object.prop1(v1).prop2(v2).prop3(v3); +// object.set_prop1(v1).set_prop2(v2).set_prop3(v3); // // Properties may be defined various even very complicated // ways, which is simply providing a method with body. In order diff --git a/testing/testmedia.cpp b/testing/testmedia.cpp index 6fd22e468..2d6635288 100755 --- a/testing/testmedia.cpp +++ b/testing/testmedia.cpp @@ -963,16 +963,36 @@ void TransmitGroupSocketConnect(void* srtcommon, SRTSOCKET sock, int error, cons Verb() << " IPE: LINK NOT FOUND???]"; } -void SrtCommon::OpenGroupClient() +SRT_GROUP_TYPE ResolveGroupType(const string& name) { - SRT_GROUP_TYPE type = SRT_GTYPE_UNDEFINED; + static struct + { + string name; + SRT_GROUP_TYPE type; + } table [] { +#define E(n) {#n, SRT_GTYPE_##n} + E(BROADCAST), + E(BACKUP) - // Resolve group type. - if (m_group_type == "broadcast") - type = SRT_GTYPE_BROADCAST; - else if (m_group_type == "backup") - type = SRT_GTYPE_BACKUP; - else +#undef E + }; + + typedef int charxform(int c); + + string uname; + transform(name.begin(), name.end(), back_inserter(uname), (charxform*)(&toupper)); + + for (auto& x: table) + if (x.name == uname) + return x.type; + + return SRT_GTYPE_UNDEFINED; +} + +void SrtCommon::OpenGroupClient() +{ + SRT_GROUP_TYPE type = ResolveGroupType(m_group_type); + if (type == SRT_GTYPE_UNDEFINED) { Error("With //group, type='" + m_group_type + "' undefined"); }