diff --git a/trunk/src/app/srs_app_rtc_server.cpp b/trunk/src/app/srs_app_rtc_server.cpp index b7ac976c5d..5f16d1a85e 100644 --- a/trunk/src/app/srs_app_rtc_server.cpp +++ b/trunk/src/app/srs_app_rtc_server.cpp @@ -721,7 +721,8 @@ srs_error_t SrsRtcServer::on_timer(srs_utime_t interval, srs_utime_t tick) // TODO: FIXME: Should move to Hybrid server stat. string loss_desc; - if (!snk_desc.empty() || _srs_pps_rloss->r1s() || _srs_pps_rloss->r10s() || _srs_pps_sloss->r10s() || _srs_pps_aloss->r10s()) { + _srs_pps_aloss->update(); + if (_srs_pps_rloss->r1s() || _srs_pps_rloss->r10s() || _srs_pps_sloss->r10s() || _srs_pps_aloss->r10s()) { snprintf(buf, sizeof(buf), ", loss=(r:%d/%d,s:%d,a:%d)", _srs_pps_rloss->r1s(), _srs_pps_rloss->r10s(), _srs_pps_sloss->r10s(), _srs_pps_aloss->r10s()); loss_desc = buf; } diff --git a/trunk/src/app/srs_app_threads.cpp b/trunk/src/app/srs_app_threads.cpp index 8cd2791b01..bcc4fcbd65 100644 --- a/trunk/src/app/srs_app_threads.cpp +++ b/trunk/src/app/srs_app_threads.cpp @@ -61,6 +61,8 @@ SrsPps* _srs_thread_sync_plus = new SrsPps(); SrsPps* _srs_tunnel_recv_raw = new SrsPps(); SrsPps* _srs_tunnel_recv_hit = new SrsPps(); +SrsPps* _srs_tunnel_send_raw = new SrsPps(); +SrsPps* _srs_tunnel_send_hit = new SrsPps(); extern bool srs_is_rtp_or_rtcp(const uint8_t* data, size_t len); extern bool srs_is_rtcp(const uint8_t* data, size_t len); @@ -251,12 +253,13 @@ srs_error_t SrsThreadPool::initialize() bool async_tunnel = _srs_config->get_threads_async_tunnel(); _srs_async_recv->set_tunnel_enabled(async_tunnel); + _srs_async_srtp->set_tunnel_enabled(async_tunnel); srs_trace("Thread #%d(%s): init name=%s, interval=%dms, async_srtp=%d, cpuset=%d/%d-0x%" PRIx64 "/%d-0x%" PRIx64 ", water_level=%dx%d,%dx%d, recvQ=%d, aSend=%d, tunnel=%d", entry->num, entry->label.c_str(), entry->name.c_str(), srsu2msi(interval_), async_srtp, entry->cpuset_ok, r0, srs_covert_cpuset(entry->cpuset), r1, srs_covert_cpuset(entry->cpuset2), - high_pulse_, high_threshold_, critical_pulse_, critical_threshold_, recv_queue, async_send, - async_tunnel); + high_pulse_, high_threshold_, critical_pulse_, critical_threshold_, + recv_queue, async_send, async_tunnel); return err; } @@ -339,7 +342,6 @@ srs_error_t SrsThreadPool::run() // For Circuit-Breaker to update the SNMP, ASAP. srs_update_udp_snmp_statistic(); - _srs_pps_aloss->update(); // Update thread CPUs per 1s. for (int i = 0; i < (int)threads.size(); i++) { @@ -377,7 +379,7 @@ srs_error_t SrsThreadPool::run() string queue_desc; if (true) { - snprintf(buf, sizeof(buf), ", queue=%d,%d,%d", _srs_async_recv->size(), _srs_async_srtp->size(), _srs_async_srtp->cooked_size()); + snprintf(buf, sizeof(buf), ", queue=%d,%d,%d,%d", _srs_async_recv->size(), _srs_async_srtp->size(), _srs_async_srtp->cooked_size(), _srs_async_send->size()); queue_desc = buf; } @@ -391,8 +393,9 @@ srs_error_t SrsThreadPool::run() string tunnel_desc; _srs_tunnel_recv_raw->update(); _srs_tunnel_recv_hit->update(); - if (_srs_tunnel_recv_raw->r10s() || _srs_tunnel_recv_hit->r10s()) { - snprintf(buf, sizeof(buf), ", tunnel=%d,%d", _srs_tunnel_recv_raw->r10s(), _srs_tunnel_recv_hit->r10s()); + _srs_tunnel_send_raw->update(); _srs_tunnel_send_hit->update(); + if (_srs_tunnel_recv_raw->r10s() || _srs_tunnel_recv_hit->r10s() || _srs_tunnel_send_raw->r10s() || _srs_tunnel_send_hit->r10s()) { + snprintf(buf, sizeof(buf), ", tunnel=(r:%d/%d,s:%d/%d)", _srs_tunnel_recv_raw->r10s(), _srs_tunnel_recv_hit->r10s(), _srs_tunnel_send_raw->r10s(), _srs_tunnel_send_hit->r10s()); tunnel_desc = buf; } @@ -906,6 +909,7 @@ void SrsAsyncSRTP::dig_tunnel(SrsUdpMuxSocket* skt) } _srs_async_recv->tunnels()->dig_tunnel(fast_id, task_); + task_->dig_tunnel(skt); } SrsAsyncSRTPTask::SrsAsyncSRTPTask(SrsAsyncSRTP* codec) @@ -913,11 +917,15 @@ SrsAsyncSRTPTask::SrsAsyncSRTPTask(SrsAsyncSRTP* codec) codec_ = codec; impl_ = new SrsSRTP(); disposing_ = false; + sendonly_skt_ = NULL; + lock_ = new SrsThreadMutex(); } SrsAsyncSRTPTask::~SrsAsyncSRTPTask() { srs_freep(impl_); + srs_freep(sendonly_skt_); + srs_freep(lock_); } srs_error_t SrsAsyncSRTPTask::initialize(std::string recv_key, std::string send_key) @@ -999,6 +1007,42 @@ srs_error_t SrsAsyncSRTPTask::consume(SrsAsyncSRTPPacket* pkt) return err; } +void SrsAsyncSRTPTask::dig_tunnel(SrsUdpMuxSocket* skt) +{ + if (!skt) { + return; + } + + SrsThreadLocker(lock_); + srs_freep(sendonly_skt_); + sendonly_skt_ = skt->copy_sendonly(); +} + +bool SrsAsyncSRTPTask::consume_by_tunnel(SrsAsyncSRTPPacket* src) +{ + // If decrypt, we should consume by hybrid. + if (src->do_decrypt_) { + return false; + } + + // No tunnel established, ignore. + if (!sendonly_skt_) { + return false; + } + + SrsAsyncUdpPacket* pkt = new SrsAsyncUdpPacket(); + if (true) { + SrsThreadLocker(lock_); + pkt->from(sendonly_skt_, (char*)src->msg_->payload, src->nb_consumed_); + } + + _srs_async_send->add_packet(pkt); + srs_freep(src); + + ++_srs_tunnel_send_hit->sugar; + return true; +} + SrsAsyncSRTPPacket::SrsAsyncSRTPPacket(SrsAsyncSRTPTask* task) { srs_assert(task); @@ -1020,6 +1064,7 @@ SrsAsyncSRTPManager::SrsAsyncSRTPManager() lock_ = new SrsThreadMutex(); srtp_packets_ = new SrsThreadQueue(); cooked_packets_ = new SrsThreadQueue(); + tunnel_enabled_ = false; } // TODO: FIXME: We should stop the thread first, then free the manager. @@ -1101,7 +1146,14 @@ srs_error_t SrsAsyncSRTPManager::do_start() srs_error_reset(err); // Ignore any error. } + // Try to consume the packet by tunnel. + if (tunnel_enabled_ && pkt->task_->consume_by_tunnel(pkt)) { + continue; + } + cooked_packets_->push_back(pkt); + + ++_srs_tunnel_send_raw->sugar; } // If got packets, maybe more packets in queue. diff --git a/trunk/src/app/srs_app_threads.hpp b/trunk/src/app/srs_app_threads.hpp index 19c9900c26..c84a0e12b7 100644 --- a/trunk/src/app/srs_app_threads.hpp +++ b/trunk/src/app/srs_app_threads.hpp @@ -332,6 +332,10 @@ class SrsAsyncSRTPTask SrsSRTP* impl_; // For disposing, only set a flag, free it in future. int disposing_; +private: + // For tunnel, srtp-send. + SrsUdpMuxSocket* sendonly_skt_; + SrsThreadMutex* lock_; public: SrsAsyncSRTPTask(SrsAsyncSRTP* codec); virtual ~SrsAsyncSRTPTask(); @@ -341,6 +345,10 @@ class SrsAsyncSRTPTask public: srs_error_t cook(SrsAsyncSRTPPacket* pkt); srs_error_t consume(SrsAsyncSRTPPacket* pkt); +public: + void dig_tunnel(SrsUdpMuxSocket* skt); + // Try to consume by tunnel. + bool consume_by_tunnel(SrsAsyncSRTPPacket* pkt); }; // The async SRTP packet, handle by task. @@ -369,10 +377,16 @@ class SrsAsyncSRTPManager private: // The packets cooked by async SRTP manager. SrsThreadQueue* cooked_packets_; +private: + // Whether enabled tunnel. + bool tunnel_enabled_; public: SrsAsyncSRTPManager(); virtual ~SrsAsyncSRTPManager(); public: + // Enable or disable the tunnel. + // SrsAsyncSRTPManager::set_tunnel_enabled() + void set_tunnel_enabled(bool v) { tunnel_enabled_ = v; } void register_task(SrsAsyncSRTPTask* task); void on_srtp_codec_destroy(SrsAsyncSRTPTask* task); void add_packet(SrsAsyncSRTPPacket* pkt); @@ -498,6 +512,9 @@ class SrsAsyncSendManager void set_enabled(bool v) { enabled_ = v; } // Send the packet. void add_packet(SrsAsyncUdpPacket* pkt); + // Get the size of packets queue. + // SrsAsyncSendManager::size() + int size() { return sending_packets_->size(); } // Start the thread. static srs_error_t start(void* arg); private: