From 281350da03bbafc268f5d3c7e26598a6d28d00fa Mon Sep 17 00:00:00 2001 From: winlin Date: Mon, 5 Apr 2021 18:53:32 +0800 Subject: [PATCH] Threads-Hybrid: Extract pipe and pair to communicate between threads 1. Pipe can be open by any threads, because it's only os FDs. 2. If pipe is open read/write, it's associated by ST, so we MUST free it by the same thread. 3. If open pipe in one thread, it's ok to free it directly, without close pipe. 4. If open read in a thread, then open write in another thread, user MUST close it correctly. --- trunk/src/app/srs_app_server.cpp | 66 +++++++------ trunk/src/app/srs_app_server.hpp | 5 +- trunk/src/app/srs_app_threads.cpp | 137 ++++++++++++++++++++++++++ trunk/src/app/srs_app_threads.hpp | 68 +++++++++++++ trunk/src/kernel/srs_kernel_error.hpp | 3 + trunk/src/protocol/srs_service_st.cpp | 5 + trunk/src/protocol/srs_service_st.hpp | 1 + 7 files changed, 252 insertions(+), 33 deletions(-) diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 002d37f587..0e85c0024f 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -404,39 +404,42 @@ SrsSignalManager* SrsSignalManager::instance = NULL; SrsSignalManager::SrsSignalManager(SrsServer* s) { SrsSignalManager::instance = this; - + + pipe_ = new SrsThreadPipePair(); + server = s; - sig_pipe[0] = sig_pipe[1] = -1; trd = new SrsSTCoroutine("signal", this, _srs_context->get_id()); - signal_read_stfd = NULL; } SrsSignalManager::~SrsSignalManager() { - srs_close_stfd(signal_read_stfd); - - if (sig_pipe[0] > 0) { - ::close(sig_pipe[0]); - } - if (sig_pipe[1] > 0) { - ::close(sig_pipe[1]); - } - srs_freep(trd); + + // Note that it's optional, because the read/write pair is in the same thread. + pipe_->close_read(); + pipe_->close_write(); + + // If in the same thread, we could directly free the pipe, which will close all FDs. + srs_freep(pipe_); } srs_error_t SrsSignalManager::initialize() { - /* Create signal pipe */ - if (pipe(sig_pipe) < 0) { - return srs_error_new(ERROR_SYSTEM_CREATE_PIPE, "create pipe"); + srs_error_t err = srs_success; + + if ((err = pipe_->initialize()) != srs_success) { + return srs_error_wrap(err, "init pipe"); } - - if ((signal_read_stfd = srs_netfd_open(sig_pipe[0])) == NULL) { - return srs_error_new(ERROR_SYSTEM_CREATE_PIPE, "open pipe"); + + if ((err = pipe_->open_read()) != srs_success) { + return srs_error_wrap(err, "init read pipe"); } - - return srs_success; + + if ((err = pipe_->open_write()) != srs_success) { + return srs_error_wrap(err, "init write pipe"); + } + + return err; } srs_error_t SrsSignalManager::start() @@ -496,11 +499,12 @@ srs_error_t SrsSignalManager::cycle() } int signo; + // Read the next signal from the pipe + if ((err = pipe_->read(&signo, sizeof(int), NULL)) != srs_success) { + srs_freep(err); // Ignore any error. + } - /* Read the next signal from the pipe */ - srs_read(signal_read_stfd, &signo, sizeof(int), SRS_UTIME_NO_TIMEOUT); - - /* Process signal synchronously */ + // Process signal synchronously server->on_signal(signo); } @@ -511,13 +515,15 @@ void SrsSignalManager::sig_catcher(int signo) { int err; - /* Save errno to restore it after the write() */ + // Save errno to restore it after the write() err = errno; - - /* write() is reentrant/async-safe */ - int fd = SrsSignalManager::instance->sig_pipe[1]; - write(fd, &signo, sizeof(int)); - + + // write() is reentrant/async-safe + srs_error_t r0 = SrsSignalManager::instance->pipe_->write(&signo, sizeof(int), NULL); + if (r0 != srs_success) { + srs_freep(r0); // Ignore any error. + } + errno = err; } diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index 9afbcf1697..caf7a0500e 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -57,7 +57,7 @@ class SrsAppCasterFlv; class SrsRtspCaster; class SrsResourceManager; class SrsGb28181Caster; - +class SrsThreadPipePair; // The listener type for server to identify the connection, // that is, use different type to process the connection. @@ -215,8 +215,7 @@ class SrsSignalManager : public ISrsCoroutineHandler private: // Per-process pipe which is used as a signal queue. // Up to PIPE_BUF/sizeof(int) signals can be queued up. - int sig_pipe[2]; - srs_netfd_t signal_read_stfd; + SrsThreadPipePair* pipe_; private: SrsServer* server; SrsCoroutine* trd; diff --git a/trunk/src/app/srs_app_threads.cpp b/trunk/src/app/srs_app_threads.cpp index f63cf108e8..3c8c299f9f 100644 --- a/trunk/src/app/srs_app_threads.cpp +++ b/trunk/src/app/srs_app_threads.cpp @@ -83,6 +83,143 @@ uint64_t srs_covert_cpuset(cpu_set_t v) #endif } +SrsPipe::SrsPipe() +{ + pipes_[0] = pipes_[1] = -1; +} + +SrsPipe::~SrsPipe() +{ + // Close the FDs because we might not open it as stfd. + if (pipes_[0] > 0) { + ::close(pipes_[0]); + } + if (pipes_[1] > 0) { + ::close(pipes_[1]); + } +} + +srs_error_t SrsPipe::initialize() +{ + srs_error_t err = srs_success; + + if (pipe(pipes_) < 0) { + return srs_error_new(ERROR_SYSTEM_CREATE_PIPE, "create pipe"); + } + + return err; +} + +int SrsPipe::read_fd() +{ + return pipes_[0]; +} + +int SrsPipe::write_fd() +{ + return pipes_[1]; +} + +SrsThreadPipe::SrsThreadPipe() +{ + stfd_ = NULL; +} + +SrsThreadPipe::~SrsThreadPipe() +{ + srs_close_stfd(stfd_); +} + +srs_error_t SrsThreadPipe::initialize(int fd) +{ + srs_error_t err = srs_success; + + if ((stfd_ = srs_netfd_open(fd)) == NULL) { + return srs_error_new(ERROR_PIPE_OPEN, "open pipe"); + } + + return err; +} + +srs_error_t SrsThreadPipe::read(void* buf, size_t size, ssize_t* nread) +{ + ssize_t nn = srs_read(stfd_, buf, size, SRS_UTIME_NO_TIMEOUT); + + if (nread) { + *nread = nn; + } + + if (nn < 0) { + return srs_error_new(ERROR_PIPE_READ, "read"); + } + + return srs_success; +} + +srs_error_t SrsThreadPipe::write(void* buf, size_t size, ssize_t* nwrite) +{ + ssize_t nn = srs_write(stfd_, buf, size, SRS_UTIME_NO_TIMEOUT); + + if (nwrite) { + *nwrite = nn; + } + + if (nn < 0) { + return srs_error_new(ERROR_PIPE_WRITE, "write"); + } + + return srs_success; +} + +SrsThreadPipePair::SrsThreadPipePair() +{ + pipe_ = new SrsPipe(); + rpipe_ = new SrsThreadPipe(); + wpipe_ = new SrsThreadPipe(); +} + +SrsThreadPipePair::~SrsThreadPipePair() +{ + close_read(); + close_write(); + srs_freep(pipe_); +} + +srs_error_t SrsThreadPipePair::initialize() +{ + return pipe_->initialize(); +} + +srs_error_t SrsThreadPipePair::open_read() +{ + return rpipe_->initialize(pipe_->read_fd()); +} + +srs_error_t SrsThreadPipePair::open_write() +{ + return wpipe_->initialize(pipe_->write_fd()); +} + +void SrsThreadPipePair::close_read() +{ + srs_freep(rpipe_); +} + +void SrsThreadPipePair::close_write() +{ + srs_freep(wpipe_); +} + +srs_error_t SrsThreadPipePair::read(void* buf, size_t size, ssize_t* nread) +{ + return rpipe_->read(buf, size, nread); +} + +srs_error_t SrsThreadPipePair::write(void* buf, size_t size, ssize_t* nwrite) +{ + return wpipe_->write(buf, size, nwrite); +} + SrsThreadMutex::SrsThreadMutex() { // https://man7.org/linux/man-pages/man3/pthread_mutexattr_init.3.html diff --git a/trunk/src/app/srs_app_threads.hpp b/trunk/src/app/srs_app_threads.hpp index 317cc99afc..8ac0243217 100644 --- a/trunk/src/app/srs_app_threads.hpp +++ b/trunk/src/app/srs_app_threads.hpp @@ -44,6 +44,74 @@ class SrsAsyncSRTPPacket; class SrsSecurityTransport; class SrsProcSelfStat; +// The pipe wraps the os pipes(fds). +class SrsPipe +{ +private: + // The max buffer size of pipe is PIPE_BUF, so if we used to transmit signals(int), + // up to PIPE_BUF/sizeof(int) signals can be queued up. + // @see https://man7.org/linux/man-pages/man2/pipe.2.html + int pipes_[2]; +public: + SrsPipe(); + virtual ~SrsPipe(); +public: + srs_error_t initialize(); +public: + int read_fd(); + int write_fd(); +}; + +// The pipe to communicate between thread-local ST of threads. +class SrsThreadPipe +{ +private: + srs_netfd_t stfd_; +public: + SrsThreadPipe(); + virtual ~SrsThreadPipe(); +public: + // Open fd by ST, should be free by the same thread. + srs_error_t initialize(int fd); +public: + // Note that the pipe is unidirectional data channel, so only one of + // read/write is available. + srs_error_t read(void* buf, size_t size, ssize_t* nread); + srs_error_t write(void* buf, size_t size, ssize_t* nwrite); +}; + +// A thread pipe pair, to communicate between threads. +// @remark If thread A open read, then it MUST close the read. +class SrsThreadPipePair +{ +private: + // Per-process pipe which is used as a signal queue. + // Up to PIPE_BUF/sizeof(int) signals can be queued up. + SrsPipe* pipe_; + SrsThreadPipe* rpipe_; + SrsThreadPipe* wpipe_; +public: + SrsThreadPipePair(); + virtual ~SrsThreadPipePair(); +public: + // It's ok to initialize pipe in another threads. + srs_error_t initialize(); +public: + // It's ok to open read/write in one or two threads. + srs_error_t open_read(); + srs_error_t open_write(); +public: + // For multiple-threading, if a thread open the pipe, it MUST close it, never close it by + // another thread which has not open it. + // If pair(read/write) alive in one thread, user can directly free the pair, without closing + // the read/write, because it's in the same thread. + void close_read(); + void close_write(); +public: + srs_error_t read(void* buf, size_t size, ssize_t* nread); + srs_error_t write(void* buf, size_t size, ssize_t* nwrite); +}; + // The thread mutex wrapper, without error. class SrsThreadMutex { diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index ece24b0430..7558c00ac2 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -120,6 +120,9 @@ #define ERROR_SOCKET_ACCEPT 1081 #define ERROR_THREAD_CREATE 1082 #define ERROR_SYSTEM_LOGFILE 1083 +#define ERROR_PIPE_OPEN 1084 +#define ERROR_PIPE_READ 1085 +#define ERROR_PIPE_WRITE 1086 /////////////////////////////////////////////////////// // RTMP protocol error. diff --git a/trunk/src/protocol/srs_service_st.cpp b/trunk/src/protocol/srs_service_st.cpp index 1547858eb3..1c206e965a 100644 --- a/trunk/src/protocol/srs_service_st.cpp +++ b/trunk/src/protocol/srs_service_st.cpp @@ -457,6 +457,11 @@ ssize_t srs_read(srs_netfd_t stfd, void *buf, size_t nbyte, srs_utime_t timeout) return st_read((st_netfd_t)stfd, buf, nbyte, (st_utime_t)timeout); } +ssize_t srs_write(srs_netfd_t stfd, const void *buf, size_t nbyte, srs_utime_t timeout) +{ + return st_write((st_netfd_t)stfd, buf, nbyte, (st_utime_t)timeout); +} + bool srs_is_never_timeout(srs_utime_t tm) { return tm == SRS_UTIME_NO_TIMEOUT; diff --git a/trunk/src/protocol/srs_service_st.hpp b/trunk/src/protocol/srs_service_st.hpp index a82aa41d1f..ffeb5c32fd 100644 --- a/trunk/src/protocol/srs_service_st.hpp +++ b/trunk/src/protocol/srs_service_st.hpp @@ -102,6 +102,7 @@ extern int srs_sendmsg(srs_netfd_t stfd, const struct msghdr *msg, int flags, sr extern srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout); extern ssize_t srs_read(srs_netfd_t stfd, void *buf, size_t nbyte, srs_utime_t timeout); +extern ssize_t srs_write(srs_netfd_t stfd, const void *buf, size_t nbyte, srs_utime_t timeout); extern bool srs_is_never_timeout(srs_utime_t tm);