Skip to content

Commit

Permalink
Threads-Hybrid: Support communicate between threads by chan and slot
Browse files Browse the repository at this point in the history
1. Hybrid thread is responder, API thread is initiator.
2. Responder read message from initiator-slot, write message to responder-slot.
3. Initiator write message to initiator-slot, read message from responder-slot.
4. Responder start a coroutine to consume requests and response it.
  • Loading branch information
winlinvip committed Apr 26, 2021
1 parent 281350d commit 6e2de90
Show file tree
Hide file tree
Showing 9 changed files with 356 additions and 12 deletions.
57 changes: 57 additions & 0 deletions trunk/src/app/srs_app_hybrid.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <srs_service_st.hpp>
#include <srs_app_utility.hpp>
#include <srs_app_threads.hpp>
#include <srs_app_rtc_server.hpp>

using namespace std;

Expand Down Expand Up @@ -192,6 +193,19 @@ srs_error_t SrsHybridServer::initialize()
}
}

// Create slots for other threads to communicate with us.
SrsThreadEntry* self = _srs_thread_pool->self();

self->slot_ = new SrsThreadPipeSlot(1);

if ((err = self->slot_->initialize()) != srs_success) {
return srs_error_wrap(err, "init slot");
}

if ((err = self->slot_->open_responder(this)) != srs_success) {
return srs_error_wrap(err, "init slot");
}

return err;
}

Expand Down Expand Up @@ -387,5 +401,48 @@ srs_error_t SrsHybridServer::on_timer(srs_utime_t interval, srs_utime_t tick)
return err;
}

srs_error_t SrsHybridServer::on_thread_message(SrsThreadMessage* msg, SrsThreadPipeChannel* channel)
{
srs_error_t err = srs_success;

RtcServerAdapter* adapter = NULL;
if (true) {
vector<ISrsHybridServer*> servers = _srs_hybrid->servers;
for (vector<ISrsHybridServer*>::iterator it = servers.begin(); it != servers.end(); ++it) {
RtcServerAdapter* server = dynamic_cast<RtcServerAdapter*>(*it);
if (server) {
adapter = server;
break;
}
}
}

// TODO: FIXME: Response error?
if (!adapter) {
return err;
}

if (msg->id == (uint64_t)SrsThreadMessageIDRtcCreateSession) {
SrsThreadMessageRtcCreateSession* s = (SrsThreadMessageRtcCreateSession*)msg->ptr;
err = adapter->rtc->create_session(s->req, s->remote_sdp, s->local_sdp, s->mock_eip,
s->publish, s->dtls, s->srtp, &s->session);

// TODO: FIXME: Response error?
if (err != srs_success) {
return srs_error_wrap(err, "create session");
}

// TODO: FIXME: Response timeout if error?
// TODO: FIXME: Response a different message? With trace ID?
// We're responder, write response to responder.
srs_error_t r0 = channel->responder()->write(msg, sizeof(SrsThreadMessage), NULL);
if (r0 != srs_success) {
srs_freep(r0); // Ignore any error.
}
}

return err;
}

SrsHybridServer* _srs_hybrid = new SrsHybridServer();

6 changes: 4 additions & 2 deletions trunk/src/app/srs_app_hybrid.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <vector>

#include <srs_app_hourglass.hpp>
#include <srs_app_threads.hpp>

class SrsServer;
class SrsServerAdapter;
Expand All @@ -49,9 +50,8 @@ class ISrsHybridServer
};

// The hybrid server manager.
class SrsHybridServer : public ISrsFastTimer
class SrsHybridServer : public ISrsFastTimer, public ISrsThreadResponder
{
friend class SrsApiServer;
private:
std::vector<ISrsHybridServer*> servers;
SrsFastTimer* timer_;
Expand All @@ -70,6 +70,8 @@ class SrsHybridServer : public ISrsFastTimer
// interface ISrsFastTimer
private:
srs_error_t on_timer(srs_utime_t interval, srs_utime_t tick);
private:
srs_error_t on_thread_message(SrsThreadMessage* msg, SrsThreadPipeChannel* channel);
};

extern SrsHybridServer* _srs_hybrid;
Expand Down
1 change: 0 additions & 1 deletion trunk/src/app/srs_app_rtc_conn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
#include <srs_service_st.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_rtmp_stack.hpp>
#include <srs_app_hybrid.hpp>
#include <srs_app_hourglass.hpp>
#include <srs_app_rtc_sdp.hpp>
#include <srs_app_reload.hpp>
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_rtc_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class SrsRtcServer : public ISrsUdpMuxHandler, public ISrsFastTimer, public ISrs
// The RTC server adapter.
class RtcServerAdapter : public ISrsHybridServer
{
friend class SrsApiServer;
friend class SrsHybridServer;
private:
SrsRtcServer* rtc;
public:
Expand Down
58 changes: 50 additions & 8 deletions trunk/src/app/srs_app_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1993,18 +1993,60 @@ srs_error_t SrsApiServer::create_session(
) {
srs_error_t err = srs_success;

vector<ISrsHybridServer*> servers = _srs_hybrid->servers;
for (vector<ISrsHybridServer*>::iterator it = servers.begin(); it != servers.end(); ++it) {
RtcServerAdapter* adapter = dynamic_cast<RtcServerAdapter*>(*it);
if (!adapter) {
continue;
// Allocate slot to communicate with hybrid thread.
SrsThreadEntry* self = _srs_thread_pool->self();
SrsThreadEntry* hybrid = _srs_thread_pool->hybrid();
srs_assert(self && hybrid);

SrsThreadPipeChannel* channel = NULL;
if (true) {
map<pthread_t, SrsThreadPipeChannel*>::iterator it = self->channels_.find(hybrid->trd);
if (it == self->channels_.end()) {
self->channels_[hybrid->trd] = channel = hybrid->slot_->allocate();
} else {
channel = it->second;
}
}
srs_assert(channel);

// TODO: FIXME: Should notify thread by thread-slot.
return adapter->rtc->create_session(req, remote_sdp, local_sdp, mock_eip,
publish, dtls, srtp, psession);
// We're initiator, write to initiator, read from responder.
if ((err = channel->initiator()->open_write()) != srs_success) {
return srs_error_wrap(err, "open write");
}
if ((err = channel->responder()->open_read()) != srs_success) {
return srs_error_wrap(err, "open read");
}

SrsThreadMessageRtcCreateSession s;
s.req = req;
s.remote_sdp = remote_sdp;
s.local_sdp = local_sdp;
s.mock_eip = mock_eip;
s.publish = publish;
s.dtls = dtls;
s.srtp = srtp;
s.session = NULL;

SrsThreadMessage m;
m.id = (uint64_t)SrsThreadMessageIDRtcCreateSession;
m.ptr = (uint64_t)&s;

// We're initiator, write to initiator, read from responder.
// TODO: FIXME: Write important logs, and error response, and timeout?
if ((err = channel->initiator()->write(&m, sizeof(m), NULL)) != srs_success) {
return srs_error_wrap(err, "write");
}

// TODO: FIXME: Write important logs, and error response, and timeout?
if ((err = channel->responder()->read(&m, sizeof(m), NULL)) != srs_success) {
return srs_error_wrap(err, "read");
}

// Covert to output params.
local_sdp = s.local_sdp;
// TODO: FIMXE: Should never return it, for it's not thread-safe.
*psession = s.session;

return err;
}

Expand Down
18 changes: 18 additions & 0 deletions trunk/src/app/srs_app_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include <srs_app_hourglass.hpp>
#include <srs_app_hybrid.hpp>
#include <srs_app_rtc_api.hpp>
#include <srs_app_rtc_sdp.hpp>

class SrsServer;
class SrsHttpServeMux;
Expand Down Expand Up @@ -449,5 +450,22 @@ class SrsApiServer : public ISrsTcpMuxHandler, public ISrsResourceManager, publi
srs_error_t do_start();
};

// The RTC create session information.
struct SrsThreadMessageRtcCreateSession
{
// Input.
SrsRequest* req;
SrsSdp remote_sdp;
std::string mock_eip;
bool publish;
bool dtls;
bool srtp;

// Output.
SrsSdp local_sdp;
// TODO: FIXME: It's not thread-safe.
SrsRtcConnection* session;
};

#endif

140 changes: 140 additions & 0 deletions trunk/src/app/srs_app_threads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ srs_error_t SrsPipe::initialize()
{
srs_error_t err = srs_success;

if (pipes_[0] > 0) {
return err;
}

if (pipe(pipes_) < 0) {
return srs_error_new(ERROR_SYSTEM_CREATE_PIPE, "create pipe");
}
Expand Down Expand Up @@ -134,6 +138,10 @@ srs_error_t SrsThreadPipe::initialize(int fd)
{
srs_error_t err = srs_success;

if (stfd_) {
return err;
}

if ((stfd_ = srs_netfd_open(fd)) == NULL) {
return srs_error_new(ERROR_PIPE_OPEN, "open pipe");
}
Expand Down Expand Up @@ -220,6 +228,134 @@ srs_error_t SrsThreadPipePair::write(void* buf, size_t size, ssize_t* nwrite)
return wpipe_->write(buf, size, nwrite);
}

SrsThreadPipeChannel::SrsThreadPipeChannel()
{
initiator_ = new SrsThreadPipePair();
responder_ = new SrsThreadPipePair();

trd_ = new SrsFastCoroutine("chan", this);
handler_ = NULL;
}

SrsThreadPipeChannel::~SrsThreadPipeChannel()
{
srs_freep(trd_);
srs_freep(initiator_);
srs_freep(responder_);
}

SrsThreadPipePair* SrsThreadPipeChannel::initiator()
{
return initiator_;
}

SrsThreadPipePair* SrsThreadPipeChannel::responder()
{
return responder_;
}

srs_error_t SrsThreadPipeChannel::start(ISrsThreadResponder* h)
{
handler_ = h;
return trd_->start();
}

srs_error_t SrsThreadPipeChannel::cycle()
{
srs_error_t err = srs_success;

while (true) {
if ((err = trd_->pull()) != srs_success) {
return srs_error_wrap(err, "pull");
}

// Here we're responder, read from initiator.
SrsThreadMessage m;
if ((err = initiator_->read(&m, sizeof(m), NULL)) != srs_success) {
return srs_error_wrap(err, "read");
}

// Consume the message, the responder can write response to responder.
if (handler_ && (err = handler_->on_thread_message(&m, this)) != srs_success) {
return srs_error_wrap(err, "consume");
}
}

return err;
}

SrsThreadPipeSlot::SrsThreadPipeSlot(int slots)
{
nn_channels_ = slots;
channels_ = new SrsThreadPipeChannel[slots];

index_ = 0;
lock_ = new SrsThreadMutex();
}

SrsThreadPipeSlot::~SrsThreadPipeSlot()
{
srs_freepa(channels_);
srs_freep(lock_);
}

srs_error_t SrsThreadPipeSlot::initialize()
{
srs_error_t err = srs_success;

for (int i = 0; i < nn_channels_; i++) {
SrsThreadPipeChannel* channel = &channels_[i];

// Here we're responder, but it's ok to initialize the initiator.
if ((err = channel->initiator()->initialize()) != srs_success) {
return srs_error_wrap(err, "init %d initiator", i);
}
if ((err = channel->responder()->initialize()) != srs_success) {
return srs_error_wrap(err, "init %d responder", i);
}
}

return err;
}

srs_error_t SrsThreadPipeSlot::open_responder(ISrsThreadResponder* h)
{
srs_error_t err = srs_success;

for (int i = 0; i < nn_channels_; i++) {
SrsThreadPipeChannel* channel = &channels_[i];

// We're responder, read from initiator, write to responder.
if ((err = channel->initiator()->open_read()) != srs_success) {
return srs_error_wrap(err, "open read");
}
if ((err = channel->responder()->open_write()) != srs_success) {
return srs_error_wrap(err, "open write");
}

// OK, we start the cycle coroutine for responder.
if ((err = channel->start(h)) != srs_success) {
return srs_error_wrap(err, "start %d consume coroutine", i);
}
}

return err;
}

SrsThreadPipeChannel* SrsThreadPipeSlot::allocate()
{
SrsThreadLocker(lock_);
return index_ < nn_channels_? &channels_[index_++] : NULL;
}

ISrsThreadResponder::ISrsThreadResponder()
{
}

ISrsThreadResponder::~ISrsThreadResponder()
{
}

SrsThreadMutex::SrsThreadMutex()
{
// https://man7.org/linux/man-pages/man3/pthread_mutexattr_init.3.html
Expand Down Expand Up @@ -276,6 +412,7 @@ SrsThreadEntry::SrsThreadEntry()
cpuset_ok = false;

stat = new SrsProcSelfStat();
slot_ = NULL;

received_packets_ = new SrsThreadQueue<SrsUdpMuxSocket>();
cooked_packets_ = new SrsThreadQueue<SrsAsyncSRTPPacket>();
Expand All @@ -286,6 +423,9 @@ SrsThreadEntry::~SrsThreadEntry()
srs_freep(stat);
srs_freep(err);

// TODO: FIXME: Before free slot, we MUST close pipes in threads that open them.
srs_freep(slot_);

srs_freep(received_packets_);
srs_freep(cooked_packets_);

Expand Down
Loading

0 comments on commit 6e2de90

Please sign in to comment.