diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index e1b64f10db..341a5381c1 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -41,6 +41,9 @@ srs_log_level trace; # when srs_log_tank is file, specifies the log file. # default: ./objs/srs.log srs_log_file ./objs/srs.log; +# The interval in ms, to flush async log. +# Default: 1300 +srs_log_async_interval 1300; # the max connections. # if exceed the max connections, server will drop the new connection. # default: 1000 @@ -111,6 +114,13 @@ auto_reload_for_docker on; # default: 0.8 tcmalloc_release_rate 0.8; +# For thread pool. +threads { + # The thread pool manager cycle interval, in seconds. + # Default: 60 + interval 60; +} + ############################################################################################# # heartbeat/stats sections ############################################################################################# diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 7e538389b9..a366ec895a 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -1492,27 +1492,6 @@ srs_error_t SrsConfig::reload_conf(SrsConfig* conf) } } - // merge config: srs_log_tank - if (!srs_directive_equals(root->get("srs_log_tank"), old_root->get("srs_log_tank"))) { - if ((err = do_reload_srs_log_tank()) != srs_success) { - return srs_error_wrap(err, "log tank");; - } - } - - // merge config: srs_log_level - if (!srs_directive_equals(root->get("srs_log_level"), old_root->get("srs_log_level"))) { - if ((err = do_reload_srs_log_level()) != srs_success) { - return srs_error_wrap(err, "log level");; - } - } - - // merge config: srs_log_file - if (!srs_directive_equals(root->get("srs_log_file"), old_root->get("srs_log_file"))) { - if ((err = do_reload_srs_log_file()) != srs_success) { - return srs_error_wrap(err, "log file");; - } - } - // merge config: max_connections if (!srs_directive_equals(root->get("max_connections"), old_root->get("max_connections"))) { if ((err = do_reload_max_connections()) != srs_success) { @@ -1520,13 +1499,6 @@ srs_error_t SrsConfig::reload_conf(SrsConfig* conf) } } - // merge config: utc_time - if (!srs_directive_equals(root->get("utc_time"), old_root->get("utc_time"))) { - if ((err = do_reload_utc_time()) != srs_success) { - return srs_error_wrap(err, "utc time");; - } - } - // merge config: pithy_print_ms if (!srs_directive_equals(root->get("pithy_print_ms"), old_root->get("pithy_print_ms"))) { if ((err = do_reload_pithy_print_ms()) != srs_success) { @@ -2946,78 +2918,6 @@ srs_error_t SrsConfig::raw_set_ff_log_dir(string ff_log_dir, bool& applied) return err; } -srs_error_t SrsConfig::raw_set_srs_log_tank(string srs_log_tank, bool& applied) -{ - srs_error_t err = srs_success; - - applied = false; - - SrsConfDirective* conf = root->get_or_create("srs_log_tank"); - - if (conf->arg0() == srs_log_tank) { - return err; - } - - conf->args.clear(); - conf->args.push_back(srs_log_tank); - - if ((err = do_reload_srs_log_tank()) != srs_success) { - return srs_error_wrap(err, "reload log tank"); - } - - applied = true; - - return err; -} - -srs_error_t SrsConfig::raw_set_srs_log_level(string srs_log_level, bool& applied) -{ - srs_error_t err = srs_success; - - applied = false; - - SrsConfDirective* conf = root->get_or_create("srs_log_level"); - - if (conf->arg0() == srs_log_level) { - return err; - } - - conf->args.clear(); - conf->args.push_back(srs_log_level); - - if ((err = do_reload_srs_log_level()) != srs_success) { - return srs_error_wrap(err, "reload log level"); - } - - applied = true; - - return err; -} - -srs_error_t SrsConfig::raw_set_srs_log_file(string srs_log_file, bool& applied) -{ - srs_error_t err = srs_success; - - applied = false; - - SrsConfDirective* conf = root->get_or_create("srs_log_file"); - - if (conf->arg0() == srs_log_file) { - return err; - } - - conf->args.clear(); - conf->args.push_back(srs_log_file); - - if ((err = do_reload_srs_log_file()) != srs_success) { - return srs_error_wrap(err, "reload log file"); - } - - applied = true; - - return err; -} - srs_error_t SrsConfig::raw_set_max_connections(string max_connections, bool& applied) { srs_error_t err = srs_success; @@ -3042,30 +2942,6 @@ srs_error_t SrsConfig::raw_set_max_connections(string max_connections, bool& app return err; } -srs_error_t SrsConfig::raw_set_utc_time(string utc_time, bool& applied) -{ - srs_error_t err = srs_success; - - applied = false; - - SrsConfDirective* conf = root->get_or_create("utc_time"); - - if (conf->arg0() == utc_time) { - return err; - } - - conf->args.clear(); - conf->args.push_back(utc_time); - - if ((err = do_reload_utc_time()) != srs_success) { - return srs_error_wrap(err, "reload"); - } - - applied = true; - - return err; -} - srs_error_t SrsConfig::raw_set_pithy_print_ms(string pithy_print_ms, bool& applied) { srs_error_t err = srs_success; @@ -3265,54 +3141,6 @@ srs_error_t SrsConfig::do_reload_pid() return err; } -srs_error_t SrsConfig::do_reload_srs_log_tank() -{ - srs_error_t err = srs_success; - - vector::iterator it; - for (it = subscribes.begin(); it != subscribes.end(); ++it) { - ISrsReloadHandler* subscribe = *it; - if ((err = subscribe->on_reload_log_tank()) != srs_success) { - return srs_error_wrap(err, "notify subscribes reload srs_log_tank failed"); - } - } - srs_trace("reload srs_log_tank success."); - - return err; -} - -srs_error_t SrsConfig::do_reload_srs_log_level() -{ - srs_error_t err = srs_success; - - vector::iterator it; - for (it = subscribes.begin(); it != subscribes.end(); ++it) { - ISrsReloadHandler* subscribe = *it; - if ((err = subscribe->on_reload_log_level()) != srs_success) { - return srs_error_wrap(err, "notify subscribes reload srs_log_level failed"); - } - } - srs_trace("reload srs_log_level success."); - - return err; -} - -srs_error_t SrsConfig::do_reload_srs_log_file() -{ - srs_error_t err = srs_success; - - vector::iterator it; - for (it = subscribes.begin(); it != subscribes.end(); ++it) { - ISrsReloadHandler* subscribe = *it; - if ((err = subscribe->on_reload_log_file()) != srs_success) { - return srs_error_wrap(err, "notify subscribes reload srs_log_file failed"); - } - } - srs_trace("reload srs_log_file success."); - - return err; -} - srs_error_t SrsConfig::do_reload_max_connections() { srs_error_t err = srs_success; @@ -3329,22 +3157,6 @@ srs_error_t SrsConfig::do_reload_max_connections() return err; } -srs_error_t SrsConfig::do_reload_utc_time() -{ - srs_error_t err = srs_success; - - vector::iterator it; - for (it = subscribes.begin(); it != subscribes.end(); ++it) { - ISrsReloadHandler* subscribe = *it; - if ((err = subscribe->on_reload_utc_time()) != srs_success) { - return srs_error_wrap(err, "utc_time"); - } - } - srs_trace("reload utc_time success."); - - return err; -} - srs_error_t SrsConfig::do_reload_pithy_print_ms() { srs_error_t err = srs_success; @@ -3581,7 +3393,7 @@ srs_error_t SrsConfig::check_normal_config() && n != "ff_log_level" && n != "grace_final_wait" && n != "force_grace_quit" && n != "grace_start_wait" && n != "empty_ip_ok" && n != "disable_daemon_for_docker" && n != "inotify_auto_reload" && n != "auto_reload_for_docker" && n != "tcmalloc_release_rate" - ) { + && n != "srs_log_flush_interval" && n != "threads") { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal directive %s", n.c_str()); } } @@ -4293,6 +4105,28 @@ double SrsConfig::tcmalloc_release_rate() return trr; } +srs_utime_t SrsConfig::get_threads_interval() +{ + static srs_utime_t DEFAULT = 60 * SRS_UTIME_SECONDS; + + SrsConfDirective* conf = root->get("threads"); + if (!conf) { + return DEFAULT; + } + + conf = conf->get("interval"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + int v = ::atoi(conf->arg0().c_str()); + if (v <= 0) { + return DEFAULT; + } + + return v * SRS_UTIME_SECONDS; +} + vector SrsConfig::get_stream_casters() { srs_assert(root); @@ -6967,6 +6801,23 @@ string SrsConfig::get_log_file() return conf->arg0(); } +srs_utime_t SrsConfig::srs_log_flush_interval() +{ + srs_utime_t DEFAULT = 1300 * SRS_UTIME_MILLISECONDS; + + SrsConfDirective* conf = root->get("srs_log_flush_interval"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + srs_utime_t v = ::atoi(conf->arg0().c_str()) * SRS_UTIME_MILLISECONDS; + if (v <= 0) { + return DEFAULT; + } + + return v; +} + bool SrsConfig::get_ff_log_enabled() { string log = get_ff_log_dir(); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 0ace96eb69..de7a1a7eca 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -367,16 +367,8 @@ class SrsConfig virtual srs_error_t raw_set_chunk_size(std::string chunk_size, bool& applied); // RAW set the global ffmpeg log dir. virtual srs_error_t raw_set_ff_log_dir(std::string ff_log_dir, bool& applied); - // RAW set the global log tank. - virtual srs_error_t raw_set_srs_log_tank(std::string srs_log_tank, bool& applied); - // RAW set the global log level. - virtual srs_error_t raw_set_srs_log_level(std::string srs_log_level, bool& applied); - // RAW set the global log file path for file tank. - virtual srs_error_t raw_set_srs_log_file(std::string srs_log_file, bool& applied); // RAW set the global max connections of srs. virtual srs_error_t raw_set_max_connections(std::string max_connections, bool& applied); - // RAW set the global whether use utc time. - virtual srs_error_t raw_set_utc_time(std::string utc_time, bool& applied); // RAW set the global pithy print interval in ms. virtual srs_error_t raw_set_pithy_print_ms(std::string pithy_print_ms, bool& applied); // RAW create the new vhost. @@ -396,11 +388,7 @@ class SrsConfig private: virtual srs_error_t do_reload_listen(); virtual srs_error_t do_reload_pid(); - virtual srs_error_t do_reload_srs_log_tank(); - virtual srs_error_t do_reload_srs_log_level(); - virtual srs_error_t do_reload_srs_log_file(); virtual srs_error_t do_reload_max_connections(); - virtual srs_error_t do_reload_utc_time(); virtual srs_error_t do_reload_pithy_print_ms(); virtual srs_error_t do_reload_vhost_added(std::string vhost); virtual srs_error_t do_reload_vhost_removed(std::string vhost); @@ -487,6 +475,9 @@ class SrsConfig virtual bool auto_reload_for_docker(); // For tcmalloc, get the release rate. virtual double tcmalloc_release_rate(); +// Thread pool section. +public: + virtual srs_utime_t get_threads_interval(); // stream_caster section public: // Get all stream_caster in config file. @@ -899,6 +890,8 @@ class SrsConfig virtual std::string get_log_level(); // Get the log file path. virtual std::string get_log_file(); + // Get the interval in ms to flush asyn log. + virtual srs_utime_t srs_log_flush_interval(); // Whether ffmpeg log enabled virtual bool get_ff_log_enabled(); // The ffmpeg log dir. diff --git a/trunk/src/app/srs_app_http_api.cpp b/trunk/src/app/srs_app_http_api.cpp index 449146aac6..a6de116b83 100644 --- a/trunk/src/app/srs_app_http_api.cpp +++ b/trunk/src/app/srs_app_http_api.cpp @@ -1083,36 +1083,6 @@ srs_error_t SrsGoApiRaw::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* srs_error_reset(err); return srs_api_response_code(w, r, code); } - } else if (scope == "srs_log_tank") { - if (value.empty() || (value != "file" && value != "console")) { - return srs_api_response_code(w, r, ERROR_SYSTEM_CONFIG_RAW_PARAMS); - } - - if ((err = _srs_config->raw_set_srs_log_tank(value, applied)) != srs_success) { - int code = srs_error_code(err); - srs_error_reset(err); - return srs_api_response_code(w, r, code); - } - } else if (scope == "srs_log_level") { - if (value != "verbose" && value != "info" && value != "trace" && value != "warn" && value != "error") { - return srs_api_response_code(w, r, ERROR_SYSTEM_CONFIG_RAW_PARAMS); - } - - if ((err = _srs_config->raw_set_srs_log_level(value, applied)) != srs_success) { - int code = srs_error_code(err); - srs_error_reset(err); - return srs_api_response_code(w, r, code); - } - } else if (scope == "srs_log_file") { - if (value.empty() || !srs_string_starts_with(value, "./", "/tmp/", "/var/") || !srs_string_ends_with(value, ".log")) { - return srs_api_response_code(w, r, ERROR_SYSTEM_CONFIG_RAW_PARAMS); - } - - if ((err = _srs_config->raw_set_srs_log_file(value, applied)) != srs_success) { - int code = srs_error_code(err); - srs_error_reset(err); - return srs_api_response_code(w, r, code); - } } else if (scope == "max_connections") { int mcv = ::atoi(value.c_str()); if (mcv < 10 || mcv > 65535 || !srs_is_digit_number(value)) { @@ -1124,14 +1094,6 @@ srs_error_t SrsGoApiRaw::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* srs_error_reset(err); return srs_api_response_code(w, r, code); } - } else if (scope == "utc_time") { - if (!srs_is_boolean(value)) { - return srs_api_response_code(w, r, ERROR_SYSTEM_CONFIG_RAW_PARAMS); - } - - if ((err = _srs_config->raw_set_utc_time(srs_config_bool2switch(value), applied)) != srs_success) { - return srs_api_response_code(w, r, srs_error_wrap(err, "raw api update utc_time=%s", value.c_str())); - } } else if (scope == "pithy_print_ms") { int ppmv = ::atoi(value.c_str()); if (ppmv < 100 || ppmv > 300000 || !srs_is_digit_number(value)) { diff --git a/trunk/src/app/srs_app_log.cpp b/trunk/src/app/srs_app_log.cpp index 9ef9f55714..a869b7015f 100644 --- a/trunk/src/app/srs_app_log.cpp +++ b/trunk/src/app/srs_app_log.cpp @@ -30,11 +30,13 @@ #include #include #include +#include #include #include #include #include +#include // the max size of a line of log. #define LOG_MAX_SIZE 8192 @@ -47,51 +49,43 @@ SrsFileLog::SrsFileLog() { level = SrsLogLevelTrace; - log_data = new char[LOG_MAX_SIZE]; - - fd = -1; log_to_file_tank = false; utc = false; + + log_data = new char[LOG_MAX_SIZE]; + writer_ = NULL; } SrsFileLog::~SrsFileLog() { srs_freepa(log_data); - - if (fd > 0) { - ::close(fd); - fd = -1; - } - - if (_srs_config) { - _srs_config->unsubscribe(this); - } } srs_error_t SrsFileLog::initialize() { + srs_error_t err = srs_success; + if (_srs_config) { - _srs_config->subscribe(this); - log_to_file_tank = _srs_config->get_log_tank_file(); + filename_ = _srs_config->get_log_file(); level = srs_get_log_level(_srs_config->get_log_level()); utc = _srs_config->get_utc_time(); } - - return srs_success; -} -void SrsFileLog::reopen() -{ - if (fd > 0) { - ::close(fd); - } - if (!log_to_file_tank) { - return; + return err; + } + + if (filename_.empty()) { + return srs_error_new(ERROR_SYSTEM_LOGFILE, "no log filename"); + } + + // We only use the log writer, which is managed by another thread. + if ((err = _srs_async_log->create_writer(filename_, &writer_)) != srs_success) { + return srs_error_wrap(err, "create async writer for %s", filename_.c_str()); } - open_log_file(); + return err; } void SrsFileLog::verbose(const char* tag, SrsContextId context_id, const char* fmt, ...) @@ -111,7 +105,7 @@ void SrsFileLog::verbose(const char* tag, SrsContextId context_id, const char* f size += vsnprintf(log_data + size, LOG_MAX_SIZE - size, fmt, ap); va_end(ap); - write_log(fd, log_data, size, SrsLogLevelVerbose); + write_log(log_data, size, SrsLogLevelVerbose); } void SrsFileLog::info(const char* tag, SrsContextId context_id, const char* fmt, ...) @@ -131,7 +125,7 @@ void SrsFileLog::info(const char* tag, SrsContextId context_id, const char* fmt, size += vsnprintf(log_data + size, LOG_MAX_SIZE - size, fmt, ap); va_end(ap); - write_log(fd, log_data, size, SrsLogLevelInfo); + write_log(log_data, size, SrsLogLevelInfo); } void SrsFileLog::trace(const char* tag, SrsContextId context_id, const char* fmt, ...) @@ -151,7 +145,7 @@ void SrsFileLog::trace(const char* tag, SrsContextId context_id, const char* fmt size += vsnprintf(log_data + size, LOG_MAX_SIZE - size, fmt, ap); va_end(ap); - write_log(fd, log_data, size, SrsLogLevelTrace); + write_log(log_data, size, SrsLogLevelTrace); } void SrsFileLog::warn(const char* tag, SrsContextId context_id, const char* fmt, ...) @@ -171,7 +165,7 @@ void SrsFileLog::warn(const char* tag, SrsContextId context_id, const char* fmt, size += vsnprintf(log_data + size, LOG_MAX_SIZE - size, fmt, ap); va_end(ap); - write_log(fd, log_data, size, SrsLogLevelWarn); + write_log(log_data, size, SrsLogLevelWarn); } void SrsFileLog::error(const char* tag, SrsContextId context_id, const char* fmt, ...) @@ -197,78 +191,13 @@ void SrsFileLog::error(const char* tag, SrsContextId context_id, const char* fmt size += snprintf(log_data + size, LOG_MAX_SIZE - size, "(%s)", strerror(errno)); } - write_log(fd, log_data, size, SrsLogLevelError); -} - -srs_error_t SrsFileLog::on_reload_utc_time() -{ - utc = _srs_config->get_utc_time(); - - return srs_success; -} - -srs_error_t SrsFileLog::on_reload_log_tank() -{ - srs_error_t err = srs_success; - - if (!_srs_config) { - return err; - } - - bool tank = log_to_file_tank; - log_to_file_tank = _srs_config->get_log_tank_file(); - - if (tank) { - return err; - } - - if (!log_to_file_tank) { - return err; - } - - if (fd > 0) { - ::close(fd); - } - open_log_file(); - - return err; + write_log(log_data, size, SrsLogLevelError); } -srs_error_t SrsFileLog::on_reload_log_level() +void SrsFileLog::write_log(char *str_log, int size, int level) { srs_error_t err = srs_success; - - if (!_srs_config) { - return err; - } - - level = srs_get_log_level(_srs_config->get_log_level()); - - return err; -} -srs_error_t SrsFileLog::on_reload_log_file() -{ - srs_error_t err = srs_success; - - if (!_srs_config) { - return err; - } - - if (!log_to_file_tank) { - return err; - } - - if (fd > 0) { - ::close(fd); - } - open_log_file(); - - return err; -} - -void SrsFileLog::write_log(int& fd, char *str_log, int size, int level) -{ // ensure the tail and EOF of string // LOG_TAIL_SIZE for the TAIL char. // 1 for the last char(0). @@ -296,32 +225,9 @@ void SrsFileLog::write_log(int& fd, char *str_log, int size, int level) return; } - // open log file. if specified - if (fd < 0) { - open_log_file(); - } - // write log to file. - if (fd > 0) { - ::write(fd, str_log, size); + if ((err = writer_->write(str_log, size, NULL)) != srs_success) { + srs_error_reset(err); // Ignore any error for log writing. } } -void SrsFileLog::open_log_file() -{ - if (!_srs_config) { - return; - } - - std::string filename = _srs_config->get_log_file(); - - if (filename.empty()) { - return; - } - - fd = ::open(filename.c_str(), - O_RDWR | O_CREAT | O_APPEND, - S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH - ); -} - diff --git a/trunk/src/app/srs_app_log.hpp b/trunk/src/app/srs_app_log.hpp index e6b82c4eb3..ba0c951833 100644 --- a/trunk/src/app/srs_app_log.hpp +++ b/trunk/src/app/srs_app_log.hpp @@ -32,6 +32,8 @@ #include #include +class SrsAsyncFileWriter; + // For log TAGs. #define TAG_MAIN "MAIN" #define TAG_MAYBE "MAYBE" @@ -45,15 +47,18 @@ // when you want to use different level, override this classs, set the protected _level. class SrsFileLog : public ISrsLog, public ISrsReloadHandler { +private: + // Shared cache for each line of log. + char* log_data; + // Async file writer. + SrsAsyncFileWriter* writer_; private: // Defined in SrsLogLevel. SrsLogLevel level; -private: - char* log_data; - // Log to file if specified srs_log_file - int fd; // Whether log to file tank bool log_to_file_tank; + // If log to file, the log filename. + std::string filename_; // Whether use utc time. bool utc; public: @@ -62,21 +67,13 @@ class SrsFileLog : public ISrsLog, public ISrsReloadHandler // Interface ISrsLog public: virtual srs_error_t initialize(); - virtual void reopen(); virtual void verbose(const char* tag, SrsContextId context_id, const char* fmt, ...); virtual void info(const char* tag, SrsContextId context_id, const char* fmt, ...); virtual void trace(const char* tag, SrsContextId context_id, const char* fmt, ...); virtual void warn(const char* tag, SrsContextId context_id, const char* fmt, ...); virtual void error(const char* tag, SrsContextId context_id, const char* fmt, ...); -// Interface ISrsReloadHandler. -public: - virtual srs_error_t on_reload_utc_time(); - virtual srs_error_t on_reload_log_tank(); - virtual srs_error_t on_reload_log_level(); - virtual srs_error_t on_reload_log_file(); private: - virtual void write_log(int& fd, char* str_log, int size, int level); - virtual void open_log_file(); + virtual void write_log(char* str_log, int size, int level); }; #endif diff --git a/trunk/src/app/srs_app_reload.cpp b/trunk/src/app/srs_app_reload.cpp index 5203617fbe..83cb72fb50 100644 --- a/trunk/src/app/srs_app_reload.cpp +++ b/trunk/src/app/srs_app_reload.cpp @@ -40,11 +40,6 @@ srs_error_t ISrsReloadHandler::on_reload_listen() return srs_success; } -srs_error_t ISrsReloadHandler::on_reload_utc_time() -{ - return srs_success; -} - srs_error_t ISrsReloadHandler::on_reload_max_conns() { return srs_success; @@ -55,21 +50,6 @@ srs_error_t ISrsReloadHandler::on_reload_pid() return srs_success; } -srs_error_t ISrsReloadHandler::on_reload_log_tank() -{ - return srs_success; -} - -srs_error_t ISrsReloadHandler::on_reload_log_level() -{ - return srs_success; -} - -srs_error_t ISrsReloadHandler::on_reload_log_file() -{ - return srs_success; -} - srs_error_t ISrsReloadHandler::on_reload_pithy_print() { return srs_success; diff --git a/trunk/src/app/srs_app_reload.hpp b/trunk/src/app/srs_app_reload.hpp index bf277f9ddd..0ccf6e7b81 100644 --- a/trunk/src/app/srs_app_reload.hpp +++ b/trunk/src/app/srs_app_reload.hpp @@ -39,13 +39,9 @@ class ISrsReloadHandler ISrsReloadHandler(); virtual ~ISrsReloadHandler(); public: - virtual srs_error_t on_reload_utc_time(); virtual srs_error_t on_reload_max_conns(); virtual srs_error_t on_reload_listen(); virtual srs_error_t on_reload_pid(); - virtual srs_error_t on_reload_log_tank(); - virtual srs_error_t on_reload_log_level(); - virtual srs_error_t on_reload_log_file(); virtual srs_error_t on_reload_pithy_print(); virtual srs_error_t on_reload_http_api_enabled(); virtual srs_error_t on_reload_http_api_disabled(); diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 99d76df1e2..c014b51b29 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -40,6 +40,7 @@ using namespace std; #include #include #include +#include #include #include #include @@ -1132,7 +1133,7 @@ void SrsServer::on_signal(int signo) #ifndef SRS_GPERF_MC if (signo == SRS_SIGNAL_REOPEN_LOG) { - _srs_log->reopen(); + _srs_async_log->reopen(); if (handler) { handler->on_logrotate(); diff --git a/trunk/src/app/srs_app_threads.cpp b/trunk/src/app/srs_app_threads.cpp index 9cb06301d9..b28916a0bc 100644 --- a/trunk/src/app/srs_app_threads.cpp +++ b/trunk/src/app/srs_app_threads.cpp @@ -27,6 +27,7 @@ #include #include #include +#include #include @@ -90,8 +91,20 @@ SrsThreadPool::SrsThreadPool() { entry_ = NULL; lock_ = new SrsThreadMutex(); + + // Add primordial thread, current thread itself. + SrsThreadEntry* entry = new SrsThreadEntry(); + threads_.push_back(entry); + entry_ = entry; + + entry->pool = this; + entry->label = "primordial"; + entry->start = NULL; + entry->arg = NULL; + entry->num = 1; } +// TODO: FIMXE: If free the pool, we should stop all threads. SrsThreadPool::~SrsThreadPool() { srs_freep(lock_); @@ -106,18 +119,12 @@ srs_error_t SrsThreadPool::initialize() return srs_error_wrap(err, "initialize st failed"); } - // Add primordial thread, current thread itself. - SrsThreadEntry* entry = new SrsThreadEntry(); - threads_.push_back(entry); - entry_ = entry; - - entry->pool = this; - entry->label = "primordial"; - entry->start = NULL; - entry->arg = NULL; - entry->num = 1; + if ((err = _srs_async_log->initialize()) != srs_success) { + return srs_error_wrap(err, "init async log"); + } - srs_trace("Thread #%d: %s init", entry_->num, entry_->label.c_str()); + interval_ = _srs_config->get_threads_interval(); + srs_trace("Thread #%d(%s): init interval=%dms", entry_->num, entry_->label.c_str(), srsu2msi(interval_)); return err; } @@ -163,9 +170,11 @@ srs_error_t SrsThreadPool::run() srs_error_t err = srs_success; while (true) { - srs_trace("Thread #%d: %s run, threads=%d", entry_->num, entry_->label.c_str(), - (int)threads_.size()); - sleep(60); + string async_logs = _srs_async_log->desc(); + srs_trace("Thread #%d(%s): cycle threads=%d%s", entry_->num, entry_->label.c_str(), (int)threads_.size(), + async_logs.c_str()); + + sleep(interval_ / SRS_UTIME_SECONDS); } return err; @@ -181,7 +190,7 @@ void* SrsThreadPool::start(void* arg) srs_error_t err = srs_success; SrsThreadEntry* entry = (SrsThreadEntry*)arg; - srs_trace("Thread #%d: %s run", entry->num, entry->label.c_str()); + srs_trace("Thread #%d(%s): run", entry->num, entry->label.c_str()); if ((err = entry->start(entry->arg)) != srs_success) { entry->err = err; @@ -192,3 +201,238 @@ void* SrsThreadPool::start(void* arg) } SrsThreadPool* _srs_thread_pool = new SrsThreadPool(); + +SrsAsyncFileWriter::SrsAsyncFileWriter(std::string p) +{ + filename_ = p; + lock_ = new SrsThreadMutex(); + writer_ = new SrsFileWriter(); +} + +// TODO: FIXME: Before free the writer, we must remove it from the manager. +SrsAsyncFileWriter::~SrsAsyncFileWriter() +{ + srs_freep(writer_); + srs_freep(lock_); + + // TODO: FIXME: Should we flush these logs? + for (int i = 0; i < (int)dirty_.size(); i++) { + SrsSharedPtrMessage* msg = dirty_.at(i); + srs_freep(msg); + } +} + +srs_error_t SrsAsyncFileWriter::open() +{ + return writer_->open(filename_); +} + +srs_error_t SrsAsyncFileWriter::open_append() +{ + return writer_->open_append(filename_); +} + +void SrsAsyncFileWriter::close() +{ + writer_->close(); +} + +srs_error_t SrsAsyncFileWriter::write(void* buf, size_t count, ssize_t* pnwrite) +{ + srs_error_t err = srs_success; + + if (count <= 0) { + return err; + } + + char* cp = new char[count]; + memcpy(cp, buf, count); + + SrsSharedPtrMessage* msg = new SrsSharedPtrMessage(); + msg->wrap(cp, count); + + if (true) { + SrsThreadLocker(lock_); + dirty_.push_back(msg); + } + + if (pnwrite) { + *pnwrite = count; + } + + return err; +} + +srs_error_t SrsAsyncFileWriter::writev(const iovec* iov, int iovcnt, ssize_t* pnwrite) +{ + srs_error_t err = srs_success; + + for (int i = 0; i < iovcnt; i++) { + const iovec* p = iov + i; + + ssize_t nn = 0; + if ((err = write(p->iov_base, p->iov_len, &nn)) != srs_success) { + return srs_error_wrap(err, "write %d iov %d bytes", i, p->iov_len); + } + + if (pnwrite) { + *pnwrite += nn; + } + } + + return err; +} + +srs_error_t SrsAsyncFileWriter::flush() +{ + srs_error_t err = srs_success; + + vector flying; + if (true) { + SrsThreadLocker(lock_); + dirty_.swap(flying); + } + + for (int i = 0; i < (int)flying.size(); i++) { + SrsSharedPtrMessage* msg = flying.at(i); + + srs_error_t r0 = writer_->write(msg->payload, msg->size, NULL); + + // Choose a random error to return. + if (err == srs_success) { + err = r0; + } else { + srs_freep(r0); + } + + srs_freep(msg); + } + + return err; +} + +SrsAsyncLogManager::SrsAsyncLogManager() +{ + interval_ = 0; + reopen_ = false; + lock_ = new SrsThreadMutex(); +} + +// TODO: FIXME: We should stop the thread first, then free the manager. +SrsAsyncLogManager::~SrsAsyncLogManager() +{ + srs_freep(lock_); + + for (int i = 0; i < (int)writers_.size(); i++) { + SrsAsyncFileWriter* writer = writers_.at(i); + srs_freep(writer); + } +} + +srs_error_t SrsAsyncLogManager::initialize() +{ + srs_error_t err = srs_success; + + interval_ = _srs_config->srs_log_flush_interval(); + if (interval_ <= 0) { + return srs_error_new(ERROR_SYSTEM_LOGFILE, "invalid interval=%dms", srsu2msi(interval_)); + } + + if ((err = _srs_thread_pool->execute("log", SrsAsyncLogManager::start, this)) != srs_success) { + return srs_error_wrap(err, "run async log"); + } + + return err; +} + +srs_error_t SrsAsyncLogManager::create_writer(std::string filename, SrsAsyncFileWriter** ppwriter) +{ + srs_error_t err = srs_success; + + SrsAsyncFileWriter* writer = new SrsAsyncFileWriter(filename); + writers_.push_back(writer); + + if ((err = writer->open()) != srs_success) { + return srs_error_wrap(err, "open file %s fail", filename.c_str()); + } + + *ppwriter = writer; + return err; +} + +void SrsAsyncLogManager::reopen() +{ + SrsThreadLocker(lock_); + reopen_ = true; +} + +std::string SrsAsyncLogManager::desc() +{ + SrsThreadLocker(lock_); + + int nn_logs = 0; + int max_logs = 0; + for (int i = 0; i < (int)writers_.size(); i++) { + SrsAsyncFileWriter* writer = writers_.at(i); + + nn_logs += (int)writer->dirty_.size(); + max_logs = srs_max(max_logs, (int)writer->dirty_.size()); + } + + static char buf[128]; + snprintf(buf, sizeof(buf), ", files=%d, queue=%d, max=%d", (int)writers_.size(), nn_logs, max_logs); + return buf; +} + +srs_error_t SrsAsyncLogManager::start(void* arg) +{ + SrsAsyncLogManager* log = (SrsAsyncLogManager*)arg; + return log->do_start(); +} + +srs_error_t SrsAsyncLogManager::do_start() +{ + srs_error_t err = srs_success; + + // Never quit for this thread. + while (true) { + // Reopen all log files. + if (reopen_) { + SrsThreadLocker(lock_); + reopen_ = false; + + for (int i = 0; i < (int)writers_.size(); i++) { + SrsAsyncFileWriter* writer = writers_.at(i); + + writer->close(); + if ((err = writer->open()) != srs_success) { + srs_error_reset(err); // Ignore any error for reopen logs. + } + } + } + + // Flush all logs from cache to disk. + if (true) { + SrsThreadLocker(lock_); + + for (int i = 0; i < (int)writers_.size(); i++) { + SrsAsyncFileWriter* writer = writers_.at(i); + + if ((err = writer->flush()) != srs_success) { + srs_error_reset(err); // Ignore any error for flushing logs. + } + } + } + + // We use the system primordial sleep, not the ST sleep, because + // this is a system thread, not a coroutine. + timespec tv = {0}; + tv.tv_sec = interval_ / SRS_UTIME_SECONDS; + tv.tv_nsec = (interval_ % SRS_UTIME_MILLISECONDS) * 1000; + nanosleep(&tv, NULL); + } + + return err; +} + +SrsAsyncLogManager* _srs_async_log = new SrsAsyncLogManager(); diff --git a/trunk/src/app/srs_app_threads.hpp b/trunk/src/app/srs_app_threads.hpp index 6458e01f03..f53ea0bd11 100644 --- a/trunk/src/app/srs_app_threads.hpp +++ b/trunk/src/app/srs_app_threads.hpp @@ -26,6 +26,9 @@ #include +#include +#include + #include #include @@ -90,6 +93,7 @@ class SrsThreadPool { private: SrsThreadEntry* entry_; + srs_utime_t interval_; private: SrsThreadMutex* lock_; std::vector threads_; @@ -112,4 +116,65 @@ class SrsThreadPool // The global thread pool. extern SrsThreadPool* _srs_thread_pool; +// Async file writer. +class SrsAsyncFileWriter : public ISrsWriter +{ + friend class SrsAsyncLogManager; +private: + std::string filename_; + SrsFileWriter* writer_; + std::vector dirty_; + SrsThreadMutex* lock_; +private: + SrsAsyncFileWriter(std::string p); + virtual ~SrsAsyncFileWriter(); +public: + // Open file writer, in truncate mode. + virtual srs_error_t open(); + // Open file writer, in append mode. + virtual srs_error_t open_append(); + // Close current writer. + virtual void close(); +// Interface ISrsWriteSeeker +public: + virtual srs_error_t write(void* buf, size_t count, ssize_t* pnwrite); + virtual srs_error_t writev(const iovec* iov, int iovcnt, ssize_t* pnwrite); +public: + // Flush by other thread. + srs_error_t flush(); +}; + +// The async log file writer manager, use a thread to flush multiple writers, +// and reopen all log files when got LOGROTATE signal. +class SrsAsyncLogManager +{ +private: + // The async flush interval. + srs_utime_t interval_; +private: + // The async reopen event. + bool reopen_; +private: + SrsThreadMutex* lock_; + std::vector writers_; +public: + SrsAsyncLogManager(); + virtual ~SrsAsyncLogManager(); +public: + // Initialize the async log manager. + srs_error_t initialize(); + // Create a managed writer, user should never free it. + srs_error_t create_writer(std::string filename, SrsAsyncFileWriter** ppwriter); + // Reopen all log files, asynchronously. + virtual void reopen(); +public: + std::string desc(); +private: + static srs_error_t start(void* arg); + srs_error_t do_start(); +}; + +// The global async log manager. +extern SrsAsyncLogManager* _srs_async_log; + #endif diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index 225d414097..15a427b971 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -119,6 +119,7 @@ #define ERROR_SOCKET_SETCLOSEEXEC 1080 #define ERROR_SOCKET_ACCEPT 1081 #define ERROR_THREAD_CREATE 1082 +#define ERROR_SYSTEM_LOGFILE 1083 /////////////////////////////////////////////////////// // RTMP protocol error. diff --git a/trunk/src/kernel/srs_kernel_log.hpp b/trunk/src/kernel/srs_kernel_log.hpp index ca1cd9cd44..fe18f19ea0 100644 --- a/trunk/src/kernel/srs_kernel_log.hpp +++ b/trunk/src/kernel/srs_kernel_log.hpp @@ -61,8 +61,6 @@ class ISrsLog public: // Initialize log utilities. virtual srs_error_t initialize() = 0; - // Reopen the log file for log rotate. - virtual void reopen() = 0; public: // The log for verbose, very verbose information. virtual void verbose(const char* tag, SrsContextId context_id, const char* fmt, ...) = 0; diff --git a/trunk/src/protocol/srs_service_log.cpp b/trunk/src/protocol/srs_service_log.cpp index 4bf559f355..39356b202d 100644 --- a/trunk/src/protocol/srs_service_log.cpp +++ b/trunk/src/protocol/srs_service_log.cpp @@ -134,10 +134,6 @@ srs_error_t SrsConsoleLog::initialize() return srs_success; } -void SrsConsoleLog::reopen() -{ -} - void SrsConsoleLog::verbose(const char* tag, SrsContextId context_id, const char* fmt, ...) { if (level > SrsLogLevelVerbose) { diff --git a/trunk/src/protocol/srs_service_log.hpp b/trunk/src/protocol/srs_service_log.hpp index 6a4e145687..fdb9852b14 100644 --- a/trunk/src/protocol/srs_service_log.hpp +++ b/trunk/src/protocol/srs_service_log.hpp @@ -76,7 +76,6 @@ class SrsConsoleLog : public ISrsLog // Interface ISrsLog public: virtual srs_error_t initialize(); - virtual void reopen(); virtual void verbose(const char* tag, SrsContextId context_id, const char* fmt, ...); virtual void info(const char* tag, SrsContextId context_id, const char* fmt, ...); virtual void trace(const char* tag, SrsContextId context_id, const char* fmt, ...);