Skip to content

Commit

Permalink
For #2188, Support dual queue cache for async logs.
Browse files Browse the repository at this point in the history
1. Create dual queue, the coroutine queue and thread queue.
2. The coroutine queue cache logs does not require lock.
3. When need to flush, flush the logs from coroutine-queue to thread-queue.
4. Finally, flush thread-queue to disk.
  • Loading branch information
winlinvip committed Mar 14, 2021
1 parent 8d42384 commit 3aa57f7
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 15 deletions.
6 changes: 4 additions & 2 deletions trunk/conf/full.conf
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ srs_log_level trace;
# when srs_log_tank is file, specifies the log file.
# default: ./objs/srs.log
srs_log_file ./objs/srs.log;
# The interval in ms, to flush async log.
# The interval in ms, to flush async log. Generally, we flush from
# coroutine-queue to thread-queue, then from thread-queue to disk.
# So the delay of logs might be 2*srs_log_flush_interval.
# Default: 1300
srs_log_async_interval 1300;
srs_log_flush_interval 1300;
# the max connections.
# if exceed the max connections, server will drop the new connection.
# default: 1000
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,7 @@ class SrsConfig
virtual std::string get_log_level();
// Get the log file path.
virtual std::string get_log_file();
// Get the interval in ms to flush asyn log.
// Get the interval in ms to flush async log.
virtual srs_utime_t srs_log_flush_interval();
// Whether ffmpeg log enabled
virtual bool get_ff_log_enabled();
Expand Down
1 change: 1 addition & 0 deletions trunk/src/app/srs_app_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ SrsFileLog::~SrsFileLog()
srs_freepa(log_data);
}

// @remark Note that we should never write logs, because log is not ready not.
srs_error_t SrsFileLog::initialize()
{
srs_error_t err = srs_success;
Expand Down
58 changes: 47 additions & 11 deletions trunk/src/app/srs_app_threads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ SrsThreadPool::~SrsThreadPool()
srs_freep(lock_);
}

// @remark Note that we should never write logs, because log is not ready not.
srs_error_t SrsThreadPool::initialize()
{
srs_error_t err = srs_success;
Expand All @@ -119,12 +120,7 @@ srs_error_t SrsThreadPool::initialize()
return srs_error_wrap(err, "initialize st failed");
}

if ((err = _srs_async_log->initialize()) != srs_success) {
return srs_error_wrap(err, "init async log");
}

interval_ = _srs_config->get_threads_interval();
srs_trace("Thread #%d(%s): init interval=%dms", entry_->num, entry_->label.c_str(), srsu2msi(interval_));

return err;
}
Expand Down Expand Up @@ -169,12 +165,15 @@ srs_error_t SrsThreadPool::run()
{
srs_error_t err = srs_success;

// Write the init log here.
srs_trace("Thread #%d(%s): init interval=%dms", entry_->num, entry_->label.c_str(), srsu2msi(interval_));

while (true) {
sleep(interval_ / SRS_UTIME_SECONDS);

string async_logs = _srs_async_log->description();
srs_trace("Thread #%d(%s): cycle threads=%d%s", entry_->num, entry_->label.c_str(), (int)threads_.size(),
async_logs.c_str());

sleep(interval_ / SRS_UTIME_SECONDS);
}

return err;
Expand Down Expand Up @@ -202,11 +201,14 @@ void* SrsThreadPool::start(void* arg)

SrsThreadPool* _srs_thread_pool = new SrsThreadPool();

SrsAsyncFileWriter::SrsAsyncFileWriter(std::string p)
SrsAsyncFileWriter::SrsAsyncFileWriter(std::string p, srs_utime_t interval)
{
filename_ = p;
writer_ = new SrsFileWriter();
queue_ = new SrsThreadQueue<SrsSharedPtrMessage>();
co_queue_ = new SrsCoroutineQueue<SrsSharedPtrMessage>();
interval_ = interval;
last_flush_time_ = srs_get_system_time();
}

// TODO: FIXME: Before free the writer, we must remove it from the manager.
Expand All @@ -215,6 +217,7 @@ SrsAsyncFileWriter::~SrsAsyncFileWriter()
// TODO: FIXME: Should we flush dirty logs?
srs_freep(writer_);
srs_freep(queue_);
srs_freep(co_queue_);
}

srs_error_t SrsAsyncFileWriter::open()
Expand Down Expand Up @@ -246,7 +249,16 @@ srs_error_t SrsAsyncFileWriter::write(void* buf, size_t count, ssize_t* pnwrite)
SrsSharedPtrMessage* msg = new SrsSharedPtrMessage();
msg->wrap(cp, count);

queue_->push_back(msg);
co_queue_->push_back(msg);

// Whether flush to thread-queue.
if (srs_get_system_time() - last_flush_time_ >= interval_) {
last_flush_time_ = srs_get_system_time();

vector<SrsSharedPtrMessage*> flying;
co_queue_->swap(flying);
queue_->push_back(flying);
}

if (pnwrite) {
*pnwrite = count;
Expand Down Expand Up @@ -303,6 +315,7 @@ srs_error_t SrsAsyncFileWriter::flush()
SrsAsyncLogManager::SrsAsyncLogManager()
{
interval_ = 0;

reopen_ = false;
lock_ = new SrsThreadMutex();
}
Expand All @@ -318,6 +331,7 @@ SrsAsyncLogManager::~SrsAsyncLogManager()
}
}

// @remark Note that we should never write logs, because log is not ready not.
srs_error_t SrsAsyncLogManager::initialize()
{
srs_error_t err = srs_success;
Expand All @@ -327,18 +341,28 @@ srs_error_t SrsAsyncLogManager::initialize()
return srs_error_new(ERROR_SYSTEM_LOGFILE, "invalid interval=%dms", srsu2msi(interval_));
}

return err;
}

// @remark Now, log is ready, and we can print logs.
srs_error_t SrsAsyncLogManager::run()
{
srs_error_t err = srs_success;

if ((err = _srs_thread_pool->execute("log", SrsAsyncLogManager::start, this)) != srs_success) {
return srs_error_wrap(err, "run async log");
}

srs_trace("AsyncLogs: Init flush_interval=%dms", srsu2msi(interval_));

return err;
}

srs_error_t SrsAsyncLogManager::create_writer(std::string filename, SrsAsyncFileWriter** ppwriter)
{
srs_error_t err = srs_success;

SrsAsyncFileWriter* writer = new SrsAsyncFileWriter(filename);
SrsAsyncFileWriter* writer = new SrsAsyncFileWriter(filename, interval_);
writers_.push_back(writer);

if ((err = writer->open()) != srs_success) {
Expand Down Expand Up @@ -369,8 +393,20 @@ std::string SrsAsyncLogManager::description()
max_logs = srs_max(max_logs, nn);
}

int nn_co_logs = 0;
int max_co_logs = 0;
for (int i = 0; i < (int)writers_.size(); i++) {
SrsAsyncFileWriter* writer = writers_.at(i);

int nn = (int)writer->co_queue_->size();
nn_co_logs += nn;
max_co_logs = srs_max(max_co_logs, nn);
}

static char buf[128];
snprintf(buf, sizeof(buf), ", files=%d, queue=%d/%d", (int)writers_.size(), nn_logs, max_logs);
snprintf(buf, sizeof(buf), ", files=%d, queue=%d/%d, coq=%d/%d",
(int)writers_.size(), nn_logs, max_logs, nn_co_logs, max_co_logs);

return buf;
}

Expand Down
51 changes: 50 additions & 1 deletion trunk/src/app/srs_app_threads.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,37 @@ class SrsThreadPool
// The global thread pool.
extern SrsThreadPool* _srs_thread_pool;

// We use coroutine queue to collect messages from different coroutines,
// then swap to the SrsThreadQueue and process by another thread.
template<typename T>
class SrsCoroutineQueue
{
private:
std::vector<T*> dirty_;
public:
SrsCoroutineQueue() {
}
virtual ~SrsCoroutineQueue() {
for (int i = 0; i < (int)dirty_.size(); i++) {
T* msg = dirty_.at(i);
srs_freep(msg);
}
}
public:
// SrsCoroutineQueue::push_back
void push_back(T* msg) {
dirty_.push_back(msg);
}
// SrsCoroutineQueue::swap
void swap(std::vector<T*>& flying) {
dirty_.swap(flying);
}
// SrsCoroutineQueue::size
size_t size() {
return dirty_.size();
}
};

// Thread-safe queue.
template<typename T>
class SrsThreadQueue
Expand All @@ -142,6 +173,11 @@ class SrsThreadQueue
SrsThreadLocker(lock_);
dirty_.push_back(msg);
}
// SrsThreadQueue::push_back
void push_back(std::vector<T*>& flying) {
SrsThreadLocker(lock_);
dirty_.insert(dirty_.end(), flying.begin(), flying.end());
}
// SrsThreadQueue::swap
void swap(std::vector<T*>& flying) {
SrsThreadLocker(lock_);
Expand All @@ -161,9 +197,18 @@ class SrsAsyncFileWriter : public ISrsWriter
private:
std::string filename_;
SrsFileWriter* writer_;
private:
// The thread-queue, to flush to disk by dedicated thread.
SrsThreadQueue<SrsSharedPtrMessage>* queue_;
private:
SrsAsyncFileWriter(std::string p);
// The interval to flush from coroutine-queue to thread-queue.
srs_utime_t interval_;
// Last flush coroutine-queue time, to calculate the timeout.
srs_utime_t last_flush_time_;
// The coroutine-queue, to avoid requires lock for each log.
SrsCoroutineQueue<SrsSharedPtrMessage>* co_queue_;
private:
SrsAsyncFileWriter(std::string p, srs_utime_t interval);
virtual ~SrsAsyncFileWriter();
public:
// Open file writer, in truncate mode.
Expand All @@ -188,6 +233,8 @@ class SrsAsyncLogManager
private:
// The async flush interval.
srs_utime_t interval_;
// The number of logs to flush from coroutine-queue to thread-queue.
int flush_co_queue_;
private:
// The async reopen event.
bool reopen_;
Expand All @@ -200,6 +247,8 @@ class SrsAsyncLogManager
public:
// Initialize the async log manager.
srs_error_t initialize();
// Run the async log manager thread.
srs_error_t run();
// Create a managed writer, user should never free it.
srs_error_t create_writer(std::string filename, SrsAsyncFileWriter** ppwriter);
// Reopen all log files, asynchronously.
Expand Down
12 changes: 12 additions & 0 deletions trunk/src/main/srs_main_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ srs_error_t do_main(int argc, char** argv)
if ((err = _srs_config->initialize_cwd()) != srs_success) {
return srs_error_wrap(err, "config cwd");
}

// We must initialize the async log manager before log init.
if ((err = _srs_async_log->initialize()) != srs_success) {
return srs_error_wrap(err, "init async log");
}

// config parsed, initialize log.
if ((err = _srs_log->initialize()) != srs_success) {
Expand Down Expand Up @@ -469,6 +474,13 @@ srs_error_t run_in_thread_pool()
return srs_error_wrap(err, "init thread pool");
}

// After all init(log, async log manager, thread pool), now we can start to
// run the log manager thread.
if ((err = _srs_async_log->run()) != srs_success) {
return srs_error_wrap(err, "run async log");
}

// Start the service worker thread, for RTMP and RTC server, etc.
if ((err = _srs_thread_pool->execute("hybrid", run_hybrid_server, NULL)) != srs_success) {
return srs_error_wrap(err, "run hybrid server");
}
Expand Down

0 comments on commit 3aa57f7

Please sign in to comment.