Skip to content

Commit

Permalink
imple RTCHandler for demuxerHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
yngrtc committed Mar 24, 2024
1 parent 388b6b3 commit a33a4ea
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 70 deletions.
107 changes: 60 additions & 47 deletions rtc/src/handlers/demuxer.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::messages::{
DTLSMessageEvent, MessageEvent, RTPMessageEvent, STUNMessageEvent, TaggedMessageEvent,
};
use crate::handlers::RTCHandler;
use crate::messages::{DTLSMessageEvent, RTCMessageEvent, RTPMessageEvent, STUNMessageEvent};
use log::{debug, error};
use retty::channel::{Context, Handler};
use retty::transport::TaggedBytesMut;
use shared::Transmit;
use std::collections::VecDeque;

/// match_range is a MatchFunc that accepts packets with the first byte in [lower..upper]
fn match_range(lower: u8, upper: u8, buf: &[u8]) -> bool {
Expand Down Expand Up @@ -41,64 +40,78 @@ fn match_srtp(b: &[u8]) -> bool {

/// DemuxerHandler implements demuxing of STUN/DTLS/RTP/RTCP Protocol packets
#[derive(Default)]
pub struct DemuxerHandler;
pub struct DemuxerHandler {
next: Option<Box<dyn RTCHandler>>,

transmits: VecDeque<Transmit<RTCMessageEvent>>,
}

impl DemuxerHandler {
pub fn new() -> Self {
DemuxerHandler
DemuxerHandler::default()
}
}

impl Handler for DemuxerHandler {
type Rin = TaggedBytesMut;
type Rout = TaggedMessageEvent;
type Win = TaggedMessageEvent;
type Wout = TaggedBytesMut;
impl RTCHandler for DemuxerHandler {
fn chain(mut self: Box<Self>, next: Box<dyn RTCHandler>) -> Box<dyn RTCHandler> {
self.next = Some(next);
self
}

fn name(&self) -> &str {
"DemuxerHandler"
fn next(&mut self) -> Option<&mut Box<dyn RTCHandler>> {
self.next.as_mut()
}

fn handle_read(
&mut self,
ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>,
msg: Self::Rin,
) {
if msg.message.is_empty() {
error!("drop invalid packet due to zero length");
} else if match_dtls(&msg.message) {
ctx.fire_read(TaggedMessageEvent {
now: msg.now,
transport: msg.transport,
message: MessageEvent::Dtls(DTLSMessageEvent::Raw(msg.message)),
});
} else if match_srtp(&msg.message) {
ctx.fire_read(TaggedMessageEvent {
now: msg.now,
transport: msg.transport,
message: MessageEvent::Rtp(RTPMessageEvent::Raw(msg.message)),
});
fn handle_transmit(&mut self, msg: Transmit<RTCMessageEvent>) {
if let RTCMessageEvent::Raw(message) = msg.message {
if message.is_empty() {
error!("drop invalid packet due to zero length");
return;
}

let next_msg = if match_dtls(&message) {
Transmit {
now: msg.now,
transport: msg.transport,
message: RTCMessageEvent::Dtls(DTLSMessageEvent::Raw(message)),
}
} else if match_srtp(&message) {
Transmit {
now: msg.now,
transport: msg.transport,
message: RTCMessageEvent::Rtp(RTPMessageEvent::Raw(message)),
}
} else {
Transmit {
now: msg.now,
transport: msg.transport,
message: RTCMessageEvent::Stun(STUNMessageEvent::Raw(message)),
}
};

if let Some(next) = self.next() {
next.handle_transmit(next_msg);
}
} else {
ctx.fire_read(TaggedMessageEvent {
now: msg.now,
transport: msg.transport,
message: MessageEvent::Stun(STUNMessageEvent::Raw(msg.message)),
});
debug!("drop non-RAW packet {:?}", msg.message);
}
}

fn poll_write(
&mut self,
ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>,
) -> Option<Self::Wout> {
if let Some(msg) = ctx.fire_poll_write() {
fn poll_transmit(&mut self) -> Option<Transmit<RTCMessageEvent>> {
let transmit = if let Some(next) = self.next() {
next.poll_transmit()
} else {
None
};

if let Some(msg) = transmit {
match msg.message {
MessageEvent::Stun(STUNMessageEvent::Raw(message))
| MessageEvent::Dtls(DTLSMessageEvent::Raw(message))
| MessageEvent::Rtp(RTPMessageEvent::Raw(message)) => Some(TaggedBytesMut {
RTCMessageEvent::Stun(STUNMessageEvent::Raw(message))
| RTCMessageEvent::Dtls(DTLSMessageEvent::Raw(message))
| RTCMessageEvent::Rtp(RTPMessageEvent::Raw(message)) => Some(Transmit {
now: msg.now,
transport: msg.transport,
message,
message: RTCMessageEvent::Raw(message),
}),
_ => {
debug!("drop non-RAW packet {:?}", msg.message);
Expand Down
53 changes: 30 additions & 23 deletions rtc/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,53 +3,60 @@ use shared::error::Error;
use shared::Transmit;
use std::time::Instant;

//pub mod demuxer;
pub mod demuxer;
//pub mod dtls;
//pub mod stun;

pub enum HandlerEvent {
Inbound(Transmit<RTCMessageEvent>),
Outbound(Transmit<RTCMessageEvent>),
Error(Error),
}
pub trait RTCHandler {
/// Chains next handler
fn chain(self: Box<Self>, next: Box<dyn RTCHandler>) -> Box<dyn RTCHandler>;

pub trait Handler {
fn chain(self: Box<Self>, next: Box<dyn Handler>) -> Box<dyn Handler>;
/// Returns next handler
fn next(&mut self) -> Option<&mut Box<dyn RTCHandler>>;

fn next(&mut self) -> Option<&mut Box<dyn Handler>>;
/// Handles input message
fn handle_transmit(&mut self, msg: Transmit<RTCMessageEvent>) {
if let Some(next) = self.next() {
next.handle_transmit(msg);
}
}

fn read(&mut self, msg: &mut Transmit<RTCMessageEvent>) -> Vec<HandlerEvent> {
/// Polls output message from internal transmit queue
fn poll_transmit(&mut self) -> Option<Transmit<RTCMessageEvent>> {
if let Some(next) = self.next() {
next.read(msg)
next.poll_transmit()
} else {
vec![]
None
}
}
fn write(&mut self, msg: &mut Transmit<RTCMessageEvent>) -> Vec<HandlerEvent> {

/// Handles a timeout event
fn handle_timeout(&mut self, now: Instant) {
if let Some(next) = self.next() {
next.write(msg)
} else {
vec![]
next.handle_timeout(now);
}
}

fn handle_timeout(&mut self, now: Instant) -> Vec<HandlerEvent> {
/// Polls a timeout event
fn poll_timeout(&mut self) -> Option<Instant> {
if let Some(next) = self.next() {
next.handle_timeout(now)
next.poll_timeout()
} else {
vec![]
None
}
}

fn poll_timeout(&mut self, eto: &mut Instant) {
/// Handle an error event
fn handle_error(&mut self, err: Error) {
if let Some(next) = self.next() {
next.poll_timeout(eto);
next.handle_error(err)
}
}

fn close(&mut self) {
/// Handle a close event
fn handle_close(&mut self) {
if let Some(next) = self.next() {
next.close();
next.handle_close();
}
}
}
1 change: 1 addition & 0 deletions rtc/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub enum RTPMessageEvent {

#[derive(Debug)]
pub enum RTCMessageEvent {
Raw(BytesMut),
Stun(STUNMessageEvent),
Dtls(DTLSMessageEvent),
Rtp(RTPMessageEvent),
Expand Down

0 comments on commit a33a4ea

Please sign in to comment.