Skip to content

Commit

Permalink
For #2188, Remove dual queue for sys logs.
Browse files Browse the repository at this point in the history
1. It exists delay for multiple threads.
2. There is overlay for cache of coroutine queues.
3. Risk when other threads write logs.
  • Loading branch information
winlinvip committed Mar 14, 2021
1 parent e46a4b0 commit fb0b784
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 80 deletions.
31 changes: 21 additions & 10 deletions trunk/src/app/srs_app_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@
#include <srs_kernel_utility.hpp>
#include <srs_app_threads.hpp>

#include <srs_protocol_kbps.hpp>

SrsPps* _srs_thread_sync_10us = new SrsPps();
SrsPps* _srs_thread_sync_100us = new SrsPps();
SrsPps* _srs_thread_sync_1000us = new SrsPps();
SrsPps* _srs_thread_sync_plus = new SrsPps();

// the max size of a line of log.
#define LOG_MAX_SIZE 8192

Expand All @@ -54,9 +61,6 @@ SrsFileLog::SrsFileLog()

log_data = new char[LOG_MAX_SIZE];
writer_ = NULL;

last_flush_time_ = srs_get_system_time();
interval_ = 0;
}

SrsFileLog::~SrsFileLog()
Expand All @@ -74,7 +78,6 @@ srs_error_t SrsFileLog::initialize()
filename_ = _srs_config->get_log_file();
level = srs_get_log_level(_srs_config->get_log_level());
utc = _srs_config->get_utc_time();
interval_ = _srs_config->srs_log_flush_interval();
}

if (!log_to_file_tank) {
Expand Down Expand Up @@ -231,17 +234,25 @@ void SrsFileLog::write_log(char *str_log, int size, int level)

return;
}


// It's ok to use cache, because it has been updated in generating log header.
srs_utime_t now = srs_get_system_time();

// write log to file.
if ((err = writer_->write(str_log, size, NULL)) != srs_success) {
srs_error_reset(err); // Ignore any error for log writing.
}

// Whether flush to thread-queue.
srs_utime_t diff = srs_get_system_time() - last_flush_time_;
if (diff >= interval_) {
last_flush_time_ = srs_get_system_time();
writer_->flush_co_queue();
// Stat the sync wait of locks.
srs_utime_t elapsed = srs_update_system_time() - now;
if (elapsed <= 10) {
++_srs_thread_sync_10us->sugar;
} else if (elapsed <= 100) {
++_srs_thread_sync_100us->sugar;
} else if (elapsed <= 1000) {
++_srs_thread_sync_1000us->sugar;
} else {
++_srs_thread_sync_plus->sugar;
}
}

5 changes: 0 additions & 5 deletions trunk/src/app/srs_app_log.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,6 @@ class SrsFileLog : public ISrsLog, public ISrsReloadHandler
char* log_data;
// Async file writer.
SrsAsyncFileWriter* writer_;
private:
// The interval to flush from coroutine-queue to thread-queue.
srs_utime_t interval_;
// Last flush coroutine-queue time, to calculate the timeout.
srs_utime_t last_flush_time_;
private:
// Defined in SrsLogLevel.
SrsLogLevel level;
Expand Down
57 changes: 7 additions & 50 deletions trunk/src/app/srs_app_threads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ using namespace std;

#include <srs_protocol_kbps.hpp>

SrsPps* _srs_thread_sync_10us = new SrsPps();
SrsPps* _srs_thread_sync_100us = new SrsPps();
SrsPps* _srs_thread_sync_1000us = new SrsPps();
SrsPps* _srs_thread_sync_plus = new SrsPps();
extern SrsPps* _srs_thread_sync_10us;
extern SrsPps* _srs_thread_sync_100us;
extern SrsPps* _srs_thread_sync_1000us;
extern SrsPps* _srs_thread_sync_plus;

SrsThreadMutex::SrsThreadMutex()
{
Expand Down Expand Up @@ -128,6 +128,7 @@ srs_error_t SrsThreadPool::initialize()
}

interval_ = _srs_config->get_threads_interval();
srs_trace("Thread #%d(%s): init interval=%dms", entry_->num, entry_->label.c_str(), srsu2msi(interval_));

return err;
}
Expand Down Expand Up @@ -172,8 +173,6 @@ srs_error_t SrsThreadPool::run()
{
srs_error_t err = srs_success;

bool print_init_log = true;

while (true) {
// Check the threads status fastly.
int loops = (int)(interval_ / SRS_UTIME_SECONDS);
Expand All @@ -190,12 +189,6 @@ srs_error_t SrsThreadPool::run()
}

sleep(1);

// Flush the system previous logs by this log.
if (i > 0 && print_init_log) {
print_init_log = false;
srs_trace("Thread #%d(%s): init interval=%dms", entry_->num, entry_->label.c_str(), srsu2msi(interval_));
}
}

// In normal state, gather status and log it.
Expand Down Expand Up @@ -245,7 +238,6 @@ SrsAsyncFileWriter::SrsAsyncFileWriter(std::string p)
filename_ = p;
writer_ = new SrsFileWriter();
queue_ = new SrsThreadQueue<SrsSharedPtrMessage>();
co_queue_ = new SrsCoroutineQueue<SrsSharedPtrMessage>();
}

// TODO: FIXME: Before free the writer, we must remove it from the manager.
Expand All @@ -254,7 +246,6 @@ SrsAsyncFileWriter::~SrsAsyncFileWriter()
// TODO: FIXME: Should we flush dirty logs?
srs_freep(writer_);
srs_freep(queue_);
srs_freep(co_queue_);
}

srs_error_t SrsAsyncFileWriter::open()
Expand Down Expand Up @@ -286,7 +277,7 @@ srs_error_t SrsAsyncFileWriter::write(void* buf, size_t count, ssize_t* pnwrite)
SrsSharedPtrMessage* msg = new SrsSharedPtrMessage();
msg->wrap(cp, count);

co_queue_->push_back(msg);
queue_->push_back(msg);

if (pnwrite) {
*pnwrite = count;
Expand Down Expand Up @@ -315,29 +306,6 @@ srs_error_t SrsAsyncFileWriter::writev(const iovec* iov, int iovcnt, ssize_t* pn
return err;
}

void SrsAsyncFileWriter::flush_co_queue()
{
srs_utime_t now = srs_update_system_time();

// The thread queue is thread-safe, so we do not need a lock.
if (true) {
vector<SrsSharedPtrMessage*> flying;
co_queue_->swap(flying);
queue_->push_back(flying);
}

srs_utime_t elapsed = srs_update_system_time() - now;
if (elapsed <= 10) {
++_srs_thread_sync_10us->sugar;
} else if (elapsed <= 100) {
++_srs_thread_sync_100us->sugar;
} else if (elapsed <= 1000) {
++_srs_thread_sync_1000us->sugar;
} else {
++_srs_thread_sync_plus->sugar;
}
}

srs_error_t SrsAsyncFileWriter::flush()
{
srs_error_t err = srs_success;
Expand Down Expand Up @@ -441,19 +409,8 @@ std::string SrsAsyncLogManager::description()
max_logs = srs_max(max_logs, nn);
}

int nn_co_logs = 0;
int max_co_logs = 0;
for (int i = 0; i < (int)writers_.size(); i++) {
SrsAsyncFileWriter* writer = writers_.at(i);

int nn = (int)writer->co_queue_->size();
nn_co_logs += nn;
max_co_logs = srs_max(max_co_logs, nn);
}

static char buf[128];
snprintf(buf, sizeof(buf), ", logs=%d/%d/%d/%d/%d",
(int)writers_.size(), nn_logs, max_logs, nn_co_logs, max_co_logs);
snprintf(buf, sizeof(buf), ", logs=%d/%d/%d", (int)writers_.size(), nn_logs, max_logs);

return buf;
}
Expand Down
16 changes: 1 addition & 15 deletions trunk/src/app/srs_app_threads.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,7 @@ class SrsThreadQueue
}
};

// Async file writer, it's not thread safe. It assume that each thread has its dedicate object,
// for example, the log object should be thread_local.
// Async file writer, it's thread safe.
class SrsAsyncFileWriter : public ISrsWriter
{
friend class SrsAsyncLogManager;
Expand All @@ -201,17 +200,6 @@ class SrsAsyncFileWriter : public ISrsWriter
private:
// The thread-queue, to flush to disk by dedicated thread.
SrsThreadQueue<SrsSharedPtrMessage>* queue_;
private:
// The coroutine-queue, to avoid requires lock for each log.
// @remark Note that if multiple thread write to the same log file, the log is nor ordered
// by time, because each thread has this coroutine-queue and flush as a batch of logs to
// thread-queue:
// thread #1, flush 10 logs to thread-queue, [10:10:00 ~ 10:11:00]
// thread #2, flush 100 logs to thread-queue, [10:09:00 ~ 10:12:00]
// Finally, the logs flush to disk as:
// [10:10:00 ~ 10:11:00], 10 logs.
// [10:09:00 ~ 10:12:00], 100 logs.
SrsCoroutineQueue<SrsSharedPtrMessage>* co_queue_;
private:
SrsAsyncFileWriter(std::string p);
virtual ~SrsAsyncFileWriter();
Expand All @@ -227,8 +215,6 @@ class SrsAsyncFileWriter : public ISrsWriter
virtual srs_error_t write(void* buf, size_t count, ssize_t* pnwrite);
virtual srs_error_t writev(const iovec* iov, int iovcnt, ssize_t* pnwrite);
public:
// Flush coroutine-queue to thread-queue, avoid requiring lock for each message.
void flush_co_queue();
// Flush thread-queue to disk, generally by dedicated thread.
srs_error_t flush();
};
Expand Down

0 comments on commit fb0b784

Please sign in to comment.