diff --git a/rtc/src/handlers/ice.rs b/rtc/src/handlers/ice.rs new file mode 100644 index 0000000..192315e --- /dev/null +++ b/rtc/src/handlers/ice.rs @@ -0,0 +1,127 @@ +use crate::handlers::RTCHandler; +use crate::messages::{RTCEvent, RTCMessage, STUNMessage}; +use crate::transport::ice_transport::ice_candidate_pair::RTCIceCandidatePair; +use crate::transport::ice_transport::{IceTransportEvent, RTCIceTransport}; +use bytes::BytesMut; +use ice::Event; +use log::{debug, error, warn}; +use shared::error::{Error, Result}; +use shared::Transmit; +use std::time::Instant; + +impl RTCHandler for RTCIceTransport { + fn handle_transmit(&mut self, msg: Transmit) -> Vec> { + if let RTCMessage::Stun(STUNMessage::Raw(message)) = msg.message { + let stun_transmit = Transmit { + now: msg.now, + transport: msg.transport, + message, + }; + + let try_read = || -> Result<()> { + let ice_agent = self + .gatherer + .agent + .as_mut() + .ok_or(Error::ErrICEAgentNotExist)?; + + ice_agent.handle_read(stun_transmit) + }; + + if let Err(err) = try_read() { + warn!("try_read got error {}", err); + self.handle_error(err); + } + vec![] + } else { + debug!("bypass StunHandler read for {}", msg.transport.peer_addr); + vec![msg] + } + } + + fn poll_transmit(&mut self, msg: Option>) -> Option> { + if let Some(msg) = msg { + if let RTCMessage::Stun(STUNMessage::Stun(mut stun_message)) = msg.message { + debug!( + "StunMessage type {} sent to {}", + stun_message.typ, msg.transport.peer_addr + ); + stun_message.encode(); + let message = BytesMut::from(&stun_message.raw[..]); + self.transmits.push_back(Transmit { + now: msg.now, + transport: msg.transport, + message: RTCMessage::Stun(STUNMessage::Raw(message)), + }); + } else { + debug!("bypass StunHandler write for {}", msg.transport.peer_addr); + self.transmits.push_back(msg); + } + } + + self.transmits.pop_front() + } + + fn poll_event(&mut self) -> Option { + if let Some(ice_agent) = self.gatherer.agent.as_mut() { + if let Some(event) = ice_agent.poll_event() { + match event { + Event::ConnectionStateChange(state) => Some(RTCEvent::IceTransportEvent( + IceTransportEvent::OnConnectionStateChange(state.into()), + )), + Event::SelectedCandidatePairChange(local, remote) => { + Some(RTCEvent::IceTransportEvent( + IceTransportEvent::OnSelectedCandidatePairChange(Box::new( + RTCIceCandidatePair::new((&*local).into(), (&*remote).into()), + )), + )) + } + } + } else { + None + } + } else { + None + } + } + + /// Handles a timeout event + fn handle_timeout(&mut self, now: Instant) { + let mut try_timeout = || -> Result<()> { + let ice_agent = self + .gatherer + .agent + .as_mut() + .ok_or(Error::ErrICEAgentNotExist)?; + + ice_agent.handle_timeout(now); + while let Some(transmit) = ice_agent.poll_transmit() { + self.transmits.push_back(Transmit { + now: transmit.now, + transport: transmit.transport, + message: RTCMessage::Stun(STUNMessage::Raw(transmit.message)), + }); + } + + Ok(()) + }; + match try_timeout() { + Ok(_) => {} + Err(err) => { + error!("try_timeout with error {}", err); + self.handle_error(err); + } + } + } + + /// Polls a timeout event + fn poll_timeout(&mut self, eto: &mut Instant) { + if let Some(ice_agent) = self.gatherer.agent.as_mut() { + if let Some(timeout) = ice_agent.poll_timeout() { + if timeout < *eto { + *eto = timeout; + } + } + } + } +} diff --git a/rtc/src/handlers/mod.rs b/rtc/src/handlers/mod.rs index 1506f9e..7fc8696 100644 --- a/rtc/src/handlers/mod.rs +++ b/rtc/src/handlers/mod.rs @@ -5,8 +5,8 @@ use std::time::Instant; pub mod demuxer; pub mod dtls; +pub mod ice; mod sctp; -pub mod stun; pub trait RTCHandler { /// Handles input message diff --git a/rtc/src/handlers/stun.rs b/rtc/src/handlers/stun.rs deleted file mode 100644 index 77154b0..0000000 --- a/rtc/src/handlers/stun.rs +++ /dev/null @@ -1,75 +0,0 @@ -use crate::handlers::RTCHandler; -use crate::messages::{RTCMessage, STUNMessage}; -use bytes::BytesMut; -use log::{debug, warn}; -use shared::error::Result; -use shared::Transmit; -use stun::message::Message; - -/// StunHandler implements STUN Protocol handling -#[derive(Default)] -pub struct StunCodec; - -impl StunCodec { - pub fn new() -> Self { - StunCodec - } -} - -impl RTCHandler for StunCodec { - fn handle_transmit(&mut self, msg: Transmit) -> Vec> { - if let RTCMessage::Stun(STUNMessage::Raw(message)) = msg.message { - let try_read = || -> Result { - let mut stun_message = Message { - raw: message.to_vec(), - ..Default::default() - }; - stun_message.decode()?; - debug!( - "StunMessage type {} received from {}", - stun_message.typ, msg.transport.peer_addr - ); - Ok(stun_message) - }; - - match try_read() { - Ok(stun_message) => vec![Transmit { - now: msg.now, - transport: msg.transport, - message: RTCMessage::Stun(STUNMessage::Stun(stun_message)), - }], - Err(err) => { - warn!("try_read got error {}", err); - self.handle_error(err); - vec![] - } - } - } else { - debug!("bypass StunHandler read for {}", msg.transport.peer_addr); - vec![msg] - } - } - - fn poll_transmit(&mut self, msg: Option>) -> Option> { - if let Some(msg) = msg { - if let RTCMessage::Stun(STUNMessage::Stun(mut stun_message)) = msg.message { - debug!( - "StunMessage type {} sent to {}", - stun_message.typ, msg.transport.peer_addr - ); - stun_message.encode(); - let message = BytesMut::from(&stun_message.raw[..]); - Some(Transmit { - now: msg.now, - transport: msg.transport, - message: RTCMessage::Stun(STUNMessage::Raw(message)), - }) - } else { - debug!("bypass StunHandler write for {}", msg.transport.peer_addr); - Some(msg) - } - } else { - None - } - } -} diff --git a/rtc/src/transport/ice_transport/mod.rs b/rtc/src/transport/ice_transport/mod.rs index c5c519e..5952e15 100644 --- a/rtc/src/transport/ice_transport/mod.rs +++ b/rtc/src/transport/ice_transport/mod.rs @@ -8,11 +8,13 @@ use ice_role::RTCIceRole; use std::collections::VecDeque; //use crate::transports::ice_transport::ice_parameters::RTCIceParameters; +use crate::messages::RTCMessage; use crate::stats::stats_collector::StatsCollector; use crate::stats::ICETransportStats; use crate::stats::StatsReportType::Transport; use crate::transport::ice_transport::ice_transport_state::RTCIceTransportState; use shared::error::{Error, Result}; +use shared::Transmit; /*TODO:#[cfg(test)] mod ice_transport_test; @@ -37,26 +39,23 @@ pub enum IceTransportEvent { OnSelectedCandidatePairChange(Box), } -#[derive(Default)] -struct ICETransportInternal {} - /// ICETransport allows an application access to information about the ICE /// transport over which packets are sent and received. #[derive(Default)] pub struct RTCIceTransport { pub(crate) gatherer: RTCIceGatherer, state: RTCIceTransportState, - events: VecDeque, role: RTCIceRole, + + pub(crate) transmits: VecDeque>, } impl RTCIceTransport { - /// creates a new new_icetransport. + /// creates a new new_ice_transport. pub(crate) fn new(gatherer: RTCIceGatherer) -> Self { RTCIceTransport { gatherer, state: RTCIceTransportState::New, - events: VecDeque::new(), ..Default::default() } }