Skip to content

Commit

Permalink
For #2188: Run hybrid server in thread.
Browse files Browse the repository at this point in the history
1. Create thread when execute by thread pool.
2. The primordial thread check all threads status.
3. Have not complete the cleanup and stop.
  • Loading branch information
winlinvip committed Mar 12, 2021
1 parent 3b54c02 commit 4a43a47
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 10 deletions.
5 changes: 0 additions & 5 deletions trunk/src/app/srs_app_hybrid.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,6 @@ srs_error_t SrsHybridServer::initialize()
{
srs_error_t err = srs_success;

// init st
if ((err = srs_st_init()) != srs_success) {
return srs_error_wrap(err, "initialize st failed");
}

if ((err = setup_ticks()) != srs_success) {
return srs_error_wrap(err, "tick");
}
Expand Down
109 changes: 107 additions & 2 deletions trunk/src/app/srs_app_threads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,35 +24,140 @@
#include <srs_app_threads.hpp>

#include <srs_kernel_error.hpp>
#include <srs_app_config.hpp>
#include <srs_app_log.hpp>
#include <srs_core_autofree.hpp>

#include <unistd.h>

using namespace std;

SrsThreadLock::SrsThreadLock()
{
// https://michaelkerrisk.com/linux/man-pages/man3/pthread_mutex_init.3p.html
int r0 = pthread_mutex_init(&lock_, NULL);
srs_assert(!r0);

// https://man7.org/linux/man-pages/man3/pthread_mutex_lock.3p.html
r0 = pthread_mutex_lock(&lock_);
srs_assert(!r0);
}

SrsThreadLock::~SrsThreadLock()
{
int r0 = pthread_mutex_unlock(&lock_);
srs_assert(!r0);

r0 = pthread_mutex_destroy(&lock_);
srs_assert(!r0);
}

SrsThreadEntry::SrsThreadEntry()
{
pool = NULL;
start = NULL;
arg = NULL;
num = 0;

err = srs_success;
}

SrsThreadPool::SrsThreadPool()
{
entry_ = NULL;
}

SrsThreadPool::~SrsThreadPool()
{
// TODO: FIXME: Implements it.
}

srs_error_t SrsThreadPool::initialize()
{
srs_error_t err = srs_success;

// TODO: FIXME: Should init ST for each thread.
if ((err = srs_st_init()) != srs_success) {
return srs_error_wrap(err, "initialize st failed");
}

// Add primordial thread, current thread itself.
SrsThreadEntry* entry = new SrsThreadEntry();
threads_.push_back(entry);
entry_ = entry;

entry->pool = this;
entry->label = "primordial";
entry->start = NULL;
entry->arg = NULL;
entry->num = 1;

srs_trace("Thread #%d: %s init", entry_->num, entry_->label.c_str());

return err;
}

srs_error_t SrsThreadPool::execute(srs_error_t (*start)(void* arg), void* arg)
srs_error_t SrsThreadPool::execute(string label, srs_error_t (*start)(void* arg), void* arg)
{
srs_error_t err = start(arg);
srs_error_t err = srs_success;

SrsThreadLock* lock = new SrsThreadLock();
SrsAutoFree(SrsThreadLock, lock);

static int num = entry_->num + 1;

SrsThreadEntry* entry = new SrsThreadEntry();
threads_.push_back(entry);

entry->pool = this;
entry->label = label;
entry->start = start;
entry->arg = arg;
entry->num = num++;

// https://man7.org/linux/man-pages/man3/pthread_create.3.html
pthread_t trd;
int r0 = pthread_create(&trd, NULL, SrsThreadPool::start, entry);
if (r0 != 0) {
return srs_error_new(ERROR_THREAD_CREATE, "create thread %s", label.c_str());
}

entry->trd = trd;

return err;
}

srs_error_t SrsThreadPool::run()
{
srs_error_t err = srs_success;

while (true) {
srs_trace("Thread #%d: %s run, threads=%d", entry_->num, entry_->label.c_str(),
(int)threads_.size());
sleep(60);
}

return err;
}

void SrsThreadPool::stop()
{
// TODO: FIXME: Implements it.
}

void* SrsThreadPool::start(void* arg)
{
srs_error_t err = srs_success;

SrsThreadEntry* entry = (SrsThreadEntry*)arg;
srs_trace("Thread #%d: %s run", entry->num, entry->label.c_str());

if ((err = entry->start(entry->arg)) != srs_success) {
entry->err = err;
}

// We do not use the return value, the err has been set to entry->err.
return NULL;
}

SrsThreadPool* _srs_thread_pool = new SrsThreadPool();
44 changes: 42 additions & 2 deletions trunk/src/app/srs_app_threads.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,62 @@

#include <srs_core.hpp>

#include <pthread.h>

#include <vector>
#include <string>

class SrsThreadPool;

// The thread lock.
class SrsThreadLock
{
private:
pthread_mutex_t lock_;
public:
SrsThreadLock();
virtual ~SrsThreadLock();
};

// The information for a thread.
class SrsThreadEntry
{
public:
SrsThreadPool* pool;
std::string label;
srs_error_t (*start)(void* arg);
void* arg;
int num;
public:
// The thread object.
pthread_t trd;
// The exit error of thread.
srs_error_t err;

SrsThreadEntry();
};

// Allocate a(or almost) fixed thread poll to execute tasks,
// so that we can take the advantage of multiple CPUs.
class SrsThreadPool
{
private:
SrsThreadEntry* entry_;
std::vector<SrsThreadEntry*> threads_;
public:
SrsThreadPool();
virtual ~SrsThreadPool();
public:
// Initialize the thread pool.
srs_error_t initialize();
// Execute start function in thread.
srs_error_t execute(srs_error_t (*start)(void* arg), void* arg);
// Execute start function with label in thread.
srs_error_t execute(std::string label, srs_error_t (*start)(void* arg), void* arg);
// Run in the primordial thread, util stop or quit.
srs_error_t run();
// Stop the thread pool and quit the primordial thread.
void stop();
private:
static void* start(void* arg);
};

// The global thread pool.
Expand Down
1 change: 1 addition & 0 deletions trunk/src/kernel/srs_kernel_error.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
#define ERROR_SOCKET_SETREUSEADDR 1079
#define ERROR_SOCKET_SETCLOSEEXEC 1080
#define ERROR_SOCKET_ACCEPT 1081
#define ERROR_THREAD_CREATE 1082

///////////////////////////////////////////////////////
// RTMP protocol error.
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/main/srs_main_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ srs_error_t run_in_thread_pool()
return srs_error_wrap(err, "init thread pool");
}

if ((err = _srs_thread_pool->execute(run_hybrid_server, NULL)) != srs_success) {
if ((err = _srs_thread_pool->execute("hybrid", run_hybrid_server, NULL)) != srs_success) {
return srs_error_wrap(err, "run hybrid server");
}

Expand Down

0 comments on commit 4a43a47

Please sign in to comment.