Skip to content

Commit

Permalink
Threads-SEND: Support tunnel for srtp-send.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed Apr 26, 2021
1 parent d282ccd commit d6a92cb
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 7 deletions.
3 changes: 2 additions & 1 deletion trunk/src/app/srs_app_rtc_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,8 @@ srs_error_t SrsRtcServer::on_timer(srs_utime_t interval, srs_utime_t tick)

// TODO: FIXME: Should move to Hybrid server stat.
string loss_desc;
if (!snk_desc.empty() || _srs_pps_rloss->r1s() || _srs_pps_rloss->r10s() || _srs_pps_sloss->r10s() || _srs_pps_aloss->r10s()) {
_srs_pps_aloss->update();
if (_srs_pps_rloss->r1s() || _srs_pps_rloss->r10s() || _srs_pps_sloss->r10s() || _srs_pps_aloss->r10s()) {
snprintf(buf, sizeof(buf), ", loss=(r:%d/%d,s:%d,a:%d)", _srs_pps_rloss->r1s(), _srs_pps_rloss->r10s(), _srs_pps_sloss->r10s(), _srs_pps_aloss->r10s());
loss_desc = buf;
}
Expand Down
64 changes: 58 additions & 6 deletions trunk/src/app/srs_app_threads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ SrsPps* _srs_thread_sync_plus = new SrsPps();

SrsPps* _srs_tunnel_recv_raw = new SrsPps();
SrsPps* _srs_tunnel_recv_hit = new SrsPps();
SrsPps* _srs_tunnel_send_raw = new SrsPps();
SrsPps* _srs_tunnel_send_hit = new SrsPps();

extern bool srs_is_rtp_or_rtcp(const uint8_t* data, size_t len);
extern bool srs_is_rtcp(const uint8_t* data, size_t len);
Expand Down Expand Up @@ -251,12 +253,13 @@ srs_error_t SrsThreadPool::initialize()

bool async_tunnel = _srs_config->get_threads_async_tunnel();
_srs_async_recv->set_tunnel_enabled(async_tunnel);
_srs_async_srtp->set_tunnel_enabled(async_tunnel);

srs_trace("Thread #%d(%s): init name=%s, interval=%dms, async_srtp=%d, cpuset=%d/%d-0x%" PRIx64 "/%d-0x%" PRIx64 ", water_level=%dx%d,%dx%d, recvQ=%d, aSend=%d, tunnel=%d",
entry->num, entry->label.c_str(), entry->name.c_str(), srsu2msi(interval_), async_srtp,
entry->cpuset_ok, r0, srs_covert_cpuset(entry->cpuset), r1, srs_covert_cpuset(entry->cpuset2),
high_pulse_, high_threshold_, critical_pulse_, critical_threshold_, recv_queue, async_send,
async_tunnel);
high_pulse_, high_threshold_, critical_pulse_, critical_threshold_,
recv_queue, async_send, async_tunnel);

return err;
}
Expand Down Expand Up @@ -339,7 +342,6 @@ srs_error_t SrsThreadPool::run()

// For Circuit-Breaker to update the SNMP, ASAP.
srs_update_udp_snmp_statistic();
_srs_pps_aloss->update();

// Update thread CPUs per 1s.
for (int i = 0; i < (int)threads.size(); i++) {
Expand Down Expand Up @@ -377,7 +379,7 @@ srs_error_t SrsThreadPool::run()

string queue_desc;
if (true) {
snprintf(buf, sizeof(buf), ", queue=%d,%d,%d", _srs_async_recv->size(), _srs_async_srtp->size(), _srs_async_srtp->cooked_size());
snprintf(buf, sizeof(buf), ", queue=%d,%d,%d,%d", _srs_async_recv->size(), _srs_async_srtp->size(), _srs_async_srtp->cooked_size(), _srs_async_send->size());
queue_desc = buf;
}

Expand All @@ -391,8 +393,9 @@ srs_error_t SrsThreadPool::run()

string tunnel_desc;
_srs_tunnel_recv_raw->update(); _srs_tunnel_recv_hit->update();
if (_srs_tunnel_recv_raw->r10s() || _srs_tunnel_recv_hit->r10s()) {
snprintf(buf, sizeof(buf), ", tunnel=%d,%d", _srs_tunnel_recv_raw->r10s(), _srs_tunnel_recv_hit->r10s());
_srs_tunnel_send_raw->update(); _srs_tunnel_send_hit->update();
if (_srs_tunnel_recv_raw->r10s() || _srs_tunnel_recv_hit->r10s() || _srs_tunnel_send_raw->r10s() || _srs_tunnel_send_hit->r10s()) {
snprintf(buf, sizeof(buf), ", tunnel=(r:%d/%d,s:%d/%d)", _srs_tunnel_recv_raw->r10s(), _srs_tunnel_recv_hit->r10s(), _srs_tunnel_send_raw->r10s(), _srs_tunnel_send_hit->r10s());
tunnel_desc = buf;
}

Expand Down Expand Up @@ -906,18 +909,23 @@ void SrsAsyncSRTP::dig_tunnel(SrsUdpMuxSocket* skt)
}

_srs_async_recv->tunnels()->dig_tunnel(fast_id, task_);
task_->dig_tunnel(skt);
}

SrsAsyncSRTPTask::SrsAsyncSRTPTask(SrsAsyncSRTP* codec)
{
codec_ = codec;
impl_ = new SrsSRTP();
disposing_ = false;
sendonly_skt_ = NULL;
lock_ = new SrsThreadMutex();
}

SrsAsyncSRTPTask::~SrsAsyncSRTPTask()
{
srs_freep(impl_);
srs_freep(sendonly_skt_);
srs_freep(lock_);
}

srs_error_t SrsAsyncSRTPTask::initialize(std::string recv_key, std::string send_key)
Expand Down Expand Up @@ -999,6 +1007,42 @@ srs_error_t SrsAsyncSRTPTask::consume(SrsAsyncSRTPPacket* pkt)
return err;
}

void SrsAsyncSRTPTask::dig_tunnel(SrsUdpMuxSocket* skt)
{
if (!skt) {
return;
}

SrsThreadLocker(lock_);
srs_freep(sendonly_skt_);
sendonly_skt_ = skt->copy_sendonly();
}

bool SrsAsyncSRTPTask::consume_by_tunnel(SrsAsyncSRTPPacket* src)
{
// If decrypt, we should consume by hybrid.
if (src->do_decrypt_) {
return false;
}

// No tunnel established, ignore.
if (!sendonly_skt_) {
return false;
}

SrsAsyncUdpPacket* pkt = new SrsAsyncUdpPacket();
if (true) {
SrsThreadLocker(lock_);
pkt->from(sendonly_skt_, (char*)src->msg_->payload, src->nb_consumed_);
}

_srs_async_send->add_packet(pkt);
srs_freep(src);

++_srs_tunnel_send_hit->sugar;
return true;
}

SrsAsyncSRTPPacket::SrsAsyncSRTPPacket(SrsAsyncSRTPTask* task)
{
srs_assert(task);
Expand All @@ -1020,6 +1064,7 @@ SrsAsyncSRTPManager::SrsAsyncSRTPManager()
lock_ = new SrsThreadMutex();
srtp_packets_ = new SrsThreadQueue<SrsAsyncSRTPPacket>();
cooked_packets_ = new SrsThreadQueue<SrsAsyncSRTPPacket>();
tunnel_enabled_ = false;
}

// TODO: FIXME: We should stop the thread first, then free the manager.
Expand Down Expand Up @@ -1101,7 +1146,14 @@ srs_error_t SrsAsyncSRTPManager::do_start()
srs_error_reset(err); // Ignore any error.
}

// Try to consume the packet by tunnel.
if (tunnel_enabled_ && pkt->task_->consume_by_tunnel(pkt)) {
continue;
}

cooked_packets_->push_back(pkt);

++_srs_tunnel_send_raw->sugar;
}

// If got packets, maybe more packets in queue.
Expand Down
17 changes: 17 additions & 0 deletions trunk/src/app/srs_app_threads.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,10 @@ class SrsAsyncSRTPTask
SrsSRTP* impl_;
// For disposing, only set a flag, free it in future.
int disposing_;
private:
// For tunnel, srtp-send.
SrsUdpMuxSocket* sendonly_skt_;
SrsThreadMutex* lock_;
public:
SrsAsyncSRTPTask(SrsAsyncSRTP* codec);
virtual ~SrsAsyncSRTPTask();
Expand All @@ -341,6 +345,10 @@ class SrsAsyncSRTPTask
public:
srs_error_t cook(SrsAsyncSRTPPacket* pkt);
srs_error_t consume(SrsAsyncSRTPPacket* pkt);
public:
void dig_tunnel(SrsUdpMuxSocket* skt);
// Try to consume by tunnel.
bool consume_by_tunnel(SrsAsyncSRTPPacket* pkt);
};

// The async SRTP packet, handle by task.
Expand Down Expand Up @@ -369,10 +377,16 @@ class SrsAsyncSRTPManager
private:
// The packets cooked by async SRTP manager.
SrsThreadQueue<SrsAsyncSRTPPacket>* cooked_packets_;
private:
// Whether enabled tunnel.
bool tunnel_enabled_;
public:
SrsAsyncSRTPManager();
virtual ~SrsAsyncSRTPManager();
public:
// Enable or disable the tunnel.
// SrsAsyncSRTPManager::set_tunnel_enabled()
void set_tunnel_enabled(bool v) { tunnel_enabled_ = v; }
void register_task(SrsAsyncSRTPTask* task);
void on_srtp_codec_destroy(SrsAsyncSRTPTask* task);
void add_packet(SrsAsyncSRTPPacket* pkt);
Expand Down Expand Up @@ -498,6 +512,9 @@ class SrsAsyncSendManager
void set_enabled(bool v) { enabled_ = v; }
// Send the packet.
void add_packet(SrsAsyncUdpPacket* pkt);
// Get the size of packets queue.
// SrsAsyncSendManager::size()
int size() { return sending_packets_->size(); }
// Start the thread.
static srs_error_t start(void* arg);
private:
Expand Down

0 comments on commit d6a92cb

Please sign in to comment.