diff --git a/rtc/src/handler/ice.rs b/rtc/src/handler/ice.rs index d3135eb..e228bba 100644 --- a/rtc/src/handler/ice.rs +++ b/rtc/src/handler/ice.rs @@ -5,7 +5,7 @@ use crate::transport::ice_transport::{IceTransportEvent, RTCIceTransport}; use bytes::BytesMut; use ice::Event; use log::{debug, error, warn}; -use shared::error::{Error, Result}; +use shared::error::Result; use shared::Transmit; use std::time::Instant; @@ -18,15 +18,7 @@ impl RTCHandler for RTCIceTransport { message, }; - let try_read = || -> Result<()> { - let ice_agent = self - .gatherer - .agent - .as_mut() - .ok_or(Error::ErrICEAgentNotExist)?; - - ice_agent.handle_read(stun_transmit) - }; + let try_read = || -> Result<()> { self.gatherer.agent.handle_read(stun_transmit) }; if let Err(err) = try_read() { warn!("try_read got error {}", err); @@ -63,22 +55,18 @@ impl RTCHandler for RTCIceTransport { } fn poll_event(&mut self) -> Option { - if let Some(ice_agent) = self.gatherer.agent.as_mut() { - if let Some(event) = ice_agent.poll_event() { - match event { - Event::ConnectionStateChange(state) => Some(RTCEvent::IceTransportEvent( - IceTransportEvent::OnConnectionStateChange(state.into()), - )), - Event::SelectedCandidatePairChange(local, remote) => { - Some(RTCEvent::IceTransportEvent( - IceTransportEvent::OnSelectedCandidatePairChange(Box::new( - RTCIceCandidatePair::new((&*local).into(), (&*remote).into()), - )), - )) - } + if let Some(event) = self.gatherer.agent.poll_event() { + match event { + Event::ConnectionStateChange(state) => Some(RTCEvent::IceTransportEvent( + IceTransportEvent::OnConnectionStateChange(state.into()), + )), + Event::SelectedCandidatePairChange(local, remote) => { + Some(RTCEvent::IceTransportEvent( + IceTransportEvent::OnSelectedCandidatePairChange(Box::new( + RTCIceCandidatePair::new((&*local).into(), (&*remote).into()), + )), + )) } - } else { - None } } else { None @@ -88,14 +76,8 @@ impl RTCHandler for RTCIceTransport { /// Handles a timeout event fn handle_timeout(&mut self, now: Instant) { let mut try_timeout = || -> Result<()> { - let ice_agent = self - .gatherer - .agent - .as_mut() - .ok_or(Error::ErrICEAgentNotExist)?; - - ice_agent.handle_timeout(now); - while let Some(transmit) = ice_agent.poll_transmit() { + self.gatherer.agent.handle_timeout(now); + while let Some(transmit) = self.gatherer.agent.poll_transmit() { self.transmits.push_back(Transmit { now: transmit.now, transport: transmit.transport, @@ -116,11 +98,9 @@ impl RTCHandler for RTCIceTransport { /// Polls a timeout event fn poll_timeout(&mut self, eto: &mut Instant) { - if let Some(ice_agent) = self.gatherer.agent.as_mut() { - if let Some(timeout) = ice_agent.poll_timeout() { - if timeout < *eto { - *eto = timeout; - } + if let Some(timeout) = self.gatherer.agent.poll_timeout() { + if timeout < *eto { + *eto = timeout; } } } diff --git a/rtc/src/peer_connection/mod.rs b/rtc/src/peer_connection/mod.rs index dc134b8..5345985 100644 --- a/rtc/src/peer_connection/mod.rs +++ b/rtc/src/peer_connection/mod.rs @@ -73,6 +73,10 @@ use crate::peer_connection::sdp::session_description::RTCSessionDescription; use crate::peer_connection::signaling_state::{ /*check_next_signaling_state,*/ RTCSignalingState, //StateChangeOp, }; +//use crate::transport::dtls_transport::RTCDtlsTransport; +use crate::transport::ice_transport::ice_gatherer::{RTCIceGatherOptions, RTCIceGatherer}; +use crate::transport::ice_transport::RTCIceTransport; +//use crate::transport::sctp_transport::RTCSctpTransport; /*use crate::rtp_transceiver::rtp_codec::{RTCRtpHeaderExtensionCapability, RTPCodecType}; use crate::rtp_transceiver::rtp_receiver::RTCRtpReceiver; use crate::rtp_transceiver::rtp_sender::RTCRtpSender; @@ -177,8 +181,9 @@ pub struct RTCPeerConnection { pub(super) pending_local_description: Option, pub(super) pending_remote_description: Option, - pub(super) ice_agent: ice::Agent, - + pub(super) ice_transport: RTCIceTransport, + //pub(super) dtls_transport: RTCDtlsTransport, + //pub(super) sctp_transport: RTCSctpTransport, /// ops is an operations queue which will ensure the enqueued actions are /// executed in order. It is used for asynchronously, but serially processing /// remote and local descriptions @@ -228,36 +233,15 @@ impl RTCPeerConnection { pub(crate) fn new(api: &API, mut configuration: RTCConfiguration) -> Result { RTCPeerConnection::init_configuration(&mut configuration)?; - let mut candidate_types = vec![]; - if api.setting_engine.candidates.ice_lite { - candidate_types.push(ice::candidate::CandidateType::Host); - } else if configuration.ice_transport_policy == RTCIceTransportPolicy::Relay { - candidate_types.push(ice::candidate::CandidateType::Relay); - } - - let mut validated_servers = vec![]; - for server in configuration.get_ice_servers() { - let url = server.urls()?; - validated_servers.extend(url); - } + // Create the ICE transport + let ice_transport = Self::create_ice_transport(api, &configuration)?; - let ice_agent_config = ice::AgentConfig { - lite: api.setting_engine.candidates.ice_lite, - urls: validated_servers, - disconnected_timeout: api.setting_engine.timeout.ice_disconnected_timeout, - failed_timeout: api.setting_engine.timeout.ice_failed_timeout, - keepalive_interval: api.setting_engine.timeout.ice_keepalive_interval, - candidate_types, - host_acceptance_min_wait: api.setting_engine.timeout.ice_host_acceptance_min_wait, - srflx_acceptance_min_wait: api.setting_engine.timeout.ice_srflx_acceptance_min_wait, - prflx_acceptance_min_wait: api.setting_engine.timeout.ice_prflx_acceptance_min_wait, - relay_acceptance_min_wait: api.setting_engine.timeout.ice_relay_acceptance_min_wait, - local_ufrag: api.setting_engine.candidates.username_fragment.clone(), - local_pwd: api.setting_engine.candidates.password.clone(), - ..Default::default() - }; + // Create the DTLS transport + //let certificates = configuration.certificates.drain(..).collect(); + //let dtls_transport = api.new_dtls_transport(Arc::clone(&pc.ice_transport), certificates)?); - let ice_agent = ice::Agent::new(Arc::new(ice_agent_config))?; + // Create the SCTP transport + //let sctp_transport = Arc::new(api.new_sctp_transport(Arc::clone(&pc.dtls_transport))?); // (Step #2) // Some variables defined explicitly despite their implicit zero values to @@ -291,7 +275,9 @@ impl RTCPeerConnection { setting_engine: api.setting_engine.clone(), media_engine: api.media_engine.clone(), is_negotiation_needed: false, - ice_agent, + + ice_transport, + events: Default::default(), }) } @@ -690,6 +676,74 @@ impl RTCPeerConnection { self.stats_id.as_str() } + fn create_ice_gatherer( + ice_agent: ice::Agent, + opts: RTCIceGatherOptions, + setting_engine: &Arc, + ) -> Result { + let mut validated_servers = vec![]; + if !opts.ice_servers.is_empty() { + for server in &opts.ice_servers { + let url = server.urls()?; + validated_servers.extend(url); + } + } + + Ok(RTCIceGatherer::new( + ice_agent, + validated_servers, + opts.ice_gather_policy, + Arc::clone(setting_engine), + )) + } + + fn create_ice_transport( + api: &API, + configuration: &RTCConfiguration, + ) -> Result { + let mut candidate_types = vec![]; + if api.setting_engine.candidates.ice_lite { + candidate_types.push(ice::candidate::CandidateType::Host); + } else if configuration.ice_transport_policy == RTCIceTransportPolicy::Relay { + candidate_types.push(ice::candidate::CandidateType::Relay); + } + + let mut validated_servers = vec![]; + for server in configuration.get_ice_servers() { + let url = server.urls()?; + validated_servers.extend(url); + } + + let ice_agent_config = ice::AgentConfig { + lite: api.setting_engine.candidates.ice_lite, + urls: validated_servers, + disconnected_timeout: api.setting_engine.timeout.ice_disconnected_timeout, + failed_timeout: api.setting_engine.timeout.ice_failed_timeout, + keepalive_interval: api.setting_engine.timeout.ice_keepalive_interval, + candidate_types, + host_acceptance_min_wait: api.setting_engine.timeout.ice_host_acceptance_min_wait, + srflx_acceptance_min_wait: api.setting_engine.timeout.ice_srflx_acceptance_min_wait, + prflx_acceptance_min_wait: api.setting_engine.timeout.ice_prflx_acceptance_min_wait, + relay_acceptance_min_wait: api.setting_engine.timeout.ice_relay_acceptance_min_wait, + local_ufrag: api.setting_engine.candidates.username_fragment.clone(), + local_pwd: api.setting_engine.candidates.password.clone(), + ..Default::default() + }; + + // Create the ICE transport + let ice_agent = ice::Agent::new(Arc::new(ice_agent_config))?; + let ice_gatherer = Self::create_ice_gatherer( + ice_agent, + RTCIceGatherOptions { + ice_servers: configuration.get_ice_servers(), + ice_gather_policy: configuration.ice_transport_policy, + }, + &api.setting_engine, + )?; + + Ok(RTCIceTransport::new(ice_gatherer)) + } + /* /// create_offer starts the PeerConnection and generates the localDescription /// diff --git a/rtc/src/transport/ice_transport/ice_gatherer.rs b/rtc/src/transport/ice_transport/ice_gatherer.rs index 5d98b48..5265a74 100644 --- a/rtc/src/transport/ice_transport/ice_gatherer.rs +++ b/rtc/src/transport/ice_transport/ice_gatherer.rs @@ -14,7 +14,7 @@ use crate::transport::ice_transport::ice_candidate::*; use crate::transport::ice_transport::ice_gatherer_state::RTCIceGathererState; use crate::transport::ice_transport::ice_parameters::RTCIceParameters; use crate::transport::ice_transport::ice_server::RTCIceServer; -use shared::error::{Error, Result}; +use shared::error::Result; /// ICEGatherOptions provides options relating to the gathering of ICE candidates. #[derive(Default, Debug, Clone)] @@ -33,7 +33,6 @@ pub enum IceGathererEvent { /// candidates, as well as enabling the retrieval of local Interactive /// Connectivity Establishment (ICE) parameters which can be /// exchanged in signaling. -#[derive(Default)] pub struct RTCIceGatherer { pub(crate) validated_servers: Vec, pub(crate) gather_policy: RTCIceTransportPolicy, @@ -42,87 +41,25 @@ pub struct RTCIceGatherer { pub(crate) state: RTCIceGathererState, pub(crate) events: VecDeque, - pub(crate) agent: Option, + pub(crate) agent: Agent, } impl RTCIceGatherer { pub(crate) fn new( + agent: Agent, validated_servers: Vec, gather_policy: RTCIceTransportPolicy, setting_engine: Arc, ) -> Self { RTCIceGatherer { + agent, gather_policy, validated_servers, setting_engine, state: RTCIceGathererState::New, events: VecDeque::new(), - - agent: None, - } - } - - pub(crate) fn create_agent(&mut self) -> Result<()> { - if self.agent.is_some() || self.state() != RTCIceGathererState::New { - return Ok(()); } - - let mut candidate_types = vec![]; - if self.setting_engine.candidates.ice_lite { - candidate_types.push(ice::candidate::CandidateType::Host); - } else if self.gather_policy == RTCIceTransportPolicy::Relay { - candidate_types.push(ice::candidate::CandidateType::Relay); - } - - /*let nat_1to1_cand_type = match self.setting_engine.candidates.nat_1to1_ip_candidate_type { - RTCIceCandidateType::Host => CandidateType::Host, - RTCIceCandidateType::Srflx => CandidateType::ServerReflexive, - _ => CandidateType::Unspecified, - };*/ - - //TOOD: let mdns_mode = self.setting_engine.candidates.multicast_dns_mode; - - let config = ice::agent::agent_config::AgentConfig { - //TODO: udp_network: self.setting_engine.udp_network.clone(), - lite: self.setting_engine.candidates.ice_lite, - urls: self.validated_servers.clone(), - disconnected_timeout: self.setting_engine.timeout.ice_disconnected_timeout, - failed_timeout: self.setting_engine.timeout.ice_failed_timeout, - keepalive_interval: self.setting_engine.timeout.ice_keepalive_interval, - candidate_types, - host_acceptance_min_wait: self.setting_engine.timeout.ice_host_acceptance_min_wait, - srflx_acceptance_min_wait: self.setting_engine.timeout.ice_srflx_acceptance_min_wait, - prflx_acceptance_min_wait: self.setting_engine.timeout.ice_prflx_acceptance_min_wait, - relay_acceptance_min_wait: self.setting_engine.timeout.ice_relay_acceptance_min_wait, - /*TODO: interface_filter: self.setting_engine.candidates.interface_filter.clone(), - ip_filter: self.setting_engine.candidates.ip_filter.clone(), - nat_1to1_ips: self.setting_engine.candidates.nat_1to1_ips.clone(), - nat_1to1_ip_candidate_type: nat_1to1_cand_type, - net: self.setting_engine.vnet.clone(), - multicast_dns_mode: mdns_mode, - multicast_dns_host_name: self - .setting_engine - .candidates - .multicast_dns_host_name - .clone(),*/ - local_ufrag: self.setting_engine.candidates.username_fragment.clone(), - local_pwd: self.setting_engine.candidates.password.clone(), - ..Default::default() - }; - - /*TODO: let requested_network_types = if self.setting_engine.candidates.ice_network_types.is_empty() - { - ice::network_type::supported_network_types() - } else { - self.setting_engine.candidates.ice_network_types.clone() - }; - - config.network_types.extend(requested_network_types);*/ - - self.agent = Some(Agent::new(Arc::new(config))?); - - Ok(()) } /*TODO:/// Gather ICE candidates. @@ -181,21 +118,13 @@ impl RTCIceGatherer { /// Close prunes all local candidates, and closes the ports. pub fn close(&mut self) -> Result<()> { self.set_state(RTCIceGathererState::Closed); - if let Some(mut agent) = self.agent.take() { - agent.close()?; - } + self.agent.close()?; Ok(()) } /// get_local_parameters returns the ICE parameters of the ICEGatherer. pub fn get_local_parameters(&mut self) -> Result { - self.create_agent()?; - - let Credentials { ufrag, pwd } = if let Some(agent) = self.get_agent() { - agent.get_local_credentials() - } else { - return Err(Error::ErrICEAgentNotExist); - }; + let Credentials { ufrag, pwd } = self.agent.get_local_credentials(); Ok(RTCIceParameters { username_fragment: ufrag.to_string(), @@ -205,16 +134,9 @@ impl RTCIceGatherer { } /// get_local_candidates returns the sequence of valid local candidates associated with the ICEGatherer. - pub fn get_local_candidates(&mut self) -> Result> { - self.create_agent()?; - - let ice_candidates = if let Some(agent) = self.get_agent() { - agent.get_local_candidates() - } else { - return Err(Error::ErrICEAgentNotExist); - }; - - Ok(rtc_ice_candidates_from_ice_candidates(ice_candidates)) + pub fn get_local_candidates(&mut self) -> Vec { + let ice_candidates = self.agent.get_local_candidates(); + rtc_ice_candidates_from_ice_candidates(ice_candidates) } /// State indicates the current state of the ICE gatherer. @@ -228,39 +150,29 @@ impl RTCIceGatherer { .push_back(IceGathererEvent::OnICEGathererState(s)); } - pub(crate) fn get_agent(&self) -> Option<&Agent> { - self.agent.as_ref() - } + pub(crate) fn collect_stats(&self, collector: &mut StatsCollector) { + let mut reports = HashMap::new(); - pub(crate) fn get_mut_agent(&mut self) -> Option<&mut Agent> { - self.agent.as_mut() - } + for stats in self.agent.get_candidate_pairs_stats() { + let stats: ICECandidatePairStats = stats.into(); + reports.insert(stats.id.clone(), StatsReportType::CandidatePair(stats)); + } - pub(crate) fn collect_stats(&self, collector: &mut StatsCollector) { - if let Some(agent) = self.get_agent() { - let mut reports = HashMap::new(); - - for stats in agent.get_candidate_pairs_stats() { - let stats: ICECandidatePairStats = stats.into(); - reports.insert(stats.id.clone(), StatsReportType::CandidatePair(stats)); - } - - for stats in agent.get_local_candidates_stats() { - reports.insert( - stats.id.clone(), - StatsReportType::from(LocalCandidate(stats)), - ); - } - - for stats in agent.get_remote_candidates_stats() { - reports.insert( - stats.id.clone(), - StatsReportType::from(RemoteCandidate(stats)), - ); - } - - collector.merge(reports); + for stats in self.agent.get_local_candidates_stats() { + reports.insert( + stats.id.clone(), + StatsReportType::from(LocalCandidate(stats)), + ); } + + for stats in self.agent.get_remote_candidates_stats() { + reports.insert( + stats.id.clone(), + StatsReportType::from(RemoteCandidate(stats)), + ); + } + + collector.merge(reports); } } diff --git a/rtc/src/transport/ice_transport/mod.rs b/rtc/src/transport/ice_transport/mod.rs index 5952e15..1cd383c 100644 --- a/rtc/src/transport/ice_transport/mod.rs +++ b/rtc/src/transport/ice_transport/mod.rs @@ -13,7 +13,7 @@ use crate::stats::stats_collector::StatsCollector; use crate::stats::ICETransportStats; use crate::stats::StatsReportType::Transport; use crate::transport::ice_transport::ice_transport_state::RTCIceTransportState; -use shared::error::{Error, Result}; +use shared::error::Result; use shared::Transmit; /*TODO:#[cfg(test)] @@ -41,7 +41,6 @@ pub enum IceTransportEvent { /// ICETransport allows an application access to information about the ICE /// transport over which packets are sent and received. -#[derive(Default)] pub struct RTCIceTransport { pub(crate) gatherer: RTCIceGatherer, state: RTCIceTransportState, @@ -56,21 +55,23 @@ impl RTCIceTransport { RTCIceTransport { gatherer, state: RTCIceTransportState::New, - ..Default::default() + role: Default::default(), + transmits: Default::default(), } } /// get_selected_candidate_pair returns the selected candidate pair on which packets are sent /// if there is no selected pair nil is returned pub fn get_selected_candidate_pair(&self) -> Option { - if let Some(agent) = self.gatherer.get_agent() { - if let Some((ice_pair_local, ice_pair_remote)) = agent.get_selected_candidate_pair() { - let local = RTCIceCandidate::from(&ice_pair_local); - let remote = RTCIceCandidate::from(&ice_pair_remote); - return Some(RTCIceCandidatePair::new(local, remote)); - } + if let Some((ice_pair_local, ice_pair_remote)) = + self.gatherer.agent.get_selected_candidate_pair() + { + let local = RTCIceCandidate::from(&ice_pair_local); + let remote = RTCIceCandidate::from(&ice_pair_remote); + Some(RTCIceCandidatePair::new(local, remote)) + } else { + None } - None } /*TODO: /// Start incoming connectivity checks based on its configured role. @@ -174,14 +175,6 @@ impl RTCIceTransport { } */ - pub(crate) fn ensure_gatherer(&mut self) -> Result<()> { - if self.gatherer.get_agent().is_none() { - self.gatherer.create_agent() - } else { - Ok(()) - } - } - /// restart is not exposed currently because ORTC has users create a whole new ICETransport /// so for now lets keep it private so we don't cause ORTC users to depend on non-standard APIs pub(crate) fn restart(&mut self) -> Result<()> { @@ -193,11 +186,8 @@ impl RTCIceTransport { .clone(), self.gatherer.setting_engine.candidates.password.clone(), ); - if let Some(agent) = self.gatherer.get_mut_agent() { - agent.restart(ufrag, pwd, false)?; - } else { - return Err(Error::ErrICEAgentNotExist); - } + self.gatherer.agent.restart(ufrag, pwd, false)?; + //TODO: self.gatherer.gather() Ok(()) } @@ -215,45 +205,27 @@ impl RTCIceTransport { /// add_local_candidates sets the sequence of candidates associated with the local ICETransport. pub fn add_local_candidates(&mut self, local_candidates: &[RTCIceCandidate]) -> Result<()> { - self.ensure_gatherer()?; - - if let Some(agent) = self.gatherer.get_mut_agent() { - for rc in local_candidates { - agent.add_local_candidate(rc.to_ice()?)?; - } - Ok(()) - } else { - Err(Error::ErrICEAgentNotExist) + for rc in local_candidates { + self.gatherer.agent.add_local_candidate(rc.to_ice()?)?; } + Ok(()) } /// adds a candidate associated with the local ICETransport. pub fn add_local_candidate(&mut self, local_candidate: Option) -> Result<()> { - self.ensure_gatherer()?; - - if let Some(agent) = self.gatherer.get_mut_agent() { - if let Some(r) = local_candidate { - agent.add_local_candidate(r.to_ice()?)?; - } - - Ok(()) - } else { - Err(Error::ErrICEAgentNotExist) + if let Some(r) = local_candidate { + self.gatherer.agent.add_local_candidate(r.to_ice()?)?; } + + Ok(()) } /// add_remote_candidates sets the sequence of candidates associated with the remote ICETransport. pub fn add_remote_candidates(&mut self, remote_candidates: &[RTCIceCandidate]) -> Result<()> { - self.ensure_gatherer()?; - - if let Some(agent) = self.gatherer.get_mut_agent() { - for rc in remote_candidates { - agent.add_remote_candidate(rc.to_ice()?)?; - } - Ok(()) - } else { - Err(Error::ErrICEAgentNotExist) + for rc in remote_candidates { + self.gatherer.agent.add_remote_candidate(rc.to_ice()?)?; } + Ok(()) } /// adds a candidate associated with the remote ICETransport. @@ -261,17 +233,11 @@ impl RTCIceTransport { &mut self, remote_candidate: Option, ) -> Result<()> { - self.ensure_gatherer()?; - - if let Some(agent) = self.gatherer.get_mut_agent() { - if let Some(r) = remote_candidate { - agent.add_remote_candidate(r.to_ice()?)?; - } - - Ok(()) - } else { - Err(Error::ErrICEAgentNotExist) + if let Some(r) = remote_candidate { + self.gatherer.agent.add_remote_candidate(r.to_ice()?)?; } + + Ok(()) } /// State returns the current ice transport state. @@ -284,11 +250,9 @@ impl RTCIceTransport { } pub(crate) fn collect_stats(&self, collector: &mut StatsCollector) { - if let Some(agent) = self.gatherer.get_agent() { - let stats = ICETransportStats::new("ice_transport".to_string(), agent); + let stats = ICETransportStats::new("ice_transport".to_string(), &self.gatherer.agent); - collector.insert("ice_transport".to_string(), Transport(stats)); - } + collector.insert("ice_transport".to_string(), Transport(stats)); } pub(crate) fn have_remote_credentials_change( @@ -296,12 +260,8 @@ impl RTCIceTransport { new_ufrag: &str, new_pwd: &str, ) -> bool { - if let Some(agent) = self.gatherer.get_mut_agent() { - if let Some(Credentials { ufrag, pwd }) = agent.get_remote_credentials() { - ufrag != new_ufrag || pwd != new_pwd - } else { - false - } + if let Some(Credentials { ufrag, pwd }) = self.gatherer.agent.get_remote_credentials() { + ufrag != new_ufrag || pwd != new_pwd } else { false } @@ -312,10 +272,8 @@ impl RTCIceTransport { new_ufrag: String, new_pwd: String, ) -> Result<()> { - if let Some(agent) = self.gatherer.get_mut_agent() { - Ok(agent.set_remote_credentials(new_ufrag, new_pwd)?) - } else { - Err(Error::ErrICEAgentNotExist) - } + self.gatherer + .agent + .set_remote_credentials(new_ufrag, new_pwd) } }