diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf index 7290d08703..419c5bb802 100644 --- a/trunk/conf/full.conf +++ b/trunk/conf/full.conf @@ -41,9 +41,11 @@ srs_log_level trace; # when srs_log_tank is file, specifies the log file. # default: ./objs/srs.log srs_log_file ./objs/srs.log; -# The interval in ms, to flush async log. +# The interval in ms, to flush async log. Generally, we flush from +# coroutine-queue to thread-queue, then from thread-queue to disk. +# So the delay of logs might be 2*srs_log_flush_interval. # Default: 1300 -srs_log_async_interval 1300; +srs_log_flush_interval 1300; # the max connections. # if exceed the max connections, server will drop the new connection. # default: 1000 diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 36572a2742..741d0c556b 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -892,7 +892,7 @@ class SrsConfig virtual std::string get_log_level(); // Get the log file path. virtual std::string get_log_file(); - // Get the interval in ms to flush asyn log. + // Get the interval in ms to flush async log. virtual srs_utime_t srs_log_flush_interval(); // Whether ffmpeg log enabled virtual bool get_ff_log_enabled(); diff --git a/trunk/src/app/srs_app_log.cpp b/trunk/src/app/srs_app_log.cpp index af57b706ba..620833a270 100644 --- a/trunk/src/app/srs_app_log.cpp +++ b/trunk/src/app/srs_app_log.cpp @@ -61,6 +61,7 @@ SrsFileLog::~SrsFileLog() srs_freepa(log_data); } +// @remark Note that we should never write logs, because log is not ready not. srs_error_t SrsFileLog::initialize() { srs_error_t err = srs_success; diff --git a/trunk/src/app/srs_app_threads.cpp b/trunk/src/app/srs_app_threads.cpp index 789bf5c8b4..fa278b24e1 100644 --- a/trunk/src/app/srs_app_threads.cpp +++ b/trunk/src/app/srs_app_threads.cpp @@ -110,6 +110,7 @@ SrsThreadPool::~SrsThreadPool() srs_freep(lock_); } +// @remark Note that we should never write logs, because log is not ready not. srs_error_t SrsThreadPool::initialize() { srs_error_t err = srs_success; @@ -119,12 +120,7 @@ srs_error_t SrsThreadPool::initialize() return srs_error_wrap(err, "initialize st failed"); } - if ((err = _srs_async_log->initialize()) != srs_success) { - return srs_error_wrap(err, "init async log"); - } - interval_ = _srs_config->get_threads_interval(); - srs_trace("Thread #%d(%s): init interval=%dms", entry_->num, entry_->label.c_str(), srsu2msi(interval_)); return err; } @@ -169,12 +165,15 @@ srs_error_t SrsThreadPool::run() { srs_error_t err = srs_success; + // Write the init log here. + srs_trace("Thread #%d(%s): init interval=%dms", entry_->num, entry_->label.c_str(), srsu2msi(interval_)); + while (true) { + sleep(interval_ / SRS_UTIME_SECONDS); + string async_logs = _srs_async_log->description(); srs_trace("Thread #%d(%s): cycle threads=%d%s", entry_->num, entry_->label.c_str(), (int)threads_.size(), async_logs.c_str()); - - sleep(interval_ / SRS_UTIME_SECONDS); } return err; @@ -202,11 +201,14 @@ void* SrsThreadPool::start(void* arg) SrsThreadPool* _srs_thread_pool = new SrsThreadPool(); -SrsAsyncFileWriter::SrsAsyncFileWriter(std::string p) +SrsAsyncFileWriter::SrsAsyncFileWriter(std::string p, srs_utime_t interval) { filename_ = p; writer_ = new SrsFileWriter(); queue_ = new SrsThreadQueue(); + co_queue_ = new SrsCoroutineQueue(); + interval_ = interval; + last_flush_time_ = srs_get_system_time(); } // TODO: FIXME: Before free the writer, we must remove it from the manager. @@ -215,6 +217,7 @@ SrsAsyncFileWriter::~SrsAsyncFileWriter() // TODO: FIXME: Should we flush dirty logs? srs_freep(writer_); srs_freep(queue_); + srs_freep(co_queue_); } srs_error_t SrsAsyncFileWriter::open() @@ -246,7 +249,16 @@ srs_error_t SrsAsyncFileWriter::write(void* buf, size_t count, ssize_t* pnwrite) SrsSharedPtrMessage* msg = new SrsSharedPtrMessage(); msg->wrap(cp, count); - queue_->push_back(msg); + co_queue_->push_back(msg); + + // Whether flush to thread-queue. + if (srs_get_system_time() - last_flush_time_ >= interval_) { + last_flush_time_ = srs_get_system_time(); + + vector flying; + co_queue_->swap(flying); + queue_->push_back(flying); + } if (pnwrite) { *pnwrite = count; @@ -303,6 +315,7 @@ srs_error_t SrsAsyncFileWriter::flush() SrsAsyncLogManager::SrsAsyncLogManager() { interval_ = 0; + reopen_ = false; lock_ = new SrsThreadMutex(); } @@ -318,6 +331,7 @@ SrsAsyncLogManager::~SrsAsyncLogManager() } } +// @remark Note that we should never write logs, because log is not ready not. srs_error_t SrsAsyncLogManager::initialize() { srs_error_t err = srs_success; @@ -327,10 +341,20 @@ srs_error_t SrsAsyncLogManager::initialize() return srs_error_new(ERROR_SYSTEM_LOGFILE, "invalid interval=%dms", srsu2msi(interval_)); } + return err; +} + +// @remark Now, log is ready, and we can print logs. +srs_error_t SrsAsyncLogManager::run() +{ + srs_error_t err = srs_success; + if ((err = _srs_thread_pool->execute("log", SrsAsyncLogManager::start, this)) != srs_success) { return srs_error_wrap(err, "run async log"); } + srs_trace("AsyncLogs: Init flush_interval=%dms", srsu2msi(interval_)); + return err; } @@ -338,7 +362,7 @@ srs_error_t SrsAsyncLogManager::create_writer(std::string filename, SrsAsyncFile { srs_error_t err = srs_success; - SrsAsyncFileWriter* writer = new SrsAsyncFileWriter(filename); + SrsAsyncFileWriter* writer = new SrsAsyncFileWriter(filename, interval_); writers_.push_back(writer); if ((err = writer->open()) != srs_success) { @@ -369,8 +393,20 @@ std::string SrsAsyncLogManager::description() max_logs = srs_max(max_logs, nn); } + int nn_co_logs = 0; + int max_co_logs = 0; + for (int i = 0; i < (int)writers_.size(); i++) { + SrsAsyncFileWriter* writer = writers_.at(i); + + int nn = (int)writer->co_queue_->size(); + nn_co_logs += nn; + max_co_logs = srs_max(max_co_logs, nn); + } + static char buf[128]; - snprintf(buf, sizeof(buf), ", files=%d, queue=%d/%d", (int)writers_.size(), nn_logs, max_logs); + snprintf(buf, sizeof(buf), ", files=%d, queue=%d/%d, coq=%d/%d", + (int)writers_.size(), nn_logs, max_logs, nn_co_logs, max_co_logs); + return buf; } diff --git a/trunk/src/app/srs_app_threads.hpp b/trunk/src/app/srs_app_threads.hpp index ff0c757692..624d94e95e 100644 --- a/trunk/src/app/srs_app_threads.hpp +++ b/trunk/src/app/srs_app_threads.hpp @@ -116,6 +116,37 @@ class SrsThreadPool // The global thread pool. extern SrsThreadPool* _srs_thread_pool; +// We use coroutine queue to collect messages from different coroutines, +// then swap to the SrsThreadQueue and process by another thread. +template +class SrsCoroutineQueue +{ +private: + std::vector dirty_; +public: + SrsCoroutineQueue() { + } + virtual ~SrsCoroutineQueue() { + for (int i = 0; i < (int)dirty_.size(); i++) { + T* msg = dirty_.at(i); + srs_freep(msg); + } + } +public: + // SrsCoroutineQueue::push_back + void push_back(T* msg) { + dirty_.push_back(msg); + } + // SrsCoroutineQueue::swap + void swap(std::vector& flying) { + dirty_.swap(flying); + } + // SrsCoroutineQueue::size + size_t size() { + return dirty_.size(); + } +}; + // Thread-safe queue. template class SrsThreadQueue @@ -142,6 +173,11 @@ class SrsThreadQueue SrsThreadLocker(lock_); dirty_.push_back(msg); } + // SrsThreadQueue::push_back + void push_back(std::vector& flying) { + SrsThreadLocker(lock_); + dirty_.insert(dirty_.end(), flying.begin(), flying.end()); + } // SrsThreadQueue::swap void swap(std::vector& flying) { SrsThreadLocker(lock_); @@ -161,9 +197,18 @@ class SrsAsyncFileWriter : public ISrsWriter private: std::string filename_; SrsFileWriter* writer_; +private: + // The thread-queue, to flush to disk by dedicated thread. SrsThreadQueue* queue_; private: - SrsAsyncFileWriter(std::string p); + // The interval to flush from coroutine-queue to thread-queue. + srs_utime_t interval_; + // Last flush coroutine-queue time, to calculate the timeout. + srs_utime_t last_flush_time_; + // The coroutine-queue, to avoid requires lock for each log. + SrsCoroutineQueue* co_queue_; +private: + SrsAsyncFileWriter(std::string p, srs_utime_t interval); virtual ~SrsAsyncFileWriter(); public: // Open file writer, in truncate mode. @@ -188,6 +233,8 @@ class SrsAsyncLogManager private: // The async flush interval. srs_utime_t interval_; + // The number of logs to flush from coroutine-queue to thread-queue. + int flush_co_queue_; private: // The async reopen event. bool reopen_; @@ -200,6 +247,8 @@ class SrsAsyncLogManager public: // Initialize the async log manager. srs_error_t initialize(); + // Run the async log manager thread. + srs_error_t run(); // Create a managed writer, user should never free it. srs_error_t create_writer(std::string filename, SrsAsyncFileWriter** ppwriter); // Reopen all log files, asynchronously. diff --git a/trunk/src/main/srs_main_server.cpp b/trunk/src/main/srs_main_server.cpp index 2cbb846f0b..5a88584360 100644 --- a/trunk/src/main/srs_main_server.cpp +++ b/trunk/src/main/srs_main_server.cpp @@ -131,6 +131,11 @@ srs_error_t do_main(int argc, char** argv) if ((err = _srs_config->initialize_cwd()) != srs_success) { return srs_error_wrap(err, "config cwd"); } + + // We must initialize the async log manager before log init. + if ((err = _srs_async_log->initialize()) != srs_success) { + return srs_error_wrap(err, "init async log"); + } // config parsed, initialize log. if ((err = _srs_log->initialize()) != srs_success) { @@ -469,6 +474,13 @@ srs_error_t run_in_thread_pool() return srs_error_wrap(err, "init thread pool"); } + // After all init(log, async log manager, thread pool), now we can start to + // run the log manager thread. + if ((err = _srs_async_log->run()) != srs_success) { + return srs_error_wrap(err, "run async log"); + } + + // Start the service worker thread, for RTMP and RTC server, etc. if ((err = _srs_thread_pool->execute("hybrid", run_hybrid_server, NULL)) != srs_success) { return srs_error_wrap(err, "run hybrid server"); }