Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kickoff publisher when stream is idle, which means no players. v6.0.31, v5.0.144 #3105

Merged
merged 9 commits into from
Mar 6, 2023
5 changes: 5 additions & 0 deletions trunk/conf/full.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1341,6 +1341,11 @@ vhost publish.srs.com {
# Overwrite by env SRS_VHOST_PUBLISH_TRY_ANNEXB_FIRST for all vhosts.
# default: on
try_annexb_first on;
# The timeout time waiting for new consumers in ms. When the publisher is idle
# more than this config, it will be kicked off.
# Overwrite by env SRS_VHOST_PUBLISH_KICKOFF_FOR_IDLE for all vhosts.
# default: 0 (0 or negative means disabled)
kickoff_for_idle 0;
industriousonesoft marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
13 changes: 13 additions & 0 deletions trunk/conf/rtmp.kickoff.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# the config for srs to delivery RTMP with kicking off publish as no one watching.
# @see https://github.com/ossrs/srs/wiki/v1_CN_SampleRTMP
# @see full.conf for detail config.

listen 1935;
max_connections 1000;
daemon off;
srs_log_tank console;
vhost __defaultVhost__ {
publish {
kickoff_for_idle 60000; # default is 60s, 0 means disable
}
}
27 changes: 26 additions & 1 deletion trunk/src/app/srs_app_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2559,7 +2559,7 @@ srs_error_t SrsConfig::check_normal_config()
for (int j = 0; j < (int)conf->directives.size(); j++) {
string m = conf->at(j)->name;
if (m != "mr" && m != "mr_latency" && m != "firstpkt_timeout" && m != "normal_timeout"
&& m != "parse_sps" && m != "try_annexb_first") {
&& m != "parse_sps" && m != "try_annexb_first" && m != "kickoff_for_idle") {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.publish.%s of %s", m.c_str(), vhost->arg0().c_str());
}
}
Expand Down Expand Up @@ -5340,6 +5340,31 @@ srs_utime_t SrsConfig::get_publish_normal_timeout(string vhost)
return (srs_utime_t)(::atoi(conf->arg0().c_str()) * SRS_UTIME_MILLISECONDS);
}

srs_utime_t SrsConfig::get_publish_kickoff_for_idle(std::string vhost)
{
SRS_OVERWRITE_BY_ENV_MILLISECONDS("srs.vhost.publish.kickoff_for_idle"); // SRS_VHOST_PUBLISH_KICKOFF_FOR_IDLE
// the timeout to kickoff publish as no one watching,
// the default value is 0.
static srs_utime_t DEFAULT = 0 * SRS_UTIME_SECONDS;

SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return DEFAULT;
}

conf = conf->get("publish");
if (!conf) {
return DEFAULT;
}

conf = conf->get("kickoff_for_idle");
if (!conf || conf->arg0().empty()) {
return DEFAULT;
}

return (srs_utime_t)(::atoi(conf->arg0().c_str()) * SRS_UTIME_MILLISECONDS);
}

int SrsConfig::get_global_chunk_size()
{
SRS_OVERWRITE_BY_ENV_INT("srs.vhost.chunk_size"); // SRS_VHOST_CHUNK_SIZE
Expand Down
2 changes: 2 additions & 0 deletions trunk/src/app/srs_app_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,8 @@ class SrsConfig
virtual srs_utime_t get_publish_1stpkt_timeout(std::string vhost);
// The normal packet timeout in srs_utime_t for encoder.
virtual srs_utime_t get_publish_normal_timeout(std::string vhost);
// The kickoff timeout in srs_utime_t for publisher.
virtual srs_utime_t get_publish_kickoff_for_idle(std::string vhost);
private:
// Get the global chunk size.
virtual int get_global_chunk_size();
Expand Down
6 changes: 6 additions & 0 deletions trunk/src/app/srs_app_rtmp_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,7 @@ srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThre
// initialize the publish timeout.
publish_1stpkt_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost);
publish_normal_timeout = _srs_config->get_publish_normal_timeout(req->vhost);
srs_utime_t publish_kickoff_timeout = _srs_config->get_publish_kickoff_for_idle(req->vhost);

// set the sock options.
set_sock_options();
Expand All @@ -1008,6 +1009,11 @@ srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThre
return srs_error_wrap(err, "rtmp: thread quit");
}

// Check if the source is expired or idle (no consumers or players).
if (source->idle_for(publish_kickoff_timeout)) {
return srs_error_wrap(err, "rtmp: kickoff publisher as no one watching for a while, url=%s, timeout=%ds", req->tcUrl.c_str(), srsu2si(publish_kickoff_timeout));
winlinvip marked this conversation as resolved.
Show resolved Hide resolved
}

pprint->elapse();

// cond wait for timeout.
Expand Down
21 changes: 20 additions & 1 deletion trunk/src/app/srs_app_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1927,6 +1927,7 @@ SrsLiveSource::SrsLiveSource()

_can_publish = true;
die_at = 0;
idle_at = 0;

handler = NULL;
bridge_ = NULL;
Expand Down Expand Up @@ -2008,6 +2009,18 @@ bool SrsLiveSource::expired()
return false;
}

bool SrsLiveSource::idle_for(srs_utime_t timeout)
{
if (idle_at == 0 || timeout <= 0) {
return false;
}
srs_utime_t now = srs_get_system_time();
if (now > idle_at + timeout) {
return true;
}
return false;
}

srs_error_t SrsLiveSource::initialize(SrsRequest* r, ISrsLiveSourceHandler* h)
{
srs_error_t err = srs_success;
Expand Down Expand Up @@ -2634,6 +2647,10 @@ srs_error_t SrsLiveSource::on_publish()

SrsStatistic* stat = SrsStatistic::instance();
stat->on_stream_publish(req, _source_id.c_str());

if (consumers.empty()) {
idle_at = srs_get_system_time();
}

return err;
}
Expand Down Expand Up @@ -2690,7 +2707,8 @@ srs_error_t SrsLiveSource::create_consumer(SrsLiveConsumer*& consumer)

consumer = new SrsLiveConsumer(this);
consumers.push_back(consumer);

idle_at = 0;
winlinvip marked this conversation as resolved.
Show resolved Hide resolved

// for edge, when play edge stream, check the state
if (_srs_config->get_vhost_is_edge(req->vhost)) {
// notice edge to start for the first client.
Expand Down Expand Up @@ -2756,6 +2774,7 @@ void SrsLiveSource::on_consumer_destroy(SrsLiveConsumer* consumer)
if (consumers.empty()) {
play_edge->on_all_client_stop();
die_at = srs_get_system_time();
idle_at = srs_get_system_time();
}
}

Expand Down
3 changes: 3 additions & 0 deletions trunk/src/app/srs_app_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,8 @@ class SrsLiveSource : public ISrsReloadHandler
// The last die time, when all consumers quit and no publisher,
// We will remove the source when source die.
srs_utime_t die_at;
// The last idle time, when all consumers quit but at least one publisher is alive.
srs_utime_t idle_at;
public:
SrsLiveSource();
virtual ~SrsLiveSource();
Expand All @@ -544,6 +546,7 @@ class SrsLiveSource : public ISrsReloadHandler
virtual srs_error_t cycle();
// Remove source when expired.
virtual bool expired();
bool idle_for(srs_utime_t timeout);
public:
// Initialize the hls with handlers.
virtual srs_error_t initialize(SrsRequest* r, ISrsLiveSourceHandler* h);
Expand Down
3 changes: 3 additions & 0 deletions trunk/src/utest/srs_utest_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4461,6 +4461,9 @@ VOID TEST(ConfigEnvTest, CheckEnvValuesVhostPublish)

SrsSetEnvConfig(try_annexb_first, "SRS_VHOST_PUBLISH_TRY_ANNEXB_FIRST", "off");
EXPECT_FALSE(conf.try_annexb_first("__defaultVhost__"));

SrsSetEnvConfig(kickoff_for_idle, "SRS_VHOST_PUBLISH_KICKOFF_FOR_IDLE", "30");
EXPECT_EQ(30 * SRS_UTIME_MILLISECONDS, conf.get_publish_kickoff_for_idle("__defaultVhost__"));
}
}

Expand Down