diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 3391c3a194..174af939d5 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -175,6 +175,13 @@ circuit_breaker { # @remark 0 to disable the critical water-level. # Default: 1 critical_pulse 1; + # If dying, also drop packets for players. + # Default: 99 + dying_threshold 99; + # If CPU exceed the dying_pulse times, enter dying. + # @remark 0 to disable the dying water-level. + # Default: 5 + dying_pulse 5; } ############################################################################################# diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 75c4b46ceb..d4977a645f 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -4321,6 +4321,40 @@ int SrsConfig::get_critical_pulse() return ::atoi(conf->arg0().c_str()); } +int SrsConfig::get_dying_threshold() +{ + static int DEFAULT = 99; + + SrsConfDirective* conf = root->get("circuit_breaker"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("dying_threshold"); + if (!conf) { + return DEFAULT; + } + + return ::atoi(conf->arg0().c_str()); +} + +int SrsConfig::get_dying_pulse() +{ + static int DEFAULT = 5; + + SrsConfDirective* conf = root->get("circuit_breaker"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("dying_pulse"); + if (!conf) { + return DEFAULT; + } + + return ::atoi(conf->arg0().c_str()); +} + vector SrsConfig::get_stream_casters() { srs_assert(root); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 8ea82f296d..73f6255393 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -488,6 +488,8 @@ class SrsConfig virtual int get_high_pulse(); virtual int get_critical_threshold(); virtual int get_critical_pulse(); + virtual int get_dying_threshold(); + virtual int get_dying_pulse(); // stream_caster section public: // Get all stream_caster in config file. diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index ed85e9fdec..bfb26686ff 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -315,7 +315,7 @@ srs_error_t SrsPlaintextTransport::on_dtls_alert(std::string type, std::string d srs_error_t SrsPlaintextTransport::on_dtls_handshake_done() { - srs_trace("RTC: DTLS handshake done."); + srs_trace("RTC: DTLS plaintext handshake done."); return session_->on_connection_established(); } @@ -650,6 +650,9 @@ srs_error_t SrsRtcPlayStream::cycle() } } + // How many messages to run a yield. + uint32_t nn_msgs_for_yield = 0; + while (true) { if ((err = trd_->pull()) != srs_success) { return srs_error_wrap(err, "rtc sender thread"); @@ -677,6 +680,13 @@ srs_error_t SrsRtcPlayStream::cycle() // Release the packet to cache. // @remark Note that the pkt might be set to NULL. _srs_rtp_cache->recycle(pkt); + + // Yield to another coroutines. + // @see https://github.com/ossrs/srs/issues/2194#issuecomment-777485531 + if (++nn_msgs_for_yield > 10) { + nn_msgs_for_yield = 0; + srs_thread_yield(); + } } } diff --git a/trunk/src/app/srs_app_rtc_server.cpp b/trunk/src/app/srs_app_rtc_server.cpp index 5f16d1a85e..ab0f42e3d4 100644 --- a/trunk/src/app/srs_app_rtc_server.cpp +++ b/trunk/src/app/srs_app_rtc_server.cpp @@ -80,6 +80,7 @@ extern SrsPps* _srs_pps_rmnack; extern SrsPps* _srs_pps_rloss; extern SrsPps* _srs_pps_sloss; extern SrsPps* _srs_pps_aloss; +extern SrsPps* _srs_pps_aloss2; SrsRtcBlackhole::SrsRtcBlackhole() { @@ -721,9 +722,9 @@ srs_error_t SrsRtcServer::on_timer(srs_utime_t interval, srs_utime_t tick) // TODO: FIXME: Should move to Hybrid server stat. string loss_desc; - _srs_pps_aloss->update(); - if (_srs_pps_rloss->r1s() || _srs_pps_rloss->r10s() || _srs_pps_sloss->r10s() || _srs_pps_aloss->r10s()) { - snprintf(buf, sizeof(buf), ", loss=(r:%d/%d,s:%d,a:%d)", _srs_pps_rloss->r1s(), _srs_pps_rloss->r10s(), _srs_pps_sloss->r10s(), _srs_pps_aloss->r10s()); + _srs_pps_aloss->update(); _srs_pps_aloss2->update(); + if (_srs_pps_rloss->r1s() || _srs_pps_rloss->r10s() || _srs_pps_sloss->r10s() || _srs_pps_aloss->r10s() || _srs_pps_aloss2->r10s()) { + snprintf(buf, sizeof(buf), ", loss=(r:%d/%d,s:%d,a:%d/%d)", _srs_pps_rloss->r1s(), _srs_pps_rloss->r10s(), _srs_pps_sloss->r10s(), _srs_pps_aloss->r10s(), _srs_pps_aloss2->r10s()); loss_desc = buf; } diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index 75187f14c9..4a47797f31 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -43,6 +43,7 @@ #include #include #include +#include #ifdef SRS_FFMPEG_FIT #include @@ -63,6 +64,8 @@ SrsPps* _srs_pps_rnack2 = new SrsPps(); SrsPps* _srs_pps_rhnack = new SrsPps(); SrsPps* _srs_pps_rmnack = new SrsPps(); +extern SrsPps* _srs_pps_aloss2; + // Firefox defaults as 109, Chrome is 111. const int kAudioPayloadType = 111; const int kAudioChannel = 2; @@ -568,6 +571,12 @@ srs_error_t SrsRtcStream::on_rtp(SrsRtpPacket2* pkt) { srs_error_t err = srs_success; + // If circuit-breaker is dying, drop packet. + if (_srs_thread_pool->hybrid_dying_water_level()) { + _srs_pps_aloss2->sugar += (int64_t)consumers.size(); + return err; + } + for (int i = 0; i < (int)consumers.size(); i++) { SrsRtcConsumer* consumer = consumers.at(i); if ((err = consumer->enqueue(pkt->copy())) != srs_success) { diff --git a/trunk/src/app/srs_app_threads.cpp b/trunk/src/app/srs_app_threads.cpp index bcc4fcbd65..499bcf9d13 100644 --- a/trunk/src/app/srs_app_threads.cpp +++ b/trunk/src/app/srs_app_threads.cpp @@ -49,6 +49,7 @@ using namespace std; extern SrsPps* _srs_pps_rloss; extern SrsPps* _srs_pps_aloss; +extern SrsPps* _srs_pps_aloss2; extern SrsPps* _srs_pps_snack2; extern SrsPps* _srs_pps_snack3; @@ -155,6 +156,7 @@ SrsThreadPool::SrsThreadPool() hybrid_ = NULL; hybrid_high_water_level_ = 0; hybrid_critical_water_level_ = 0; + hybrid_dying_water_level_ = 0; trd_ = new SrsFastCoroutine("pool", this); @@ -162,6 +164,8 @@ SrsThreadPool::SrsThreadPool() high_pulse_ = 0; critical_threshold_ = 0; critical_pulse_ = 0; + dying_threshold_ = 0; + dying_pulse_ = 0; // Add primordial thread, current thread itself. SrsThreadEntry* entry = new SrsThreadEntry(); @@ -191,12 +195,17 @@ SrsThreadPool::~SrsThreadPool() bool SrsThreadPool::hybrid_high_water_level() { - return hybrid_critical_water_level_ || hybrid_high_water_level_; + return hybrid_critical_water_level() || hybrid_high_water_level_; } bool SrsThreadPool::hybrid_critical_water_level() { - return hybrid_critical_water_level_; + return hybrid_dying_water_level() || hybrid_critical_water_level_; +} + +bool SrsThreadPool::hybrid_dying_water_level() +{ + return dying_pulse_ && hybrid_dying_water_level_ >= dying_pulse_; } // Thread local objects. @@ -243,6 +252,8 @@ srs_error_t SrsThreadPool::initialize() high_pulse_ = _srs_config->get_high_pulse(); critical_threshold_ = _srs_config->get_critical_threshold(); critical_pulse_ = _srs_config->get_critical_pulse(); + dying_threshold_ = _srs_config->get_dying_threshold(); + dying_pulse_ = _srs_config->get_dying_pulse(); bool async_srtp = _srs_config->get_threads_async_srtp(); int recv_queue = _srs_config->get_threads_max_recv_queue(); @@ -255,10 +266,10 @@ srs_error_t SrsThreadPool::initialize() _srs_async_recv->set_tunnel_enabled(async_tunnel); _srs_async_srtp->set_tunnel_enabled(async_tunnel); - srs_trace("Thread #%d(%s): init name=%s, interval=%dms, async_srtp=%d, cpuset=%d/%d-0x%" PRIx64 "/%d-0x%" PRIx64 ", water_level=%dx%d,%dx%d, recvQ=%d, aSend=%d, tunnel=%d", + srs_trace("Thread #%d(%s): init name=%s, interval=%dms, async_srtp=%d, cpuset=%d/%d-0x%" PRIx64 "/%d-0x%" PRIx64 ", water_level=%dx%d,%dx%d,%dx%d recvQ=%d, aSend=%d, tunnel=%d", entry->num, entry->label.c_str(), entry->name.c_str(), srsu2msi(interval_), async_srtp, entry->cpuset_ok, r0, srs_covert_cpuset(entry->cpuset), r1, srs_covert_cpuset(entry->cpuset2), - high_pulse_, high_threshold_, critical_pulse_, critical_threshold_, + high_pulse_, high_threshold_, critical_pulse_, critical_threshold_, dying_pulse_, dying_threshold_, recv_queue, async_send, async_tunnel); return err; @@ -368,6 +379,13 @@ srs_error_t SrsThreadPool::run() } else if (hybrid_critical_water_level_ > 0) { hybrid_critical_water_level_--; } + + // Reset the dying water-level when CPU is low for N times. + if (hybrid_->stat->percent * 100 > dying_threshold_) { + hybrid_dying_water_level_ = srs_min(dying_pulse_ + 1, hybrid_dying_water_level_ + 1); + } else if (hybrid_dying_water_level_ > 0) { + hybrid_dying_water_level_ = 0; + } } sleep(1); @@ -419,8 +437,8 @@ srs_error_t SrsThreadPool::run() string circuit_breaker; if (hybrid_high_water_level() || hybrid_critical_water_level() || _srs_pps_aloss->r1s() || _srs_pps_rloss->r1s() || _srs_pps_snack2->r10s()) { - snprintf(buf, sizeof(buf), ", break=%d,%d, cond=%d,%d,%.2f%%, snk=%d,%d,%d", - hybrid_high_water_level(), hybrid_critical_water_level(), // Whether Circuit-Break is enable. + snprintf(buf, sizeof(buf), ", break=%d,%d,%d, cond=%d,%d,%.2f%%, snk=%d,%d,%d", + hybrid_high_water_level(), hybrid_critical_water_level(), hybrid_dying_water_level(), // Whether Circuit-Break is enable. _srs_pps_rloss->r1s(), _srs_pps_aloss->r1s(), thread_percent, // The conditions to enable Circuit-Breaker. _srs_pps_snack2->r10s(), _srs_pps_snack3->r10s(), // NACK packet,seqs sent. _srs_pps_snack4->r10s() // NACK drop by Circuit-Break. diff --git a/trunk/src/app/srs_app_threads.hpp b/trunk/src/app/srs_app_threads.hpp index c84a0e12b7..0c77b4eb03 100644 --- a/trunk/src/app/srs_app_threads.hpp +++ b/trunk/src/app/srs_app_threads.hpp @@ -127,12 +127,15 @@ class SrsThreadPool : public ISrsCoroutineHandler // @note To avoid the CPU change rapidly. int hybrid_high_water_level_; int hybrid_critical_water_level_; + int hybrid_dying_water_level_; private: // The config for high/critical water level. int high_threshold_; int high_pulse_; int critical_threshold_; int critical_pulse_; + int dying_threshold_; + int dying_pulse_; private: // A coroutine to consume cooked packets. SrsFastCoroutine* trd_; @@ -143,6 +146,7 @@ class SrsThreadPool : public ISrsCoroutineHandler // Whether hybrid server water-level is high. bool hybrid_high_water_level(); bool hybrid_critical_water_level(); + bool hybrid_dying_water_level(); // Setup the thread-local variables. static void setup(); // Initialize the thread pool. diff --git a/trunk/src/app/srs_app_utility.cpp b/trunk/src/app/srs_app_utility.cpp index a5132a60b3..3bcf8748f3 100644 --- a/trunk/src/app/srs_app_utility.cpp +++ b/trunk/src/app/srs_app_utility.cpp @@ -55,6 +55,7 @@ using namespace std; SrsPps* _srs_pps_rloss = new SrsPps(); SrsPps* _srs_pps_sloss = new SrsPps(); SrsPps* _srs_pps_aloss = new SrsPps(); +SrsPps* _srs_pps_aloss2 = new SrsPps(); // the longest time to wait for a process to quit. #define SRS_PROCESS_QUIT_TIMEOUT_MS 1000