Skip to content

Commit

Permalink
for #742, use ms for application clock tbn.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Jan 17, 2017
1 parent dca9749 commit 3fe338d
Show file tree
Hide file tree
Showing 43 changed files with 437 additions and 435 deletions.
6 changes: 3 additions & 3 deletions trunk/src/app/srs_app_async_call.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ using namespace std;
#include <srs_kernel_error.hpp>
#include <srs_kernel_log.hpp>

// the sleep interval for http async callback.
#define SRS_AUTO_ASYNC_CALLBACL_SLEEP_US 300000
// the sleep interval in ms for http async callback.
#define SRS_AUTO_ASYNC_CALLBACL_CIMS 30

ISrsAsyncCallTask::ISrsAsyncCallTask()
{
Expand All @@ -41,7 +41,7 @@ ISrsAsyncCallTask::~ISrsAsyncCallTask()

SrsAsyncCallWorker::SrsAsyncCallWorker()
{
pthread = new SrsReusableThread("async", this, SRS_AUTO_ASYNC_CALLBACL_SLEEP_US);
pthread = new SrsReusableThread("async", this, SRS_AUTO_ASYNC_CALLBACL_CIMS);
wait = st_cond_new();
}

Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_bandwidth.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ int SrsBandwidth::do_bandwidth_check(SrsKbpsLimit* limit)
SrsBandwidthSample publish_sample;

// timeout for a packet.
_rtmp->set_send_timeout(play_sample.duration_ms * 1000 * 2);
_rtmp->set_recv_timeout(publish_sample.duration_ms * 1000 * 2);
_rtmp->set_send_timeout(play_sample.duration_ms * 2);
_rtmp->set_recv_timeout(publish_sample.duration_ms * 2);

// start test.
srs_update_system_time_ms();
Expand Down
6 changes: 3 additions & 3 deletions trunk/src/app/srs_app_caster_flv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ int SrsDynamicHttpConn::do_proxy(ISrsHttpResponseReader* rr, SrsFlvDecoder* dec)

srs_freep(sdk);

int64_t cto = SRS_CONSTS_RTMP_TIMEOUT_US;
int64_t sto = SRS_CONSTS_RTMP_PULSE_TIMEOUT_US;
sdk = new SrsSimpleRtmpClient(output, cto / 1000, sto / 1000);
int64_t cto = SRS_CONSTS_RTMP_TMMS;
int64_t sto = SRS_CONSTS_RTMP_PULSE_TMMS;
sdk = new SrsSimpleRtmpClient(output, cto, sto);

if ((ret = sdk->connect()) != ERROR_SUCCESS) {
srs_error("flv: connect %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", output.c_str(), cto, sto, ret);
Expand Down
37 changes: 17 additions & 20 deletions trunk/src/app/srs_app_edge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,16 @@ using namespace std;
#include <srs_app_rtmp_conn.hpp>

// when error, edge ingester sleep for a while and retry.
#define SRS_EDGE_INGESTER_SLEEP_US (int64_t)(3*1000*1000LL)
#define SRS_EDGE_INGESTER_CIMS (3*1000)

// when edge timeout, retry next.
#define SRS_EDGE_INGESTER_TIMEOUT_US (int64_t)(5*1000*1000LL)
#define SRS_EDGE_INGESTER_TMMS (5*1000)

// when error, edge ingester sleep for a while and retry.
#define SRS_EDGE_FORWARDER_SLEEP_US (int64_t)(3*1000*1000LL)

// when edge timeout, retry next.
#define SRS_EDGE_FORWARDER_TIMEOUT_US (int64_t)(5*1000*1000LL)
#define SRS_EDGE_FORWARDER_CIMS (3*1000)

// when edge error, wait for quit
#define SRS_EDGE_FORWARDER_ERROR_US (int64_t)(50*1000LL)
#define SRS_EDGE_FORWARDER_TMMS (150)

SrsEdgeUpstream::SrsEdgeUpstream()
{
Expand Down Expand Up @@ -125,9 +122,9 @@ int SrsEdgeRtmpUpstream::connect(SrsRequest* r, SrsLbRoundRobin* lb)
}

srs_freep(sdk);
int64_t cto = SRS_EDGE_INGESTER_TIMEOUT_US;
int64_t sto = SRS_CONSTS_RTMP_PULSE_TIMEOUT_US;
sdk = new SrsSimpleRtmpClient(url, cto/1000, sto/1000);
int64_t cto = SRS_EDGE_INGESTER_TMMS;
int64_t sto = SRS_CONSTS_RTMP_PULSE_TMMS;
sdk = new SrsSimpleRtmpClient(url, cto, sto);

if ((ret = sdk->connect()) != ERROR_SUCCESS) {
srs_error("edge pull %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", url.c_str(), cto, sto, ret);
Expand Down Expand Up @@ -157,9 +154,9 @@ void SrsEdgeRtmpUpstream::close()
srs_freep(sdk);
}

void SrsEdgeRtmpUpstream::set_recv_timeout(int64_t timeout)
void SrsEdgeRtmpUpstream::set_recv_timeout(int64_t tm)
{
sdk->set_recv_timeout(timeout);
sdk->set_recv_timeout(tm);
}

void SrsEdgeRtmpUpstream::kbps_sample(const char* label, int64_t age)
Expand All @@ -175,7 +172,7 @@ SrsEdgeIngester::SrsEdgeIngester()

upstream = new SrsEdgeRtmpUpstream(redirect);
lb = new SrsLbRoundRobin();
pthread = new SrsReusableThread2("edge-igs", this, SRS_EDGE_INGESTER_SLEEP_US);
pthread = new SrsReusableThread2("edge-igs", this, SRS_EDGE_INGESTER_CIMS);
}

SrsEdgeIngester::~SrsEdgeIngester()
Expand Down Expand Up @@ -274,7 +271,7 @@ int SrsEdgeIngester::ingest()
SrsAutoFree(SrsPithyPrint, pprint);

// set to larger timeout to read av data from origin.
upstream->set_recv_timeout(SRS_EDGE_INGESTER_TIMEOUT_US);
upstream->set_recv_timeout(SRS_EDGE_INGESTER_TMMS);

while (!pthread->interrupted()) {
pprint->elapse();
Expand Down Expand Up @@ -409,7 +406,7 @@ SrsEdgeForwarder::SrsEdgeForwarder()

sdk = NULL;
lb = new SrsLbRoundRobin();
pthread = new SrsReusableThread2("edge-fwr", this, SRS_EDGE_FORWARDER_SLEEP_US);
pthread = new SrsReusableThread2("edge-fwr", this, SRS_EDGE_FORWARDER_CIMS);
queue = new SrsMessageQueue();
}

Expand Down Expand Up @@ -465,9 +462,9 @@ int SrsEdgeForwarder::start()

// open socket.
srs_freep(sdk);
int64_t cto = SRS_EDGE_FORWARDER_TIMEOUT_US;
int64_t sto = SRS_CONSTS_RTMP_TIMEOUT_US;
sdk = new SrsSimpleRtmpClient(url, cto/1000, sto/1000);
int64_t cto = SRS_EDGE_FORWARDER_TMMS;
int64_t sto = SRS_CONSTS_RTMP_TMMS;
sdk = new SrsSimpleRtmpClient(url, cto, sto);

if ((ret = sdk->connect()) != ERROR_SUCCESS) {
srs_warn("edge push %s failed, cto=%"PRId64", sto=%"PRId64". ret=%d", url.c_str(), cto, sto, ret);
Expand Down Expand Up @@ -496,7 +493,7 @@ int SrsEdgeForwarder::cycle()
{
int ret = ERROR_SUCCESS;

sdk->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
sdk->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TMMS);

SrsPithyPrint* pprint = SrsPithyPrint::create_edge();
SrsAutoFree(SrsPithyPrint, pprint);
Expand All @@ -505,7 +502,7 @@ int SrsEdgeForwarder::cycle()

while (!pthread->interrupted()) {
if (send_error_code != ERROR_SUCCESS) {
st_usleep(SRS_EDGE_FORWARDER_ERROR_US);
st_usleep(SRS_EDGE_FORWARDER_TMMS * 1000);
continue;
}

Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_edge.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class SrsEdgeUpstream
virtual int decode_message(SrsCommonMessage* msg, SrsPacket** ppacket) = 0;
virtual void close() = 0;
public:
virtual void set_recv_timeout(int64_t timeout) = 0;
virtual void set_recv_timeout(int64_t tm) = 0;
virtual void kbps_sample(const char* label, int64_t age) = 0;
};

Expand All @@ -111,7 +111,7 @@ class SrsEdgeRtmpUpstream : public SrsEdgeUpstream
virtual int decode_message(SrsCommonMessage* msg, SrsPacket** ppacket);
virtual void close();
public:
virtual void set_recv_timeout(int64_t timeout);
virtual void set_recv_timeout(int64_t tm);
virtual void kbps_sample(const char* label, int64_t age);
};

Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_encoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ using namespace std;
#ifdef SRS_AUTO_TRANSCODE

// when error, encoder sleep for a while and retry.
#define SRS_RTMP_ENCODER_SLEEP_US (int64_t)(3*1000*1000LL)
#define SRS_RTMP_ENCODER_CIMS (3000)

// for encoder to detect the dead loop
static std::vector<std::string> _transcoded_url;

SrsEncoder::SrsEncoder()
{
pthread = new SrsReusableThread("encoder", this, SRS_RTMP_ENCODER_SLEEP_US);
pthread = new SrsReusableThread("encoder", this, SRS_RTMP_ENCODER_CIMS);
pprint = SrsPithyPrint::create_encoder();
}

Expand Down
10 changes: 5 additions & 5 deletions trunk/src/app/srs_app_forward.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ using namespace std;
#include <srs_app_rtmp_conn.hpp>

// when error, forwarder sleep for a while and retry.
#define SRS_FORWARDER_SLEEP_US (int64_t)(3*1000*1000LL)
#define SRS_FORWARDER_CIMS (3000)

SrsForwarder::SrsForwarder(SrsSource* s)
{
Expand All @@ -58,7 +58,7 @@ SrsForwarder::SrsForwarder(SrsSource* s)
sh_video = sh_audio = NULL;

sdk = NULL;
pthread = new SrsReusableThread2("forward", this, SRS_FORWARDER_SLEEP_US);
pthread = new SrsReusableThread2("forward", this, SRS_FORWARDER_CIMS);
queue = new SrsMessageQueue();
jitter = new SrsRtmpJitter();
}
Expand Down Expand Up @@ -237,8 +237,8 @@ int SrsForwarder::cycle()
}

srs_freep(sdk);
int64_t cto = SRS_FORWARDER_SLEEP_US;
int64_t sto = SRS_CONSTS_RTMP_TIMEOUT_US;
int64_t cto = SRS_FORWARDER_CIMS;
int64_t sto = SRS_CONSTS_RTMP_TMMS;
sdk = new SrsSimpleRtmpClient(url, cto, sto);

if ((ret = sdk->connect()) != ERROR_SUCCESS) {
Expand Down Expand Up @@ -267,7 +267,7 @@ int SrsForwarder::forward()
{
int ret = ERROR_SUCCESS;

sdk->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TIMEOUT_US);
sdk->set_recv_timeout(SRS_CONSTS_RTMP_PULSE_TMMS);

SrsPithyPrint* pprint = SrsPithyPrint::create_forwarder();
SrsAutoFree(SrsPithyPrint, pprint);
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_http_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1365,7 +1365,7 @@ int SrsHttpApi::do_cycle()

// set the recv timeout, for some clients never disconnect the connection.
// @see https://github.com/ossrs/srs/issues/398
skt.set_recv_timeout(SRS_HTTP_RECV_TIMEOUT_US);
skt.set_recv_timeout(SRS_HTTP_RECV_TMMS);

// initialize the cors, which will proxy to mux.
bool crossdomain_enabled = _srs_config->get_http_api_crossdomain();
Expand Down
20 changes: 10 additions & 10 deletions trunk/src/app/srs_app_http_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ SrsHttpClient::SrsHttpClient()
transport = NULL;
kbps = new SrsKbps();
parser = NULL;
timeout_us = 0;
timeout = SRS_CONSTS_NO_TMMS;
port = 0;
}

Expand All @@ -56,7 +56,7 @@ SrsHttpClient::~SrsHttpClient()
}

// TODO: FIXME: use ms for timeout.
int SrsHttpClient::initialize(string h, int p, int64_t t_us)
int SrsHttpClient::initialize(string h, int p, int64_t tm)
{
int ret = ERROR_SUCCESS;

Expand All @@ -71,7 +71,7 @@ int SrsHttpClient::initialize(string h, int p, int64_t t_us)
// Always disconnect the transport.
host = h;
port = p;
timeout_us = t_us;
timeout = tm;
disconnect();

// ep used for host in header.
Expand Down Expand Up @@ -196,9 +196,9 @@ int SrsHttpClient::get(string path, string req, ISrsHttpMessage** ppmsg)
return ret;
}

void SrsHttpClient::set_recv_timeout(int64_t timeout)
void SrsHttpClient::set_recv_timeout(int64_t tm)
{
transport->set_recv_timeout(timeout);
transport->set_recv_timeout(tm);
}

void SrsHttpClient::kbps_sample(const char* label, int64_t age)
Expand Down Expand Up @@ -232,17 +232,17 @@ int SrsHttpClient::connect()
return ret;
}

transport = new SrsTcpClient(host, port, timeout_us / 1000);
transport = new SrsTcpClient(host, port, timeout);
if ((ret = transport->connect()) != ERROR_SUCCESS) {
disconnect();
srs_warn("http client failed, server=%s, port=%d, timeout=%"PRId64", ret=%d", host.c_str(), port, timeout_us, ret);
srs_warn("http client failed, server=%s, port=%d, timeout=%"PRId64", ret=%d", host.c_str(), port, timeout, ret);
return ret;
}
srs_info("connect to server success. server=%s, port=%d", host.c_str(), port);

// set the recv/send timeout in us.
transport->set_recv_timeout(timeout_us);
transport->set_send_timeout(timeout_us);
// Set the recv/send timeout in ms.
transport->set_recv_timeout(timeout);
transport->set_send_timeout(timeout);

kbps->set_io(transport, transport);

Expand Down
13 changes: 8 additions & 5 deletions trunk/src/app/srs_app_http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class SrsStSocket;
class SrsKbps;

// the default timeout for http client.
#define SRS_HTTP_CLIENT_TIMEOUT_US (int64_t)(30*1000*1000LL)
#define SRS_HTTP_CLIENT_TMMS (30*1000)

/**
* The client to GET/POST/PUT/DELETE over HTTP.
Expand All @@ -64,8 +64,9 @@ class SrsHttpClient
std::map<std::string, std::string> headers;
SrsKbps* kbps;
private:
int64_t timeout_us;
// host name or ip.
// The timeout in ms.
int64_t timeout;
// The host name or ip.
std::string host;
int port;
public:
Expand All @@ -74,9 +75,10 @@ class SrsHttpClient
public:
/**
* Initliaze the client, disconnect the transport, renew the HTTP parser.
* @param tm The underlayer TCP transport timeout in ms.
* @remark we will set default values in headers, which can be override by set_header.
*/
virtual int initialize(std::string h, int p, int64_t t_us = SRS_HTTP_CLIENT_TIMEOUT_US);
virtual int initialize(std::string h, int p, int64_t tm = SRS_HTTP_CLIENT_TMMS);
/**
* Set HTTP request header in header[k]=v.
* @return the HTTP client itself.
Expand All @@ -99,8 +101,9 @@ class SrsHttpClient
* @remark user must free the ppmsg if not NULL.
*/
virtual int get(std::string path, std::string req, ISrsHttpMessage** ppmsg);
private:
virtual void set_recv_timeout(int64_t tm);
public:
virtual void set_recv_timeout(int64_t timeout);
virtual void kbps_sample(const char* label, int64_t age);
private:
virtual void disconnect();
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_http_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1137,7 +1137,7 @@ int SrsHttpConn::do_cycle()

// set the recv timeout, for some clients never disconnect the connection.
// @see https://github.com/ossrs/srs/issues/398
skt->set_recv_timeout(SRS_HTTP_RECV_TIMEOUT_US);
skt->set_recv_timeout(SRS_HTTP_RECV_TMMS);

SrsRequest* last_req = NULL;
SrsAutoFree(SrsRequest, last_req);
Expand Down
12 changes: 6 additions & 6 deletions trunk/src/app/srs_app_http_hooks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ using namespace std;

#define SRS_HTTP_RESPONSE_OK SRS_XSTR(ERROR_SUCCESS)

#define SRS_HTTP_HEADER_BUFFER 1024
#define SRS_HTTP_READ_BUFFER 4096
#define SRS_HTTP_BODY_BUFFER 32 * 1024
#define SRS_HTTP_HEADER_BUFFER 1024
#define SRS_HTTP_READ_BUFFER 4096
#define SRS_HTTP_BODY_BUFFER (32 * 1024)

// the timeout for hls notify, in us.
#define SRS_HLS_NOTIFY_TIMEOUT_US (int64_t)(10*1000*1000LL)
// the timeout for hls notify, in ms.
#define SRS_HLS_NOTIFY_TMMS (10 * 1000)

SrsHttpHooks::SrsHttpHooks()
{
Expand Down Expand Up @@ -383,7 +383,7 @@ int SrsHttpHooks::on_hls_notify(int cid, std::string url, SrsRequest* req, std::
}

SrsHttpClient http;
if ((ret = http.initialize(uri.get_host(), uri.get_port(), SRS_HLS_NOTIFY_TIMEOUT_US)) != ERROR_SUCCESS) {
if ((ret = http.initialize(uri.get_host(), uri.get_port(), SRS_HLS_NOTIFY_TMMS)) != ERROR_SUCCESS) {
return ret;
}

Expand Down
Loading

0 comments on commit 3fe338d

Please sign in to comment.