From fb0b784500c145c357f8a5b45c6074fb89619869 Mon Sep 17 00:00:00 2001 From: winlin Date: Sun, 14 Mar 2021 18:43:54 +0800 Subject: [PATCH] For #2188, Remove dual queue for sys logs. 1. It exists delay for multiple threads. 2. There is overlay for cache of coroutine queues. 3. Risk when other threads write logs. --- trunk/src/app/srs_app_log.cpp | 31 +++++++++++------ trunk/src/app/srs_app_log.hpp | 5 --- trunk/src/app/srs_app_threads.cpp | 57 ++++--------------------------- trunk/src/app/srs_app_threads.hpp | 16 +-------- 4 files changed, 29 insertions(+), 80 deletions(-) diff --git a/trunk/src/app/srs_app_log.cpp b/trunk/src/app/srs_app_log.cpp index c96e31603d7..c90ef8eda7e 100644 --- a/trunk/src/app/srs_app_log.cpp +++ b/trunk/src/app/srs_app_log.cpp @@ -38,6 +38,13 @@ #include #include +#include + +SrsPps* _srs_thread_sync_10us = new SrsPps(); +SrsPps* _srs_thread_sync_100us = new SrsPps(); +SrsPps* _srs_thread_sync_1000us = new SrsPps(); +SrsPps* _srs_thread_sync_plus = new SrsPps(); + // the max size of a line of log. #define LOG_MAX_SIZE 8192 @@ -54,9 +61,6 @@ SrsFileLog::SrsFileLog() log_data = new char[LOG_MAX_SIZE]; writer_ = NULL; - - last_flush_time_ = srs_get_system_time(); - interval_ = 0; } SrsFileLog::~SrsFileLog() @@ -74,7 +78,6 @@ srs_error_t SrsFileLog::initialize() filename_ = _srs_config->get_log_file(); level = srs_get_log_level(_srs_config->get_log_level()); utc = _srs_config->get_utc_time(); - interval_ = _srs_config->srs_log_flush_interval(); } if (!log_to_file_tank) { @@ -231,17 +234,25 @@ void SrsFileLog::write_log(char *str_log, int size, int level) return; } - + + // It's ok to use cache, because it has been updated in generating log header. + srs_utime_t now = srs_get_system_time(); + // write log to file. if ((err = writer_->write(str_log, size, NULL)) != srs_success) { srs_error_reset(err); // Ignore any error for log writing. } - // Whether flush to thread-queue. - srs_utime_t diff = srs_get_system_time() - last_flush_time_; - if (diff >= interval_) { - last_flush_time_ = srs_get_system_time(); - writer_->flush_co_queue(); + // Stat the sync wait of locks. + srs_utime_t elapsed = srs_update_system_time() - now; + if (elapsed <= 10) { + ++_srs_thread_sync_10us->sugar; + } else if (elapsed <= 100) { + ++_srs_thread_sync_100us->sugar; + } else if (elapsed <= 1000) { + ++_srs_thread_sync_1000us->sugar; + } else { + ++_srs_thread_sync_plus->sugar; } } diff --git a/trunk/src/app/srs_app_log.hpp b/trunk/src/app/srs_app_log.hpp index f1a3a5c7b84..ba0c9518330 100644 --- a/trunk/src/app/srs_app_log.hpp +++ b/trunk/src/app/srs_app_log.hpp @@ -52,11 +52,6 @@ class SrsFileLog : public ISrsLog, public ISrsReloadHandler char* log_data; // Async file writer. SrsAsyncFileWriter* writer_; -private: - // The interval to flush from coroutine-queue to thread-queue. - srs_utime_t interval_; - // Last flush coroutine-queue time, to calculate the timeout. - srs_utime_t last_flush_time_; private: // Defined in SrsLogLevel. SrsLogLevel level; diff --git a/trunk/src/app/srs_app_threads.cpp b/trunk/src/app/srs_app_threads.cpp index 2ea6ea61de6..2a160fe16bf 100644 --- a/trunk/src/app/srs_app_threads.cpp +++ b/trunk/src/app/srs_app_threads.cpp @@ -35,10 +35,10 @@ using namespace std; #include -SrsPps* _srs_thread_sync_10us = new SrsPps(); -SrsPps* _srs_thread_sync_100us = new SrsPps(); -SrsPps* _srs_thread_sync_1000us = new SrsPps(); -SrsPps* _srs_thread_sync_plus = new SrsPps(); +extern SrsPps* _srs_thread_sync_10us; +extern SrsPps* _srs_thread_sync_100us; +extern SrsPps* _srs_thread_sync_1000us; +extern SrsPps* _srs_thread_sync_plus; SrsThreadMutex::SrsThreadMutex() { @@ -128,6 +128,7 @@ srs_error_t SrsThreadPool::initialize() } interval_ = _srs_config->get_threads_interval(); + srs_trace("Thread #%d(%s): init interval=%dms", entry_->num, entry_->label.c_str(), srsu2msi(interval_)); return err; } @@ -172,8 +173,6 @@ srs_error_t SrsThreadPool::run() { srs_error_t err = srs_success; - bool print_init_log = true; - while (true) { // Check the threads status fastly. int loops = (int)(interval_ / SRS_UTIME_SECONDS); @@ -190,12 +189,6 @@ srs_error_t SrsThreadPool::run() } sleep(1); - - // Flush the system previous logs by this log. - if (i > 0 && print_init_log) { - print_init_log = false; - srs_trace("Thread #%d(%s): init interval=%dms", entry_->num, entry_->label.c_str(), srsu2msi(interval_)); - } } // In normal state, gather status and log it. @@ -245,7 +238,6 @@ SrsAsyncFileWriter::SrsAsyncFileWriter(std::string p) filename_ = p; writer_ = new SrsFileWriter(); queue_ = new SrsThreadQueue(); - co_queue_ = new SrsCoroutineQueue(); } // TODO: FIXME: Before free the writer, we must remove it from the manager. @@ -254,7 +246,6 @@ SrsAsyncFileWriter::~SrsAsyncFileWriter() // TODO: FIXME: Should we flush dirty logs? srs_freep(writer_); srs_freep(queue_); - srs_freep(co_queue_); } srs_error_t SrsAsyncFileWriter::open() @@ -286,7 +277,7 @@ srs_error_t SrsAsyncFileWriter::write(void* buf, size_t count, ssize_t* pnwrite) SrsSharedPtrMessage* msg = new SrsSharedPtrMessage(); msg->wrap(cp, count); - co_queue_->push_back(msg); + queue_->push_back(msg); if (pnwrite) { *pnwrite = count; @@ -315,29 +306,6 @@ srs_error_t SrsAsyncFileWriter::writev(const iovec* iov, int iovcnt, ssize_t* pn return err; } -void SrsAsyncFileWriter::flush_co_queue() -{ - srs_utime_t now = srs_update_system_time(); - - // The thread queue is thread-safe, so we do not need a lock. - if (true) { - vector flying; - co_queue_->swap(flying); - queue_->push_back(flying); - } - - srs_utime_t elapsed = srs_update_system_time() - now; - if (elapsed <= 10) { - ++_srs_thread_sync_10us->sugar; - } else if (elapsed <= 100) { - ++_srs_thread_sync_100us->sugar; - } else if (elapsed <= 1000) { - ++_srs_thread_sync_1000us->sugar; - } else { - ++_srs_thread_sync_plus->sugar; - } -} - srs_error_t SrsAsyncFileWriter::flush() { srs_error_t err = srs_success; @@ -441,19 +409,8 @@ std::string SrsAsyncLogManager::description() max_logs = srs_max(max_logs, nn); } - int nn_co_logs = 0; - int max_co_logs = 0; - for (int i = 0; i < (int)writers_.size(); i++) { - SrsAsyncFileWriter* writer = writers_.at(i); - - int nn = (int)writer->co_queue_->size(); - nn_co_logs += nn; - max_co_logs = srs_max(max_co_logs, nn); - } - static char buf[128]; - snprintf(buf, sizeof(buf), ", logs=%d/%d/%d/%d/%d", - (int)writers_.size(), nn_logs, max_logs, nn_co_logs, max_co_logs); + snprintf(buf, sizeof(buf), ", logs=%d/%d/%d", (int)writers_.size(), nn_logs, max_logs); return buf; } diff --git a/trunk/src/app/srs_app_threads.hpp b/trunk/src/app/srs_app_threads.hpp index d6d78599e68..7b44e6316f0 100644 --- a/trunk/src/app/srs_app_threads.hpp +++ b/trunk/src/app/srs_app_threads.hpp @@ -190,8 +190,7 @@ class SrsThreadQueue } }; -// Async file writer, it's not thread safe. It assume that each thread has its dedicate object, -// for example, the log object should be thread_local. +// Async file writer, it's thread safe. class SrsAsyncFileWriter : public ISrsWriter { friend class SrsAsyncLogManager; @@ -201,17 +200,6 @@ class SrsAsyncFileWriter : public ISrsWriter private: // The thread-queue, to flush to disk by dedicated thread. SrsThreadQueue* queue_; -private: - // The coroutine-queue, to avoid requires lock for each log. - // @remark Note that if multiple thread write to the same log file, the log is nor ordered - // by time, because each thread has this coroutine-queue and flush as a batch of logs to - // thread-queue: - // thread #1, flush 10 logs to thread-queue, [10:10:00 ~ 10:11:00] - // thread #2, flush 100 logs to thread-queue, [10:09:00 ~ 10:12:00] - // Finally, the logs flush to disk as: - // [10:10:00 ~ 10:11:00], 10 logs. - // [10:09:00 ~ 10:12:00], 100 logs. - SrsCoroutineQueue* co_queue_; private: SrsAsyncFileWriter(std::string p); virtual ~SrsAsyncFileWriter(); @@ -227,8 +215,6 @@ class SrsAsyncFileWriter : public ISrsWriter virtual srs_error_t write(void* buf, size_t count, ssize_t* pnwrite); virtual srs_error_t writev(const iovec* iov, int iovcnt, ssize_t* pnwrite); public: - // Flush coroutine-queue to thread-queue, avoid requiring lock for each message. - void flush_co_queue(); // Flush thread-queue to disk, generally by dedicated thread. srs_error_t flush(); };