Skip to content

Commit

Permalink
Threads-Hybrid: Config auto generate stream config for hybrids.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Apr 26, 2021
1 parent 11aada1 commit 50f03ad
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 44 deletions.
13 changes: 9 additions & 4 deletions trunk/conf/full.conf
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,19 @@ threads {
# Note that 0 to disable it. Max to 64 threads.
# Default: 1
async_send 1;
# Whether enable the tunnel, to consume packets between srtp/recv/send threads,
# without proxy by hybrid(except the few head packets).
# Default: off
async_tunnel off;
# The number of hybrid threads to use, MUST >=1.
# Note that there MUST be same number of stream sections.
# Max to 64 threads.
# Default: 1
hybrids 1;
# Whether automatically generate stream config by hybrid.
# If off, user must create a number of streams, which is equal to the hybrids.
# Default: off
generate_streams off;
# Whether enable the tunnel, to consume packets between srtp/recv/send threads,
# without proxy by hybrid(except the few head packets).
# Default: off
async_tunnel off;
# CPU set for affinity, for example:
# 0 means CPU0
# 0-3 means CPU0, CPU1, CPU2
Expand Down
179 changes: 139 additions & 40 deletions trunk/src/app/srs_app_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -515,47 +515,132 @@ srs_error_t srs_config_transform_vhost(SrsConfDirective* root)
return err;
}

// To wrap directive temporally.
class SrsTempConfig : public SrsConfig
{
public:
SrsTempConfig(SrsConfDirective* r) {
root = r;
}
virtual ~SrsTempConfig() {
root = NULL;
}
};

srs_error_t srs_config_transform_vhost2(SrsConfDirective* root)
{
srs_error_t err = srs_success;

SrsConfDirective* stream = new SrsConfDirective();
SrsAutoFree(SrsConfDirective, stream);
// Transform streams for hybrids.
if (true) {
SrsConfDirective* stream = new SrsConfDirective();
SrsAutoFree(SrsConfDirective, stream);

// Collect directives which should be moved to stream.
for (int i = 0; i < (int)root->directives.size(); i++) {
SrsConfDirective* dir = root->directives.at(i);

// SRS5.0, move listen/max_connections/http_server/rtc_server to stream.
// SRS1/2/3/4:
// listen; max_connections;
// http_server {} rtc_server{}
// SRS5+:
// stream {
// listen; max_connections;
// http_server {} rtc_server{}
// }
if (dir->name == "listen" || dir->name == "max_connections"
|| dir->name == "http_server" || dir->name == "rtc_server") {
stream->directives.push_back(dir);
}
}

// Collect directives which should be moved to stream.
for (int i = 0; i < (int)root->directives.size(); i++) {
SrsConfDirective* dir = root->directives.at(i);
// Ignore if no directives for stream.
if (!stream->directives.empty()) {
// Remove the stream directives from root.
for (int i = 0; i < (int)stream->directives.size(); i++) {
SrsConfDirective* dir = stream->directives.at(i);
root->remove(dir);
}

// SRS5.0, move listen/max_connections/http_server/rtc_server to stream.
// SRS1/2/3/4:
// listen; max_connections;
// http_server {} rtc_server{}
// SRS5+:
// stream {
// listen; max_connections;
// http_server {} rtc_server{}
// }
if (dir->name == "listen" || dir->name == "max_connections"
|| dir->name == "http_server" || dir->name == "rtc_server") {
stream->directives.push_back(dir);
// Push the stream to root.
stream->name = "stream";
root->directives.push_back(stream);
stream = NULL;
}
}

// No directives for stream, ignore.
if (stream->directives.empty()) {
return err;
// Auto generate streams, if there is only one stream template.
SrsTempConfig config(root);
if (config.get_threads_generate_stream()) {
int nn_streams = 0;
SrsConfDirective* tmpl = NULL;
for (int i = 0; i < (int)root->directives.size(); i++) {
SrsConfDirective* dir = root->directives.at(i);
if (dir->name == "stream") {
tmpl = dir;
nn_streams++;
}
}

int nn_hybrids = config.get_threads_hybrids();
if (tmpl && nn_streams == 1 && nn_hybrids > 1) {
if ((err = srs_config_generate_stream(root, tmpl, nn_hybrids - nn_streams)) != srs_success) {
return srs_error_wrap(err, "generate stream");
}
}
}

return err;
}

srs_error_t srs_config_generate_stream(SrsConfDirective* root, SrsConfDirective* tmpl, int nn)
{
srs_error_t err = srs_success;

SrsConfDirective* p = NULL;

// stream.listen for RTMP.
int rtmp_port = 0;
if ((p = tmpl->get("listen")) != NULL) {
rtmp_port = ::atoi(p->arg0().c_str());
}

// Remove the stream directives from root.
for (int i = 0; i < (int)stream->directives.size(); i++) {
SrsConfDirective* dir = stream->directives.at(i);
root->remove(dir);
// stream.http_server.listen for HTTP.
int http_port = 0;
if ((p = tmpl->get("http_server")) != NULL) {
if ((p = p->get("listen")) != NULL) {
http_port = ::atoi(p->arg0().c_str());
}
}

// Push the stream to root.
stream->name = "stream";
root->directives.push_back(stream);
stream = NULL;
// stream.rtc_server.listen for RTC.
int rtc_port = 0;
if ((p = tmpl->get("rtc_server")) != NULL) {
if ((p = p->get("listen")) != NULL) {
rtc_port = ::atoi(p->arg0().c_str());
}
}

for (int i = 0; i < nn; i++) {
SrsConfDirective* stream = tmpl->copy();
root->directives.push_back(stream);

// stream.listen for RTMP.
if (rtmp_port) {
stream->get_or_create("listen")->set_arg0(srs_int2str(rtmp_port + i + 1));
}

// stream.http_server.listen for HTTP.
if (http_port) {
stream->get_or_create("http_server")->get_or_create("listen")->set_arg0(srs_int2str(http_port + i + 1));
}

// stream.rtc_server.listen for RTC.
if (http_port) {
stream->get_or_create("rtc_server")->get_or_create("listen")->set_arg0(srs_int2str(rtc_port + i + 1));
}
}

return err;
}
Expand Down Expand Up @@ -1894,18 +1979,14 @@ srs_error_t SrsConfig::parse_options(int argc, char** argv)
// the parse_file never check the config,
// we check it when user requires check config file.
if (err == srs_success && (err = srs_config_transform_vhost(root)) == srs_success) {
if (err == srs_success && (err = check_config()) == srs_success) {
srs_trace("config file is ok");
exit(0);
}
}
if (err == srs_success && (err = srs_config_transform_vhost2(root)) == srs_success) {
if (err == srs_success && (err = check_config()) == srs_success) {
srs_trace("config file is ok");
exit(0);
if (err == srs_success && (err = srs_config_transform_vhost2(root)) == srs_success) {
if (err == srs_success && (err = check_config()) == srs_success) {
srs_trace("config file is ok");
exit(0);
}
}
}

srs_error("invalid config, %s", srs_error_desc(err).c_str());
int ret = srs_error_code(err);
srs_freep(err);
Expand Down Expand Up @@ -3818,8 +3899,9 @@ srs_error_t SrsConfig::check_hybrids()
}
}

if (hybrids != (int)streams.size()) {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "hybrids=%d, streams=%d, not equal", hybrids, (int)streams.size());
if (hybrids > (int)streams.size()) {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "hybrids=%d requires %d streams, actual=%d",
hybrids, (int)streams.size(), (int)streams.size());
}

// For each stream, the UDP listen MUST not be the same.
Expand Down Expand Up @@ -4233,6 +4315,23 @@ int SrsConfig::get_threads_hybrids()
return ::atoi(conf->arg0().c_str());
}

bool SrsConfig::get_threads_generate_stream()
{
static bool DEFAULT = false;

SrsConfDirective* conf = root->get("threads");
if (!conf) {
return DEFAULT;
}

conf = conf->get("generate_streams");
if (!conf) {
return DEFAULT;
}

return SRS_CONF_PERFER_FALSE(conf->arg0());
}

bool SrsConfig::get_threads_cpu_affinity(std::string label, int* start, int* end)
{
static int DEFAULT_START = 0;
Expand Down
2 changes: 2 additions & 0 deletions trunk/src/app/srs_app_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ extern std::string srs_config_bool2switch(std::string sbool);
// @param root the root directive to transform, in and out parameter.
extern srs_error_t srs_config_transform_vhost(SrsConfDirective* root);
extern srs_error_t srs_config_transform_vhost2(SrsConfDirective* root);
extern srs_error_t srs_config_generate_stream(SrsConfDirective* root, SrsConfDirective* tmpl, int nn);

// TODO: FIXME: It should be thread-local or thread-safe.
// TODO: FIXME: We should use channel to deliver changes of config.
Expand Down Expand Up @@ -481,6 +482,7 @@ class SrsConfig
virtual int get_threads_async_send();
virtual bool get_threads_async_tunnel();
virtual int get_threads_hybrids();
virtual bool get_threads_generate_stream();
virtual bool get_threads_cpu_affinity(std::string label, int* start, int* end);
virtual int get_threads_max_recv_queue();
virtual int get_high_threshold();
Expand Down

0 comments on commit 50f03ad

Please sign in to comment.