Skip to content

Commit

Permalink
Threads-Hybrid: Schedule connection to the sample hybrid by url
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Apr 26, 2021
1 parent eab4f89 commit 97f6684
Show file tree
Hide file tree
Showing 9 changed files with 51 additions and 17 deletions.
6 changes: 3 additions & 3 deletions trunk/src/app/srs_app_hybrid.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -437,9 +437,9 @@ srs_error_t SrsHybridServer::on_thread_message(SrsThreadMessage* msg, SrsThreadP
// TODO: FIXME: Response timeout if error?
// TODO: FIXME: Response a different message? With trace ID?
// We're responder, write response to responder.
srs_error_t r0 = channel->responder()->write(msg, sizeof(SrsThreadMessage), NULL);
if (r0 != srs_success) {
srs_freep(r0); // Ignore any error.
err = channel->responder()->write(msg, sizeof(SrsThreadMessage), NULL);
if (err != srs_success) {
return srs_error_wrap(err, "response");
}
}

Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_rtc_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ srs_error_t SrsGoApiRtcPlay::exchange_sdp(SrsRequest* req, const SrsSdp& remote_

uint32_t SrsGoApiRtcPublish::ssrc_num = 0;

SrsGoApiRtcPublish::SrsGoApiRtcPublish(SrsRtcServer* server)
SrsGoApiRtcPublish::SrsGoApiRtcPublish(ISrsRtcServer* server)
{
server_ = server;
}
Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_rtc_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ class SrsGoApiRtcPublish : public ISrsHttpHandler
public:
static uint32_t ssrc_num;
private:
SrsRtcServer* server_;
ISrsRtcServer* server_;
public:
SrsGoApiRtcPublish(SrsRtcServer* server);
SrsGoApiRtcPublish(ISrsRtcServer* server);
virtual ~SrsGoApiRtcPublish();
public:
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
Expand Down
6 changes: 3 additions & 3 deletions trunk/src/app/srs_app_rtc_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ srs_error_t SrsRtcServer::listen_udp()
return err;
}

int port = _srs_config->get_rtc_server_listen();
int port = _srs_config->get_rtc_server_listen(_srs_hybrid->stream_index());
if (port <= 0) {
return srs_error_new(ERROR_RTC_PORT, "invalid port=%d", port);
}
Expand Down Expand Up @@ -571,15 +571,15 @@ srs_error_t SrsRtcServer::do_create_session(
// We allows to mock the eip of server.
if (!mock_eip.empty()) {
string host;
int port = _srs_config->get_rtc_server_listen();
int port = _srs_config->get_rtc_server_listen(_srs_hybrid->stream_index());
srs_parse_hostport(mock_eip, host, port);

local_sdp.add_candidate(host, port, "host");
srs_trace("RTC: Use candidate mock_eip %s as %s:%d", mock_eip.c_str(), host.c_str(), port);
} else {
std::vector<string> candidate_ips = get_candidate_ips();
for (int i = 0; i < (int)candidate_ips.size(); ++i) {
local_sdp.add_candidate(candidate_ips[i], _srs_config->get_rtc_server_listen(), "host");
local_sdp.add_candidate(candidate_ips[i], _srs_config->get_rtc_server_listen(_srs_hybrid->stream_index()), "host");
}
srs_trace("RTC: Use candidates %s", srs_join_vector_string(candidate_ips, ", ").c_str());
}
Expand Down
27 changes: 22 additions & 5 deletions trunk/src/app/srs_app_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1975,14 +1975,13 @@ srs_error_t SrsApiServer::listen_api()
{
srs_error_t err = srs_success;

// TODO: FIXME: Implements it.
if ((err = http_api_mux_->handle("/rtc/v1/play/", new SrsGoApiRtcPlay(this))) != srs_success) {
return srs_error_wrap(err, "handle play");
}

//if ((err = http_api_mux_->handle("/rtc/v1/publish/", new SrsGoApiRtcPublish(this))) != srs_success) {
// return srs_error_wrap(err, "handle publish");
//}
if ((err = http_api_mux_->handle("/rtc/v1/publish/", new SrsGoApiRtcPublish(this))) != srs_success) {
return srs_error_wrap(err, "handle publish");
}

#ifdef SRS_SIMULATOR
// TODO: FIXME: Implements it.
Expand All @@ -2001,9 +2000,22 @@ srs_error_t SrsApiServer::create_session(
) {
srs_error_t err = srs_success;

// Serve all connections of a stream, which identified by url, by the same hybrid thread.
string url = req->get_stream_url();
SrsThreadEntry* hybrid = NULL;
if (true) {
map<string, SrsThreadEntry*>::iterator it = hybrids_.find(url);
if (it == hybrids_.end()) {
static int index = 0;
vector<SrsThreadEntry*> hybrids = _srs_thread_pool->hybrids();
hybrids_[url] = hybrid = hybrids[(index++) % (int)hybrids.size()];
} else {
hybrid = it->second;
}
}

// Allocate slot to communicate with hybrid thread.
SrsThreadEntry* self = _srs_thread_pool->self();
SrsThreadEntry* hybrid = _srs_thread_pool->hybrid();
srs_assert(self && hybrid);

SrsThreadPipeChannel* channel = NULL;
Expand Down Expand Up @@ -2055,6 +2067,11 @@ srs_error_t SrsApiServer::create_session(
// TODO: FIMXE: Should never return it, for it's not thread-safe.
*psession = s.session;

// TODO: FIXME: Shoule return detail error by channel.
if (!s.session) {
return srs_error_new(ERROR_PIPE_READ, "no session");
}

return err;
}

Expand Down
4 changes: 4 additions & 0 deletions trunk/src/app/srs_app_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include <vector>
#include <string>
#include <map>

#include <srs_app_st.hpp>
#include <srs_app_reload.hpp>
Expand Down Expand Up @@ -423,6 +424,9 @@ class SrsApiServer : public ISrsTcpMuxHandler, public ISrsResourceManager, publi
SrsBufferListener* https_;
SrsHttpServeMux* http_api_mux_;
SrsResourceManager* conn_manager_;
private:
// Key is stream url, value is hybrid thread entry.
std::map<std::string, SrsThreadEntry*> hybrids_;
public:
SrsApiServer();
virtual ~SrsApiServer();
Expand Down
16 changes: 13 additions & 3 deletions trunk/src/app/srs_app_threads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,16 @@ srs_error_t SrsThreadPipeChannel::cycle()
// Here we're responder, read from initiator.
SrsThreadMessage m;
if ((err = initiator_->read(&m, sizeof(m), NULL)) != srs_success) {
return srs_error_wrap(err, "read");
srs_warn("read err %s", srs_error_desc(err).c_str());
srs_freep(err); // Ignore any error.
continue;
}

// Consume the message, the responder can write response to responder.
if (handler_ && (err = handler_->on_thread_message(&m, this)) != srs_success) {
return srs_error_wrap(err, "consume");
srs_warn("consume err %s", srs_error_desc(err).c_str());
srs_freep(err); // Ignore any error.
continue;
}
}

Expand Down Expand Up @@ -604,8 +608,9 @@ srs_error_t SrsThreadPool::execute(string label, srs_error_t (*start)(void* arg)
SrsThreadEntry* entry = new SrsThreadEntry();

// Update the hybrid thread entry for circuit breaker.
if (label == "hybrid" && !hybrid_) {
if (label == "hybrid") {
hybrid_ = entry;
hybrids_.push_back(entry);
}

// To protect the threads_ for executing thread-safe.
Expand Down Expand Up @@ -808,6 +813,11 @@ SrsThreadEntry* SrsThreadPool::hybrid()
return hybrid_;
}

vector<SrsThreadEntry*> SrsThreadPool::hybrids()
{
return hybrids_;
}

void* SrsThreadPool::start(void* arg)
{
srs_error_t err = srs_success;
Expand Down
2 changes: 2 additions & 0 deletions trunk/src/app/srs_app_threads.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ class SrsThreadPool
private:
// The hybrid server entry, the cpu percent used for circuit breaker.
SrsThreadEntry* hybrid_;
std::vector<SrsThreadEntry*> hybrids_;
// Reset the water-level when CPU is low for N times.
// @note To avoid the CPU change rapidly.
int hybrid_high_water_level_;
Expand Down Expand Up @@ -358,6 +359,7 @@ class SrsThreadPool
public:
SrsThreadEntry* self();
SrsThreadEntry* hybrid();
std::vector<SrsThreadEntry*> hybrids();
private:
static void* start(void* arg);
};
Expand Down
1 change: 1 addition & 0 deletions trunk/src/main/srs_main_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,7 @@ srs_error_t run_hybrid_server(void* arg)
// The config index for hybrid/stream server.
int stream_index = (int)(uint64_t)arg;
_srs_hybrid->set_stream_index(stream_index);
srs_assert(_srs_hybrid->stream_index() >= 0);

// Create servers and register them.
_srs_hybrid->register_server(new SrsServerAdapter());
Expand Down

0 comments on commit 97f6684

Please sign in to comment.