Skip to content

Commit

Permalink
for #742, refine the object live cycle. 3.0.15
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Jan 17, 2017
1 parent f4c0af8 commit dca9749
Show file tree
Hide file tree
Showing 19 changed files with 282 additions and 222 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ Please select your language:

### V3 changes

* v3.0, 2017-01-17, for [#742][bug #742] refine source, timeout, live cycle. 3.0.15
* v3.0, 2017-01-11, fix [#735][bug #735] config transform refer_publish invalid. 3.0.14
* v3.0, 2017-01-06, for [#730][bug #730] support config in/out ack size. 3.0.13
* v3.0, 2017-01-06, for [#711][bug #711] support perfile for transcode. 3.0.12
Expand Down Expand Up @@ -1365,7 +1366,8 @@ Winlin
[bug #xxxxxxxxxx]: https://github.com/ossrs/srs/issues/xxxxxxxxxx

[bug #735]: https://github.com/ossrs/srs/issues/735
[bug #xxxxxxxxxx]: https://github.com/ossrs/srs/issues/xxxxxxxxxx
[bug #742]: https://github.com/ossrs/srs/issues/742
[bug #xxxxxxxxxxxxx]: https://github.com/ossrs/srs/issues/xxxxxxxxxxxxx

[exo #828]: https://github.com/google/ExoPlayer/pull/828

Expand Down
8 changes: 6 additions & 2 deletions trunk/src/app/srs_app_caster_flv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ int SrsAppCasterFlv::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
SrsDynamicHttpConn::SrsDynamicHttpConn(IConnectionManager* cm, st_netfd_t fd, SrsHttpServeMux* m, string cip)
: SrsHttpConn(cm, fd, m, cip)
{
sdk = new SrsSimpleRtmpClient();
sdk = NULL;
pprint = SrsPithyPrint::create_caster();
}

Expand Down Expand Up @@ -181,9 +181,13 @@ int SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec)
{
int ret = ERROR_SUCCESS;

srs_freep(sdk);

int64_t cto = SRS_CONSTS_RTMP_TIMEOUT_US;
int64_t sto = SRS_CONSTS_RTMP_PULSE_TIMEOUT_US;
if ((ret = sdk->connect(output, cto, sto)) != ERROR_SUCCESS) {
sdk = new SrsSimpleRtmpClient(output, cto / 1000, sto / 1000);

if ((ret = sdk->connect()) != ERROR_SUCCESS) {
srs_error("flv: connect %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", output.c_str(), cto, sto, ret);
return ret;
}
Expand Down
22 changes: 13 additions & 9 deletions trunk/src/app/srs_app_edge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,12 @@ SrsEdgeUpstream::~SrsEdgeUpstream()
SrsEdgeRtmpUpstream::SrsEdgeRtmpUpstream(string r)
{
redirect = r;
sdk = new SrsSimpleRtmpClient();
sdk = NULL;
}

SrsEdgeRtmpUpstream::~SrsEdgeRtmpUpstream()
{
close();

srs_freep(sdk);
}

int SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb)
Expand Down Expand Up @@ -126,9 +124,12 @@ int SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb)
url = srs_generate_rtmp_url(server, port, vhost, req->app, req->stream);
}

srs_freep(sdk);
int64_t cto = SRS_EDGE_INGESTER_TIMEOUT_US;
int64_t sto = SRS_CONSTS_RTMP_PULSE_TIMEOUT_US;
if ((ret = sdk->connect(url, cto, sto)) != ERROR_SUCCESS) {
sdk = new SrsSimpleRtmpClient(url, cto/1000, sto/1000);

if ((ret = sdk->connect()) != ERROR_SUCCESS) {
srs_error("edge pull %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", url.c_str(), cto, sto, ret);
return ret;
}
Expand All @@ -153,7 +154,7 @@ int SrsEdgeRtmpUpstream::decode_message(SrsCommonMessage* msg, SrsPacket** ppack

void SrsEdgeRtmpUpstream::close()
{
sdk->close();
srs_freep(sdk);
}

void SrsEdgeRtmpUpstream::set_recv_timeout(int64_t timeout)
Expand Down Expand Up @@ -406,7 +407,7 @@ SrsEdgeForwarder::SrsEdgeForwarder()
req = NULL;
send_error_code = ERROR_SUCCESS;

sdk = new SrsSimpleRtmpClient();
sdk = NULL;
lb = new SrsLbRoundRobin();
pthread = new SrsReusableThread2("edge-fwr", this, SRS_EDGE_FORWARDER_SLEEP_US);
queue = new SrsMessageQueue();
Expand All @@ -416,7 +417,6 @@ SrsEdgeForwarder::~SrsEdgeForwarder()
{
stop();

srs_freep(sdk);
srs_freep(lb);
srs_freep(pthread);
srs_freep(queue);
Expand Down Expand Up @@ -464,9 +464,12 @@ int SrsEdgeForwarder::start()
}

// open socket.
srs_freep(sdk);
int64_t cto = SRS_EDGE_FORWARDER_TIMEOUT_US;
int64_t sto = SRS_CONSTS_RTMP_TIMEOUT_US;
if ((ret = sdk->connect(url, cto, sto)) != ERROR_SUCCESS) {
sdk = new SrsSimpleRtmpClient(url, cto/1000, sto/1000);

if ((ret = sdk->connect()) != ERROR_SUCCESS) {
srs_warn("edge push %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", url.c_str(), cto, sto, ret);
return ret;
}
Expand All @@ -482,8 +485,9 @@ int SrsEdgeForwarder::start()
void SrsEdgeForwarder::stop()
{
pthread->stop();
sdk->close();
queue->clear();

srs_freep(sdk);
}

#define SYS_MAX_EDGE_SEND_MSGS 128
Expand Down
7 changes: 5 additions & 2 deletions trunk/src/app/srs_app_forward.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ SrsForwarder::SrsForwarder(SrsSource* s)
req = NULL;
sh_video = sh_audio = NULL;

sdk = new SrsSimpleRtmpClient();
sdk = NULL;
pthread = new SrsReusableThread2("forward", this, SRS_FORWARDER_SLEEP_US);
queue = new SrsMessageQueue();
jitter = new SrsRtmpJitter();
Expand Down Expand Up @@ -236,9 +236,12 @@ int SrsForwarder::cycle()
url = srs_generate_rtmp_url(server, port, req->vhost, req->app, req->stream);
}

srs_freep(sdk);
int64_t cto = SRS_FORWARDER_SLEEP_US;
int64_t sto = SRS_CONSTS_RTMP_TIMEOUT_US;
if ((ret = sdk->connect(url, cto, sto)) != ERROR_SUCCESS) {
sdk = new SrsSimpleRtmpClient(url, cto, sto);

if ((ret = sdk->connect()) != ERROR_SUCCESS) {
srs_warn("forward failed, url=%s, cto=%"PRId64", sto=%"PRId64". ret=%d", url.c_str(), cto, sto, ret);
return ret;
}
Expand Down
48 changes: 26 additions & 22 deletions trunk/src/app/srs_app_http_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ using namespace std;

SrsHttpClient::SrsHttpClient()
{
transport = new SrsTcpClient();
transport = NULL;
kbps = new SrsKbps();
parser = NULL;
timeout_us = 0;
Expand All @@ -52,19 +52,14 @@ SrsHttpClient::~SrsHttpClient()
disconnect();

srs_freep(kbps);
srs_freep(transport);
srs_freep(parser);
}

// TODO: FIXME: use ms for timeout.
int SrsHttpClient::initialize(string h, int p, int64_t t_us)
{
int ret = ERROR_SUCCESS;

// disconnect first when h:p changed.
if ((!host.empty() && host != h) || (port != 0 && port != p)) {
disconnect();
}

srs_freep(parser);
parser = new SrsHttpParser();

Expand All @@ -73,17 +68,19 @@ int SrsHttpClient::initialize(string h, int p, int64_t t_us)
return ret;
}

// Always disconnect the transport.
host = h;
port = p;
timeout_us = t_us;
disconnect();

// ep used for host in header.
string ep = host;
if (port > 0 && port != SRS_CONSTS_HTTP_DEFAULT_PORT) {
ep += ":" + srs_int2str(port);
}

// set default value for headers.
// Set default value for headers.
headers["Host"] = ep;
headers["Connection"] = "Keep-Alive";
headers["User-Agent"] = RTMP_SIG_SRS_SERVER;
Expand Down Expand Up @@ -126,9 +123,8 @@ int SrsHttpClient::post(string path, string req, ISrsHttpMessage** ppmsg)

std::string data = ss.str();
if ((ret = transport->write((void*)data.c_str(), data.length(), NULL)) != ERROR_SUCCESS) {
// disconnect when error.
// Disconnect the transport when channel error, reconnect for next operation.
disconnect();

srs_error("write http post failed. ret=%d", ret);
return ret;
}
Expand All @@ -138,9 +134,13 @@ int SrsHttpClient::post(string path, string req, ISrsHttpMessage** ppmsg)
srs_error("parse http post response failed. ret=%d", ret);
return ret;
}

srs_assert(msg);
*ppmsg = msg;

if (ppmsg) {
*ppmsg = msg;
} else {
srs_freep(msg);
}
srs_info("parse http post response success.");

return ret;
Expand Down Expand Up @@ -173,9 +173,8 @@ int SrsHttpClient::get(string path, string req, ISrsHttpMessage** ppmsg)

std::string data = ss.str();
if ((ret = transport->write((void*)data.c_str(), data.length(), NULL)) != ERROR_SUCCESS) {
// disconnect when error.
// Disconnect the transport when channel error, reconnect for next operation.
disconnect();

srs_error("write http get failed. ret=%d", ret);
return ret;
}
Expand All @@ -187,7 +186,11 @@ int SrsHttpClient::get(string path, string req, ISrsHttpMessage** ppmsg)
}
srs_assert(msg);

*ppmsg = msg;
if (ppmsg) {
*ppmsg = msg;
} else {
srs_freep(msg);
}
srs_info("parse http get response success.");

return ret;
Expand Down Expand Up @@ -215,23 +218,24 @@ void SrsHttpClient::kbps_sample(const char* label, int64_t age)
void SrsHttpClient::disconnect()
{
kbps->set_io(NULL, NULL);

transport->close();
srs_freep(transport);
}

int SrsHttpClient::connect()
{
int ret = ERROR_SUCCESS;

if (transport->connected()) {
// When transport connected, ignore.
if (transport) {
return ret;
}

disconnect();

// open socket.
if ((ret = transport->connect(host, port, timeout_us)) != ERROR_SUCCESS) {
srs_warn("http client failed, server=%s, port=%d, timeout=%"PRId64", ret=%d",
host.c_str(), port, timeout_us, ret);
transport = new SrsTcpClient(host, port, timeout_us / 1000);
if ((ret = transport->connect()) != ERROR_SUCCESS) {
disconnect();
srs_warn("http client failed, server=%s, port=%d, timeout=%"PRId64", ret=%d", host.c_str(), port, timeout_us, ret);
return ret;
}
srs_info("connect to server success. server=%s, port=%d", host.c_str(), port);
Expand Down
17 changes: 13 additions & 4 deletions trunk/src/app/srs_app_http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,19 @@ class SrsKbps;
#define SRS_HTTP_CLIENT_TIMEOUT_US (int64_t)(30*1000*1000LL)

/**
* http client to GET/POST/PUT/DELETE uri
*/
* The client to GET/POST/PUT/DELETE over HTTP.
* @remark We will reuse the TCP transport until initialize or channel error,
* such as send/recv failed.
* Usage:
* SrsHttpClient hc;
* hc.initialize("127.0.0.1", 80, 9000);
* hc.post("/api/v1/version", "Hello world!", NULL);
*/
class SrsHttpClient
{
private:
// The underlayer TCP transport, set to NULL when disconnect, or never not NULL when connected.
// We will disconnect transport when initialize or channel error, such as send/recv error.
SrsTcpClient* transport;
SrsHttpParser* parser;
std::map<std::string, std::string> headers;
Expand All @@ -65,12 +73,13 @@ class SrsHttpClient
virtual ~SrsHttpClient();
public:
/**
* initialize the client, connect to host and port.
* Initliaze the client, disconnect the transport, renew the HTTP parser.
* @remark we will set default values in headers, which can be override by set_header.
*/
virtual int initialize(std::string h, int p, int64_t t_us = SRS_HTTP_CLIENT_TIMEOUT_US);
/**
* set the header[k]=v and return the client itself.
* Set HTTP request header in header[k]=v.
* @return the HTTP client itself.
*/
virtual SrsHttpClient* set_header(std::string k, std::string v);
public:
Expand Down
37 changes: 22 additions & 15 deletions trunk/src/app/srs_app_kafka.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,13 @@ SrsKafkaPartition::SrsKafkaPartition()
id = broker = 0;
port = SRS_CONSTS_KAFKA_DEFAULT_PORT;

transport = new SrsTcpClient();
kafka = new SrsKafkaClient(transport);
transport = NULL;
kafka = NULL;
}

SrsKafkaPartition::~SrsKafkaPartition()
{
srs_freep(kafka);
srs_freep(transport);
disconnect();
}

string SrsKafkaPartition::hostport()
Expand All @@ -158,13 +157,15 @@ int SrsKafkaPartition::connect()
{
int ret = ERROR_SUCCESS;

if (transport->connected()) {
if (transport) {
return ret;
}
transport = new SrsTcpClient(host, port, SRS_KAFKA_PRODUCER_TIMEOUT);
kafka = new SrsKafkaClient(transport);

int64_t timeout = SRS_KAFKA_PRODUCER_TIMEOUT * 1000;
if ((ret = transport->connect(host, port, timeout)) != ERROR_SUCCESS) {
srs_error("connect to %s partition=%d failed, timeout=%"PRId64". ret=%d", hostport().c_str(), id, timeout, ret);
if ((ret = transport->connect()) != ERROR_SUCCESS) {
disconnect();
srs_error("connect to %s partition=%d failed. ret=%d", hostport().c_str(), id, ret);
return ret;
}

Expand All @@ -178,6 +179,12 @@ int SrsKafkaPartition::flush(SrsKafkaPartitionCache* pc)
return kafka->write_messages(topic, id, *pc);
}

void SrsKafkaPartition::disconnect()
{
srs_freep(kafka);
srs_freep(transport);
}

SrsKafkaMessage::SrsKafkaMessage(SrsKafkaProducer* p, int k, SrsJsonObject* j)
{
producer = p;
Expand Down Expand Up @@ -562,12 +569,6 @@ int SrsKafkaProducer::request_metadata()
return ret;
}

SrsTcpClient* transport = new SrsTcpClient();
SrsAutoFree(SrsTcpClient, transport);

SrsKafkaClient* kafka = new SrsKafkaClient(transport);
SrsAutoFree(SrsKafkaClient, kafka);

std::string server;
int port = SRS_CONSTS_KAFKA_DEFAULT_PORT;
if (true) {
Expand All @@ -584,8 +585,14 @@ int SrsKafkaProducer::request_metadata()
senabled.c_str(), sbrokers.c_str(), lb->current(), server.c_str(), port, topic.c_str());
}

SrsTcpClient* transport = new SrsTcpClient(server, port, SRS_CONSTS_KAFKA_TIMEOUT_US / 1000);
SrsAutoFree(SrsTcpClient, transport);

SrsKafkaClient* kafka = new SrsKafkaClient(transport);
SrsAutoFree(SrsKafkaClient, kafka);

// reconnect to kafka server.
if ((ret = transport->connect(server, port, SRS_CONSTS_KAFKA_TIMEOUT_US)) != ERROR_SUCCESS) {
if ((ret = transport->connect()) != ERROR_SUCCESS) {
srs_error("kafka connect %s:%d failed. ret=%d", server.c_str(), port, ret);
return ret;
}
Expand Down
Loading

0 comments on commit dca9749

Please sign in to comment.