diff --git a/trunk/src/app/srs_app_rtc_server.cpp b/trunk/src/app/srs_app_rtc_server.cpp index ab0f42e3d4..db76d04162 100644 --- a/trunk/src/app/srs_app_rtc_server.cpp +++ b/trunk/src/app/srs_app_rtc_server.cpp @@ -498,30 +498,6 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* skt) return srs_error_new(ERROR_RTC_UDP, "unknown packet"); } -srs_error_t SrsRtcServer::listen_api() -{ - srs_error_t err = srs_success; - - // TODO: FIXME: Fetch api from hybrid manager, not from SRS. - SrsHttpServeMux* http_api_mux = _srs_hybrid->srs()->instance()->api_server(); - - if ((err = http_api_mux->handle("/rtc/v1/play/", new SrsGoApiRtcPlay(this))) != srs_success) { - return srs_error_wrap(err, "handle play"); - } - - if ((err = http_api_mux->handle("/rtc/v1/publish/", new SrsGoApiRtcPublish(this))) != srs_success) { - return srs_error_wrap(err, "handle publish"); - } - -#ifdef SRS_SIMULATOR - if ((err = http_api_mux->handle("/rtc/v1/nack/", new SrsGoApiRtcNACK(this))) != srs_success) { - return srs_error_wrap(err, "handle nack"); - } -#endif - - return err; -} - srs_error_t SrsRtcServer::create_session( SrsRequest* req, const SrsSdp& remote_sdp, SrsSdp& local_sdp, const std::string& mock_eip, bool publish, bool dtls, bool srtp, @@ -776,10 +752,6 @@ srs_error_t RtcServerAdapter::run() return srs_error_wrap(err, "listen udp"); } - if ((err = rtc->listen_api()) != srs_success) { - return srs_error_wrap(err, "listen api"); - } - if ((err = _srs_rtc_manager->start()) != srs_success) { return srs_error_wrap(err, "start manager"); } diff --git a/trunk/src/app/srs_app_rtc_server.hpp b/trunk/src/app/srs_app_rtc_server.hpp index 693c6e2e00..324f384b4a 100644 --- a/trunk/src/app/srs_app_rtc_server.hpp +++ b/trunk/src/app/srs_app_rtc_server.hpp @@ -110,7 +110,6 @@ class SrsRtcServer : public ISrsUdpMuxHandler, public ISrsFastTimer, public ISrs // TODO: FIXME: Support reload. srs_error_t listen_udp(); virtual srs_error_t on_udp_packet(SrsUdpMuxSocket* skt); - srs_error_t listen_api(); public: // Peer start offering, we answer it. srs_error_t create_session( diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index b046bab442..bbfdd22ede 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -84,7 +84,15 @@ std::string srs_listener_type2string(SrsListenerType type) } } -SrsListener::SrsListener(SrsServer* svr, SrsListenerType t) +ISrsTcpMuxHandler::ISrsTcpMuxHandler() +{ +} + +ISrsTcpMuxHandler::~ISrsTcpMuxHandler() +{ +} + +SrsListener::SrsListener(ISrsTcpMuxHandler* svr, SrsListenerType t) { port = 0; server = svr; @@ -100,7 +108,7 @@ SrsListenerType SrsListener::listen_type() return type; } -SrsBufferListener::SrsBufferListener(SrsServer* svr, SrsListenerType t) : SrsListener(svr, t) +SrsBufferListener::SrsBufferListener(ISrsTcpMuxHandler* svr, SrsListenerType t) : SrsListener(svr, t) { listener = NULL; } @@ -132,7 +140,7 @@ srs_error_t SrsBufferListener::listen(string i, int p) srs_error_t SrsBufferListener::on_tcp_client(srs_netfd_t stfd) { - srs_error_t err = server->accept_client(type, stfd); + srs_error_t err = server->accept_tcp_client(type, stfd); if (err != srs_success) { srs_warn("accept client failed, err is %s", srs_error_desc(err).c_str()); srs_freep(err); @@ -683,7 +691,6 @@ SrsServer::SrsServer() // donot new object in constructor, // for some global instance is not ready now, // new these objects in initialize instead. - http_api_mux = new SrsHttpServeMux(); http_server = new SrsHttpServer(this); http_heartbeat = new SrsHttpHeartbeat(); ingester = new SrsIngester(); @@ -704,8 +711,7 @@ void SrsServer::destroy() srs_freep(timer_); dispose(); - - srs_freep(http_api_mux); + srs_freep(http_server); srs_freep(http_heartbeat); srs_freep(ingester); @@ -805,10 +811,6 @@ srs_error_t SrsServer::initialize(ISrsServerCycle* ch) return srs_error_wrap(err, "handler initialize"); } - if ((err = http_api_mux->initialize()) != srs_success) { - return srs_error_wrap(err, "http api initialize"); - } - if ((err = http_server->initialize()) != srs_success) { return srs_error_wrap(err, "http server initialize"); } @@ -903,14 +905,6 @@ srs_error_t SrsServer::listen() return srs_error_wrap(err, "rtmp listen"); } - if ((err = listen_http_api()) != srs_success) { - return srs_error_wrap(err, "http api listen"); - } - - if ((err = listen_https_api()) != srs_success) { - return srs_error_wrap(err, "https api listen"); - } - if ((err = listen_http_stream()) != srs_success) { return srs_error_wrap(err, "http stream listen"); } @@ -941,103 +935,6 @@ srs_error_t SrsServer::register_signal() return err; } -srs_error_t SrsServer::http_handle() -{ - srs_error_t err = srs_success; - - if ((err = http_api_mux->handle("/", new SrsGoApiRoot())) != srs_success) { - return srs_error_wrap(err, "handle /"); - } - if ((err = http_api_mux->handle("/api/", new SrsGoApiApi())) != srs_success) { - return srs_error_wrap(err, "handle api"); - } - if ((err = http_api_mux->handle("/api/v1/", new SrsGoApiV1())) != srs_success) { - return srs_error_wrap(err, "handle v1"); - } - if ((err = http_api_mux->handle("/api/v1/versions", new SrsGoApiVersion())) != srs_success) { - return srs_error_wrap(err, "handle versions"); - } - if ((err = http_api_mux->handle("/api/v1/summaries", new SrsGoApiSummaries())) != srs_success) { - return srs_error_wrap(err, "handle summaries"); - } - if ((err = http_api_mux->handle("/api/v1/rusages", new SrsGoApiRusages())) != srs_success) { - return srs_error_wrap(err, "handle rusages"); - } - if ((err = http_api_mux->handle("/api/v1/self_proc_stats", new SrsGoApiSelfProcStats())) != srs_success) { - return srs_error_wrap(err, "handle self proc stats"); - } - if ((err = http_api_mux->handle("/api/v1/system_proc_stats", new SrsGoApiSystemProcStats())) != srs_success) { - return srs_error_wrap(err, "handle system proc stats"); - } - if ((err = http_api_mux->handle("/api/v1/meminfos", new SrsGoApiMemInfos())) != srs_success) { - return srs_error_wrap(err, "handle meminfos"); - } - if ((err = http_api_mux->handle("/api/v1/authors", new SrsGoApiAuthors())) != srs_success) { - return srs_error_wrap(err, "handle authors"); - } - if ((err = http_api_mux->handle("/api/v1/features", new SrsGoApiFeatures())) != srs_success) { - return srs_error_wrap(err, "handle features"); - } - if ((err = http_api_mux->handle("/api/v1/vhosts/", new SrsGoApiVhosts())) != srs_success) { - return srs_error_wrap(err, "handle vhosts"); - } - if ((err = http_api_mux->handle("/api/v1/streams/", new SrsGoApiStreams())) != srs_success) { - return srs_error_wrap(err, "handle streams"); - } - if ((err = http_api_mux->handle("/api/v1/clients/", new SrsGoApiClients())) != srs_success) { - return srs_error_wrap(err, "handle clients"); - } - if ((err = http_api_mux->handle("/api/v1/raw", new SrsGoApiRaw(this))) != srs_success) { - return srs_error_wrap(err, "handle raw"); - } - if ((err = http_api_mux->handle("/api/v1/clusters", new SrsGoApiClusters())) != srs_success) { - return srs_error_wrap(err, "handle clusters"); - } - if ((err = http_api_mux->handle("/api/v1/perf", new SrsGoApiPerf())) != srs_success) { - return srs_error_wrap(err, "handle perf"); - } -#ifdef SRS_GB28181 - if ((err = http_api_mux->handle("/api/v1/gb28181", new SrsGoApiGb28181())) != srs_success) { - return srs_error_wrap(err, "handle raw"); - } -#endif - - // test the request info. - if ((err = http_api_mux->handle("/api/v1/tests/requests", new SrsGoApiRequests())) != srs_success) { - return srs_error_wrap(err, "handle tests requests"); - } - // test the error code response. - if ((err = http_api_mux->handle("/api/v1/tests/errors", new SrsGoApiError())) != srs_success) { - return srs_error_wrap(err, "handle tests errors"); - } - // test the redirect mechenism. - if ((err = http_api_mux->handle("/api/v1/tests/redirects", new SrsHttpRedirectHandler("/api/v1/tests/errors", SRS_CONSTS_HTTP_MovedPermanently))) != srs_success) { - return srs_error_wrap(err, "handle tests redirects"); - } - // test the http vhost. - if ((err = http_api_mux->handle("error.srs.com/api/v1/tests/errors", new SrsGoApiError())) != srs_success) { - return srs_error_wrap(err, "handle tests errors for error.srs.com"); - } - -#ifdef SRS_GPERF - // The test api for get tcmalloc stats. - // @see Memory Introspection in https://gperftools.github.io/gperftools/tcmalloc.html - if ((err = http_api_mux->handle("/api/v1/tcmalloc", new SrsGoApiTcmalloc())) != srs_success) { - return srs_error_wrap(err, "handle tests errors"); - } -#endif - - // TODO: FIXME: for console. - // TODO: FIXME: support reload. - std::string dir = _srs_config->get_http_stream_dir() + "/console"; - if ((err = http_api_mux->handle("/console/", new SrsHttpFileServer(dir))) != srs_success) { - return srs_error_wrap(err, "handle console at %s", dir.c_str()); - } - srs_trace("http: api mount /console to %s", dir.c_str()); - - return err; -} - srs_error_t SrsServer::ingest() { srs_error_t err = srs_success; @@ -1338,52 +1235,6 @@ srs_error_t SrsServer::listen_rtmp() return err; } -srs_error_t SrsServer::listen_http_api() -{ - srs_error_t err = srs_success; - - close_listeners(SrsListenerHttpApi); - if (_srs_config->get_http_api_enabled()) { - SrsListener* listener = new SrsBufferListener(this, SrsListenerHttpApi); - listeners.push_back(listener); - - std::string ep = _srs_config->get_http_api_listen(); - - std::string ip; - int port; - srs_parse_endpoint(ep, ip, port); - - if ((err = listener->listen(ip, port)) != srs_success) { - return srs_error_wrap(err, "http api listen %s:%d", ip.c_str(), port); - } - } - - return err; -} - -srs_error_t SrsServer::listen_https_api() -{ - srs_error_t err = srs_success; - - close_listeners(SrsListenerHttpsApi); - if (_srs_config->get_https_api_enabled()) { - SrsListener* listener = new SrsBufferListener(this, SrsListenerHttpsApi); - listeners.push_back(listener); - - std::string ep = _srs_config->get_https_api_listen(); - - std::string ip; - int port; - srs_parse_endpoint(ep, ip, port); - - if ((err = listener->listen(ip, port)) != srs_success) { - return srs_error_wrap(err, "https api listen %s:%d", ip.c_str(), port); - } - } - - return err; -} - srs_error_t SrsServer::listen_http_stream() { srs_error_t err = srs_success; @@ -1565,7 +1416,7 @@ void SrsServer::resample_kbps() srs_update_rtmp_server((int)conn_manager->size(), kbps); } -srs_error_t SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd) +srs_error_t SrsServer::accept_tcp_client(SrsListenerType type, srs_netfd_t stfd) { srs_error_t err = srs_success; @@ -1590,11 +1441,6 @@ srs_error_t SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd) return err; } -SrsHttpServeMux* SrsServer::api_server() -{ - return http_api_mux; -} - srs_error_t SrsServer::fd_to_resource(SrsListenerType type, srs_netfd_t stfd, ISrsStartableConneciton** pr) { srs_error_t err = srs_success; @@ -1639,10 +1485,6 @@ srs_error_t SrsServer::fd_to_resource(SrsListenerType type, srs_netfd_t stfd, IS if (type == SrsListenerRtmpStream) { *pr = new SrsRtmpConn(this, stfd, ip, port); - } else if (type == SrsListenerHttpApi) { - *pr = new SrsHttpApi(false, this, stfd, http_api_mux, ip, port); - } else if (type == SrsListenerHttpsApi) { - *pr = new SrsHttpApi(true, this, stfd, http_api_mux, ip, port); } else if (type == SrsListenerHttpStream) { *pr = new SrsResponseOnlyHttpConn(false, this, stfd, http_server, ip, port); } else if (type == SrsListenerHttpsStream) { @@ -1726,15 +1568,7 @@ srs_error_t SrsServer::on_reload_vhost_removed(std::string /*vhost*/) srs_error_t SrsServer::on_reload_http_api_enabled() { srs_error_t err = srs_success; - - if ((err = listen_http_api()) != srs_success) { - return srs_error_wrap(err, "reload http_api"); - } - - if ((err = listen_https_api()) != srs_success) { - return srs_error_wrap(err, "reload https_api"); - } - + // TODO: FIXME: Remove support for reloading HTTP API. return err; } @@ -1853,10 +1687,6 @@ srs_error_t SrsServerAdapter::run() return srs_error_wrap(err, "register signal"); } - if ((err = srs->http_handle()) != srs_success) { - return srs_error_wrap(err, "http handle"); - } - if ((err = srs->ingest()) != srs_success) { return srs_error_wrap(err, "ingest"); } @@ -1877,3 +1707,302 @@ SrsServer* SrsServerAdapter::instance() return srs; } +SrsApiServer::SrsApiServer() +{ + http_api_mux_ = new SrsHttpServeMux(); + http_ = new SrsBufferListener(this, SrsListenerHttpApi); + https_ = new SrsBufferListener(this, SrsListenerHttpApi); + conn_manager_ = new SrsResourceManager("api"); +} + +SrsApiServer::~SrsApiServer() +{ + srs_freep(http_api_mux_); + srs_freep(http_); + srs_freep(https_); + srs_freep(conn_manager_); +} + +srs_error_t SrsApiServer::initialize() +{ + srs_error_t err = srs_success; + + if ((err = http_api_mux_->initialize()) != srs_success) { + return srs_error_wrap(err, "http api initialize"); + } + + if ((err = http_handle()) != srs_success) { + return srs_error_wrap(err, "http handle"); + } + + if ((err = listen_api()) != srs_success) { + return srs_error_wrap(err, "listen api"); + } + + return err; +} + +srs_error_t SrsApiServer::listen_http_api() +{ + srs_error_t err = srs_success; + + if (!_srs_config->get_http_api_enabled()) { + return err; + } + + std::string ep = _srs_config->get_http_api_listen(); + + std::string ip; + int port; + srs_parse_endpoint(ep, ip, port); + + if ((err = http_->listen(ip, port)) != srs_success) { + return srs_error_wrap(err, "http api listen %s:%d", ip.c_str(), port); + } + + return err; +} + +srs_error_t SrsApiServer::listen_https_api() +{ + srs_error_t err = srs_success; + + if (!_srs_config->get_https_api_enabled()) { + return err; + } + + std::string ep = _srs_config->get_https_api_listen(); + + std::string ip; + int port; + srs_parse_endpoint(ep, ip, port); + + if ((err = https_->listen(ip, port)) != srs_success) { + return srs_error_wrap(err, "https api listen %s:%d", ip.c_str(), port); + } + + return err; +} + +srs_error_t SrsApiServer::accept_tcp_client(SrsListenerType type, srs_netfd_t stfd) +{ + srs_error_t err = srs_success; + + ISrsStartableConneciton* conn = NULL; + + if ((err = fd_to_resource(type, stfd, &conn)) != srs_success) { + if (srs_error_code(err) == ERROR_SOCKET_GET_PEER_IP && _srs_config->empty_ip_ok()) { + srs_close_stfd(stfd); srs_error_reset(err); + return srs_success; + } + return srs_error_wrap(err, "fd to resource"); + } + srs_assert(conn); + + // directly enqueue, the cycle thread will remove the client. + conn_manager_->add(conn); + + if ((err = conn->start()) != srs_success) { + return srs_error_wrap(err, "start conn coroutine"); + } + + return err; +} + +srs_error_t SrsApiServer::fd_to_resource(SrsListenerType type, srs_netfd_t stfd, ISrsStartableConneciton** pr) +{ + srs_error_t err = srs_success; + + int fd = srs_netfd_fileno(stfd); + string ip = srs_get_peer_ip(fd); + int port = srs_get_peer_port(fd); + + // for some keep alive application, for example, the keepalived, + // will send some tcp packet which we cann't got the ip, + // we just ignore it. + if (ip.empty()) { + return srs_error_new(ERROR_SOCKET_GET_PEER_IP, "ignore empty ip, fd=%d", fd); + } + + // avoid fd leak when fork. + // @see https://github.com/ossrs/srs/issues/518 + if (true) { + int val; + if ((val = fcntl(fd, F_GETFD, 0)) < 0) { + return srs_error_new(ERROR_SYSTEM_PID_GET_FILE_INFO, "fnctl F_GETFD error! fd=%d", fd); + } + val |= FD_CLOEXEC; + if (fcntl(fd, F_SETFD, val) < 0) { + return srs_error_new(ERROR_SYSTEM_PID_SET_FILE_INFO, "fcntl F_SETFD error! fd=%d", fd); + } + } + + // The context id may change during creating the bellow objects. + SrsContextRestore(_srs_context->get_id()); + + if (type == SrsListenerHttpApi) { + *pr = new SrsHttpApi(false, this, stfd, http_api_mux_, ip, port); + } else if (type == SrsListenerHttpsApi) { + *pr = new SrsHttpApi(true, this, stfd, http_api_mux_, ip, port); + } else { + srs_warn("close for no service handler. fd=%d, ip=%s:%d", fd, ip.c_str(), port); + srs_close_stfd(stfd); + return err; + } + + return err; +} + +void SrsApiServer::remove(ISrsResource* c) +{ + conn_manager_->remove(c); +} + +srs_error_t SrsApiServer::http_handle() +{ + srs_error_t err = srs_success; + + if ((err = http_api_mux_->handle("/", new SrsGoApiRoot())) != srs_success) { + return srs_error_wrap(err, "handle /"); + } + if ((err = http_api_mux_->handle("/api/", new SrsGoApiApi())) != srs_success) { + return srs_error_wrap(err, "handle api"); + } + if ((err = http_api_mux_->handle("/api/v1/", new SrsGoApiV1())) != srs_success) { + return srs_error_wrap(err, "handle v1"); + } + if ((err = http_api_mux_->handle("/api/v1/versions", new SrsGoApiVersion())) != srs_success) { + return srs_error_wrap(err, "handle versions"); + } + if ((err = http_api_mux_->handle("/api/v1/summaries", new SrsGoApiSummaries())) != srs_success) { + return srs_error_wrap(err, "handle summaries"); + } + if ((err = http_api_mux_->handle("/api/v1/rusages", new SrsGoApiRusages())) != srs_success) { + return srs_error_wrap(err, "handle rusages"); + } + if ((err = http_api_mux_->handle("/api/v1/self_proc_stats", new SrsGoApiSelfProcStats())) != srs_success) { + return srs_error_wrap(err, "handle self proc stats"); + } + if ((err = http_api_mux_->handle("/api/v1/system_proc_stats", new SrsGoApiSystemProcStats())) != srs_success) { + return srs_error_wrap(err, "handle system proc stats"); + } + if ((err = http_api_mux_->handle("/api/v1/meminfos", new SrsGoApiMemInfos())) != srs_success) { + return srs_error_wrap(err, "handle meminfos"); + } + if ((err = http_api_mux_->handle("/api/v1/authors", new SrsGoApiAuthors())) != srs_success) { + return srs_error_wrap(err, "handle authors"); + } + if ((err = http_api_mux_->handle("/api/v1/features", new SrsGoApiFeatures())) != srs_success) { + return srs_error_wrap(err, "handle features"); + } + if ((err = http_api_mux_->handle("/api/v1/vhosts/", new SrsGoApiVhosts())) != srs_success) { + return srs_error_wrap(err, "handle vhosts"); + } + if ((err = http_api_mux_->handle("/api/v1/streams/", new SrsGoApiStreams())) != srs_success) { + return srs_error_wrap(err, "handle streams"); + } + if ((err = http_api_mux_->handle("/api/v1/clients/", new SrsGoApiClients())) != srs_success) { + return srs_error_wrap(err, "handle clients"); + } + // TODO: FIXME: Implements it. + //if ((err = http_api_mux_->handle("/api/v1/raw", new SrsGoApiRaw(this))) != srs_success) { + // return srs_error_wrap(err, "handle raw"); + //} + if ((err = http_api_mux_->handle("/api/v1/clusters", new SrsGoApiClusters())) != srs_success) { + return srs_error_wrap(err, "handle clusters"); + } + if ((err = http_api_mux_->handle("/api/v1/perf", new SrsGoApiPerf())) != srs_success) { + return srs_error_wrap(err, "handle perf"); + } +#ifdef SRS_GB28181 + if ((err = http_api_mux_->handle("/api/v1/gb28181", new SrsGoApiGb28181())) != srs_success) { + return srs_error_wrap(err, "handle raw"); + } +#endif + + // test the request info. + if ((err = http_api_mux_->handle("/api/v1/tests/requests", new SrsGoApiRequests())) != srs_success) { + return srs_error_wrap(err, "handle tests requests"); + } + // test the error code response. + if ((err = http_api_mux_->handle("/api/v1/tests/errors", new SrsGoApiError())) != srs_success) { + return srs_error_wrap(err, "handle tests errors"); + } + // test the redirect mechenism. + if ((err = http_api_mux_->handle("/api/v1/tests/redirects", new SrsHttpRedirectHandler("/api/v1/tests/errors", SRS_CONSTS_HTTP_MovedPermanently))) != srs_success) { + return srs_error_wrap(err, "handle tests redirects"); + } + // test the http vhost. + if ((err = http_api_mux_->handle("error.srs.com/api/v1/tests/errors", new SrsGoApiError())) != srs_success) { + return srs_error_wrap(err, "handle tests errors for error.srs.com"); + } + +#ifdef SRS_GPERF + // The test api for get tcmalloc stats. + // @see Memory Introspection in https://gperftools.github.io/gperftools/tcmalloc.html + if ((err = http_api_mux_->handle("/api/v1/tcmalloc", new SrsGoApiTcmalloc())) != srs_success) { + return srs_error_wrap(err, "handle tests errors"); + } +#endif + + // TODO: FIXME: for console. + // TODO: FIXME: support reload. + std::string dir = _srs_config->get_http_stream_dir() + "/console"; + if ((err = http_api_mux_->handle("/console/", new SrsHttpFileServer(dir))) != srs_success) { + return srs_error_wrap(err, "handle console at %s", dir.c_str()); + } + srs_trace("http: api mount /console to %s", dir.c_str()); + + return err; +} + +srs_error_t SrsApiServer::listen_api() +{ + srs_error_t err = srs_success; + + // TODO: FIXME: Implements it. + //if ((err = http_api_mux_->handle("/rtc/v1/play/", new SrsGoApiRtcPlay(this))) != srs_success) { + // return srs_error_wrap(err, "handle play"); + //} + // + //if ((err = http_api_mux_->handle("/rtc/v1/publish/", new SrsGoApiRtcPublish(this))) != srs_success) { + // return srs_error_wrap(err, "handle publish"); + //} + +#ifdef SRS_SIMULATOR + // TODO: FIXME: Implements it. + //if ((err = http_api_mux_->handle("/rtc/v1/nack/", new SrsGoApiRtcNACK(this))) != srs_success) { + // return srs_error_wrap(err, "handle nack"); + //} +#endif + + return err; +} + +srs_error_t SrsApiServer::start(void* arg) +{ + SrsApiServer* api = (SrsApiServer*)arg; + return api->do_start(); +} + +srs_error_t SrsApiServer::do_start() +{ + srs_error_t err = srs_success; + + if ((err = listen_http_api()) != srs_success) { + return srs_error_wrap(err, "http api listen"); + } + + if ((err = listen_https_api()) != srs_success) { + return srs_error_wrap(err, "https api listen"); + } + + if ((err = conn_manager_->start()) != srs_success) { + return srs_error_wrap(err, "connection manager"); + } + + srs_usleep(SRS_UTIME_NO_TIMEOUT); + + return err; +} + diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index 4c3986a4e7..54a1411181 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -84,6 +84,17 @@ enum SrsListenerType SrsListenerHttpsStream = 9, }; +// To mux the tcp handler. +class ISrsTcpMuxHandler +{ +public: + ISrsTcpMuxHandler(); + virtual ~ISrsTcpMuxHandler(); +public: + // Accept the TCP client, which is identified by type. + virtual srs_error_t accept_tcp_client(SrsListenerType type, srs_netfd_t stfd) = 0; +}; + // A common tcp listener, for RTMP/HTTP server. class SrsListener { @@ -92,9 +103,9 @@ class SrsListener protected: std::string ip; int port; - SrsServer* server; + ISrsTcpMuxHandler* server; public: - SrsListener(SrsServer* svr, SrsListenerType t); + SrsListener(ISrsTcpMuxHandler* svr, SrsListenerType t); virtual ~SrsListener(); public: virtual SrsListenerType listen_type(); @@ -107,7 +118,7 @@ class SrsBufferListener : virtual public SrsListener, virtual public ISrsTcpHand private: SrsTcpListener* listener; public: - SrsBufferListener(SrsServer* server, SrsListenerType type); + SrsBufferListener(ISrsTcpMuxHandler* server, SrsListenerType type); virtual ~SrsBufferListener(); public: virtual srs_error_t listen(std::string ip, int port); @@ -264,11 +275,9 @@ class ISrsServerCycle // SRS RTMP server, initialize and listen, start connection service thread, destroy client. class SrsServer : virtual public ISrsReloadHandler, virtual public ISrsSourceHandler , virtual public ISrsResourceManager, virtual public ISrsCoroutineHandler - , virtual public ISrsHourGlass + , virtual public ISrsHourGlass, public ISrsTcpMuxHandler { private: - // TODO: FIXME: Extract an HttpApiServer. - SrsHttpServeMux* http_api_mux; SrsHttpServer* http_server; SrsHttpHeartbeat* http_heartbeat; SrsIngester* ingester; @@ -319,7 +328,6 @@ class SrsServer : virtual public ISrsReloadHandler, virtual public ISrsSourceHan virtual srs_error_t acquire_pid_file(); virtual srs_error_t listen(); virtual srs_error_t register_signal(); - virtual srs_error_t http_handle(); virtual srs_error_t ingest(); virtual srs_error_t start(); // interface ISrsCoroutineHandler @@ -353,8 +361,6 @@ class SrsServer : virtual public ISrsReloadHandler, virtual public ISrsSourceHan private: // listen at specified protocol. virtual srs_error_t listen_rtmp(); - virtual srs_error_t listen_http_api(); - virtual srs_error_t listen_https_api(); virtual srs_error_t listen_http_stream(); virtual srs_error_t listen_https_stream(); virtual srs_error_t listen_stream_caster(); @@ -366,19 +372,11 @@ class SrsServer : virtual public ISrsReloadHandler, virtual public ISrsSourceHan virtual void close_listeners(SrsListenerType type); // Resample the server kbs. virtual void resample_kbps(); -// For internal only -public: - // When listener got a fd, notice server to accept it. - // @param type, the client type, used to create concrete connection, - // for instance RTMP connection to serve client. - // @param stfd, the client fd in st boxed, the underlayer fd. - virtual srs_error_t accept_client(SrsListenerType type, srs_netfd_t stfd); - // TODO: FIXME: Fetch from hybrid server manager. - virtual SrsHttpServeMux* api_server(); private: + virtual srs_error_t accept_tcp_client(SrsListenerType type, srs_netfd_t stfd); virtual srs_error_t fd_to_resource(SrsListenerType type, srs_netfd_t stfd, ISrsStartableConneciton** pr); // Interface ISrsResourceManager -public: +private: // A callback for connection to remove itself. // When connection thread cycle terminated, callback this to delete connection. // @see SrsTcpConnection.on_thread_stop(). @@ -416,5 +414,34 @@ class SrsServerAdapter : public ISrsHybridServer virtual SrsServer* instance(); }; +// The HTTP API server. +class SrsApiServer : public ISrsTcpMuxHandler, public ISrsResourceManager +{ +private: + SrsBufferListener* http_; + SrsBufferListener* https_; + SrsHttpServeMux* http_api_mux_; + SrsResourceManager* conn_manager_; +public: + SrsApiServer(); + virtual ~SrsApiServer(); +public: + virtual srs_error_t initialize(); +private: + virtual srs_error_t listen_http_api(); + virtual srs_error_t listen_https_api(); +private: + virtual srs_error_t accept_tcp_client(SrsListenerType type, srs_netfd_t stfd); + virtual srs_error_t fd_to_resource(SrsListenerType type, srs_netfd_t stfd, ISrsStartableConneciton** pr); + virtual void remove(ISrsResource* c); +private: + virtual srs_error_t http_handle(); + srs_error_t listen_api(); +public: + static srs_error_t start(void* arg); +private: + srs_error_t do_start(); +}; + #endif diff --git a/trunk/src/app/srs_app_threads.cpp b/trunk/src/app/srs_app_threads.cpp index 769da53e21..ce7c346c7b 100644 --- a/trunk/src/app/srs_app_threads.cpp +++ b/trunk/src/app/srs_app_threads.cpp @@ -1493,8 +1493,8 @@ void SrsAsyncSendManager::add_packet(SrsAsyncUdpPacket* pkt) srs_error_t SrsAsyncSendManager::start(void* arg) { - SrsAsyncSendManager* srtp = (SrsAsyncSendManager*)arg; - return srtp->do_start(); + SrsAsyncSendManager* send = (SrsAsyncSendManager*)arg; + return send->do_start(); } srs_error_t SrsAsyncSendManager::do_start() diff --git a/trunk/src/main/srs_main_server.cpp b/trunk/src/main/srs_main_server.cpp index dbeb0a375e..a9dad9653a 100644 --- a/trunk/src/main/srs_main_server.cpp +++ b/trunk/src/main/srs_main_server.cpp @@ -484,6 +484,14 @@ srs_error_t run_in_thread_pool() return srs_error_wrap(err, "init thread pool"); } + // Create and initialize the API server. + SrsApiServer* api = new SrsApiServer(); + SrsAutoFree(SrsApiServer, api); + + if ((err = api->initialize()) != srs_success) { + return srs_error_wrap(err, "init api server"); + } + // After all init(log, async log manager, thread pool), now we can start to // run the log manager thread. if ((err = _srs_thread_pool->execute("log", SrsAsyncLogManager::start, _srs_async_log)) != srs_success) { @@ -514,7 +522,12 @@ srs_error_t run_in_thread_pool() } } - // Start the service worker thread, for RTMP and RTC server, etc. + // Start the api server thread, for server stat and RTC api, etc. + if ((err = _srs_thread_pool->execute("api", SrsApiServer::start, api)) != srs_success) { + return srs_error_wrap(err, "start api server thread"); + } + + // Start the hybrid service worker thread, for RTMP and RTC server, etc. if ((err = _srs_thread_pool->execute("hybrid", run_hybrid_server, NULL)) != srs_success) { return srs_error_wrap(err, "start hybrid server thread"); }