Skip to content

Commit

Permalink
Threads: Support multiple SRTP/SEND/RECV threads.
Browse files Browse the repository at this point in the history
1. Move received and cooked packets queue to thread entry, that is, each thread has its own queue.
2. In RECV thread, push received packet to queue of source thread, in thread listener.
3. In SRTP thread, push cooked packet to queue of source thread, in asyn SRTP task.
4. In hybrid thread, directly and only consume the packets of self thread.
5. Sync between SRTP task by lock.
  • Loading branch information
winlinvip committed Apr 26, 2021
1 parent a505b6b commit 28d8f1a
Show file tree
Hide file tree
Showing 9 changed files with 219 additions and 142 deletions.
21 changes: 12 additions & 9 deletions trunk/conf/full.conf
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,18 @@ threads {
# The thread pool manager cycle interval, in seconds.
# Default: 5
interval 5;
# Whether enable the ASYNC SRTP, codec in dedicate threads.
# Default: off
async_srtp off;
# Whether enable the ASYNC RECV, recv udp packets in dedicate threads.
# Default: off
async_recv off;
# Whether enable the ASYNC SEND, send udp packets in dedicate threads.
# Default: off
async_send off;
# The number of threads for SRTP, codec in dedicate threads.
# Note that 0 to disable it. Max to 64 threads.
# Default: 1
async_srtp 1;
# The number of threads for RECV, recv udp packets in dedicate threads.
# Note that 0 to disable it. Max to 64 threads.
# Default: 1
async_recv 1;
# The number of threads for SEND, send udp packets in dedicate threads.
# Note that 0 to disable it. Max to 64 threads.
# Default: 1
async_send 1;
# Whether enable the tunnel, to consume packets between srtp/recv/send threads,
# without proxy by hybrid(except the few head packets).
# Default: off
Expand Down
30 changes: 21 additions & 9 deletions trunk/src/app/srs_app_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4127,9 +4127,9 @@ srs_utime_t SrsConfig::get_threads_interval()
return v * SRS_UTIME_SECONDS;
}

bool SrsConfig::get_threads_async_srtp()
int SrsConfig::get_threads_async_srtp()
{
static bool DEFAULT = false;
static int DEFAULT = 1;

SrsConfDirective* conf = root->get("threads");
if (!conf) {
Expand All @@ -4141,12 +4141,16 @@ bool SrsConfig::get_threads_async_srtp()
return DEFAULT;
}

return SRS_CONF_PERFER_FALSE(conf->arg0());
int v = ::atoi(conf->arg0().c_str());
if (v < 0 || v > 64) {
return DEFAULT;
}
return v;
}

bool SrsConfig::get_threads_async_recv()
int SrsConfig::get_threads_async_recv()
{
static bool DEFAULT = false;
static int DEFAULT = 1;

SrsConfDirective* conf = root->get("threads");
if (!conf) {
Expand All @@ -4158,12 +4162,16 @@ bool SrsConfig::get_threads_async_recv()
return DEFAULT;
}

return SRS_CONF_PERFER_FALSE(conf->arg0());
int v = ::atoi(conf->arg0().c_str());
if (v < 0 || v > 64) {
return DEFAULT;
}
return v;
}

bool SrsConfig::get_threads_async_send()
int SrsConfig::get_threads_async_send()
{
static bool DEFAULT = false;
static int DEFAULT = 1;

SrsConfDirective* conf = root->get("threads");
if (!conf) {
Expand All @@ -4175,7 +4183,11 @@ bool SrsConfig::get_threads_async_send()
return DEFAULT;
}

return SRS_CONF_PERFER_FALSE(conf->arg0());
int v = ::atoi(conf->arg0().c_str());
if (v < 0 || v > 64) {
return DEFAULT;
}
return v;
}

bool SrsConfig::get_threads_async_tunnel()
Expand Down
6 changes: 3 additions & 3 deletions trunk/src/app/srs_app_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -478,9 +478,9 @@ class SrsConfig
// Thread pool section.
public:
virtual srs_utime_t get_threads_interval();
virtual bool get_threads_async_srtp();
virtual bool get_threads_async_recv();
virtual bool get_threads_async_send();
virtual int get_threads_async_srtp();
virtual int get_threads_async_recv();
virtual int get_threads_async_send();
virtual bool get_threads_async_tunnel();
virtual bool get_threads_cpu_affinity(std::string label, int* start, int* end);
virtual int get_threads_max_recv_queue();
Expand Down
9 changes: 7 additions & 2 deletions trunk/src/app/srs_app_hybrid.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,17 +215,22 @@ srs_error_t SrsHybridServer::run()
}
}

// Get the entry of self thread.
SrsThreadEntry* entry = _srs_thread_pool->self();

// Consume the async UDP/SRTP packets.
while (true) {
int consumed = 0;

// Consume the received UDP packets.
if ((err = _srs_async_recv->consume(&consumed)) != srs_success) {
// Note that this might run in multiple threads, but it's ok.
if ((err = _srs_async_recv->consume(entry, &consumed)) != srs_success) {
srs_error_reset(err); // Ignore any error.
}

// Consume the cooked SRTP packets.
if ((err = _srs_async_srtp->consume(&consumed)) != srs_success) {
// Note that this might run in multiple threads, but it's ok.
if ((err = _srs_async_srtp->consume(entry, &consumed)) != srs_success) {
srs_error_reset(err); // Ignore any error.
}

Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ srs_error_t SrsUdpMuxListener::cycle()
set_socket_buffer();

// Sleep infinite if use async_recv.
if (_srs_config->get_threads_async_recv()) {
if (_srs_config->get_threads_async_recv() > 0) {
SrsThreadUdpListener* listener = new SrsThreadUdpListener(lfd, handler);
_srs_async_recv->add_listener(listener);

Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_rtc_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ SrsSecurityTransport::SrsSecurityTransport(SrsRtcConnection* s)

dtls_ = new SrsDtls((ISrsDtlsCallback*)this);

bool async_srtp = _srs_config->get_threads_async_srtp();
if (!async_srtp) {
int async_srtp = _srs_config->get_threads_async_srtp();
if (async_srtp <= 0) {
srtp_ = new SrsSRTP();
} else {
srtp_ = new SrsAsyncSRTP(this);
Expand Down
Loading

0 comments on commit 28d8f1a

Please sign in to comment.