Skip to content

Commit

Permalink
Threads-SRTP: Support decrypt RTP by async SRTP.
Browse files Browse the repository at this point in the history
1. Create dedicate thread for async srtp.
2. SrsSecurityTransport use async srtp if config on.
3. Cook SRTP packets in async srtp.
4. Consume cooked SRTP packets in RTC server timer.
  • Loading branch information
winlinvip committed Apr 26, 2021
1 parent f068540 commit 88165ba
Show file tree
Hide file tree
Showing 8 changed files with 325 additions and 10 deletions.
6 changes: 6 additions & 0 deletions trunk/src/app/srs_app_hourglass.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ using namespace std;
#include <srs_kernel_error.hpp>
#include <srs_kernel_log.hpp>
#include <srs_kernel_utility.hpp>
#include <srs_app_threads.hpp>

#include <srs_protocol_kbps.hpp>

Expand Down Expand Up @@ -260,6 +261,11 @@ srs_error_t SrsClockWallMonitor::on_timer(srs_utime_t interval, srs_utime_t tick
++_srs_pps_timer_s->sugar;
}

// Consume the cooked async SRTP packets.
if ((err = _srs_async_srtp->consume()) != srs_success) {
srs_error_reset(err); // Ignore any error.
}

return err;
}

1 change: 1 addition & 0 deletions trunk/src/app/srs_app_hybrid.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <srs_kernel_error.hpp>
#include <srs_service_st.hpp>
#include <srs_app_utility.hpp>
#include <srs_app_threads.hpp>

using namespace std;

Expand Down
26 changes: 25 additions & 1 deletion trunk/src/app/srs_app_rtc_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ SrsSecurityTransport::SrsSecurityTransport(SrsRtcConnection* s)
if (!async_srtp) {
srtp_ = new SrsSRTP();
} else {
srtp_ = new SrsAsyncSRTP();
srtp_ = new SrsAsyncSRTP(this);
}

handshake_done = false;
Expand Down Expand Up @@ -198,6 +198,11 @@ srs_error_t SrsSecurityTransport::srtp_initialize()
return err;
}

srs_error_t SrsSecurityTransport::on_rtp_plaintext(char* plaintext, int size)
{
return session_->on_rtp_plaintext(plaintext, size);
}

srs_error_t SrsSecurityTransport::protect_rtp(void* packet, int* nb_cipher)
{
return srtp_->protect_rtp(packet, nb_cipher);
Expand Down Expand Up @@ -1190,6 +1195,12 @@ srs_error_t SrsRtcPublishStream::on_rtp(char* data, int nb_data)
return err;
}

// For async SRTP, the nb_plaintext might be zero, which means we do not got the plaintext
// right now, and it will callback if get one.
if (nb_plaintext == 0) {
return err;
}

// Handle the plaintext RTP packet.
if ((err = on_rtp_plaintext(plaintext, nb_plaintext)) != srs_success) {
// We try to decode the RTP header for more detail error informations.
Expand Down Expand Up @@ -2119,6 +2130,19 @@ srs_error_t SrsRtcConnection::on_rtp(char* data, int nb_data)
return publisher->on_rtp(data, nb_data);
}

srs_error_t SrsRtcConnection::on_rtp_plaintext(char* plaintext, int nb_plaintext)
{
srs_error_t err = srs_success;

SrsRtcPublishStream* publisher = NULL;
if ((err = find_publisher(plaintext, nb_plaintext, &publisher)) != srs_success) {
return srs_error_wrap(err, "find");
}
srs_assert(publisher);

return publisher->on_rtp_plaintext(plaintext, nb_plaintext);
}

srs_error_t SrsRtcConnection::find_publisher(char* buf, int size, SrsRtcPublishStream** ppublisher)
{
srs_error_t err = srs_success;
Expand Down
5 changes: 4 additions & 1 deletion trunk/src/app/srs_app_rtc_conn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ class SrsSecurityTransport : public ISrsRtcTransport
virtual srs_error_t write_dtls_data(void* data, int size);
private:
srs_error_t srtp_initialize();
public:
srs_error_t on_rtp_plaintext(char* plaintext, int size);
};

// Semi security transport, setup DTLS and SRTP, with SRTP decrypt, without SRTP encrypt.
Expand Down Expand Up @@ -326,7 +328,7 @@ class SrsRtcPublishStream : virtual public ISrsHourGlass, virtual public ISrsRtp
srs_error_t send_rtcp_xr_rrtr();
public:
srs_error_t on_rtp(char* buf, int nb_buf);
private:
public:
// @remark We copy the plaintext, user should free it.
srs_error_t on_rtp_plaintext(char* plaintext, int nb_plaintext);
private:
Expand Down Expand Up @@ -486,6 +488,7 @@ class SrsRtcConnection : public ISrsResource
srs_error_t on_dtls(char* data, int nb_data);
srs_error_t on_rtp(char* data, int nb_data);
private:
srs_error_t on_rtp_plaintext(char* plaintext, int size);
// Decode the RTP header from buf, find the publisher by SSRC.
srs_error_t find_publisher(char* buf, int size, SrsRtcPublishStream** ppublisher);
public:
Expand Down
10 changes: 5 additions & 5 deletions trunk/src/app/srs_app_rtc_dtls.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,12 @@ class SrsSRTP
virtual ~SrsSRTP();
public:
// Initialize srtp context with recv_key and send_key.
srs_error_t initialize(std::string recv_key, std::string send_key);
virtual srs_error_t initialize(std::string recv_key, std::string send_key);
public:
srs_error_t protect_rtp(void* packet, int* nb_cipher);
srs_error_t protect_rtcp(void* packet, int* nb_cipher);
srs_error_t unprotect_rtp(void* packet, int* nb_plaintext);
srs_error_t unprotect_rtcp(void* packet, int* nb_plaintext);
virtual srs_error_t protect_rtp(void* packet, int* nb_cipher);
virtual srs_error_t protect_rtcp(void* packet, int* nb_cipher);
virtual srs_error_t unprotect_rtp(void* packet, int* nb_plaintext);
virtual srs_error_t unprotect_rtcp(void* packet, int* nb_plaintext);
};

#endif
212 changes: 210 additions & 2 deletions trunk/src/app/srs_app_threads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -484,30 +484,238 @@ srs_error_t SrsAsyncLogManager::do_start()
// TODO: FIXME: It should be thread-local or thread-safe.
SrsAsyncLogManager* _srs_async_log = new SrsAsyncLogManager();

SrsAsyncSRTP::SrsAsyncSRTP()
SrsAsyncSRTP::SrsAsyncSRTP(SrsSecurityTransport* transport)
{
task_ = NULL;
transport_ = transport;
}

SrsAsyncSRTP::~SrsAsyncSRTP()
{
// TODO: FIXME: Check it carefully.
_srs_async_srtp->remove_task(task_);
}

srs_error_t SrsAsyncSRTP::initialize(std::string recv_key, std::string send_key)
{
srs_error_t err = srs_success;

srs_assert(!task_);
task_ = new SrsAsyncSRTPTask(this);
_srs_async_srtp->register_task(task_);

if ((err = task_->initialize(recv_key, send_key)) != srs_success) {
return srs_error_wrap(err, "init async srtp");
}

// TODO: FIMXE: Remove it.
return SrsSRTP::initialize(recv_key, send_key);
}

srs_error_t SrsAsyncSRTP::protect_rtp(void* packet, int* nb_cipher)
{
// TODO: FIMXE: Remove it.
return SrsSRTP::protect_rtp(packet, nb_cipher);
}

srs_error_t SrsAsyncSRTP::protect_rtcp(void* packet, int* nb_cipher)
{
// TODO: FIMXE: Remove it.
return SrsSRTP::protect_rtcp(packet, nb_cipher);
}

srs_error_t SrsAsyncSRTP::unprotect_rtp(void* packet, int* nb_plaintext)
{
return SrsSRTP::unprotect_rtp(packet, nb_plaintext);
int nb_cipher = *nb_plaintext;
char* buf = new char[nb_cipher];
memcpy(buf, packet, nb_cipher);

SrsAsyncSRTPPacket* pkt = new SrsAsyncSRTPPacket(task_);
pkt->msg_->wrap(buf, nb_cipher);
pkt->is_rtp_ = true;
pkt->do_decrypt_ = true;
_srs_async_srtp->add_packet(pkt);

// Do the job asynchronously.
*nb_plaintext = 0;

return srs_success;
}

srs_error_t SrsAsyncSRTP::unprotect_rtcp(void* packet, int* nb_plaintext)
{
// TODO: FIMXE: Remove it.
return SrsSRTP::unprotect_rtcp(packet, nb_plaintext);
}

SrsAsyncSRTPTask::SrsAsyncSRTPTask(SrsAsyncSRTP* codec)
{
codec_ = codec;
impl_ = new SrsSRTP();
}

SrsAsyncSRTPTask::~SrsAsyncSRTPTask()
{
srs_freep(impl_);
}

srs_error_t SrsAsyncSRTPTask::initialize(std::string recv_key, std::string send_key)
{
srs_error_t err = srs_success;

if ((err = impl_->initialize(recv_key, send_key)) != srs_success) {
return srs_error_wrap(err, "init srtp impl");
}

return err;
}

srs_error_t SrsAsyncSRTPTask::cook(SrsAsyncSRTPPacket* pkt)
{
srs_error_t err = srs_success;

if (pkt->do_decrypt_) {
if (pkt->is_rtp_) {
pkt->nb_consumed_ = pkt->msg_->size;
err = impl_->unprotect_rtp(pkt->msg_->payload, &pkt->nb_consumed_);
}
}
if (err != srs_success) {
return err;
}

return err;
}

SrsAsyncSRTPPacket::SrsAsyncSRTPPacket(SrsAsyncSRTPTask* task)
{
task_ = task;
msg_ = new SrsSharedPtrMessage();
is_rtp_ = false;
do_decrypt_ = false;
nb_consumed_ = 0;
}

SrsAsyncSRTPPacket::~SrsAsyncSRTPPacket()
{
srs_freep(msg_);
}

SrsAsyncSRTPManager::SrsAsyncSRTPManager()
{
lock_ = new SrsThreadMutex();
packets_ = new SrsThreadQueue<SrsAsyncSRTPPacket>();
cooked_packets_ = new SrsThreadQueue<SrsAsyncSRTPPacket>();
}

// TODO: FIXME: We should stop the thread first, then free the manager.
SrsAsyncSRTPManager::~SrsAsyncSRTPManager()
{
srs_freep(lock_);
srs_freep(packets_);
srs_freep(cooked_packets_);

vector<SrsAsyncSRTPTask*>::iterator it;
for (it = tasks_.begin(); it != tasks_.end(); ++it) {
SrsAsyncSRTPTask* task = *it;
srs_freep(task);
}
}

void SrsAsyncSRTPManager::register_task(SrsAsyncSRTPTask* task)
{
if (!task) {
return;
}

SrsThreadLocker(lock_);
tasks_.push_back(task);
}

void SrsAsyncSRTPManager::remove_task(SrsAsyncSRTPTask* task)
{
if (!task) {
return;
}

SrsThreadLocker(lock_);
vector<SrsAsyncSRTPTask*>::iterator it;
if ((it = std::find(tasks_.begin(), tasks_.end(), task)) != tasks_.end()) {
tasks_.erase(it);
srs_freep(task);
}
}

void SrsAsyncSRTPManager::add_packet(SrsAsyncSRTPPacket* pkt)
{
packets_->push_back(pkt);
}

srs_error_t SrsAsyncSRTPManager::start(void* arg)
{
SrsAsyncSRTPManager* srtp = (SrsAsyncSRTPManager*)arg;
return srtp->do_start();
}

srs_error_t SrsAsyncSRTPManager::do_start()
{
srs_error_t err = srs_success;

srs_utime_t interval = 10 * SRS_UTIME_MILLISECONDS;
while (true) {
vector<SrsAsyncSRTPPacket*> flying;
packets_->swap(flying);

for (int i = 0; i < (int)flying.size(); i++) {
SrsAsyncSRTPPacket* pkt = flying.at(i);

if ((err = pkt->task_->cook(pkt)) != srs_success) {
srs_error_reset(err); // Ignore any error.
}

cooked_packets_->push_back(pkt);
}

// If got packets, maybe more packets in queue.
if (!flying.empty()) {
continue;
}

// TODO: FIXME: Maybe we should use cond wait?
timespec tv = {0};
tv.tv_sec = interval / SRS_UTIME_SECONDS;
tv.tv_nsec = (interval % SRS_UTIME_MILLISECONDS) * 1000;
nanosleep(&tv, NULL);
}

return err;
}

srs_error_t SrsAsyncSRTPManager::consume()
{
srs_error_t err = srs_success;

vector<SrsAsyncSRTPPacket*> flying;
cooked_packets_->swap(flying);

for (int i = 0; i < (int)flying.size(); i++) {
SrsAsyncSRTPPacket* pkt = flying.at(i);
SrsSecurityTransport* transport = pkt->task_->codec_->transport_;
char* payload = pkt->msg_->payload;

if (pkt->do_decrypt_) {
if (pkt->is_rtp_) {
err = transport->on_rtp_plaintext(payload, pkt->nb_consumed_);
}
}
if (err != srs_success) {
srs_error_reset(err); // Ignore any error.
}

srs_freep(pkt);
}

return err;
}

SrsAsyncSRTPManager* _srs_async_srtp = new SrsAsyncSRTPManager();
Loading

0 comments on commit 88165ba

Please sign in to comment.