Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tentative first pass at making simulcast egest possible #312

Draft
wants to merge 38 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
2742632
Tentative first pass at making simulcast egest possible
robashton Oct 7, 2022
d4160e1
Fix the tests (for now)
robashton Oct 7, 2022
9fdf70b
clippy warnings (I guess I need to update/downgrade my rust)
robashton Oct 7, 2022
ec670b4
Re-instate changes from #217
robashton Oct 10, 2022
4974cf4
Replace the mutex on track_encodings with a RwLock
robashton Oct 10, 2022
379f403
Initial feedback from PR
robashton Oct 10, 2022
21607ac
Make Rid an Option<String> and see how that falls out
robashton Oct 11, 2022
a3c2a20
revert change to log (that I did when I was debugging my own stuff\!)
robashton Oct 11, 2022
d6ff71d
unique track ids in stats
robashton Oct 11, 2022
d4c27fc
debug impl for TrackLocalContext
robashton Oct 11, 2022
19fc8e9
ensure we only do simulcast send directives if we're doing a send on …
robashton Oct 11, 2022
acef0fd
apparently format! is a thing
robashton Oct 11, 2022
ac59df1
a few things in rtp_sender that we didn't ike
robashton Oct 11, 2022
4b04180
unnecessary clone now it's an option
robashton Oct 11, 2022
246044e
typo (got vimmed)
robashton Oct 11, 2022
850fcb5
Tentative first pass at making simulcast egest possible
robashton Oct 7, 2022
e20c15d
Fix the tests (for now)
robashton Oct 7, 2022
866965e
clippy warnings (I guess I need to update/downgrade my rust)
robashton Oct 7, 2022
25cedc0
Re-instate changes from #217
robashton Oct 10, 2022
2972dd5
Replace the mutex on track_encodings with a RwLock
robashton Oct 10, 2022
55bc75c
Initial feedback from PR
robashton Oct 10, 2022
8a59116
Make Rid an Option<String> and see how that falls out
robashton Oct 11, 2022
f780884
revert change to log (that I did when I was debugging my own stuff\!)
robashton Oct 11, 2022
9707863
unique track ids in stats
robashton Oct 11, 2022
ffad0b5
debug impl for TrackLocalContext
robashton Oct 11, 2022
6b23661
ensure we only do simulcast send directives if we're doing a send on …
robashton Oct 11, 2022
7853a8d
apparently format! is a thing
robashton Oct 11, 2022
afe4dcb
a few things in rtp_sender that we didn't ike
robashton Oct 11, 2022
b64f163
unnecessary clone now it's an option
robashton Oct 11, 2022
6a24250
typo (got vimmed)
robashton Oct 11, 2022
fd84deb
don't hold onto that lock, or we hang
robashton Oct 13, 2022
f5857dd
some more tweaks
robashton Oct 13, 2022
5aa6864
merge from upstream
robashton Oct 13, 2022
6996968
Merge branch 'simulcast-egest' of github.com:robashton/webrtc into si…
robashton Oct 13, 2022
7da6936
removed comments
robashton Oct 13, 2022
5007093
revert method order
robashton Oct 13, 2022
7114d5e
remove read_simulcast_rtcp as we can't see a reason why it needs to e…
robashton Oct 13, 2022
7c03a11
Option<String> on structs, Option<&str> on traits (for rid anyway)
robashton Oct 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/examples/simulcast/simulcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ async fn main() -> Result<()> {
if let Some(track) = track {
println!("Track has started");

let rid = track.rid().to_owned();
let rid = track.rid().map_or(String::from(""), String::from);
let output_track = if let Some(output_track) = output_tracks.get(&rid) {
Arc::clone(output_track)
} else {
Expand Down Expand Up @@ -199,7 +199,7 @@ async fn main() -> Result<()> {

tokio::spawn(async move {
// Read RTP packets being sent to webrtc-rs
println!("enter track loop {}", track.rid());
println!("enter track loop {:?}", track.rid());
while let Ok((rtp, _)) = track.read_rtp().await {
if let Err(err) = output_track.write_rtp(&rtp).await {
if Error::ErrClosedPipe != err {
Expand All @@ -210,7 +210,7 @@ async fn main() -> Result<()> {
}
}
}
println!("exit track loop {}", track.rid());
println!("exit track loop {:?}", track.rid());
});
}
Box::pin(async {})
Expand Down
1 change: 1 addition & 0 deletions interceptor/src/stream_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub struct RTPHeaderExtension {
#[derive(Default, Debug, Clone)]
pub struct StreamInfo {
pub id: String,
pub rid: Option<String>,
pub attributes: Attributes,
pub ssrc: u32,
pub payload_type: u8,
Expand Down
23 changes: 23 additions & 0 deletions webrtc/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,29 @@ pub enum Error {
#[error("new track must be of the same kind as previous")]
ErrRTPSenderNewTrackHasIncorrectKind,

#[error("Cannot call replace on a sender with multiple tracks")]
ErrRTPSenderCannotReplaceSimulcast,

#[error("Sender does not have track for RID")]
ErrRTPSenderNoTrackForRID,

#[error("Sender cannot add encoding due to RID collision")]
ErrRTPSenderRIDCollision,

#[error("Sender cannot add encoding as provided track does not match base track")]
ErrRTPSenderBaseEncodingMismatch,

/// ErrRTPSenderStopped indicates the sender was already stopped
#[error("Sender has already been stopped")]
ErrRTPSenderStopped,

/// ErrRTPSenderRidNil indicates that the track RID was empty
#[error("Sender cannot add encoding as rid is empty")]
ErrRTPSenderRidNil,

#[error("Sender cannot add encoding as there is no base track")]
ErrRTPSenderNoBaseEncoding,

/// ErrUnbindFailed indicates that a TrackLocal was not able to be unbind
#[error("failed to unbind TrackLocal from PeerConnection")]
ErrUnbindFailed,
Expand Down
3 changes: 1 addition & 2 deletions webrtc/src/peer_connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ impl RTCPeerConnection {
};

let sender = match t.sender().await {
Some(s) => s.clone(),
Some(s) => s,
None => {
log::warn!(
"RtpSender missing for transeceiver with sending direction {} for mid {}",
Expand All @@ -495,7 +495,6 @@ impl RTCPeerConnection {
if stream_ids.is_empty() {
return true;
}

// different stream id
if dmsid.split_whitespace().next() != Some(&stream_ids[0]) {
return true;
Expand Down
32 changes: 19 additions & 13 deletions webrtc/src/peer_connection/peer_connection_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,9 @@ impl PeerConnectionInternal {
let mut receiver_needs_stopped = false;

for t in tracks {
if !t.rid().is_empty() {
if let Some(rid) = t.rid() {
if let Some(details) =
track_details_for_rid(&track_details, t.rid().to_owned())
track_details_for_rid(&track_details, String::from(rid))
{
t.set_id(details.id.clone()).await;
t.set_stream_id(details.stream_id.clone()).await;
Expand Down Expand Up @@ -1141,6 +1141,7 @@ impl PeerConnectionInternal {
let stream_info = create_stream_info(
"".to_owned(),
ssrc,
None,
params.codecs[0].payload_type,
params.codecs[0].capability.clone(),
&params.header_extensions,
Expand Down Expand Up @@ -1182,7 +1183,7 @@ impl PeerConnectionInternal {
return receiver
.receive_for_rtx(
0,
rsid,
&rsid,
TrackStream {
stream_info: Some(stream_info.clone()),
rtp_read_stream,
Expand All @@ -1196,7 +1197,7 @@ impl PeerConnectionInternal {

let track = receiver
.receive_for_rid(
rid,
&rid,
params,
TrackStream {
stream_info: Some(stream_info.clone()),
Expand Down Expand Up @@ -1535,20 +1536,25 @@ impl PeerConnectionInternal {
None => continue,
};

let track_id = track.id().to_string();
let kind = match track.kind() {
RTPCodecType::Unspecified => continue,
RTPCodecType::Audio => "audio",
RTPCodecType::Video => "video",
};

track_infos.push(TrackInfo {
track_id,
ssrc: sender.ssrc,
mid: mid.clone(),
rid: None,
kind,
});
let encodings = sender.track_encodings.read().await;
for e in encodings.iter() {
let track_id = track.rid().map_or(track.id().to_string(), |rid| {
format!("{}-{}", track.id(), rid)
});
track_infos.push(TrackInfo {
track_id,
ssrc: e.ssrc,
mid: mid.clone(),
rid: track.rid().map(String::from),
kind,
});
}
}

let stream_stats = self
Expand Down Expand Up @@ -1610,7 +1616,7 @@ impl PeerConnectionInternal {
kind,
packets_sent,
mid,
rid,
rid: rid.map(|a| a.to_owned()),
header_bytes_sent,
bytes_sent,
nack_count,
Expand Down
37 changes: 29 additions & 8 deletions webrtc/src/peer_connection/sdp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,13 +535,36 @@ pub(crate) async fn add_transceiver_sdp(
for mt in transceivers {
if let Some(sender) = mt.sender().await {
if let Some(track) = sender.track().await {
media = media.with_media_source(
sender.ssrc,
track.stream_id().to_owned(), /* cname */
track.stream_id().to_owned(), /* streamLabel */
track.id().to_owned(),
);
if mt.direction().has_send() {
let send_parameters = sender.get_parameters().await;
// Get the different encodings expressed first
for encoding in &send_parameters.encodings {
media = media.with_media_source(
encoding.ssrc,
track.stream_id().to_owned(), /* cname */
track.stream_id().to_owned(), /* streamLabel */
track.id().to_owned(),
);
}

// Then tell the world about simulcast
if send_parameters.encodings.len() > 1 {
let mut send_rids: Vec<String> = vec![];

for e in &send_parameters.encodings {
if let Some(rid) = &e.rid {
let mut s: String = rid.clone();
send_rids.push(rid.clone());
s.push_str(" send");
media = media.with_value_attribute("rid".into(), s);
}
}
// Simulcast)
let mut s: String = "send ".to_owned();
s.push_str(send_rids.join(";").as_ref());
media = media.with_value_attribute("simulcast".into(), s);
}
}
// Send msid based on the configured track if we haven't already
// sent on this sender. If we have sent we must keep the msid line consistent, this
// is handled below.
Expand All @@ -553,12 +576,10 @@ pub(crate) async fn add_transceiver_sdp(
track.id()
));
}

sender.set_initial_track_id(track.id().to_string())?;
break;
}
}

if !is_plan_b {
if let Some(track_id) = sender.initial_track_id() {
// After we have include an msid attribute in an offer it must stay the same for
Expand Down
4 changes: 3 additions & 1 deletion webrtc/src/rtp_transceiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub struct RTCRtpRtxParameters {
/// <http://draft.ortc.org/#dom-rtcrtpcodingparameters>
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
pub struct RTCRtpCodingParameters {
pub rid: String,
pub rid: Option<String>,
pub ssrc: SSRC,
pub payload_type: PayloadType,
pub rtx: RTCRtpRtxParameters,
Expand Down Expand Up @@ -132,6 +132,7 @@ pub struct RTCRtpTransceiverInit {
pub(crate) fn create_stream_info(
id: String,
ssrc: SSRC,
rid: Option<&str>,
payload_type: PayloadType,
codec: RTCRtpCodecCapability,
webrtc_header_extensions: &[RTCRtpHeaderExtensionParameters],
Expand All @@ -154,6 +155,7 @@ pub(crate) fn create_stream_info(

StreamInfo {
id,
rid: rid.map(String::from),
attributes: Attributes::new(),
ssrc,
payload_type,
Expand Down
16 changes: 9 additions & 7 deletions webrtc/src/rtp_transceiver/rtp_receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ impl RTPReceiverInternal {

let tracks = self.tracks.read().await;
for t in &*tracks {
if t.track.rid() == rid {
if Some(rid) == t.track.rid() {
if let Some(rtcp_interceptor) = &t.stream.rtcp_interceptor {
let a = Attributes::new();

Expand Down Expand Up @@ -529,6 +529,7 @@ impl RTCRtpReceiver {
let stream_info = create_stream_info(
"".to_owned(),
encoding.ssrc,
None,
0,
codec.clone(),
&global_params.header_extensions,
Expand Down Expand Up @@ -586,6 +587,7 @@ impl RTCRtpReceiver {
let stream_info = create_stream_info(
"".to_owned(),
rtx_ssrc,
None,
0,
codec.clone(),
&global_params.header_extensions,
Expand All @@ -597,7 +599,7 @@ impl RTCRtpReceiver {

self.receive_for_rtx(
rtx_ssrc,
"".to_owned(),
"",
TrackStream {
stream_info: Some(stream_info),
rtp_read_stream,
Expand Down Expand Up @@ -654,7 +656,7 @@ impl RTCRtpReceiver {
let mut encodings = vec![RTCRtpDecodingParameters::default(); encoding_size];
for (i, encoding) in encodings.iter_mut().enumerate() {
if incoming.rids.len() > i {
encoding.rid = incoming.rids[i].clone();
encoding.rid = Some(incoming.rids[i].clone());
}
if incoming.ssrcs.len() > i {
encoding.ssrc = incoming.ssrcs[i];
Expand Down Expand Up @@ -743,13 +745,13 @@ impl RTCRtpReceiver {
/// It populates all the internal state for the given RID
pub(crate) async fn receive_for_rid(
&self,
rid: String,
rid: &str,
params: RTCRtpParameters,
stream: TrackStream,
) -> Result<Arc<TrackRemote>> {
let mut tracks = self.internal.tracks.write().await;
for t in &mut *tracks {
if t.track.rid() == rid {
if Some(rid) == t.track.rid() {
t.track.set_kind(self.kind);
if let Some(codec) = params.codecs.first() {
t.track.set_codec(codec.clone()).await;
Expand All @@ -771,13 +773,13 @@ impl RTCRtpReceiver {
pub(crate) async fn receive_for_rtx(
&self,
ssrc: SSRC,
rsid: String,
rsid: &str,
repair_stream: TrackStream,
) -> Result<()> {
let mut tracks = self.internal.tracks.write().await;
let l = tracks.len();
for t in &mut *tracks {
if (ssrc != 0 && l == 1) || t.track.rid() == rsid {
if (ssrc != 0 && l == 1) || Some(rsid) == t.track.rid() {
t.repair_stream = repair_stream;

let receive_mtu = self.receive_mtu;
Expand Down
Loading