Skip to content

Commit

Permalink
For #2188: Support thread-safe queue SrsThreadQueue.
Browse files Browse the repository at this point in the history
1. Wrap std::vector to thread-safe queue.
2. Keep API compatible with std::vector.
3. SrsAsyncFileWriter use thread-safe queue instead.
  • Loading branch information
winlinvip committed Mar 12, 2021
1 parent 34da013 commit 8d42384
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 23 deletions.
30 changes: 10 additions & 20 deletions trunk/src/app/srs_app_threads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ srs_error_t SrsThreadPool::run()
srs_error_t err = srs_success;

while (true) {
string async_logs = _srs_async_log->desc();
string async_logs = _srs_async_log->description();
srs_trace("Thread #%d(%s): cycle threads=%d%s", entry_->num, entry_->label.c_str(), (int)threads_.size(),
async_logs.c_str());

Expand Down Expand Up @@ -205,21 +205,16 @@ SrsThreadPool* _srs_thread_pool = new SrsThreadPool();
SrsAsyncFileWriter::SrsAsyncFileWriter(std::string p)
{
filename_ = p;
lock_ = new SrsThreadMutex();
writer_ = new SrsFileWriter();
queue_ = new SrsThreadQueue<SrsSharedPtrMessage>();
}

// TODO: FIXME: Before free the writer, we must remove it from the manager.
SrsAsyncFileWriter::~SrsAsyncFileWriter()
{
// TODO: FIXME: Should we flush dirty logs?
srs_freep(writer_);
srs_freep(lock_);

// TODO: FIXME: Should we flush these logs?
for (int i = 0; i < (int)dirty_.size(); i++) {
SrsSharedPtrMessage* msg = dirty_.at(i);
srs_freep(msg);
}
srs_freep(queue_);
}

srs_error_t SrsAsyncFileWriter::open()
Expand Down Expand Up @@ -251,10 +246,7 @@ srs_error_t SrsAsyncFileWriter::write(void* buf, size_t count, ssize_t* pnwrite)
SrsSharedPtrMessage* msg = new SrsSharedPtrMessage();
msg->wrap(cp, count);

if (true) {
SrsThreadLocker(lock_);
dirty_.push_back(msg);
}
queue_->push_back(msg);

if (pnwrite) {
*pnwrite = count;
Expand Down Expand Up @@ -288,10 +280,7 @@ srs_error_t SrsAsyncFileWriter::flush()
srs_error_t err = srs_success;

vector<SrsSharedPtrMessage*> flying;
if (true) {
SrsThreadLocker(lock_);
dirty_.swap(flying);
}
queue_->swap(flying);

for (int i = 0; i < (int)flying.size(); i++) {
SrsSharedPtrMessage* msg = flying.at(i);
Expand Down Expand Up @@ -366,7 +355,7 @@ void SrsAsyncLogManager::reopen()
reopen_ = true;
}

std::string SrsAsyncLogManager::desc()
std::string SrsAsyncLogManager::description()
{
SrsThreadLocker(lock_);

Expand All @@ -375,8 +364,9 @@ std::string SrsAsyncLogManager::desc()
for (int i = 0; i < (int)writers_.size(); i++) {
SrsAsyncFileWriter* writer = writers_.at(i);

nn_logs += (int)writer->dirty_.size();
max_logs = srs_max(max_logs, (int)writer->dirty_.size());
int nn = (int)writer->queue_->size();
nn_logs += nn;
max_logs = srs_max(max_logs, nn);
}

static char buf[128];
Expand Down
44 changes: 41 additions & 3 deletions trunk/src/app/srs_app_threads.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,52 @@ class SrsThreadPool
// The global thread pool.
extern SrsThreadPool* _srs_thread_pool;

// Thread-safe queue.
template<typename T>
class SrsThreadQueue
{
private:
std::vector<T*> dirty_;
SrsThreadMutex* lock_;
public:
// SrsThreadQueue::SrsThreadQueue
SrsThreadQueue() {
lock_ = new SrsThreadMutex();
}
// SrsThreadQueue::~SrsThreadQueue
virtual ~SrsThreadQueue() {
srs_freep(lock_);
for (int i = 0; i < (int)dirty_.size(); i++) {
T* msg = dirty_.at(i);
srs_freep(msg);
}
}
public:
// SrsThreadQueue::push_back
void push_back(T* msg) {
SrsThreadLocker(lock_);
dirty_.push_back(msg);
}
// SrsThreadQueue::swap
void swap(std::vector<T*>& flying) {
SrsThreadLocker(lock_);
dirty_.swap(flying);
}
// SrsThreadQueue::size
size_t size() {
SrsThreadLocker(lock_);
return dirty_.size();
}
};

// Async file writer.
class SrsAsyncFileWriter : public ISrsWriter
{
friend class SrsAsyncLogManager;
private:
std::string filename_;
SrsFileWriter* writer_;
std::vector<SrsSharedPtrMessage*> dirty_;
SrsThreadMutex* lock_;
SrsThreadQueue<SrsSharedPtrMessage>* queue_;
private:
SrsAsyncFileWriter(std::string p);
virtual ~SrsAsyncFileWriter();
Expand Down Expand Up @@ -168,7 +205,8 @@ class SrsAsyncLogManager
// Reopen all log files, asynchronously.
virtual void reopen();
public:
std::string desc();
// Get the summary of this manager.
std::string description();
private:
static srs_error_t start(void* arg);
srs_error_t do_start();
Expand Down

0 comments on commit 8d42384

Please sign in to comment.