Skip to content

Commit

Permalink
Threads-RECV: Support dedicate thread to recv UDP packets.
Browse files Browse the repository at this point in the history
1. Use SrsUdpMuxSocket::raw_recvfrom to read, without ST.
2. Start a UDP recv thread, to recv packets.
3. Consume UDP packets in RTC server timer.
  • Loading branch information
winlinvip committed Apr 26, 2021
1 parent e3686a4 commit 85ea39d
Show file tree
Hide file tree
Showing 9 changed files with 244 additions and 1 deletion.
3 changes: 3 additions & 0 deletions trunk/conf/full.conf
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ threads {
# Whether enable the ASYNC SRTP, codec in dedicate threads.
# Default: off
async_srtp off;
# Whether enable the ASYNC RECV, recv udp packets in dedicate threads.
# Default: off
async_recv off;
}

#############################################################################################
Expand Down
17 changes: 17 additions & 0 deletions trunk/src/app/srs_app_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4144,6 +4144,23 @@ bool SrsConfig::get_threads_async_srtp()
return SRS_CONF_PERFER_FALSE(conf->arg0());
}

bool SrsConfig::get_threads_async_recv()
{
static bool DEFAULT = false;

SrsConfDirective* conf = root->get("threads");
if (!conf) {
return DEFAULT;
}

conf = conf->get("async_recv");
if (!conf) {
return DEFAULT;
}

return SRS_CONF_PERFER_FALSE(conf->arg0());
}

vector<SrsConfDirective*> SrsConfig::get_stream_casters()
{
srs_assert(root);
Expand Down
1 change: 1 addition & 0 deletions trunk/src/app/srs_app_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ class SrsConfig
public:
virtual srs_utime_t get_threads_interval();
virtual bool get_threads_async_srtp();
virtual bool get_threads_async_recv();
// stream_caster section
public:
// Get all stream_caster in config file.
Expand Down
7 changes: 6 additions & 1 deletion trunk/src/app/srs_app_hourglass.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,12 @@ srs_error_t SrsClockWallMonitor::on_timer(srs_utime_t interval, srs_utime_t tick
++_srs_pps_timer_s->sugar;
}

// Consume the cooked async SRTP packets.
// Consume the async received UDP packets.
if ((err = _srs_async_recv->consume()) != srs_success) {
srs_error_reset(err); // Ignore any error.
}

// Consume the async cooked SRTP packets.
if ((err = _srs_async_srtp->consume()) != srs_success) {
srs_error_reset(err); // Ignore any error.
}
Expand Down
55 changes: 55 additions & 0 deletions trunk/src/app/srs_app_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ using namespace std;
#include <srs_app_utility.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_kernel_buffer.hpp>
#include <srs_app_config.hpp>
#include <srs_app_threads.hpp>

#include <srs_protocol_kbps.hpp>

Expand Down Expand Up @@ -323,6 +325,24 @@ int SrsUdpMuxSocket::recvfrom(srs_utime_t timeout)
return nread;
}

return on_recvfrom();
}

int SrsUdpMuxSocket::raw_recvfrom()
{
int osfd = srs_netfd_fileno(lfd);

fromlen = sizeof(from);
nread = ::recvfrom(osfd, buf, nb_buf, 0, (sockaddr*)&from, (socklen_t*)&fromlen);
if (nread <= 0) {
return nread;
}

return on_recvfrom();
}

int SrsUdpMuxSocket::on_recvfrom()
{
// Reset the fast cache buffer size.
cache_buffer_->set_size(nread);
cache_buffer_->skip(-1 * cache_buffer_->pos());
Expand Down Expand Up @@ -494,6 +514,29 @@ SrsUdpMuxSocket* SrsUdpMuxSocket::copy_sendonly()
return sendonly;
}

SrsUdpMuxSocket* SrsUdpMuxSocket::copy()
{
SrsUdpMuxSocket* cp = new SrsUdpMuxSocket(lfd);

cp->nb_buf = nb_buf;
if (nread) {
memcpy(cp->buf, buf, nread);
}
cp->nread = nread;
cp->lfd = lfd;
cp->from = from;
cp->fromlen = fromlen;
cp->peer_ip = peer_ip;
cp->peer_port = peer_port;

// Copy the fast id.
cp->peer_id_ = peer_id_;
cp->fast_id_ = fast_id_;
cp->address_changed_ = address_changed_;

return cp;
}

SrsUdpMuxListener::SrsUdpMuxListener(ISrsUdpMuxHandler* h, std::string i, int p)
{
handler = h;
Expand Down Expand Up @@ -601,6 +644,18 @@ srs_error_t SrsUdpMuxListener::cycle()

set_socket_buffer();

// Sleep infinite if use async_recv.
if (_srs_config->get_threads_async_recv()) {
SrsThreadUdpListener* listener = new SrsThreadUdpListener(lfd);

_srs_async_recv->add_listener(listener);
_srs_async_recv->set_handler(handler);

srs_usleep(SRS_UTIME_NO_TIMEOUT);

return trd->pull();
}

// Because we have to decrypt the cipher of received packet payload,
// and the size is not determined, so we think there is at least one copy,
// and we can reuse the plaintext h264/opus with players when got plaintext.
Expand Down
6 changes: 6 additions & 0 deletions trunk/src/app/srs_app_listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ class SrsUdpMuxSocket
virtual ~SrsUdpMuxSocket();
public:
int recvfrom(srs_utime_t timeout);
int raw_recvfrom();
private:
int on_recvfrom();
public:
srs_error_t sendto(void* data, int size, srs_utime_t timeout);
srs_netfd_t stfd();
sockaddr_in* peer_addr();
Expand All @@ -176,6 +180,8 @@ class SrsUdpMuxSocket
uint64_t fast_id();
SrsBuffer* buffer();
SrsUdpMuxSocket* copy_sendonly();
public:
SrsUdpMuxSocket* copy();
};

class SrsUdpMuxListener : public ISrsCoroutineHandler
Expand Down
112 changes: 112 additions & 0 deletions trunk/src/app/srs_app_threads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,7 @@ void SrsAsyncSRTPManager::remove_task(SrsAsyncSRTPTask* task)
}
}

// TODO: FIXME: We could use a coroutine queue, then cook all packet in RTC server timer.
void SrsAsyncSRTPManager::add_packet(SrsAsyncSRTPPacket* pkt)
{
packets_->push_back(pkt);
Expand All @@ -661,7 +662,9 @@ srs_error_t SrsAsyncSRTPManager::do_start()
{
srs_error_t err = srs_success;

// TODO: FIXME: Config it?
srs_utime_t interval = 10 * SRS_UTIME_MILLISECONDS;

while (true) {
vector<SrsAsyncSRTPPacket*> flying;
packets_->swap(flying);
Expand Down Expand Up @@ -719,3 +722,112 @@ srs_error_t SrsAsyncSRTPManager::consume()
}

SrsAsyncSRTPManager* _srs_async_srtp = new SrsAsyncSRTPManager();

SrsThreadUdpListener::SrsThreadUdpListener(srs_netfd_t fd)
{
skt_ = new SrsUdpMuxSocket(fd);
}

SrsThreadUdpListener::~SrsThreadUdpListener()
{
}

SrsAsyncRecvManager::SrsAsyncRecvManager()
{
lock_ = new SrsThreadMutex();
packets_ = new SrsThreadQueue<SrsUdpMuxSocket>();
handler_ = NULL;
}

// TODO: FIXME: We should stop the thread first, then free the manager.
SrsAsyncRecvManager::~SrsAsyncRecvManager()
{
srs_freep(lock_);
srs_freep(packets_);

vector<SrsThreadUdpListener*>::iterator it;
for (it = listeners_.begin(); it != listeners_.end(); ++it) {
SrsThreadUdpListener* listener = *it;
srs_freep(listener);
}
}

void SrsAsyncRecvManager::set_handler(ISrsUdpMuxHandler* v)
{
handler_ = v;
}

void SrsAsyncRecvManager::add_listener(SrsThreadUdpListener* listener)
{
SrsThreadLocker(lock_);
listeners_.push_back(listener);
}

srs_error_t SrsAsyncRecvManager::start(void* arg)
{
SrsAsyncRecvManager* recv = (SrsAsyncRecvManager*)arg;
return recv->do_start();
}

srs_error_t SrsAsyncRecvManager::do_start()
{
srs_error_t err = srs_success;

// TODO: FIXME: Config it?
srs_utime_t interval = 10 * SRS_UTIME_MILLISECONDS;

while (true) {
vector<SrsThreadUdpListener*> listeners;
if (true) {
SrsThreadLocker(lock_);
listeners = listeners_;
}

bool got_packet = false;
for (int i = 0; i < (int)listeners.size(); i++) {
SrsThreadUdpListener* listener = listeners.at(i);

// TODO: FIXME: Use st_recvfrom to recv if thread-safe ST is ok.
int nread = listener->skt_->raw_recvfrom();
if (nread > 0) {
got_packet = true;
packets_->push_back(listener->skt_->copy());
}
}

// If got packets, maybe more packets in queue.
if (got_packet) {
continue;
}

// TODO: FIXME: Maybe we should use cond wait?
timespec tv = {0};
tv.tv_sec = interval / SRS_UTIME_SECONDS;
tv.tv_nsec = (interval % SRS_UTIME_SECONDS) * 1000;
nanosleep(&tv, NULL);
}

return err;
}

srs_error_t SrsAsyncRecvManager::consume()
{
srs_error_t err = srs_success;

vector<SrsUdpMuxSocket*> flying;
packets_->swap(flying);

for (int i = 0; i < (int)flying.size(); i++) {
SrsUdpMuxSocket* pkt = flying.at(i);

if (handler_ && (err = handler_->on_udp_packet(pkt)) != srs_success) {
srs_error_reset(err); // Ignore any error.
}

srs_freep(pkt);
}

return err;
}

SrsAsyncRecvManager* _srs_async_recv = new SrsAsyncRecvManager();
39 changes: 39 additions & 0 deletions trunk/src/app/srs_app_threads.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <srs_kernel_flv.hpp>
#include <srs_app_rtc_dtls.hpp>
#include <srs_app_rtc_conn.hpp>
#include <srs_app_listener.hpp>

#include <pthread.h>

Expand Down Expand Up @@ -336,4 +337,42 @@ class SrsAsyncSRTPManager
// The global async SRTP manager.
extern SrsAsyncSRTPManager* _srs_async_srtp;

// A thread-safe UDP listener.
// TODO: FIXME: Use st_recvfrom to recv if thread-safe ST is ok.
class SrsThreadUdpListener
{
public:
SrsUdpMuxSocket* skt_;
public:
SrsThreadUdpListener(srs_netfd_t fd);
virtual ~SrsThreadUdpListener();
};

// The async RECV manager, to recv UDP packets.
class SrsAsyncRecvManager
{
private:
ISrsUdpMuxHandler* handler_;
private:
SrsThreadQueue<SrsUdpMuxSocket>* packets_;
private:
std::vector<SrsThreadUdpListener*> listeners_;
SrsThreadMutex* lock_;
public:
SrsAsyncRecvManager();
virtual ~SrsAsyncRecvManager();
public:
void set_handler(ISrsUdpMuxHandler* v);
void add_listener(SrsThreadUdpListener* listener);
static srs_error_t start(void* arg);
private:
srs_error_t do_start();
public:
// Consume received UDP packets. Must call in worker/service thread.
virtual srs_error_t consume();
};

// The global async RECV manager.
extern SrsAsyncRecvManager* _srs_async_recv;

#endif
5 changes: 5 additions & 0 deletions trunk/src/main/srs_main_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,11 @@ srs_error_t run_in_thread_pool()
return srs_error_wrap(err, "start async srtp thread");
}

// Start the async RECV worker thread, to recv UDP packets.
if ((err = _srs_thread_pool->execute("recv", SrsAsyncRecvManager::start, _srs_async_recv)) != srs_success) {
return srs_error_wrap(err, "start async recv thread");
}

// Start the service worker thread, for RTMP and RTC server, etc.
if ((err = _srs_thread_pool->execute("hybrid", run_hybrid_server, NULL)) != srs_success) {
return srs_error_wrap(err, "start hybrid server thread");
Expand Down

0 comments on commit 85ea39d

Please sign in to comment.