Skip to content

Commit

Permalink
Threads: Merge recv and srtp consume to one timer.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Apr 26, 2021
1 parent 57b771a commit 198dca5
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 98 deletions.
8 changes: 2 additions & 6 deletions trunk/src/app/srs_app_hybrid.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,10 @@ srs_error_t SrsHybridServer::initialize()
// A monitor to check the clock wall deviation, per clock tick.
timer_->subscribe(20 * SRS_UTIME_MILLISECONDS, clock_monitor_);

// Consume the async cooked SRTP packets.
if ((err = _srs_async_srtp->consume()) != srs_success) {
// Consume the async UDP/SRTP packets.
if ((err = _srs_thread_pool->consume()) != srs_success) {
return srs_error_wrap(err, "srtp");
}

// Consume the async received UDP packets.
if ((err = _srs_async_recv->consume()) != srs_success) {
return srs_error_wrap(err, "recv");
}

vector<ISrsHybridServer*>::iterator it;
Expand Down
169 changes: 90 additions & 79 deletions trunk/src/app/srs_app_threads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ SrsThreadPool::SrsThreadPool()
hybrid_high_water_level_ = 0;
hybrid_critical_water_level_ = 0;

trd_ = new SrsFastCoroutine("pool", this);

high_threshold_ = 0;
high_pulse_ = 0;
critical_threshold_ = 0;
Expand All @@ -174,6 +176,8 @@ SrsThreadPool::SrsThreadPool()
// TODO: FIMXE: If free the pool, we should stop all threads.
SrsThreadPool::~SrsThreadPool()
{
srs_freep(trd_);

srs_freep(lock_);
}

Expand Down Expand Up @@ -232,10 +236,15 @@ srs_error_t SrsThreadPool::initialize()
critical_threshold_ = _srs_config->get_critical_threshold();
critical_pulse_ = _srs_config->get_critical_pulse();
bool async_srtp = _srs_config->get_threads_async_srtp();
srs_trace("Thread #%d(%s): init name=%s, interval=%dms, async_srtp=%d, cpuset=%d/%d-0x%" PRIx64 "/%d-0x%" PRIx64 ", water_level=%dx%d,%dx%d",

int recv_queue = _srs_config->get_threads_max_recv_queue();
srs_trace("AsyncRecv: Set max_queue_size=%d", recv_queue);
_srs_async_recv->set_max_recv_queue(recv_queue);

srs_trace("Thread #%d(%s): init name=%s, interval=%dms, async_srtp=%d, cpuset=%d/%d-0x%" PRIx64 "/%d-0x%" PRIx64 ", water_level=%dx%d,%dx%d, recvQ=%d",
entry->num, entry->label.c_str(), entry->name.c_str(), srsu2msi(interval_), async_srtp,
entry->cpuset_ok, r0, srs_covert_cpuset(entry->cpuset), r1, srs_covert_cpuset(entry->cpuset2),
high_pulse_, high_threshold_, critical_pulse_, critical_threshold_);
high_pulse_, high_threshold_, critical_pulse_, critical_threshold_, recv_queue);

return err;
}
Expand Down Expand Up @@ -446,6 +455,49 @@ void* SrsThreadPool::start(void* arg)
return NULL;
}

srs_error_t SrsThreadPool::consume()
{
srs_error_t err = srs_success;

if ((err = trd_->start()) != srs_success) {
return srs_error_wrap(err, "start");
}

return err;
}

srs_error_t SrsThreadPool::cycle()
{
srs_error_t err = srs_success;

while (true) {
int consumed = 0;

// Check error before consume packets.
if ((err = trd_->pull()) != srs_success) {
return srs_error_wrap(err, "pull");
}
if ((err = _srs_async_recv->consume(&consumed)) != srs_success) {
srs_error_reset(err); // Ignore any error.
}

// Check error before consume packets.
if ((err = trd_->pull()) != srs_success) {
return srs_error_wrap(err, "pull");
}
if ((err = _srs_async_srtp->consume(&consumed)) != srs_success) {
srs_error_reset(err); // Ignore any error.
}

if (!consumed) {
srs_usleep(20 * SRS_UTIME_MILLISECONDS);
continue;
}
}

return err;
}

// TODO: FIXME: It should be thread-local or thread-safe.
SrsThreadPool* _srs_thread_pool = new SrsThreadPool();

Expand Down Expand Up @@ -873,14 +925,11 @@ SrsAsyncSRTPManager::SrsAsyncSRTPManager()
lock_ = new SrsThreadMutex();
srtp_packets_ = new SrsThreadQueue<SrsAsyncSRTPPacket>();
cooked_packets_ = new SrsThreadQueue<SrsAsyncSRTPPacket>();
trd_ = new SrsFastCoroutine("srtp", this);
}

// TODO: FIXME: We should stop the thread first, then free the manager.
SrsAsyncSRTPManager::~SrsAsyncSRTPManager()
{
srs_freep(trd_);

srs_freep(lock_);
srs_freep(srtp_packets_);
srs_freep(cooked_packets_);
Expand Down Expand Up @@ -975,52 +1024,36 @@ srs_error_t SrsAsyncSRTPManager::do_start()
return err;
}

srs_error_t SrsAsyncSRTPManager::consume()
{
srs_error_t err = srs_success;

if ((err = trd_->start()) != srs_success) {
return srs_error_wrap(err, "start");
}

return err;
}

srs_error_t SrsAsyncSRTPManager::cycle()
srs_error_t SrsAsyncSRTPManager::consume(int* nn_consumed)
{
srs_error_t err = srs_success;

// How many messages to run a yield.
uint32_t nn_msgs_for_yield = 0;

while (true) {
if ((err = trd_->pull()) != srs_success) {
return srs_error_wrap(err, "pull");
}
vector<SrsAsyncSRTPPacket*> flying_cooked_packets;
cooked_packets_->swap(flying_cooked_packets);

vector<SrsAsyncSRTPPacket*> flying_cooked_packets;
cooked_packets_->swap(flying_cooked_packets);
if (flying_cooked_packets.empty()) {
return err;
}

if (flying_cooked_packets.empty()) {
srs_usleep(20 * SRS_UTIME_MILLISECONDS);
continue;
}
*nn_consumed += (int)flying_cooked_packets.size();

for (int i = 0; i < (int)flying_cooked_packets.size(); i++) {
SrsAsyncSRTPPacket* pkt = flying_cooked_packets.at(i);
for (int i = 0; i < (int)flying_cooked_packets.size(); i++) {
SrsAsyncSRTPPacket* pkt = flying_cooked_packets.at(i);

if ((err = pkt->task_->consume(pkt)) != srs_success) {
srs_error_reset(err);
}
if ((err = pkt->task_->consume(pkt)) != srs_success) {
srs_error_reset(err);
}

srs_freep(pkt);
srs_freep(pkt);

// Yield to another coroutines.
// @see https://github.com/ossrs/srs/issues/2194#issuecomment-777485531
if (++nn_msgs_for_yield > 10) {
nn_msgs_for_yield = 0;
srs_thread_yield();
}
// Yield to another coroutines.
// @see https://github.com/ossrs/srs/issues/2194#issuecomment-777485531
if (++nn_msgs_for_yield > 10) {
nn_msgs_for_yield = 0;
srs_thread_yield();
}
}

Expand All @@ -1044,14 +1077,11 @@ SrsAsyncRecvManager::SrsAsyncRecvManager()
received_packets_ = new SrsThreadQueue<SrsUdpMuxSocket>();
handler_ = NULL;
max_recv_queue_ = 0;
trd_ = new SrsFastCoroutine("recv", this);
}

// TODO: FIXME: We should stop the thread first, then free the manager.
SrsAsyncRecvManager::~SrsAsyncRecvManager()
{
srs_freep(trd_);

srs_freep(lock_);
srs_freep(received_packets_);

Expand Down Expand Up @@ -1137,55 +1167,36 @@ srs_error_t SrsAsyncRecvManager::do_start()
return err;
}

srs_error_t SrsAsyncRecvManager::consume()
{
srs_error_t err = srs_success;

max_recv_queue_ = _srs_config->get_threads_max_recv_queue();
srs_trace("AsyncRecv: Set max_queue_size=%d", max_recv_queue_);

if ((err = trd_->start()) != srs_success) {
return srs_error_wrap(err, "start");
}

return err;
}

srs_error_t SrsAsyncRecvManager::cycle()
srs_error_t SrsAsyncRecvManager::consume(int* nn_consumed)
{
srs_error_t err = srs_success;

// How many messages to run a yield.
uint32_t nn_msgs_for_yield = 0;

while (true) {
if ((err = trd_->pull()) != srs_success) {
return srs_error_wrap(err, "pull");
}
vector<SrsUdpMuxSocket*> flying_received_packets;
received_packets_->swap(flying_received_packets);

vector<SrsUdpMuxSocket*> flying_received_packets;
received_packets_->swap(flying_received_packets);
if (flying_received_packets.empty()) {
return err;
}

if (flying_received_packets.empty()) {
srs_usleep(20 * SRS_UTIME_MILLISECONDS);
continue;
}
*nn_consumed += (int)flying_received_packets.size();

for (int i = 0; i < (int)flying_received_packets.size(); i++) {
SrsUdpMuxSocket* pkt = flying_received_packets.at(i);
for (int i = 0; i < (int)flying_received_packets.size(); i++) {
SrsUdpMuxSocket* pkt = flying_received_packets.at(i);

if (handler_ && (err = handler_->on_udp_packet(pkt)) != srs_success) {
srs_error_reset(err); // Ignore any error.
}
if (handler_ && (err = handler_->on_udp_packet(pkt)) != srs_success) {
srs_error_reset(err); // Ignore any error.
}

srs_freep(pkt);
srs_freep(pkt);

// Yield to another coroutines.
// @see https://github.com/ossrs/srs/issues/2194#issuecomment-777485531
if (++nn_msgs_for_yield > 10) {
nn_msgs_for_yield = 0;
srs_thread_yield();
}
// Yield to another coroutines.
// @see https://github.com/ossrs/srs/issues/2194#issuecomment-777485531
if (++nn_msgs_for_yield > 10) {
nn_msgs_for_yield = 0;
srs_thread_yield();
}
}

Expand Down
33 changes: 20 additions & 13 deletions trunk/src/app/srs_app_threads.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class SrsThreadEntry

// Allocate a(or almost) fixed thread poll to execute tasks,
// so that we can take the advantage of multiple CPUs.
class SrsThreadPool
class SrsThreadPool : public ISrsCoroutineHandler
{
private:
SrsThreadEntry* entry_;
Expand All @@ -133,6 +133,9 @@ class SrsThreadPool
int high_pulse_;
int critical_threshold_;
int critical_pulse_;
private:
// A coroutine to consume cooked packets.
SrsFastCoroutine* trd_;
public:
SrsThreadPool();
virtual ~SrsThreadPool();
Expand All @@ -152,6 +155,11 @@ class SrsThreadPool
void stop();
private:
static void* start(void* arg);
public:
// Consume packets. Must call in worker/service thread.
virtual srs_error_t consume();
private:
srs_error_t cycle();
};

// The global thread pool.
Expand Down Expand Up @@ -349,16 +357,14 @@ class SrsAsyncSRTPPacket
};

// The async SRTP manager, to start a thread to consume packets.
class SrsAsyncSRTPManager : public ISrsCoroutineHandler
class SrsAsyncSRTPManager
{
private:
std::vector<SrsAsyncSRTPTask*> tasks_;
SrsThreadMutex* lock_;
private:
SrsThreadQueue<SrsAsyncSRTPPacket>* srtp_packets_;
private:
// A coroutine to consume cooked packets.
SrsFastCoroutine* trd_;
// The packets cooked by async SRTP manager.
SrsThreadQueue<SrsAsyncSRTPPacket>* cooked_packets_;
public:
Expand All @@ -375,9 +381,7 @@ class SrsAsyncSRTPManager : public ISrsCoroutineHandler
srs_error_t do_start();
public:
// Consume cooked SRTP packets. Must call in worker/service thread.
virtual srs_error_t consume();
private:
srs_error_t cycle();
virtual srs_error_t consume(int* nn_consumed);
};

// The global async SRTP manager.
Expand All @@ -395,13 +399,11 @@ class SrsThreadUdpListener
};

// The async RECV manager, to recv UDP packets.
class SrsAsyncRecvManager : public ISrsCoroutineHandler
class SrsAsyncRecvManager
{
private:
ISrsUdpMuxHandler* handler_;
private:
// A coroutine to consume received packets.
SrsFastCoroutine* trd_;
// The received UDP packets.
SrsThreadQueue<SrsUdpMuxSocket>* received_packets_;
private:
Expand All @@ -416,16 +418,21 @@ class SrsAsyncRecvManager : public ISrsCoroutineHandler
public:
// Set the handler to process the received UDP packet.
void set_handler(ISrsUdpMuxHandler* v);
// Set the max queue size.
// SrsAsyncRecvManager::set_max_recv_queue()
void set_max_recv_queue(int v) { max_recv_queue_ =v; }
// Add listener to recv from.
void add_listener(SrsThreadUdpListener* listener);
// Get the size of packets queue.
int size();
public:
// Start the thread.
static srs_error_t start(void* arg);
private:
srs_error_t do_start();
public:
// Consume received UDP packets. Must call in worker/service thread.
virtual srs_error_t consume();
private:
srs_error_t cycle();
virtual srs_error_t consume(int* nn_consumed);
};

// The global async RECV manager.
Expand Down

0 comments on commit 198dca5

Please sign in to comment.