Skip to content

Commit

Permalink
Threads: Refine variables and do dispose
Browse files Browse the repository at this point in the history
1. Rename packets to srtp or received packets.
2. Add task dispose API, cleanup in future.
3. If got packets before init AsyncSRTP, return error.
4. Never free the SRTPTask, dispose it instead.
  • Loading branch information
winlinvip committed Apr 26, 2021
1 parent 9a05d24 commit 57b771a
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 52 deletions.
140 changes: 94 additions & 46 deletions trunk/src/app/srs_app_threads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -453,15 +453,15 @@ SrsAsyncFileWriter::SrsAsyncFileWriter(std::string p)
{
filename_ = p;
writer_ = new SrsFileWriter();
queue_ = new SrsThreadQueue<SrsSharedPtrMessage>();
chunks_ = new SrsThreadQueue<SrsSharedPtrMessage>();
}

// TODO: FIXME: Before free the writer, we must remove it from the manager.
SrsAsyncFileWriter::~SrsAsyncFileWriter()
{
// TODO: FIXME: Should we flush dirty logs?
srs_freep(writer_);
srs_freep(queue_);
srs_freep(chunks_);
}

srs_error_t SrsAsyncFileWriter::open()
Expand Down Expand Up @@ -493,7 +493,7 @@ srs_error_t SrsAsyncFileWriter::write(void* buf, size_t count, ssize_t* pnwrite)
SrsSharedPtrMessage* msg = new SrsSharedPtrMessage();
msg->wrap(cp, count);

queue_->push_back(msg);
chunks_->push_back(msg);

if (pnwrite) {
*pnwrite = count;
Expand Down Expand Up @@ -530,9 +530,9 @@ srs_error_t SrsAsyncFileWriter::flush()
// at queue to push_back or swap all messages.
srs_utime_t now = srs_update_system_time();

vector<SrsSharedPtrMessage*> flying;
vector<SrsSharedPtrMessage*> flying_chunks;
if (true) {
queue_->swap(flying);
chunks_->swap(flying_chunks);
}

// Stat the sync wait of locks.
Expand All @@ -547,9 +547,9 @@ srs_error_t SrsAsyncFileWriter::flush()
++_srs_thread_sync_plus->sugar;
}

// Flush the flying messages to disk.
for (int i = 0; i < (int)flying.size(); i++) {
SrsSharedPtrMessage* msg = flying.at(i);
// Flush the chunks to disk.
for (int i = 0; i < (int)flying_chunks.size(); i++) {
SrsSharedPtrMessage* msg = flying_chunks.at(i);

srs_error_t r0 = writer_->write(msg->payload, msg->size, NULL);

Expand Down Expand Up @@ -639,7 +639,7 @@ std::string SrsAsyncLogManager::description()
for (int i = 0; i < (int)writers_.size(); i++) {
SrsAsyncFileWriter* writer = writers_.at(i);

int nn = (int)writer->queue_->size();
int nn = (int)writer->chunks_->size();
nn_logs += nn;
max_logs = srs_max(max_logs, nn);
}
Expand Down Expand Up @@ -706,8 +706,7 @@ SrsAsyncSRTP::SrsAsyncSRTP(SrsSecurityTransport* transport)

SrsAsyncSRTP::~SrsAsyncSRTP()
{
// TODO: FIXME: Check it carefully.
_srs_async_srtp->remove_task(task_);
_srs_async_srtp->on_srtp_codec_destroy(task_);
}

srs_error_t SrsAsyncSRTP::initialize(std::string recv_key, std::string send_key)
Expand All @@ -728,18 +727,30 @@ srs_error_t SrsAsyncSRTP::initialize(std::string recv_key, std::string send_key)

srs_error_t SrsAsyncSRTP::protect_rtp(void* packet, int* nb_cipher)
{
if (!task_) {
return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "not ready");
}

// TODO: FIMXE: Remove it.
return SrsSRTP::protect_rtp(packet, nb_cipher);
}

srs_error_t SrsAsyncSRTP::protect_rtcp(void* packet, int* nb_cipher)
{
if (!task_) {
return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "not ready");
}

// TODO: FIMXE: Remove it.
return SrsSRTP::protect_rtcp(packet, nb_cipher);
}

srs_error_t SrsAsyncSRTP::unprotect_rtp(void* packet, int* nb_plaintext)
{
if (!task_) {
return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "not ready");
}

int nb_cipher = *nb_plaintext;
char* buf = new char[nb_cipher];
memcpy(buf, packet, nb_cipher);
Expand All @@ -758,6 +769,10 @@ srs_error_t SrsAsyncSRTP::unprotect_rtp(void* packet, int* nb_plaintext)

srs_error_t SrsAsyncSRTP::unprotect_rtcp(void* packet, int* nb_plaintext)
{
if (!task_) {
return srs_error_new(ERROR_RTC_SRTP_UNPROTECT, "not ready");
}

// TODO: FIMXE: Remove it.
return SrsSRTP::unprotect_rtcp(packet, nb_plaintext);
}
Expand All @@ -766,6 +781,7 @@ SrsAsyncSRTPTask::SrsAsyncSRTPTask(SrsAsyncSRTP* codec)
{
codec_ = codec;
impl_ = new SrsSRTP();
disposing_ = false;
}

SrsAsyncSRTPTask::~SrsAsyncSRTPTask()
Expand All @@ -784,10 +800,25 @@ srs_error_t SrsAsyncSRTPTask::initialize(std::string recv_key, std::string send_
return err;
}

void SrsAsyncSRTPTask::dispose()
{
// TODO: FIXME: Do cleanup in future.
// TODO: FIXME: Memory leak here, use lazy free to avoid lock for each packet.
disposing_ = true;

// It's safe to set the codec to NULl, because it has been freed.
codec_ = NULL;
}

srs_error_t SrsAsyncSRTPTask::cook(SrsAsyncSRTPPacket* pkt)
{
srs_error_t err = srs_success;

// It's safe, because here we do not use the codec.
if (disposing_) {
return err;
}

if (pkt->do_decrypt_) {
if (pkt->is_rtp_) {
pkt->nb_consumed_ = pkt->msg_->size;
Expand All @@ -801,8 +832,30 @@ srs_error_t SrsAsyncSRTPTask::cook(SrsAsyncSRTPPacket* pkt)
return err;
}

srs_error_t SrsAsyncSRTPTask::consume(SrsAsyncSRTPPacket* pkt)
{
srs_error_t err = srs_success;

// It's safe, because the dispose and consume are in the same thread hybrid.
if (disposing_) {
return err;
}

char* payload = pkt->msg_->payload;

if (pkt->do_decrypt_) {
if (pkt->is_rtp_) {
err = codec_->transport_->on_rtp_plaintext(payload, pkt->nb_consumed_);
}
}

return err;
}

SrsAsyncSRTPPacket::SrsAsyncSRTPPacket(SrsAsyncSRTPTask* task)
{
srs_assert(task);

task_ = task;
msg_ = new SrsSharedPtrMessage();
is_rtp_ = false;
Expand All @@ -818,7 +871,7 @@ SrsAsyncSRTPPacket::~SrsAsyncSRTPPacket()
SrsAsyncSRTPManager::SrsAsyncSRTPManager()
{
lock_ = new SrsThreadMutex();
packets_ = new SrsThreadQueue<SrsAsyncSRTPPacket>();
srtp_packets_ = new SrsThreadQueue<SrsAsyncSRTPPacket>();
cooked_packets_ = new SrsThreadQueue<SrsAsyncSRTPPacket>();
trd_ = new SrsFastCoroutine("srtp", this);
}
Expand All @@ -829,7 +882,7 @@ SrsAsyncSRTPManager::~SrsAsyncSRTPManager()
srs_freep(trd_);

srs_freep(lock_);
srs_freep(packets_);
srs_freep(srtp_packets_);
srs_freep(cooked_packets_);

vector<SrsAsyncSRTPTask*>::iterator it;
Expand All @@ -849,7 +902,7 @@ void SrsAsyncSRTPManager::register_task(SrsAsyncSRTPTask* task)
tasks_.push_back(task);
}

void SrsAsyncSRTPManager::remove_task(SrsAsyncSRTPTask* task)
void SrsAsyncSRTPManager::on_srtp_codec_destroy(SrsAsyncSRTPTask* task)
{
if (!task) {
return;
Expand All @@ -859,19 +912,21 @@ void SrsAsyncSRTPManager::remove_task(SrsAsyncSRTPTask* task)
vector<SrsAsyncSRTPTask*>::iterator it;
if ((it = std::find(tasks_.begin(), tasks_.end(), task)) != tasks_.end()) {
tasks_.erase(it);
srs_freep(task);

// TODO: FIXME: Do cleanup in future.
task->dispose();
}
}

// TODO: FIXME: We could use a coroutine queue, then cook all packet in RTC server timer.
void SrsAsyncSRTPManager::add_packet(SrsAsyncSRTPPacket* pkt)
{
packets_->push_back(pkt);
srtp_packets_->push_back(pkt);
}

int SrsAsyncSRTPManager::size()
{
return packets_->size();
return srtp_packets_->size();
}
int SrsAsyncSRTPManager::cooked_size()
{
Expand All @@ -892,11 +947,11 @@ srs_error_t SrsAsyncSRTPManager::do_start()
srs_utime_t interval = 10 * SRS_UTIME_MILLISECONDS;

while (true) {
vector<SrsAsyncSRTPPacket*> flying;
packets_->swap(flying);
vector<SrsAsyncSRTPPacket*> flying_srtp_packets;
srtp_packets_->swap(flying_srtp_packets);

for (int i = 0; i < (int)flying.size(); i++) {
SrsAsyncSRTPPacket* pkt = flying.at(i);
for (int i = 0; i < (int)flying_srtp_packets.size(); i++) {
SrsAsyncSRTPPacket* pkt = flying_srtp_packets.at(i);

if ((err = pkt->task_->cook(pkt)) != srs_success) {
srs_error_reset(err); // Ignore any error.
Expand All @@ -906,7 +961,7 @@ srs_error_t SrsAsyncSRTPManager::do_start()
}

// If got packets, maybe more packets in queue.
if (!flying.empty()) {
if (!flying_srtp_packets.empty()) {
continue;
}

Expand Down Expand Up @@ -943,26 +998,19 @@ srs_error_t SrsAsyncSRTPManager::cycle()
return srs_error_wrap(err, "pull");
}

vector<SrsAsyncSRTPPacket*> flying;
cooked_packets_->swap(flying);
vector<SrsAsyncSRTPPacket*> flying_cooked_packets;
cooked_packets_->swap(flying_cooked_packets);

if (flying.empty()) {
if (flying_cooked_packets.empty()) {
srs_usleep(20 * SRS_UTIME_MILLISECONDS);
continue;
}

for (int i = 0; i < (int)flying.size(); i++) {
SrsAsyncSRTPPacket* pkt = flying.at(i);
SrsSecurityTransport* transport = pkt->task_->codec_->transport_;
char* payload = pkt->msg_->payload;
for (int i = 0; i < (int)flying_cooked_packets.size(); i++) {
SrsAsyncSRTPPacket* pkt = flying_cooked_packets.at(i);

if (pkt->do_decrypt_) {
if (pkt->is_rtp_) {
err = transport->on_rtp_plaintext(payload, pkt->nb_consumed_);
}
}
if (err != srs_success) {
srs_error_reset(err); // Ignore any error.
if ((err = pkt->task_->consume(pkt)) != srs_success) {
srs_error_reset(err);
}

srs_freep(pkt);
Expand Down Expand Up @@ -993,7 +1041,7 @@ SrsThreadUdpListener::~SrsThreadUdpListener()
SrsAsyncRecvManager::SrsAsyncRecvManager()
{
lock_ = new SrsThreadMutex();
packets_ = new SrsThreadQueue<SrsUdpMuxSocket>();
received_packets_ = new SrsThreadQueue<SrsUdpMuxSocket>();
handler_ = NULL;
max_recv_queue_ = 0;
trd_ = new SrsFastCoroutine("recv", this);
Expand All @@ -1005,7 +1053,7 @@ SrsAsyncRecvManager::~SrsAsyncRecvManager()
srs_freep(trd_);

srs_freep(lock_);
srs_freep(packets_);
srs_freep(received_packets_);

vector<SrsThreadUdpListener*>::iterator it;
for (it = listeners_.begin(); it != listeners_.end(); ++it) {
Expand All @@ -1027,7 +1075,7 @@ void SrsAsyncRecvManager::add_listener(SrsThreadUdpListener* listener)

int SrsAsyncRecvManager::size()
{
return packets_->size();
return received_packets_->size();
}

srs_error_t SrsAsyncRecvManager::start(void* arg)
Expand Down Expand Up @@ -1062,15 +1110,15 @@ srs_error_t SrsAsyncRecvManager::do_start()
}

// Drop packet if queue is critical full.
int nb_packets = (int)packets_->size();
int nb_packets = (int)received_packets_->size();
if (nb_packets >= max_recv_queue_) {
++_srs_pps_aloss->sugar;
continue;
}

// If got packet, copy to the queue.
got_packets = true;
packets_->push_back(listener->skt_->copy());
received_packets_->push_back(listener->skt_->copy());
}
}

Expand Down Expand Up @@ -1115,16 +1163,16 @@ srs_error_t SrsAsyncRecvManager::cycle()
return srs_error_wrap(err, "pull");
}

vector<SrsUdpMuxSocket*> flying;
packets_->swap(flying);
vector<SrsUdpMuxSocket*> flying_received_packets;
received_packets_->swap(flying_received_packets);

if (flying.empty()) {
if (flying_received_packets.empty()) {
srs_usleep(20 * SRS_UTIME_MILLISECONDS);
continue;
}

for (int i = 0; i < (int)flying.size(); i++) {
SrsUdpMuxSocket* pkt = flying.at(i);
for (int i = 0; i < (int)flying_received_packets.size(); i++) {
SrsUdpMuxSocket* pkt = flying_received_packets.at(i);

if (handler_ && (err = handler_->on_udp_packet(pkt)) != srs_success) {
srs_error_reset(err); // Ignore any error.
Expand Down
Loading

0 comments on commit 57b771a

Please sign in to comment.