Skip to content

Commit

Permalink
Threads: Use coroutine to consume recv/srtp packets.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Apr 26, 2021
1 parent 957034e commit 1c90497
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 48 deletions.
11 changes: 0 additions & 11 deletions trunk/src/app/srs_app_hourglass.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ using namespace std;
#include <srs_kernel_error.hpp>
#include <srs_kernel_log.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_app_threads.hpp>

#include <srs_protocol_kbps.hpp>

Expand Down Expand Up @@ -261,16 +260,6 @@ srs_error_t SrsClockWallMonitor::on_timer(srs_utime_t interval, srs_utime_t tick
++_srs_pps_timer_s->sugar;
}

// Consume the async received UDP packets.
if ((err = _srs_async_recv->consume()) != srs_success) {
srs_error_reset(err); // Ignore any error.
}

// Consume the async cooked SRTP packets.
if ((err = _srs_async_srtp->consume()) != srs_success) {
srs_error_reset(err); // Ignore any error.
}

return err;
}

10 changes: 10 additions & 0 deletions trunk/src/app/srs_app_hybrid.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,16 @@ srs_error_t SrsHybridServer::initialize()
// A monitor to check the clock wall deviation, per clock tick.
timer_->subscribe(20 * SRS_UTIME_MILLISECONDS, clock_monitor_);

// Consume the async cooked SRTP packets.
if ((err = _srs_async_srtp->consume()) != srs_success) {
return srs_error_wrap(err, "srtp");
}

// Consume the async received UDP packets.
if ((err = _srs_async_recv->consume()) != srs_success) {
return srs_error_wrap(err, "recv");
}

vector<ISrsHybridServer*>::iterator it;
for (it = servers.begin(); it != servers.end(); ++it) {
ISrsHybridServer* server = *it;
Expand Down
127 changes: 93 additions & 34 deletions trunk/src/app/srs_app_threads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,6 @@ srs_error_t SrsThreadPool::initialize()
r1 = pthread_getaffinity_np(pthread_self(), sizeof(entry->cpuset2), &entry->cpuset2);
#endif

if ((err = _srs_async_recv->initialize()) != srs_success) {
return srs_error_wrap(err, "init async recv");
}

interval_ = _srs_config->get_threads_interval();
bool async_srtp = _srs_config->get_threads_async_srtp();
srs_trace("Thread #%d(%s): init name=%s, interval=%dms, async_srtp=%d, cpuset=%d/%d-0x%" PRIx64 "/%d-0x%" PRIx64,
Expand Down Expand Up @@ -705,11 +701,14 @@ SrsAsyncSRTPManager::SrsAsyncSRTPManager()
lock_ = new SrsThreadMutex();
packets_ = new SrsThreadQueue<SrsAsyncSRTPPacket>();
cooked_packets_ = new SrsThreadQueue<SrsAsyncSRTPPacket>();
trd_ = new SrsFastCoroutine("srtp", this);
}

// TODO: FIXME: We should stop the thread first, then free the manager.
SrsAsyncSRTPManager::~SrsAsyncSRTPManager()
{
srs_freep(trd_);

srs_freep(lock_);
srs_freep(packets_);
srs_freep(cooked_packets_);
Expand Down Expand Up @@ -806,24 +805,56 @@ srs_error_t SrsAsyncSRTPManager::consume()
{
srs_error_t err = srs_success;

vector<SrsAsyncSRTPPacket*> flying;
cooked_packets_->swap(flying);
if ((err = trd_->start()) != srs_success) {
return srs_error_wrap(err, "start");
}

for (int i = 0; i < (int)flying.size(); i++) {
SrsAsyncSRTPPacket* pkt = flying.at(i);
SrsSecurityTransport* transport = pkt->task_->codec_->transport_;
char* payload = pkt->msg_->payload;
return err;
}

if (pkt->do_decrypt_) {
if (pkt->is_rtp_) {
err = transport->on_rtp_plaintext(payload, pkt->nb_consumed_);
}
srs_error_t SrsAsyncSRTPManager::cycle()
{
srs_error_t err = srs_success;

// How many messages to run a yield.
uint32_t nn_msgs_for_yield = 0;

while (true) {
if ((err = trd_->pull()) != srs_success) {
return srs_error_wrap(err, "pull");
}
if (err != srs_success) {
srs_error_reset(err); // Ignore any error.

vector<SrsAsyncSRTPPacket*> flying;
cooked_packets_->swap(flying);

if (flying.empty()) {
srs_usleep(20 * SRS_UTIME_MILLISECONDS);
continue;
}

srs_freep(pkt);
for (int i = 0; i < (int)flying.size(); i++) {
SrsAsyncSRTPPacket* pkt = flying.at(i);
SrsSecurityTransport* transport = pkt->task_->codec_->transport_;
char* payload = pkt->msg_->payload;

if (pkt->do_decrypt_) {
if (pkt->is_rtp_) {
err = transport->on_rtp_plaintext(payload, pkt->nb_consumed_);
}
}
if (err != srs_success) {
srs_error_reset(err); // Ignore any error.
}

srs_freep(pkt);

// Yield to another coroutines.
// @see https://github.com/ossrs/srs/issues/2194#issuecomment-777485531
if (++nn_msgs_for_yield > 10) {
nn_msgs_for_yield = 0;
srs_thread_yield();
}
}
}

return err;
Expand All @@ -846,11 +877,14 @@ SrsAsyncRecvManager::SrsAsyncRecvManager()
packets_ = new SrsThreadQueue<SrsUdpMuxSocket>();
handler_ = NULL;
max_recv_queue_ = 0;
trd_ = new SrsFastCoroutine("recv", this);
}

// TODO: FIXME: We should stop the thread first, then free the manager.
SrsAsyncRecvManager::~SrsAsyncRecvManager()
{
srs_freep(trd_);

srs_freep(lock_);
srs_freep(packets_);

Expand All @@ -861,16 +895,6 @@ SrsAsyncRecvManager::~SrsAsyncRecvManager()
}
}

srs_error_t SrsAsyncRecvManager::initialize()
{
srs_error_t err = srs_success;

max_recv_queue_ = _srs_config->get_threads_max_recv_queue();
srs_trace("AsyncRecv: Set max_queue=%d", max_recv_queue_);

return err;
}

void SrsAsyncRecvManager::set_handler(ISrsUdpMuxHandler* v)
{
handler_ = v;
Expand Down Expand Up @@ -946,17 +970,52 @@ srs_error_t SrsAsyncRecvManager::consume()
{
srs_error_t err = srs_success;

vector<SrsUdpMuxSocket*> flying;
packets_->swap(flying);
max_recv_queue_ = _srs_config->get_threads_max_recv_queue();
srs_trace("AsyncRecv: Set max_queue=%d", max_recv_queue_);

if ((err = trd_->start()) != srs_success) {
return srs_error_wrap(err, "start");
}

for (int i = 0; i < (int)flying.size(); i++) {
SrsUdpMuxSocket* pkt = flying.at(i);
return err;
}

srs_error_t SrsAsyncRecvManager::cycle()
{
srs_error_t err = srs_success;

if (handler_ && (err = handler_->on_udp_packet(pkt)) != srs_success) {
srs_error_reset(err); // Ignore any error.
// How many messages to run a yield.
uint32_t nn_msgs_for_yield = 0;

while (true) {
if ((err = trd_->pull()) != srs_success) {
return srs_error_wrap(err, "pull");
}

vector<SrsUdpMuxSocket*> flying;
packets_->swap(flying);

if (flying.empty()) {
srs_usleep(20 * SRS_UTIME_MILLISECONDS);
continue;
}

srs_freep(pkt);
for (int i = 0; i < (int)flying.size(); i++) {
SrsUdpMuxSocket* pkt = flying.at(i);

if (handler_ && (err = handler_->on_udp_packet(pkt)) != srs_success) {
srs_error_reset(err); // Ignore any error.
}

srs_freep(pkt);

// Yield to another coroutines.
// @see https://github.com/ossrs/srs/issues/2194#issuecomment-777485531
if (++nn_msgs_for_yield > 10) {
nn_msgs_for_yield = 0;
srs_thread_yield();
}
}
}

return err;
Expand Down
16 changes: 13 additions & 3 deletions trunk/src/app/srs_app_threads.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <srs_app_rtc_dtls.hpp>
#include <srs_app_rtc_conn.hpp>
#include <srs_app_listener.hpp>
#include <srs_app_st.hpp>

#include <pthread.h>

Expand Down Expand Up @@ -321,14 +322,16 @@ class SrsAsyncSRTPPacket
};

// The async SRTP manager, to start a thread to consume packets.
class SrsAsyncSRTPManager
class SrsAsyncSRTPManager : public ISrsCoroutineHandler
{
private:
std::vector<SrsAsyncSRTPTask*> tasks_;
SrsThreadMutex* lock_;
private:
SrsThreadQueue<SrsAsyncSRTPPacket>* packets_;
private:
// A coroutine to consume cooked packets.
SrsFastCoroutine* trd_;
// The packets cooked by async SRTP manager.
SrsThreadQueue<SrsAsyncSRTPPacket>* cooked_packets_;
public:
Expand All @@ -346,6 +349,8 @@ class SrsAsyncSRTPManager
public:
// Consume cooked SRTP packets. Must call in worker/service thread.
virtual srs_error_t consume();
private:
srs_error_t cycle();
};

// The global async SRTP manager.
Expand All @@ -363,12 +368,16 @@ class SrsThreadUdpListener
};

// The async RECV manager, to recv UDP packets.
class SrsAsyncRecvManager
class SrsAsyncRecvManager : public ISrsCoroutineHandler
{
private:
ISrsUdpMuxHandler* handler_;
private:
// A coroutine to consume received packets.
SrsFastCoroutine* trd_;
// If exceed max queue, drop packet.
int max_recv_queue_;
// The received UDP packets.
SrsThreadQueue<SrsUdpMuxSocket>* packets_;
private:
std::vector<SrsThreadUdpListener*> listeners_;
Expand All @@ -377,7 +386,6 @@ class SrsAsyncRecvManager
SrsAsyncRecvManager();
virtual ~SrsAsyncRecvManager();
public:
srs_error_t initialize();
void set_handler(ISrsUdpMuxHandler* v);
void add_listener(SrsThreadUdpListener* listener);
int size();
Expand All @@ -387,6 +395,8 @@ class SrsAsyncRecvManager
public:
// Consume received UDP packets. Must call in worker/service thread.
virtual srs_error_t consume();
private:
srs_error_t cycle();
};

// The global async RECV manager.
Expand Down

0 comments on commit 1c90497

Please sign in to comment.