Skip to content

Commit

Permalink
Threads: Support circuit-breaker dying threshold
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Apr 26, 2021
1 parent d6a92cb commit e15737f
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 10 deletions.
7 changes: 7 additions & 0 deletions trunk/conf/full.conf
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,13 @@ circuit_breaker {
# @remark 0 to disable the critical water-level.
# Default: 1
critical_pulse 1;
# If dying, also drop packets for players.
# Default: 99
dying_threshold 99;
# If CPU exceed the dying_pulse times, enter dying.
# @remark 0 to disable the dying water-level.
# Default: 5
dying_pulse 5;
}

#############################################################################################
Expand Down
34 changes: 34 additions & 0 deletions trunk/src/app/srs_app_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4321,6 +4321,40 @@ int SrsConfig::get_critical_pulse()
return ::atoi(conf->arg0().c_str());
}

int SrsConfig::get_dying_threshold()
{
static int DEFAULT = 99;

SrsConfDirective* conf = root->get("circuit_breaker");
if (!conf) {
return DEFAULT;
}

conf = conf->get("dying_threshold");
if (!conf) {
return DEFAULT;
}

return ::atoi(conf->arg0().c_str());
}

int SrsConfig::get_dying_pulse()
{
static int DEFAULT = 5;

SrsConfDirective* conf = root->get("circuit_breaker");
if (!conf) {
return DEFAULT;
}

conf = conf->get("dying_pulse");
if (!conf) {
return DEFAULT;
}

return ::atoi(conf->arg0().c_str());
}

vector<SrsConfDirective*> SrsConfig::get_stream_casters()
{
srs_assert(root);
Expand Down
2 changes: 2 additions & 0 deletions trunk/src/app/srs_app_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,8 @@ class SrsConfig
virtual int get_high_pulse();
virtual int get_critical_threshold();
virtual int get_critical_pulse();
virtual int get_dying_threshold();
virtual int get_dying_pulse();
// stream_caster section
public:
// Get all stream_caster in config file.
Expand Down
12 changes: 11 additions & 1 deletion trunk/src/app/srs_app_rtc_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ srs_error_t SrsPlaintextTransport::on_dtls_alert(std::string type, std::string d

srs_error_t SrsPlaintextTransport::on_dtls_handshake_done()
{
srs_trace("RTC: DTLS handshake done.");
srs_trace("RTC: DTLS plaintext handshake done.");
return session_->on_connection_established();
}

Expand Down Expand Up @@ -650,6 +650,9 @@ srs_error_t SrsRtcPlayStream::cycle()
}
}

// How many messages to run a yield.
uint32_t nn_msgs_for_yield = 0;

while (true) {
if ((err = trd_->pull()) != srs_success) {
return srs_error_wrap(err, "rtc sender thread");
Expand Down Expand Up @@ -677,6 +680,13 @@ srs_error_t SrsRtcPlayStream::cycle()
// Release the packet to cache.
// @remark Note that the pkt might be set to NULL.
_srs_rtp_cache->recycle(pkt);

// Yield to another coroutines.
// @see https://github.com/ossrs/srs/issues/2194#issuecomment-777485531
if (++nn_msgs_for_yield > 10) {
nn_msgs_for_yield = 0;
srs_thread_yield();
}
}
}

Expand Down
7 changes: 4 additions & 3 deletions trunk/src/app/srs_app_rtc_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ extern SrsPps* _srs_pps_rmnack;
extern SrsPps* _srs_pps_rloss;
extern SrsPps* _srs_pps_sloss;
extern SrsPps* _srs_pps_aloss;
extern SrsPps* _srs_pps_aloss2;

SrsRtcBlackhole::SrsRtcBlackhole()
{
Expand Down Expand Up @@ -721,9 +722,9 @@ srs_error_t SrsRtcServer::on_timer(srs_utime_t interval, srs_utime_t tick)

// TODO: FIXME: Should move to Hybrid server stat.
string loss_desc;
_srs_pps_aloss->update();
if (_srs_pps_rloss->r1s() || _srs_pps_rloss->r10s() || _srs_pps_sloss->r10s() || _srs_pps_aloss->r10s()) {
snprintf(buf, sizeof(buf), ", loss=(r:%d/%d,s:%d,a:%d)", _srs_pps_rloss->r1s(), _srs_pps_rloss->r10s(), _srs_pps_sloss->r10s(), _srs_pps_aloss->r10s());
_srs_pps_aloss->update(); _srs_pps_aloss2->update();
if (_srs_pps_rloss->r1s() || _srs_pps_rloss->r10s() || _srs_pps_sloss->r10s() || _srs_pps_aloss->r10s() || _srs_pps_aloss2->r10s()) {
snprintf(buf, sizeof(buf), ", loss=(r:%d/%d,s:%d,a:%d/%d)", _srs_pps_rloss->r1s(), _srs_pps_rloss->r10s(), _srs_pps_sloss->r10s(), _srs_pps_aloss->r10s(), _srs_pps_aloss2->r10s());
loss_desc = buf;
}

Expand Down
9 changes: 9 additions & 0 deletions trunk/src/app/srs_app_rtc_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include <srs_protocol_json.hpp>
#include <srs_app_pithy_print.hpp>
#include <srs_app_log.hpp>
#include <srs_app_threads.hpp>

#ifdef SRS_FFMPEG_FIT
#include <srs_app_rtc_codec.hpp>
Expand All @@ -63,6 +64,8 @@ SrsPps* _srs_pps_rnack2 = new SrsPps();
SrsPps* _srs_pps_rhnack = new SrsPps();
SrsPps* _srs_pps_rmnack = new SrsPps();

extern SrsPps* _srs_pps_aloss2;

// Firefox defaults as 109, Chrome is 111.
const int kAudioPayloadType = 111;
const int kAudioChannel = 2;
Expand Down Expand Up @@ -568,6 +571,12 @@ srs_error_t SrsRtcStream::on_rtp(SrsRtpPacket2* pkt)
{
srs_error_t err = srs_success;

// If circuit-breaker is dying, drop packet.
if (_srs_thread_pool->hybrid_dying_water_level()) {
_srs_pps_aloss2->sugar += (int64_t)consumers.size();
return err;
}

for (int i = 0; i < (int)consumers.size(); i++) {
SrsRtcConsumer* consumer = consumers.at(i);
if ((err = consumer->enqueue(pkt->copy())) != srs_success) {
Expand Down
30 changes: 24 additions & 6 deletions trunk/src/app/srs_app_threads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ using namespace std;

extern SrsPps* _srs_pps_rloss;
extern SrsPps* _srs_pps_aloss;
extern SrsPps* _srs_pps_aloss2;

extern SrsPps* _srs_pps_snack2;
extern SrsPps* _srs_pps_snack3;
Expand Down Expand Up @@ -155,13 +156,16 @@ SrsThreadPool::SrsThreadPool()
hybrid_ = NULL;
hybrid_high_water_level_ = 0;
hybrid_critical_water_level_ = 0;
hybrid_dying_water_level_ = 0;

trd_ = new SrsFastCoroutine("pool", this);

high_threshold_ = 0;
high_pulse_ = 0;
critical_threshold_ = 0;
critical_pulse_ = 0;
dying_threshold_ = 0;
dying_pulse_ = 0;

// Add primordial thread, current thread itself.
SrsThreadEntry* entry = new SrsThreadEntry();
Expand Down Expand Up @@ -191,12 +195,17 @@ SrsThreadPool::~SrsThreadPool()

bool SrsThreadPool::hybrid_high_water_level()
{
return hybrid_critical_water_level_ || hybrid_high_water_level_;
return hybrid_critical_water_level() || hybrid_high_water_level_;
}

bool SrsThreadPool::hybrid_critical_water_level()
{
return hybrid_critical_water_level_;
return hybrid_dying_water_level() || hybrid_critical_water_level_;
}

bool SrsThreadPool::hybrid_dying_water_level()
{
return dying_pulse_ && hybrid_dying_water_level_ >= dying_pulse_;
}

// Thread local objects.
Expand Down Expand Up @@ -243,6 +252,8 @@ srs_error_t SrsThreadPool::initialize()
high_pulse_ = _srs_config->get_high_pulse();
critical_threshold_ = _srs_config->get_critical_threshold();
critical_pulse_ = _srs_config->get_critical_pulse();
dying_threshold_ = _srs_config->get_dying_threshold();
dying_pulse_ = _srs_config->get_dying_pulse();
bool async_srtp = _srs_config->get_threads_async_srtp();

int recv_queue = _srs_config->get_threads_max_recv_queue();
Expand All @@ -255,10 +266,10 @@ srs_error_t SrsThreadPool::initialize()
_srs_async_recv->set_tunnel_enabled(async_tunnel);
_srs_async_srtp->set_tunnel_enabled(async_tunnel);

srs_trace("Thread #%d(%s): init name=%s, interval=%dms, async_srtp=%d, cpuset=%d/%d-0x%" PRIx64 "/%d-0x%" PRIx64 ", water_level=%dx%d,%dx%d, recvQ=%d, aSend=%d, tunnel=%d",
srs_trace("Thread #%d(%s): init name=%s, interval=%dms, async_srtp=%d, cpuset=%d/%d-0x%" PRIx64 "/%d-0x%" PRIx64 ", water_level=%dx%d,%dx%d,%dx%d recvQ=%d, aSend=%d, tunnel=%d",
entry->num, entry->label.c_str(), entry->name.c_str(), srsu2msi(interval_), async_srtp,
entry->cpuset_ok, r0, srs_covert_cpuset(entry->cpuset), r1, srs_covert_cpuset(entry->cpuset2),
high_pulse_, high_threshold_, critical_pulse_, critical_threshold_,
high_pulse_, high_threshold_, critical_pulse_, critical_threshold_, dying_pulse_, dying_threshold_,
recv_queue, async_send, async_tunnel);

return err;
Expand Down Expand Up @@ -368,6 +379,13 @@ srs_error_t SrsThreadPool::run()
} else if (hybrid_critical_water_level_ > 0) {
hybrid_critical_water_level_--;
}

// Reset the dying water-level when CPU is low for N times.
if (hybrid_->stat->percent * 100 > dying_threshold_) {
hybrid_dying_water_level_ = srs_min(dying_pulse_ + 1, hybrid_dying_water_level_ + 1);
} else if (hybrid_dying_water_level_ > 0) {
hybrid_dying_water_level_ = 0;
}
}

sleep(1);
Expand Down Expand Up @@ -419,8 +437,8 @@ srs_error_t SrsThreadPool::run()

string circuit_breaker;
if (hybrid_high_water_level() || hybrid_critical_water_level() || _srs_pps_aloss->r1s() || _srs_pps_rloss->r1s() || _srs_pps_snack2->r10s()) {
snprintf(buf, sizeof(buf), ", break=%d,%d, cond=%d,%d,%.2f%%, snk=%d,%d,%d",
hybrid_high_water_level(), hybrid_critical_water_level(), // Whether Circuit-Break is enable.
snprintf(buf, sizeof(buf), ", break=%d,%d,%d, cond=%d,%d,%.2f%%, snk=%d,%d,%d",
hybrid_high_water_level(), hybrid_critical_water_level(), hybrid_dying_water_level(), // Whether Circuit-Break is enable.
_srs_pps_rloss->r1s(), _srs_pps_aloss->r1s(), thread_percent, // The conditions to enable Circuit-Breaker.
_srs_pps_snack2->r10s(), _srs_pps_snack3->r10s(), // NACK packet,seqs sent.
_srs_pps_snack4->r10s() // NACK drop by Circuit-Break.
Expand Down
4 changes: 4 additions & 0 deletions trunk/src/app/srs_app_threads.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,15 @@ class SrsThreadPool : public ISrsCoroutineHandler
// @note To avoid the CPU change rapidly.
int hybrid_high_water_level_;
int hybrid_critical_water_level_;
int hybrid_dying_water_level_;
private:
// The config for high/critical water level.
int high_threshold_;
int high_pulse_;
int critical_threshold_;
int critical_pulse_;
int dying_threshold_;
int dying_pulse_;
private:
// A coroutine to consume cooked packets.
SrsFastCoroutine* trd_;
Expand All @@ -143,6 +146,7 @@ class SrsThreadPool : public ISrsCoroutineHandler
// Whether hybrid server water-level is high.
bool hybrid_high_water_level();
bool hybrid_critical_water_level();
bool hybrid_dying_water_level();
// Setup the thread-local variables.
static void setup();
// Initialize the thread pool.
Expand Down
1 change: 1 addition & 0 deletions trunk/src/app/srs_app_utility.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ using namespace std;
SrsPps* _srs_pps_rloss = new SrsPps();
SrsPps* _srs_pps_sloss = new SrsPps();
SrsPps* _srs_pps_aloss = new SrsPps();
SrsPps* _srs_pps_aloss2 = new SrsPps();

// the longest time to wait for a process to quit.
#define SRS_PROCESS_QUIT_TIMEOUT_MS 1000
Expand Down

0 comments on commit e15737f

Please sign in to comment.