From 85ea39d397b715ca6667a86e919d767eb9708a7d Mon Sep 17 00:00:00 2001 From: winlin Date: Mon, 15 Mar 2021 22:51:58 +0800 Subject: [PATCH] Threads-RECV: Support dedicate thread to recv UDP packets. 1. Use SrsUdpMuxSocket::raw_recvfrom to read, without ST. 2. Start a UDP recv thread, to recv packets. 3. Consume UDP packets in RTC server timer. --- trunk/conf/full.conf | 3 + trunk/src/app/srs_app_config.cpp | 17 +++++ trunk/src/app/srs_app_config.hpp | 1 + trunk/src/app/srs_app_hourglass.cpp | 7 +- trunk/src/app/srs_app_listener.cpp | 55 ++++++++++++++ trunk/src/app/srs_app_listener.hpp | 6 ++ trunk/src/app/srs_app_threads.cpp | 112 ++++++++++++++++++++++++++++ trunk/src/app/srs_app_threads.hpp | 39 ++++++++++ trunk/src/main/srs_main_server.cpp | 5 ++ 9 files changed, 244 insertions(+), 1 deletion(-) diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 8e70de013d..12321914a5 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -124,6 +124,9 @@ threads { # Whether enable the ASYNC SRTP, codec in dedicate threads. # Default: off async_srtp off; + # Whether enable the ASYNC RECV, recv udp packets in dedicate threads. + # Default: off + async_recv off; } ############################################################################################# diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index eeb959e34c..bfb767a49c 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -4144,6 +4144,23 @@ bool SrsConfig::get_threads_async_srtp() return SRS_CONF_PERFER_FALSE(conf->arg0()); } +bool SrsConfig::get_threads_async_recv() +{ + static bool DEFAULT = false; + + SrsConfDirective* conf = root->get("threads"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("async_recv"); + if (!conf) { + return DEFAULT; + } + + return SRS_CONF_PERFER_FALSE(conf->arg0()); +} + vector SrsConfig::get_stream_casters() { srs_assert(root); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index f05676a397..50ebc4731c 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -479,6 +479,7 @@ class SrsConfig public: virtual srs_utime_t get_threads_interval(); virtual bool get_threads_async_srtp(); + virtual bool get_threads_async_recv(); // stream_caster section public: // Get all stream_caster in config file. diff --git a/trunk/src/app/srs_app_hourglass.cpp b/trunk/src/app/srs_app_hourglass.cpp index 32df83cd32..1d5900e9c2 100644 --- a/trunk/src/app/srs_app_hourglass.cpp +++ b/trunk/src/app/srs_app_hourglass.cpp @@ -261,7 +261,12 @@ srs_error_t SrsClockWallMonitor::on_timer(srs_utime_t interval, srs_utime_t tick ++_srs_pps_timer_s->sugar; } - // Consume the cooked async SRTP packets. + // Consume the async received UDP packets. + if ((err = _srs_async_recv->consume()) != srs_success) { + srs_error_reset(err); // Ignore any error. + } + + // Consume the async cooked SRTP packets. if ((err = _srs_async_srtp->consume()) != srs_success) { srs_error_reset(err); // Ignore any error. } diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp index c38609c9f5..13994c714c 100755 --- a/trunk/src/app/srs_app_listener.cpp +++ b/trunk/src/app/srs_app_listener.cpp @@ -42,6 +42,8 @@ using namespace std; #include #include #include +#include +#include #include @@ -323,6 +325,24 @@ int SrsUdpMuxSocket::recvfrom(srs_utime_t timeout) return nread; } + return on_recvfrom(); +} + +int SrsUdpMuxSocket::raw_recvfrom() +{ + int osfd = srs_netfd_fileno(lfd); + + fromlen = sizeof(from); + nread = ::recvfrom(osfd, buf, nb_buf, 0, (sockaddr*)&from, (socklen_t*)&fromlen); + if (nread <= 0) { + return nread; + } + + return on_recvfrom(); +} + +int SrsUdpMuxSocket::on_recvfrom() +{ // Reset the fast cache buffer size. cache_buffer_->set_size(nread); cache_buffer_->skip(-1 * cache_buffer_->pos()); @@ -494,6 +514,29 @@ SrsUdpMuxSocket* SrsUdpMuxSocket::copy_sendonly() return sendonly; } +SrsUdpMuxSocket* SrsUdpMuxSocket::copy() +{ + SrsUdpMuxSocket* cp = new SrsUdpMuxSocket(lfd); + + cp->nb_buf = nb_buf; + if (nread) { + memcpy(cp->buf, buf, nread); + } + cp->nread = nread; + cp->lfd = lfd; + cp->from = from; + cp->fromlen = fromlen; + cp->peer_ip = peer_ip; + cp->peer_port = peer_port; + + // Copy the fast id. + cp->peer_id_ = peer_id_; + cp->fast_id_ = fast_id_; + cp->address_changed_ = address_changed_; + + return cp; +} + SrsUdpMuxListener::SrsUdpMuxListener(ISrsUdpMuxHandler* h, std::string i, int p) { handler = h; @@ -601,6 +644,18 @@ srs_error_t SrsUdpMuxListener::cycle() set_socket_buffer(); + // Sleep infinite if use async_recv. + if (_srs_config->get_threads_async_recv()) { + SrsThreadUdpListener* listener = new SrsThreadUdpListener(lfd); + + _srs_async_recv->add_listener(listener); + _srs_async_recv->set_handler(handler); + + srs_usleep(SRS_UTIME_NO_TIMEOUT); + + return trd->pull(); + } + // Because we have to decrypt the cipher of received packet payload, // and the size is not determined, so we think there is at least one copy, // and we can reuse the plaintext h264/opus with players when got plaintext. diff --git a/trunk/src/app/srs_app_listener.hpp b/trunk/src/app/srs_app_listener.hpp index e3ba3e975b..57d0ff63c8 100644 --- a/trunk/src/app/srs_app_listener.hpp +++ b/trunk/src/app/srs_app_listener.hpp @@ -164,6 +164,10 @@ class SrsUdpMuxSocket virtual ~SrsUdpMuxSocket(); public: int recvfrom(srs_utime_t timeout); + int raw_recvfrom(); +private: + int on_recvfrom(); +public: srs_error_t sendto(void* data, int size, srs_utime_t timeout); srs_netfd_t stfd(); sockaddr_in* peer_addr(); @@ -176,6 +180,8 @@ class SrsUdpMuxSocket uint64_t fast_id(); SrsBuffer* buffer(); SrsUdpMuxSocket* copy_sendonly(); +public: + SrsUdpMuxSocket* copy(); }; class SrsUdpMuxListener : public ISrsCoroutineHandler diff --git a/trunk/src/app/srs_app_threads.cpp b/trunk/src/app/srs_app_threads.cpp index bfbfd8d5b3..24afae9278 100644 --- a/trunk/src/app/srs_app_threads.cpp +++ b/trunk/src/app/srs_app_threads.cpp @@ -646,6 +646,7 @@ void SrsAsyncSRTPManager::remove_task(SrsAsyncSRTPTask* task) } } +// TODO: FIXME: We could use a coroutine queue, then cook all packet in RTC server timer. void SrsAsyncSRTPManager::add_packet(SrsAsyncSRTPPacket* pkt) { packets_->push_back(pkt); @@ -661,7 +662,9 @@ srs_error_t SrsAsyncSRTPManager::do_start() { srs_error_t err = srs_success; + // TODO: FIXME: Config it? srs_utime_t interval = 10 * SRS_UTIME_MILLISECONDS; + while (true) { vector flying; packets_->swap(flying); @@ -719,3 +722,112 @@ srs_error_t SrsAsyncSRTPManager::consume() } SrsAsyncSRTPManager* _srs_async_srtp = new SrsAsyncSRTPManager(); + +SrsThreadUdpListener::SrsThreadUdpListener(srs_netfd_t fd) +{ + skt_ = new SrsUdpMuxSocket(fd); +} + +SrsThreadUdpListener::~SrsThreadUdpListener() +{ +} + +SrsAsyncRecvManager::SrsAsyncRecvManager() +{ + lock_ = new SrsThreadMutex(); + packets_ = new SrsThreadQueue(); + handler_ = NULL; +} + +// TODO: FIXME: We should stop the thread first, then free the manager. +SrsAsyncRecvManager::~SrsAsyncRecvManager() +{ + srs_freep(lock_); + srs_freep(packets_); + + vector::iterator it; + for (it = listeners_.begin(); it != listeners_.end(); ++it) { + SrsThreadUdpListener* listener = *it; + srs_freep(listener); + } +} + +void SrsAsyncRecvManager::set_handler(ISrsUdpMuxHandler* v) +{ + handler_ = v; +} + +void SrsAsyncRecvManager::add_listener(SrsThreadUdpListener* listener) +{ + SrsThreadLocker(lock_); + listeners_.push_back(listener); +} + +srs_error_t SrsAsyncRecvManager::start(void* arg) +{ + SrsAsyncRecvManager* recv = (SrsAsyncRecvManager*)arg; + return recv->do_start(); +} + +srs_error_t SrsAsyncRecvManager::do_start() +{ + srs_error_t err = srs_success; + + // TODO: FIXME: Config it? + srs_utime_t interval = 10 * SRS_UTIME_MILLISECONDS; + + while (true) { + vector listeners; + if (true) { + SrsThreadLocker(lock_); + listeners = listeners_; + } + + bool got_packet = false; + for (int i = 0; i < (int)listeners.size(); i++) { + SrsThreadUdpListener* listener = listeners.at(i); + + // TODO: FIXME: Use st_recvfrom to recv if thread-safe ST is ok. + int nread = listener->skt_->raw_recvfrom(); + if (nread > 0) { + got_packet = true; + packets_->push_back(listener->skt_->copy()); + } + } + + // If got packets, maybe more packets in queue. + if (got_packet) { + continue; + } + + // TODO: FIXME: Maybe we should use cond wait? + timespec tv = {0}; + tv.tv_sec = interval / SRS_UTIME_SECONDS; + tv.tv_nsec = (interval % SRS_UTIME_SECONDS) * 1000; + nanosleep(&tv, NULL); + } + + return err; +} + +srs_error_t SrsAsyncRecvManager::consume() +{ + srs_error_t err = srs_success; + + vector flying; + packets_->swap(flying); + + for (int i = 0; i < (int)flying.size(); i++) { + SrsUdpMuxSocket* pkt = flying.at(i); + + if (handler_ && (err = handler_->on_udp_packet(pkt)) != srs_success) { + srs_error_reset(err); // Ignore any error. + } + + srs_freep(pkt); + } + + return err; +} + +SrsAsyncRecvManager* _srs_async_recv = new SrsAsyncRecvManager(); diff --git a/trunk/src/app/srs_app_threads.hpp b/trunk/src/app/srs_app_threads.hpp index 2c6867446a..975f79d0c3 100644 --- a/trunk/src/app/srs_app_threads.hpp +++ b/trunk/src/app/srs_app_threads.hpp @@ -30,6 +30,7 @@ #include #include #include +#include #include @@ -336,4 +337,42 @@ class SrsAsyncSRTPManager // The global async SRTP manager. extern SrsAsyncSRTPManager* _srs_async_srtp; +// A thread-safe UDP listener. +// TODO: FIXME: Use st_recvfrom to recv if thread-safe ST is ok. +class SrsThreadUdpListener +{ +public: + SrsUdpMuxSocket* skt_; +public: + SrsThreadUdpListener(srs_netfd_t fd); + virtual ~SrsThreadUdpListener(); +}; + +// The async RECV manager, to recv UDP packets. +class SrsAsyncRecvManager +{ +private: + ISrsUdpMuxHandler* handler_; +private: + SrsThreadQueue* packets_; +private: + std::vector listeners_; + SrsThreadMutex* lock_; +public: + SrsAsyncRecvManager(); + virtual ~SrsAsyncRecvManager(); +public: + void set_handler(ISrsUdpMuxHandler* v); + void add_listener(SrsThreadUdpListener* listener); + static srs_error_t start(void* arg); +private: + srs_error_t do_start(); +public: + // Consume received UDP packets. Must call in worker/service thread. + virtual srs_error_t consume(); +}; + +// The global async RECV manager. +extern SrsAsyncRecvManager* _srs_async_recv; + #endif diff --git a/trunk/src/main/srs_main_server.cpp b/trunk/src/main/srs_main_server.cpp index 882d43d768..45dc2d9fb2 100644 --- a/trunk/src/main/srs_main_server.cpp +++ b/trunk/src/main/srs_main_server.cpp @@ -492,6 +492,11 @@ srs_error_t run_in_thread_pool() return srs_error_wrap(err, "start async srtp thread"); } + // Start the async RECV worker thread, to recv UDP packets. + if ((err = _srs_thread_pool->execute("recv", SrsAsyncRecvManager::start, _srs_async_recv)) != srs_success) { + return srs_error_wrap(err, "start async recv thread"); + } + // Start the service worker thread, for RTMP and RTC server, etc. if ((err = _srs_thread_pool->execute("hybrid", run_hybrid_server, NULL)) != srs_success) { return srs_error_wrap(err, "start hybrid server thread");