Skip to content

Commit

Permalink
Fix #1031, Always use vhost in stream query, the unify uri. 3.0.35
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Aug 2, 2018
1 parent 8cf5df5 commit 68a1656
Show file tree
Hide file tree
Showing 18 changed files with 102 additions and 159 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ Please select according to languages:

### V3 changes

* v3.0, 2018-08-02, Always use vhost in stream query, the unify uri. 3.0.35
* v3.0, 2018-08-02, For [#1031][bug #1031], SRS edge support douyu.com. 3.0.34
* v3.0, 2018-07-22, Replace hex to string to match MIT license. 3.0.33
* v3.0, 2018-07-22, Replace base64 to match MIT license. 3.0.32
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_caster_flv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ srs_error_t SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecod
return srs_error_wrap(err, "connect %s failed, cto=%" PRId64 ", sto=%" PRId64, output.c_str(), cto, sto);
}

if ((err = sdk->publish()) != srs_success) {
if ((err = sdk->publish(SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE)) != srs_success) {
return srs_error_wrap(err, "publish");
}

Expand Down
16 changes: 4 additions & 12 deletions trunk/src/app/srs_app_edge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ srs_error_t SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb)
std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost);
vhost = srs_string_replace(vhost, "[vhost]", req->vhost);

url = srs_generate_rtmp_url(server, port, vhost, req->app, req->stream);
url = srs_generate_rtmp_url(server, port, req->host, vhost, req->app, req->stream, req->param);
}

srs_freep(sdk);
Expand All @@ -122,7 +122,7 @@ srs_error_t SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb)
return srs_error_wrap(err, "edge pull %s failed, cto=%" PRId64 ", sto=%" PRId64, url.c_str(), cto, sto);
}

if ((err = sdk->play()) != srs_success) {
if ((err = sdk->play(_srs_config->get_chunk_size(req->vhost))) != srs_success) {
return srs_error_wrap(err, "edge pull %s stream failed", url.c_str());
}

Expand Down Expand Up @@ -469,15 +469,7 @@ srs_error_t SrsEdgeForwarder::start()
std::string vhost = _srs_config->get_vhost_edge_transform_vhost(req->vhost);
vhost = srs_string_replace(vhost, "[vhost]", req->vhost);

url = srs_generate_rtmp_url(server, port, vhost, req->app, req->stream);
}

// Pass params in stream, @see https://github.com/ossrs/srs/issues/1031#issuecomment-409745733
if (!req->param.empty()) {
if (req->param.find("?") != 0) {
url += "?";
}
url += req->param;
url = srs_generate_rtmp_url(server, port, req->host, vhost, req->app, req->stream, req->param);
}

// open socket.
Expand All @@ -490,7 +482,7 @@ srs_error_t SrsEdgeForwarder::start()
return srs_error_wrap(err, "sdk connect %s failed, cto=%" PRId64 ", sto=%" PRId64, url.c_str(), cto, sto);
}

if ((err = sdk->publish()) != srs_success) {
if ((err = sdk->publish(_srs_config->get_chunk_size(req->vhost))) != srs_success) {
return srs_error_wrap(err, "sdk publish");
}

Expand Down
40 changes: 2 additions & 38 deletions trunk/src/app/srs_app_forward.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,42 +94,6 @@ srs_error_t SrsForwarder::on_publish()
{
srs_error_t err = srs_success;

// discovery the server port and tcUrl from req and ep_forward.
std::string server;
std::string tcUrl;
int port = SRS_CONSTS_RTMP_DEFAULT_PORT;
if (true) {
// parse host:port from hostport.
srs_parse_hostport(ep_forward, server, port);

// generate tcUrl
tcUrl = srs_generate_tc_url(server, req->vhost, req->app, port, req->param);
}

// dead loop check
std::string source_ep = "rtmp://";
source_ep += req->host;
source_ep += ":";
source_ep += req->port;
source_ep += "?vhost=";
source_ep += req->vhost;

std::string dest_ep = "rtmp://";
if (ep_forward == SRS_CONSTS_LOCALHOST) {
dest_ep += req->host;
} else {
dest_ep += server;
}
dest_ep += ":";
dest_ep += port;
dest_ep += "?vhost=";
dest_ep += req->vhost;

if (source_ep == dest_ep) {
return srs_error_new(ERROR_SYSTEM_FORWARD_LOOP, "forward loop detected. src=%s, dest=%s", source_ep.c_str(), dest_ep.c_str());
}
srs_trace("start forward %s to %s, tcUrl=%s, stream=%s", source_ep.c_str(), dest_ep.c_str(), tcUrl.c_str(), req->stream.c_str());

srs_freep(trd);
trd = new SrsSTCoroutine("forward", this);
if ((err = trd->start()) != srs_success) {
Expand Down Expand Up @@ -245,7 +209,7 @@ srs_error_t SrsForwarder::do_cycle()
srs_parse_hostport(ep_forward, server, port);

// generate url
url = srs_generate_rtmp_url(server, port, req->vhost, req->app, req->stream);
url = srs_generate_rtmp_url(server, port, req->host, req->vhost, req->app, req->stream, req->param);
}

srs_freep(sdk);
Expand All @@ -257,7 +221,7 @@ srs_error_t SrsForwarder::do_cycle()
return srs_error_wrap(err, "sdk connect url=%s, cto=%" PRId64 ", sto=%" PRId64, url.c_str(), cto, sto);
}

if ((err = sdk->publish()) != srs_success) {
if ((err = sdk->publish(_srs_config->get_chunk_size(req->vhost))) != srs_success) {
return srs_error_wrap(err, "sdk publish");
}

Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_mpegts_udp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ srs_error_t SrsMpegtsOverUdp::connect()
return srs_error_wrap(err, "connect %s failed, cto=%" PRId64 ", sto=%" PRId64, output.c_str(), cto, sto);
}

if ((err = sdk->publish()) != srs_success) {
if ((err = sdk->publish(SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE)) != srs_success) {
close();
return srs_error_wrap(err, "publish");
}
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_ng_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ string SrsNgExec::parse(SrsRequest* req, string tmpl)
output = srs_string_replace(output, "[pageUrl]", req->pageUrl);

if (output.find("[url]") != string::npos) {
string url = srs_generate_rtmp_url(req->host, req->port, req->vhost, req->app, req->stream);
string url = srs_generate_rtmp_url(req->host, req->port, req->host, req->vhost, req->app, req->stream, req->param);
output = srs_string_replace(output, "[url]", url);
}

Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_rtsp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ srs_error_t SrsRtspConn::connect()
}

// publish.
if ((err = sdk->publish()) != srs_success) {
if ((err = sdk->publish(SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE)) != srs_success) {
close();
return srs_error_wrap(err, "publish %s failed", url.c_str());
}
Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1482,8 +1482,8 @@ srs_error_t SrsOriginHub::create_forwarders()
}

// TODO: FIXME: support queue size.
//double queue_size = _srs_config->get_queue_length(req->vhost);
//forwarder->set_queue_size(queue_size);
double queue_size = _srs_config->get_queue_length(req->vhost);
forwarder->set_queue_size(queue_size);

if ((err = forwarder->on_publish()) != srs_success) {
return srs_error_wrap(err, "start forwarder failed, vhost=%s, app=%s, stream=%s, forward-to=%s",
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/core/srs_core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
// current release version
#define VERSION_MAJOR 3
#define VERSION_MINOR 0
#define VERSION_REVISION 34
#define VERSION_REVISION 35

// generated by configure, only macros.
#include <srs_auto_headers.hpp>
Expand Down
18 changes: 10 additions & 8 deletions trunk/src/libs/srs_librtmp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -750,16 +750,12 @@ extern "C"{

string tcUrl;
switch(context->schema) {
// For SRS3, only use one format url.
case srs_url_schema_normal:
tcUrl=srs_generate_normal_tc_url(context->ip, context->vhost, context->app, context->port, context->param);
break;
case srs_url_schema_via:
tcUrl=srs_generate_via_tc_url(context->ip, context->vhost, context->app, context->port, context->param);
break;
case srs_url_schema_vis:
case srs_url_schema_vis2:
tcUrl=srs_generate_vis_tc_url(context->ip, context->vhost, context->app, context->port, context->param);
break;
tcUrl = srs_generate_tc_url(context->ip, context->vhost, context->app, context->port);
default:
break;
}
Expand Down Expand Up @@ -823,7 +819,10 @@ extern "C"{
return ret;
}

if ((err = context->rtmp->play(context->stream, context->stream_id)) != srs_success) {
// Pass params in stream, @see https://github.com/ossrs/srs/issues/1031#issuecomment-409745733
string stream = srs_generate_stream_with_query(context->host, context->vhost, context->stream, context->param);

if ((err = context->rtmp->play(stream, context->stream_id, SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE)) != srs_success) {
ret = srs_error_code(err);
srs_freep(err);
return ret;
Expand All @@ -840,7 +839,10 @@ extern "C"{
srs_assert(rtmp != NULL);
Context* context = (Context*)rtmp;

if ((err = context->rtmp->fmle_publish(context->stream, context->stream_id)) != srs_success) {
// Pass params in stream, @see https://github.com/ossrs/srs/issues/1031#issuecomment-409745733
string stream = srs_generate_stream_with_query(context->host, context->vhost, context->stream, context->param);

if ((err = context->rtmp->fmle_publish(stream, context->stream_id)) != srs_success) {
ret = srs_error_code(err);
srs_freep(err);
return ret;
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/main/srs_main_ingest_hls.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1289,7 +1289,7 @@ int SrsIngestHlsOutput::connect()
}

// publish.
if ((err = sdk->publish()) != srs_success) {
if ((err = sdk->publish(SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE)) != srs_success) {
// TODO: FIXME: Use error
ret = srs_error_code(err);
srs_freep(err);
Expand Down
78 changes: 42 additions & 36 deletions trunk/src/protocol/srs_protocol_utility.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,43 +151,59 @@ void srs_random_generate(char* bytes, int size)
}
}

string srs_generate_tc_url(string ip, string vhost, string app, int port, string param)
string srs_generate_tc_url(string host, string vhost, string app, int port)
{
string tcUrl = "rtmp://";

if (vhost == SRS_CONSTS_RTMP_DEFAULT_VHOST) {
tcUrl += ip;
tcUrl += host;
} else {
tcUrl += vhost;
}

if (port != SRS_CONSTS_RTMP_DEFAULT_PORT) {
tcUrl += ":";
tcUrl += srs_int2str(port);
tcUrl += ":" + srs_int2str(port);
}

tcUrl += "/";
tcUrl += app;
if (!param.empty()) {
tcUrl += "?" + param;
}
tcUrl += "/" + app;

return tcUrl;
}

string srs_generate_normal_tc_url(string ip, string vhost, string app, int port, string param)
{
return "rtmp://" + vhost + ":" + srs_int2str(port) + "/" + app + (param.empty() ? "" : "?" + param);
}

string srs_generate_via_tc_url(string ip, string vhost, string app, int port, string param)
{
return "rtmp://" + ip + ":" + srs_int2str(port) + "/" + vhost + "/" + app + (param.empty() ? "" : "?" + param);
}

string srs_generate_vis_tc_url(string ip, string vhost, string app, int port, string param)
string srs_generate_stream_with_query(string host, string vhost, string stream, string param)
{
return "rtmp://" + ip + ":" + srs_int2str(port) + "/" + app + (param.empty() ? "" : "?" + param);
string url = stream;
string query = param;

// If no vhost in param, try to append one.
string guessVhost;
if (query.find("vhost=") == string::npos) {
if (vhost != SRS_CONSTS_RTMP_DEFAULT_VHOST) {
guessVhost = vhost;
} else if (!srs_is_ipv4(host)) {
guessVhost = host;
}
}

// Well, if vhost exists, always append in query string.
if (!guessVhost.empty()) {
query += "&vhost=" + guessVhost;
}

// Remove the start & when param is empty.
srs_string_trim_start(query, "&");

// Prefix query with ?.
if (!srs_string_starts_with(query, "?")) {
url += "?";
}

// Append query to url.
if (!query.empty()) {
url += query;
}

return url;
}

template<typename T>
Expand Down Expand Up @@ -287,22 +303,12 @@ void srs_parse_rtmp_url(string url, string& tcUrl, string& stream)
}
}

string srs_generate_rtmp_url(string server, int port, string vhost, string app, string stream)
string srs_generate_rtmp_url(string server, int port, string host, string vhost, string app, string stream, string param)
{
std::stringstream ss;

ss << "rtmp://" << server << ":" << std::dec << port << "/" << app;

// when default or server is vhost, donot specifies the vhost in params.
if (SRS_CONSTS_RTMP_DEFAULT_VHOST != vhost && server != vhost) {
ss << "...vhost..." << vhost;
}

if (!stream.empty()) {
ss << "/" << stream;
}

return ss.str();
string tcUrl = "rtmp://" + server + ":" + srs_int2str(port) + "/" + app;
string streamWithQuery = srs_generate_stream_with_query(host, vhost, stream, param);
string url = tcUrl + "/" + streamWithQuery;
return url;
}

srs_error_t srs_write_large_iovs(ISrsProtocolReaderWriter* skt, iovec* iovs, int size, ssize_t* pnwrite)
Expand Down
32 changes: 12 additions & 20 deletions trunk/src/protocol/srs_protocol_utility.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,28 +71,16 @@ extern void srs_parse_query_string(std::string q, std::map<std::string, std::str
extern void srs_random_generate(char* bytes, int size);

/**
* generate the tcUrl.
* @param param, the app parameters in tcUrl. for example, ?key=xxx,vhost=xxx
* @return the tcUrl generated from ip/vhost/app/port.
* @remark when vhost equals to __defaultVhost__, use ip as vhost.
* @remark ignore port if port equals to default port 1935.
* generate the tcUrl without param.
* @remark Use host as tcUrl.vhost if vhost is default vhost.
*/
extern std::string srs_generate_tc_url(std::string ip, std::string vhost, std::string app, int port, std::string param);
extern std::string srs_generate_tc_url(std::string host, std::string vhost, std::string app, int port);

/**
* srs_detect_tools generate the normal tcUrl
* Generate the stream with param.
* @remark Append vhost in query string if not default vhost.
*/
extern std::string srs_generate_normal_tc_url(std::string ip, std::string vhost, std::string app, int port, std::string param);

/**
* srs_detect_tools generate the normal tcUrl
*/
extern std::string srs_generate_via_tc_url(std::string ip, std::string vhost, std::string app, int port, std::string param);

/**
* srs_detect_tools generate the vis/vis2 tcUrl
*/
extern std::string srs_generate_vis_tc_url(std::string ip, std::string vhost, std::string app, int port, std::string param);
extern std::string srs_generate_stream_with_query(std::string host, std::string vhost, std::string stream, std::string param);

/**
* create shared ptr message from bytes.
Expand All @@ -111,14 +99,18 @@ extern std::string srs_generate_stream_url(std::string vhost, std::string app, s
// stream: livestream
extern void srs_parse_rtmp_url(std::string url, std::string& tcUrl, std::string& stream);

// genereate the rtmp url, for instance, rtmp://server:port/app...vhost...vhost/stream
extern std::string srs_generate_rtmp_url(std::string server, int port, std::string vhost, std::string app, std::string stream);
// Genereate the rtmp url, for instance, rtmp://server:port/app/stream?param
// @remark We always put vhost in param, in the query of url.
extern std::string srs_generate_rtmp_url(std::string server, int port, std::string host, std::string vhost, std::string app, std::string stream, std::string param);

// write large numbers of iovs.
extern srs_error_t srs_write_large_iovs(ISrsProtocolReaderWriter* skt, iovec* iovs, int size, ssize_t* pnwrite = NULL);

// join string in vector with indicated separator
extern std::string srs_join_vector_string(std::vector<std::string>& vs, std::string separator);

// Whether domain is an IPv4 address.
extern bool srs_is_ipv4(std::string domain);

#endif

Loading

0 comments on commit 68a1656

Please sign in to comment.