diff --git a/CMakeLists.txt b/CMakeLists.txt index 0aa0d32eb..4611f0ca2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -108,6 +108,7 @@ endforeach() # SRT_DEBUG_BONDING_STATES 1 # SRT_DEBUG_RTT 1 /* RTT trace */ # SRT_MAVG_SAMPLING_RATE 40 /* Max sampling rate */ +# SRT_ENABLE_FREQUENT_LOG_TRACE 0 : set to 1 to enable printing reason for suppressed freq logs # option defaults set(ENABLE_HEAVY_LOGGING_DEFAULT OFF) diff --git a/srtcore/atomic.h b/srtcore/atomic.h index f1394a7da..f391be941 100644 --- a/srtcore/atomic.h +++ b/srtcore/atomic.h @@ -175,6 +175,22 @@ class atomic { #endif } + T operator|=(T i) { +#if defined(ATOMIC_USE_SRT_SYNC_MUTEX) && (ATOMIC_USE_SRT_SYNC_MUTEX == 1) + ScopedLock lg_(mutex_); + const T t = value_ |= i; + return t; +#elif defined(ATOMIC_USE_GCC_INTRINSICS) + return __atomic_or_fetch(&value_, i, __ATOMIC_SEQ_CST); +#elif defined(ATOMIC_USE_MSVC_INTRINSICS) + return msvc::interlocked::or_fetch(&value_, i); +#elif defined(ATOMIC_USE_CPP11_ATOMIC) + return value_ |= i; +#else + #error "Implement Me." +#endif + } + /// @brief Performs an atomic compare-and-swap (CAS) operation. /// /// The value of the atomic object is only updated to the new value if the diff --git a/srtcore/atomic_msvc.h b/srtcore/atomic_msvc.h index 9e4df2dbd..735c7a660 100644 --- a/srtcore/atomic_msvc.h +++ b/srtcore/atomic_msvc.h @@ -119,6 +119,11 @@ struct interlocked { return static_cast(_InterlockedExchange8( reinterpret_cast(x), static_cast(new_val))); } + + static inline T or_fetch(T volatile* x, const T val) { + return static_cast(_InterlockedOr8( + reinterpret_cast(x), static_cast(val))); + } }; template diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 1f5eed8ac..9587cc793 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -985,6 +985,7 @@ void srt::CUDT::open() #endif m_iReXmitCount = 1; + memset(&m_aSuppressedMsg, 0, sizeof m_aSuppressedMsg); m_iPktCount = 0; m_iLightACKCount = 1; m_tsNextSendTime = steady_clock::time_point(); @@ -5419,9 +5420,19 @@ void * srt::CUDT::tsbpd(void* param) << iDropCnt << " packets) playable at " << FormatTime(info.tsbpd_time) << " delayed " << (timediff_us / 1000) << "." << std::setw(3) << std::setfill('0') << (timediff_us % 1000) << " ms"); #endif - LOGC(brlog.Warn, log << self->CONID() << "RCV-DROPPED " << iDropCnt << " packet(s). Packet seqno %" << info.seqno - << " delayed for " << (timediff_us / 1000) << "." << std::setw(3) << std::setfill('0') - << (timediff_us % 1000) << " ms"); + string why; + if (self->frequentLogAllowed(FREQLOGFA_RCV_DROPPED, tnow, (why))) + { + LOGC(brlog.Warn, log << self->CONID() << "RCV-DROPPED " << iDropCnt << " packet(s). Packet seqno %" << info.seqno + << " delayed for " << (timediff_us / 1000) << "." << std::setw(3) << std::setfill('0') + << (timediff_us % 1000) << " ms " << why); + } +#if SRT_ENABLE_FREQUENT_LOG_TRACE + else + { + LOGC(brlog.Warn, log << "SUPPRESSED: RCV-DROPPED LOG: " << why); + } +#endif #endif tsNextDelivery = steady_clock::time_point(); // Ready to read, nothing to wait for. @@ -5878,13 +5889,44 @@ void srt::CUDT::acceptAndRespond(const sockaddr_any& agent, const sockaddr_any& addressAndSend((response)); } -bool srt::CUDT::frequentLogAllowed(const time_point& tnow) const +bool srt::CUDT::frequentLogAllowed(size_t logid, const time_point& tnow, std::string& w_why) { #ifndef SRT_LOG_SLOWDOWN_FREQ_MS #define SRT_LOG_SLOWDOWN_FREQ_MS 1000 #endif - return (m_tsLogSlowDown + milliseconds_from(SRT_LOG_SLOWDOWN_FREQ_MS)) <= tnow; + bool is_suppressed = IsSet(m_LogSlowDownExpired, BIT(logid)); + bool isnow = (m_tsLogSlowDown.load() + milliseconds_from(SRT_LOG_SLOWDOWN_FREQ_MS)) <= tnow; + if (isnow) + { + // Theoretically this should prevent other calls of this function to take + // set their values simultaneously, but if it happened that the time is + // also set, this section will not fire for the other log, if it didn't do + // the check yet. + m_LogSlowDownExpired.store(uint8_t(BIT(logid))); // Clear all other bits + + // Note: it may happen that two threads could intermix one another between + // the check and setting up, but this will at worst case set the slightly + // later time again. + m_tsLogSlowDown.store(tnow); + + is_suppressed = false; + + int supr = m_aSuppressedMsg[logid]; + + if (supr > 0) + w_why = Sprint("++SUPPRESSED: ", supr); + m_aSuppressedMsg[logid] = 0; + } + else + { + w_why = Sprint("Too early - last one was ", FormatDuration(tnow - m_tsLogSlowDown.load())); + // Set YOUR OWN bit, atomically. + m_LogSlowDownExpired |= uint8_t(BIT(logid)); + ++m_aSuppressedMsg[logid]; + } + + return !is_suppressed; } // This function is required to be called when a caller receives an INDUCTION @@ -8947,15 +8989,25 @@ void srt::CUDT::processCtrlDropReq(const CPacket& ctrlpkt) if (iDropCnt > 0) { - LOGC(brlog.Warn, log << CONID() << "RCV-DROPPED " << iDropCnt << " packet(s), seqno range %" - << dropdata[0] << "-%" << dropdata[1] << ", msgno " << ctrlpkt.getMsgSeq(using_rexmit_flag) - << " (SND DROP REQUEST)."); + ScopedLock lg (m_StatsLock); + const steady_clock::time_point tnow = steady_clock::now(); + string why; + if (frequentLogAllowed(FREQLOGFA_RCV_DROPPED, tnow, (why))) + { + LOGC(brlog.Warn, log << CONID() << "RCV-DROPPED " << iDropCnt << " packet(s), seqno range %" + << dropdata[0] << "-%" << dropdata[1] << ", msgno " << ctrlpkt.getMsgSeq(using_rexmit_flag) + << " (SND DROP REQUEST). " << why); + } +#if SRT_ENABLE_FREQUENT_LOG_TRACE + else + { + LOGC(brlog.Warn, log << "SUPPRESSED: RCV-DROPPED LOG: " << why); + } +#endif - enterCS(m_StatsLock); // Estimate dropped bytes from average payload size. const uint64_t avgpayloadsz = m_pRcvBuffer->getRcvAvgPayloadSize(); m_stats.rcvr.dropped.count(stats::BytesPacketsCount(iDropCnt * avgpayloadsz, (uint32_t) iDropCnt)); - leaveCS(m_StatsLock); } } // When the drop request was received, it means that there are @@ -10073,12 +10125,19 @@ int srt::CUDT::handleSocketPacketReception(const vector& incoming, bool& ScopedLock lg(m_StatsLock); m_stats.rcvr.dropped.count(stats::BytesPacketsCount(iDropCnt * rpkt.getLength(), iDropCnt)); m_stats.rcvr.undecrypted.count(stats::BytesPacketsCount(rpkt.getLength(), 1)); - if (frequentLogAllowed(tnow)) + string why; + if (frequentLogAllowed(FREQLOGFA_ENCRYPTION_FAILURE, tnow, (why))) { LOGC(qrlog.Warn, log << CONID() << "Decryption failed (seqno %" << u->m_Packet.getSeqNo() << "), dropped " - << iDropCnt << ". pktRcvUndecryptTotal=" << m_stats.rcvr.undecrypted.total.count() << "."); - m_tsLogSlowDown = tnow; + << iDropCnt << ". pktRcvUndecryptTotal=" << m_stats.rcvr.undecrypted.total.count() << "." << why); } +#if SRT_ENABLE_FREQUENT_LOG_TRACE + else + { + + LOGC(qrlog.Warn, log << "SUPPRESSED: Decryption failed LOG: " << why); + } +#endif } } else if (m_pCryptoControl && m_pCryptoControl->getCryptoMode() == CSrtConfig::CIPHER_MODE_AES_GCM) @@ -10090,11 +10149,11 @@ int srt::CUDT::handleSocketPacketReception(const vector& incoming, bool& ScopedLock lg(m_StatsLock); m_stats.rcvr.dropped.count(stats::BytesPacketsCount(iDropCnt* rpkt.getLength(), iDropCnt)); m_stats.rcvr.undecrypted.count(stats::BytesPacketsCount(rpkt.getLength(), 1)); - if (frequentLogAllowed(tnow)) + string why; + if (frequentLogAllowed(FREQLOGFA_ENCRYPTION_FAILURE, tnow, (why))) { LOGC(qrlog.Warn, log << CONID() << "Packet not encrypted (seqno %" << u->m_Packet.getSeqNo() << "), dropped " << iDropCnt << ". pktRcvUndecryptTotal=" << m_stats.rcvr.undecrypted.total.count() << "."); - m_tsLogSlowDown = tnow; } } } diff --git a/srtcore/core.h b/srtcore/core.h index fadb64e23..d886caf4a 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -77,6 +77,10 @@ modified by #include +#ifndef SRT_ENABLE_FREQUENT_LOG_TRACE +#define SRT_ENABLE_FREQUENT_LOG_TRACE 0 +#endif + // TODO: Utility function - to be moved to utilities.h? template @@ -922,14 +926,20 @@ class CUDT SRT_ATTR_GUARDED_BY(m_RecvAckLock) int32_t m_iReXmitCount; // Re-Transmit Count since last ACK - time_point m_tsLogSlowDown; // The last time a log message from the "slow down" group was shown. + static const size_t + MAX_FREQLOGFA = 2, + FREQLOGFA_ENCRYPTION_FAILURE = 0, + FREQLOGFA_RCV_DROPPED = 1; + atomic_time_point m_tsLogSlowDown; // The last time a log message from the "slow down" group was shown. // The "slow down" group of logs are those that can be printed too often otherwise, but can't be turned off (warnings and errors). // Currently only used by decryption failure message, therefore no mutex protection needed. + sync::atomic m_LogSlowDownExpired; // Can't use bitset because atomic + sync::atomic m_aSuppressedMsg[MAX_FREQLOGFA]; /// @brief Check if a frequent log can be shown. /// @param tnow current time /// @return true if it is ok to print a frequent log message. - bool frequentLogAllowed(const time_point& tnow) const; + bool frequentLogAllowed(size_t logid, const time_point& tnow, std::string& why); private: // Receiving related data CRcvBuffer* m_pRcvBuffer; //< Receiver buffer