diff --git a/trunk/src/app/srs_app_hybrid.cpp b/trunk/src/app/srs_app_hybrid.cpp index 7d691448ec..8df38c7adf 100644 --- a/trunk/src/app/srs_app_hybrid.cpp +++ b/trunk/src/app/srs_app_hybrid.cpp @@ -168,11 +168,6 @@ srs_error_t SrsHybridServer::initialize() { srs_error_t err = srs_success; - // init st - if ((err = srs_st_init()) != srs_success) { - return srs_error_wrap(err, "initialize st failed"); - } - // Create global shared timer. timer_ = new SrsFastTimer("hybrid", 20 * SRS_UTIME_MILLISECONDS); diff --git a/trunk/src/app/srs_app_threads.cpp b/trunk/src/app/srs_app_threads.cpp index 22a3dd088e..5ee814fd48 100644 --- a/trunk/src/app/srs_app_threads.cpp +++ b/trunk/src/app/srs_app_threads.cpp @@ -24,35 +24,149 @@ #include #include +#include +#include +#include + +#include + +using namespace std; + +SrsThreadMutex::SrsThreadMutex() +{ + // https://michaelkerrisk.com/linux/man-pages/man3/pthread_mutex_init.3p.html + int r0 = pthread_mutex_init(&lock_, NULL); + srs_assert(!r0); +} + +SrsThreadMutex::~SrsThreadMutex() +{ + int r0 = pthread_mutex_destroy(&lock_); + srs_assert(!r0); +} + +void SrsThreadMutex::lock() +{ + // https://man7.org/linux/man-pages/man3/pthread_mutex_lock.3p.html + int r0 = pthread_mutex_lock(&lock_); + srs_assert(!r0); +} + +void SrsThreadMutex::unlock() +{ + int r0 = pthread_mutex_unlock(&lock_); + srs_assert(!r0); +} + +SrsThreadEntry::SrsThreadEntry() +{ + pool = NULL; + start = NULL; + arg = NULL; + num = 0; + + err = srs_success; +} SrsThreadPool::SrsThreadPool() { + entry_ = NULL; + lock_ = new SrsThreadMutex(); } SrsThreadPool::~SrsThreadPool() { + srs_freep(lock_); } srs_error_t SrsThreadPool::initialize() { srs_error_t err = srs_success; + + // TODO: FIXME: Should init ST for each thread. + if ((err = srs_st_init()) != srs_success) { + return srs_error_wrap(err, "initialize st failed"); + } + + // Add primordial thread, current thread itself. + SrsThreadEntry* entry = new SrsThreadEntry(); + threads_.push_back(entry); + entry_ = entry; + + entry->pool = this; + entry->label = "primordial"; + entry->start = NULL; + entry->arg = NULL; + entry->num = 1; + + srs_trace("Thread #%d: %s init", entry_->num, entry_->label.c_str()); + return err; } -srs_error_t SrsThreadPool::execute(srs_error_t (*start)(void* arg), void* arg) +srs_error_t SrsThreadPool::execute(string label, srs_error_t (*start)(void* arg), void* arg) { - srs_error_t err = start(arg); + srs_error_t err = srs_success; + + static int num = entry_->num + 1; + + SrsThreadEntry* entry = new SrsThreadEntry(); + + if (true) { + SrsThreadLocker(lock_); + threads_.push_back(entry); + } + + entry->pool = this; + entry->label = label; + entry->start = start; + entry->arg = arg; + entry->num = num++; + + // https://man7.org/linux/man-pages/man3/pthread_create.3.html + pthread_t trd; + int r0 = pthread_create(&trd, NULL, SrsThreadPool::start, entry); + if (r0 != 0) { + entry->err = srs_error_new(ERROR_THREAD_CREATE, "create thread %s", label.c_str()); + return srs_error_copy(entry->err); + } + + entry->trd = trd; + return err; } srs_error_t SrsThreadPool::run() { srs_error_t err = srs_success; + + while (true) { + srs_trace("Thread #%d: %s run, threads=%d", entry_->num, entry_->label.c_str(), + (int)threads_.size()); + sleep(60); + } + return err; } void SrsThreadPool::stop() { + // TODO: FIXME: Implements it. +} + +void* SrsThreadPool::start(void* arg) +{ + srs_error_t err = srs_success; + + SrsThreadEntry* entry = (SrsThreadEntry*)arg; + srs_trace("Thread #%d: %s run", entry->num, entry->label.c_str()); + + if ((err = entry->start(entry->arg)) != srs_success) { + entry->err = err; + } + + // We do not use the return value, the err has been set to entry->err. + return NULL; } SrsThreadPool* _srs_thread_pool = new SrsThreadPool(); diff --git a/trunk/src/app/srs_app_threads.hpp b/trunk/src/app/srs_app_threads.hpp index 5aa12582d3..ad86f58a70 100644 --- a/trunk/src/app/srs_app_threads.hpp +++ b/trunk/src/app/srs_app_threads.hpp @@ -26,22 +26,85 @@ #include +#include + +#include +#include + +class SrsThreadPool; + +// The thread mutex wrapper, without error. +class SrsThreadMutex +{ +private: + pthread_mutex_t lock_; +public: + SrsThreadMutex(); + virtual ~SrsThreadMutex(); +public: + void lock(); + void unlock(); +}; + +// The thread mutex locker. +#define SrsThreadLocker(instance) \ + impl__SrsThreadLocker _SRS_free_##instance(instance) + +class impl__SrsThreadLocker +{ +private: + SrsThreadMutex* lock; +public: + impl__SrsThreadLocker(SrsThreadMutex* l) { + lock = l; + lock->lock(); + } + virtual ~impl__SrsThreadLocker() { + lock->unlock(); + } +}; + +// The information for a thread. +class SrsThreadEntry +{ +public: + SrsThreadPool* pool; + std::string label; + srs_error_t (*start)(void* arg); + void* arg; + int num; +public: + // The thread object. + pthread_t trd; + // The exit error of thread. + srs_error_t err; + + SrsThreadEntry(); +}; + // Allocate a(or almost) fixed thread poll to execute tasks, // so that we can take the advantage of multiple CPUs. class SrsThreadPool { +private: + SrsThreadEntry* entry_; +private: + SrsThreadMutex* lock_; + std::vector threads_; public: SrsThreadPool(); virtual ~SrsThreadPool(); public: // Initialize the thread pool. srs_error_t initialize(); - // Execute start function in thread. - srs_error_t execute(srs_error_t (*start)(void* arg), void* arg); + // Execute start function with label in thread. + srs_error_t execute(std::string label, srs_error_t (*start)(void* arg), void* arg); // Run in the primordial thread, util stop or quit. srs_error_t run(); // Stop the thread pool and quit the primordial thread. void stop(); +private: + static void* start(void* arg); }; // The global thread pool. diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index 2d97327bf4..d199a42ed5 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -118,6 +118,7 @@ #define ERROR_SOCKET_SETREUSEADDR 1079 #define ERROR_SOCKET_SETCLOSEEXEC 1080 #define ERROR_SOCKET_ACCEPT 1081 +#define ERROR_THREAD_CREATE 1082 /////////////////////////////////////////////////////// // RTMP protocol error. diff --git a/trunk/src/main/srs_main_server.cpp b/trunk/src/main/srs_main_server.cpp index 426db4ff58..2cbb846f0b 100644 --- a/trunk/src/main/srs_main_server.cpp +++ b/trunk/src/main/srs_main_server.cpp @@ -469,7 +469,7 @@ srs_error_t run_in_thread_pool() return srs_error_wrap(err, "init thread pool"); } - if ((err = _srs_thread_pool->execute(run_hybrid_server, NULL)) != srs_success) { + if ((err = _srs_thread_pool->execute("hybrid", run_hybrid_server, NULL)) != srs_success) { return srs_error_wrap(err, "run hybrid server"); }