Skip to content

Commit

Permalink
For #2188, Refine comments for global variable.
Browse files Browse the repository at this point in the history
1. Dual queue for async logs, exists risk.
2. Flush the logs is too slow, because it depends on logs and interval.
  • Loading branch information
winlinvip committed Mar 14, 2021
1 parent 1da1b33 commit e46a4b0
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 30 deletions.
1 change: 1 addition & 0 deletions trunk/src/app/srs_app_hybrid.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ srs_error_t SrsHybridServer::run()
}
}

// TODO: FIXME: Should run the signal manager and directly quit.
// Wait for all server to quit.
srs_usleep(SRS_UTIME_NO_TIMEOUT);

Expand Down
1 change: 1 addition & 0 deletions trunk/src/app/srs_app_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ void SrsFileLog::write_log(char *str_log, int size, int level)
str_log[size] = 0;

// if not to file, to console and return.
// @remark Its value changes, because there is some log before config loaded.
if (!log_to_file_tank) {
// if is error msg, then print color msg.
// \033[31m : red text code in shell
Expand Down
1 change: 1 addition & 0 deletions trunk/src/app/srs_app_rtc_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -843,5 +843,6 @@ void RtcServerAdapter::stop()
{
}

// TODO: FIXME: It should be thread-local or thread-safe.
SrsResourceManager* _srs_rtc_manager = new SrsResourceManager("RTC", true);

1 change: 1 addition & 0 deletions trunk/src/app/srs_app_rtc_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ SrsRtcStream* SrsRtcStreamManager::fetch(SrsRequest* r)
return source;
}

// TODO: FIXME: It should be thread-local or thread-safe.
SrsRtcStreamManager* _srs_rtc_sources = new SrsRtcStreamManager();

ISrsRtcPublishStream::ISrsRtcPublishStream()
Expand Down
3 changes: 2 additions & 1 deletion trunk/src/app/srs_app_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1113,11 +1113,12 @@ srs_error_t SrsServer::cycle()
}

srs_trace("srs terminated");

// for valgrind to detect.
srs_freep(_srs_config);
srs_freep(_srs_log);

// TODO: FIXME: Should return to exit the thread, and quit by thread pool manager.
exit(0);

return err;
Expand Down
59 changes: 37 additions & 22 deletions trunk/src/app/srs_app_threads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ SrsThreadEntry::SrsThreadEntry()

SrsThreadEntry::~SrsThreadEntry()
{
// TODO: FIXME: Should dispose err and trd.
}

SrsThreadPool::SrsThreadPool()
Expand All @@ -117,7 +118,6 @@ SrsThreadPool::~SrsThreadPool()
srs_freep(lock_);
}

// @remark Note that we should never write logs, because log is not ready not.
srs_error_t SrsThreadPool::initialize()
{
srs_error_t err = srs_success;
Expand Down Expand Up @@ -172,12 +172,33 @@ srs_error_t SrsThreadPool::run()
{
srs_error_t err = srs_success;

// Write the init log here.
srs_trace("Thread #%d(%s): init interval=%dms", entry_->num, entry_->label.c_str(), srsu2msi(interval_));
bool print_init_log = true;

while (true) {
sleep(interval_ / SRS_UTIME_SECONDS);
// Check the threads status fastly.
int loops = (int)(interval_ / SRS_UTIME_SECONDS);
for (int i = 0; i < loops; i++) {
if (true) {
SrsThreadLocker(lock_);
for (int i = 0; i < (int)threads_.size(); i++) {
SrsThreadEntry* entry = threads_.at(i);
if (entry->err != srs_success) {
err = srs_error_wrap(entry->err, "thread #%d(%s)", entry->num, entry->label.c_str());
return srs_error_copy(err);
}
}
}

sleep(1);

// Flush the system previous logs by this log.
if (i > 0 && print_init_log) {
print_init_log = false;
srs_trace("Thread #%d(%s): init interval=%dms", entry_->num, entry_->label.c_str(), srsu2msi(interval_));
}
}

// In normal state, gather status and log it.
static char buf[128];
string async_logs = _srs_async_log->description();

Expand All @@ -198,7 +219,7 @@ srs_error_t SrsThreadPool::run()

void SrsThreadPool::stop()
{
// TODO: FIXME: Implements it.
// TODO: FIXME: Should notify other threads to do cleanup and quit.
}

void* SrsThreadPool::start(void* arg)
Expand All @@ -216,6 +237,7 @@ void* SrsThreadPool::start(void* arg)
return NULL;
}

// TODO: FIXME: It should be thread-local or thread-safe.
SrsThreadPool* _srs_thread_pool = new SrsThreadPool();

SrsAsyncFileWriter::SrsAsyncFileWriter(std::string p)
Expand Down Expand Up @@ -297,6 +319,7 @@ void SrsAsyncFileWriter::flush_co_queue()
{
srs_utime_t now = srs_update_system_time();

// The thread queue is thread-safe, so we do not need a lock.
if (true) {
vector<SrsSharedPtrMessage*> flying;
co_queue_->swap(flying);
Expand Down Expand Up @@ -373,25 +396,22 @@ srs_error_t SrsAsyncLogManager::initialize()
}

// @remark Now, log is ready, and we can print logs.
srs_error_t SrsAsyncLogManager::run()
srs_error_t SrsAsyncLogManager::start(void* arg)
{
srs_error_t err = srs_success;

if ((err = _srs_thread_pool->execute("log", SrsAsyncLogManager::start, this)) != srs_success) {
return srs_error_wrap(err, "run async log");
}

srs_trace("AsyncLogs: Init flush_interval=%dms", srsu2msi(interval_));

return err;
SrsAsyncLogManager* log = (SrsAsyncLogManager*)arg;
return log->do_start();
}

srs_error_t SrsAsyncLogManager::create_writer(std::string filename, SrsAsyncFileWriter** ppwriter)
{
srs_error_t err = srs_success;

SrsAsyncFileWriter* writer = new SrsAsyncFileWriter(filename);
writers_.push_back(writer);

if (true) {
SrsThreadLocker(lock_);
writers_.push_back(writer);
}

if ((err = writer->open()) != srs_success) {
return srs_error_wrap(err, "open file %s fail", filename.c_str());
Expand Down Expand Up @@ -438,12 +458,6 @@ std::string SrsAsyncLogManager::description()
return buf;
}

srs_error_t SrsAsyncLogManager::start(void* arg)
{
SrsAsyncLogManager* log = (SrsAsyncLogManager*)arg;
return log->do_start();
}

srs_error_t SrsAsyncLogManager::do_start()
{
srs_error_t err = srs_success;
Expand Down Expand Up @@ -489,4 +503,5 @@ srs_error_t SrsAsyncLogManager::do_start()
return err;
}

// TODO: FIXME: It should be thread-local or thread-safe.
SrsAsyncLogManager* _srs_async_log = new SrsAsyncLogManager();
14 changes: 11 additions & 3 deletions trunk/src/app/srs_app_threads.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ class SrsThreadQueue
}
};

// Async file writer.
// Async file writer, it's not thread safe. It assume that each thread has its dedicate object,
// for example, the log object should be thread_local.
class SrsAsyncFileWriter : public ISrsWriter
{
friend class SrsAsyncLogManager;
Expand All @@ -202,6 +203,14 @@ class SrsAsyncFileWriter : public ISrsWriter
SrsThreadQueue<SrsSharedPtrMessage>* queue_;
private:
// The coroutine-queue, to avoid requires lock for each log.
// @remark Note that if multiple thread write to the same log file, the log is nor ordered
// by time, because each thread has this coroutine-queue and flush as a batch of logs to
// thread-queue:
// thread #1, flush 10 logs to thread-queue, [10:10:00 ~ 10:11:00]
// thread #2, flush 100 logs to thread-queue, [10:09:00 ~ 10:12:00]
// Finally, the logs flush to disk as:
// [10:10:00 ~ 10:11:00], 10 logs.
// [10:09:00 ~ 10:12:00], 100 logs.
SrsCoroutineQueue<SrsSharedPtrMessage>* co_queue_;
private:
SrsAsyncFileWriter(std::string p);
Expand Down Expand Up @@ -244,7 +253,7 @@ class SrsAsyncLogManager
// Initialize the async log manager.
srs_error_t initialize();
// Run the async log manager thread.
srs_error_t run();
static srs_error_t start(void* arg);
// Create a managed writer, user should never free it.
srs_error_t create_writer(std::string filename, SrsAsyncFileWriter** ppwriter);
// Reopen all log files, asynchronously.
Expand All @@ -253,7 +262,6 @@ class SrsAsyncLogManager
// Get the summary of this manager.
std::string description();
private:
static srs_error_t start(void* arg);
srs_error_t do_start();
};

Expand Down
1 change: 1 addition & 0 deletions trunk/src/kernel/srs_kernel_kbps.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,5 +129,6 @@ srs_utime_t SrsWallClock::now()
return srs_get_system_time();
}

// TODO: FIXME: It should be thread-local or thread-safe.
SrsWallClock* _srs_clock = new SrsWallClock();

2 changes: 2 additions & 0 deletions trunk/src/kernel/srs_kernel_rtc_rtp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1051,10 +1051,12 @@ srs_error_t SrsRtpPacket2::decode(SrsBuffer* buf)
return err;
}

// TODO: FIXME: It should be thread-local or thread-safe.
SrsRtpObjectCacheManager<SrsRtpPacket2>* _srs_rtp_cache = new SrsRtpObjectCacheManager<SrsRtpPacket2>(sizeof(SrsRtpPacket2));
SrsRtpObjectCacheManager<SrsRtpRawPayload>* _srs_rtp_raw_cache = new SrsRtpObjectCacheManager<SrsRtpRawPayload>(sizeof(SrsRtpRawPayload));
SrsRtpObjectCacheManager<SrsRtpFUAPayload2>* _srs_rtp_fua_cache = new SrsRtpObjectCacheManager<SrsRtpFUAPayload2>(sizeof(SrsRtpFUAPayload2));

// TODO: FIXME: It should be thread-local or thread-safe.
SrsRtpObjectCacheManager<SrsSharedPtrMessage>* _srs_rtp_msg_cache_buffers = new SrsRtpObjectCacheManager<SrsSharedPtrMessage>(sizeof(SrsSharedPtrMessage) + kRtpPacketSize);
SrsRtpObjectCacheManager<SrsSharedPtrMessage>* _srs_rtp_msg_cache_objs = new SrsRtpObjectCacheManager<SrsSharedPtrMessage>(sizeof(SrsSharedPtrMessage));

Expand Down
17 changes: 13 additions & 4 deletions trunk/src/main/srs_main_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,12 @@ srs_error_t run_in_thread_pool();
void show_macro_features();

// @global log and context.
// TODO: FIXME: It should be thread-local or thread-safe.
ISrsLog* _srs_log = new SrsFileLog();
// TODO: FIXME: It should be thread-local or thread-safe.
ISrsContext* _srs_context = new SrsThreadContext();
// @global config object for app module.
// TODO: FIXME: It should be thread-local or thread-safe.
SrsConfig* _srs_config = new SrsConfig();

// @global version of srs, which can grep keyword "XCORE"
Expand Down Expand Up @@ -226,8 +229,12 @@ int main(int argc, char** argv) {

srs_error_t err = do_main(argc, argv);

// Because we are exiting, and it's impossible to notify the async log thread
// to write the error log, so we print to stderr instead.
// TODO: FIXME: Should we flush the async log cache?
if (err != srs_success) {
srs_error("Failed, %s", srs_error_desc(err).c_str());
fprintf(stderr, "Failed, ts=%" PRId64 ", err is %s\n", srs_update_system_time(),
srs_error_desc(err).c_str());
}

int ret = srs_error_code(err);
Expand Down Expand Up @@ -476,18 +483,19 @@ srs_error_t run_in_thread_pool()

// After all init(log, async log manager, thread pool), now we can start to
// run the log manager thread.
if ((err = _srs_async_log->run()) != srs_success) {
return srs_error_wrap(err, "run async log");
if ((err = _srs_thread_pool->execute("log", SrsAsyncLogManager::start, _srs_async_log)) != srs_success) {
return srs_error_wrap(err, "start async log thread");
}

// Start the service worker thread, for RTMP and RTC server, etc.
if ((err = _srs_thread_pool->execute("hybrid", run_hybrid_server, NULL)) != srs_success) {
return srs_error_wrap(err, "run hybrid server");
return srs_error_wrap(err, "start hybrid server thread");
}

return _srs_thread_pool->run();
}

// TODO: FIXME: Extract to hybrid server.
srs_error_t run_hybrid_server(void* arg)
{
srs_error_t err = srs_success;
Expand All @@ -504,6 +512,7 @@ srs_error_t run_hybrid_server(void* arg)
#endif

// Do some system initialize.
// TODO: FIXME: If fail, for example, acquire pid fail, should exit.
if ((err = _srs_hybrid->initialize()) != srs_success) {
return srs_error_wrap(err, "hybrid initialize");
}
Expand Down

0 comments on commit e46a4b0

Please sign in to comment.