Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tentative first pass at making simulcast egest possible #312

Draft
wants to merge 38 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
2742632
Tentative first pass at making simulcast egest possible
robashton Oct 7, 2022
d4160e1
Fix the tests (for now)
robashton Oct 7, 2022
9fdf70b
clippy warnings (I guess I need to update/downgrade my rust)
robashton Oct 7, 2022
ec670b4
Re-instate changes from #217
robashton Oct 10, 2022
4974cf4
Replace the mutex on track_encodings with a RwLock
robashton Oct 10, 2022
379f403
Initial feedback from PR
robashton Oct 10, 2022
21607ac
Make Rid an Option<String> and see how that falls out
robashton Oct 11, 2022
a3c2a20
revert change to log (that I did when I was debugging my own stuff\!)
robashton Oct 11, 2022
d6ff71d
unique track ids in stats
robashton Oct 11, 2022
d4c27fc
debug impl for TrackLocalContext
robashton Oct 11, 2022
19fc8e9
ensure we only do simulcast send directives if we're doing a send on …
robashton Oct 11, 2022
acef0fd
apparently format! is a thing
robashton Oct 11, 2022
ac59df1
a few things in rtp_sender that we didn't ike
robashton Oct 11, 2022
4b04180
unnecessary clone now it's an option
robashton Oct 11, 2022
246044e
typo (got vimmed)
robashton Oct 11, 2022
850fcb5
Tentative first pass at making simulcast egest possible
robashton Oct 7, 2022
e20c15d
Fix the tests (for now)
robashton Oct 7, 2022
866965e
clippy warnings (I guess I need to update/downgrade my rust)
robashton Oct 7, 2022
25cedc0
Re-instate changes from #217
robashton Oct 10, 2022
2972dd5
Replace the mutex on track_encodings with a RwLock
robashton Oct 10, 2022
55bc75c
Initial feedback from PR
robashton Oct 10, 2022
8a59116
Make Rid an Option<String> and see how that falls out
robashton Oct 11, 2022
f780884
revert change to log (that I did when I was debugging my own stuff\!)
robashton Oct 11, 2022
9707863
unique track ids in stats
robashton Oct 11, 2022
ffad0b5
debug impl for TrackLocalContext
robashton Oct 11, 2022
6b23661
ensure we only do simulcast send directives if we're doing a send on …
robashton Oct 11, 2022
7853a8d
apparently format! is a thing
robashton Oct 11, 2022
afe4dcb
a few things in rtp_sender that we didn't ike
robashton Oct 11, 2022
b64f163
unnecessary clone now it's an option
robashton Oct 11, 2022
6a24250
typo (got vimmed)
robashton Oct 11, 2022
fd84deb
don't hold onto that lock, or we hang
robashton Oct 13, 2022
f5857dd
some more tweaks
robashton Oct 13, 2022
5aa6864
merge from upstream
robashton Oct 13, 2022
6996968
Merge branch 'simulcast-egest' of github.com:robashton/webrtc into si…
robashton Oct 13, 2022
7da6936
removed comments
robashton Oct 13, 2022
5007093
revert method order
robashton Oct 13, 2022
7114d5e
remove read_simulcast_rtcp as we can't see a reason why it needs to e…
robashton Oct 13, 2022
7c03a11
Option<String> on structs, Option<&str> on traits (for rid anyway)
robashton Oct 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/examples/simulcast/simulcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ async fn main() -> Result<()> {
if let Some(track) = track {
println!("Track has started");

let rid = track.rid().to_owned();
let rid = track.rid().unwrap_or("".to_owned());
let output_track = if let Some(output_track) = output_tracks.get(&rid) {
Arc::clone(output_track)
} else {
Expand Down Expand Up @@ -199,7 +199,7 @@ async fn main() -> Result<()> {

tokio::spawn(async move {
// Read RTP packets being sent to webrtc-rs
println!("enter track loop {}", track.rid());
println!("enter track loop {:?}", track.rid());
while let Ok((rtp, _)) = track.read_rtp().await {
if let Err(err) = output_track.write_rtp(&rtp).await {
if Error::ErrClosedPipe != err {
Expand All @@ -210,7 +210,7 @@ async fn main() -> Result<()> {
}
}
}
println!("exit track loop {}", track.rid());
println!("exit track loop {:?}", track.rid());
});
}
Box::pin(async {})
Expand Down
1 change: 1 addition & 0 deletions interceptor/src/stream_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub struct RTPHeaderExtension {
#[derive(Default, Debug, Clone)]
pub struct StreamInfo {
pub id: String,
pub rid: Option<String>,
pub attributes: Attributes,
pub ssrc: u32,
pub payload_type: u8,
Expand Down
23 changes: 23 additions & 0 deletions webrtc/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,29 @@ pub enum Error {
#[error("new track must be of the same kind as previous")]
ErrRTPSenderNewTrackHasIncorrectKind,

#[error("Cannot call replace on a sender with multiple tracks")]
ErrRTPSenderCannotReplaceSimulcast,

#[error("Sender does not have track for RID")]
ErrRTPSenderNoTrackForRID,

#[error("Sender cannot add encoding due to RID collision")]
ErrRTPSenderRIDCollision,

#[error("Sender cannot add encoding as provided track does not match base track")]
ErrRTPSenderBaseEncodingMismatch,

/// ErrRTPSenderStopped indicates the sender was already stopped
#[error("Sender has already been stopped")]
ErrRTPSenderStopped,

/// ErrRTPSenderRidNil indicates that the track RID was empty
#[error("Sender cannot add encoding as rid is empty")]
ErrRTPSenderRidNil,

#[error("Sender cannot add encoding as there is no base track")]
ErrRTPSenderNoBaseEncoding,

/// ErrUnbindFailed indicates that a TrackLocal was not able to be unbind
#[error("failed to unbind TrackLocal from PeerConnection")]
ErrUnbindFailed,
Expand Down
3 changes: 1 addition & 2 deletions webrtc/src/peer_connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ impl RTCPeerConnection {
};

let sender = match t.sender().await {
Some(s) => s.clone(),
Some(s) => s,
None => {
log::warn!(
"RtpSender missing for transeceiver with sending direction {} for mid {}",
Expand All @@ -495,7 +495,6 @@ impl RTCPeerConnection {
if stream_ids.is_empty() {
return true;
}

// different stream id
if dmsid.split_whitespace().next() != Some(&stream_ids[0]) {
return true;
Expand Down
28 changes: 16 additions & 12 deletions webrtc/src/peer_connection/peer_connection_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,8 @@ impl PeerConnectionInternal {
let mut receiver_needs_stopped = false;

for t in tracks {
if !t.rid().is_empty() {
if let Some(details) =
track_details_for_rid(&track_details, t.rid().to_owned())
{
if let Some(rid) = t.rid().clone() {
if let Some(details) = track_details_for_rid(&track_details, rid) {
t.set_id(details.id.clone()).await;
t.set_stream_id(details.stream_id.clone()).await;
continue;
Expand Down Expand Up @@ -1141,6 +1139,7 @@ impl PeerConnectionInternal {
let stream_info = create_stream_info(
"".to_owned(),
ssrc,
None,
params.codecs[0].payload_type,
params.codecs[0].capability.clone(),
&params.header_extensions,
Expand Down Expand Up @@ -1535,20 +1534,25 @@ impl PeerConnectionInternal {
None => continue,
};

let track_id = track.id().to_string();
let kind = match track.kind() {
RTPCodecType::Unspecified => continue,
RTPCodecType::Audio => "audio",
RTPCodecType::Video => "video",
};

track_infos.push(TrackInfo {
track_id,
ssrc: sender.ssrc,
mid: mid.clone(),
rid: None,
kind,
});
let encodings = sender.track_encodings.read().await;
for e in encodings.iter() {
let track_id = track.rid().map_or(track.id().to_string(), |rid| {
format!("{}-{}", track.id(), rid)
});
track_infos.push(TrackInfo {
track_id,
ssrc: e.ssrc,
mid: mid.clone(),
rid: track.rid().clone(),
kind,
});
}
}

let stream_stats = self
Expand Down
39 changes: 31 additions & 8 deletions webrtc/src/peer_connection/sdp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,19 +540,44 @@ pub(crate) async fn add_transceiver_sdp(
);
}

// This is equivalent to addSenderSDP in Pion (now)
robashton marked this conversation as resolved.
Show resolved Hide resolved
for mt in transceivers {
if let Some(sender) = mt.sender().await {
if let Some(track) = sender.track().await {
media = media.with_media_source(
sender.ssrc,
track.stream_id().to_owned(), /* cname */
track.stream_id().to_owned(), /* streamLabel */
track.id().to_owned(),
);
if mt.direction().has_send() {
let send_parameters = sender.get_parameters().await;
// Get the different encodings expressed first
for encoding in &send_parameters.encodings {
media = media.with_media_source(
encoding.ssrc,
track.stream_id().to_owned(), /* cname */
track.stream_id().to_owned(), /* streamLabel */
track.id().to_owned(),
);
}

// Then tell the world about simulcast
if send_parameters.encodings.len() > 1 {
let mut send_rids: Vec<String> = vec![];

for e in &send_parameters.encodings {
if let Some(rid) = e.rid.clone() {
let mut s: String = rid.clone();
send_rids.push(rid);
s.push_str(" send");
media = media.with_value_attribute("rid".into(), s);
}
}
// Simulcast)
let mut s: String = "send ".to_owned();
s.push_str(send_rids.join(";").as_ref());
media = media.with_value_attribute("simulcast".into(), s);
}
}
// Send msid based on the configured track if we haven't already
// sent on this sender. If we have sent we must keep the msid line consistent, this
// is handled below.
// And now when we 'break', the above will have been printed properly still?
robashton marked this conversation as resolved.
Show resolved Hide resolved
if !is_plan_b && sender.initial_track_id().is_none() {
for stream_id in sender.associated_media_stream_ids() {
media = media.with_property_attribute(format!(
Expand All @@ -561,12 +586,10 @@ pub(crate) async fn add_transceiver_sdp(
track.id()
));
}

sender.set_initial_track_id(track.id().to_string())?;
break;
}
}

if !is_plan_b {
if let Some(track_id) = sender.initial_track_id() {
// After we have include an msid attribute in an offer it must stay the same for
Expand Down
4 changes: 3 additions & 1 deletion webrtc/src/rtp_transceiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub struct RTCRtpRtxParameters {
/// <http://draft.ortc.org/#dom-rtcrtpcodingparameters>
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
pub struct RTCRtpCodingParameters {
pub rid: String,
pub rid: Option<String>,
pub ssrc: SSRC,
pub payload_type: PayloadType,
pub rtx: RTCRtpRtxParameters,
Expand Down Expand Up @@ -132,6 +132,7 @@ pub struct RTCRtpTransceiverInit {
pub(crate) fn create_stream_info(
id: String,
ssrc: SSRC,
rid: Option<String>,
payload_type: PayloadType,
codec: RTCRtpCodecCapability,
webrtc_header_extensions: &[RTCRtpHeaderExtensionParameters],
Expand All @@ -154,6 +155,7 @@ pub(crate) fn create_stream_info(

StreamInfo {
id,
rid,
attributes: Attributes::new(),
ssrc,
payload_type,
Expand Down
10 changes: 6 additions & 4 deletions webrtc/src/rtp_transceiver/rtp_receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ impl RTPReceiverInternal {

let tracks = self.tracks.read().await;
for t in &*tracks {
if t.track.rid() == rid {
if t.track.rid().map_or(false, |r| r == rid) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think when comparing to an option option = Some(value) might be easier to read than map_or

Copy link
Member Author

@robashton robashton Oct 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, you can't just do a Some(value) because the types don't line up, track.rid() is Option<String> (I toyed with having it be something else, I don't know what the rust convention is and it seemed to match the other types across the codebase) and rid is a &str, so I'd have to make it owned in order to do the comparison which seems even more of a stretch than doing a map_or, which is fundamentally equivalent to the oft-used fromMaybe in a functional language like haskell/purescript.

If the left hand type can be changed then this would go away. If you have a suggestion for this then please do tell (I did start with &Option<String> to avoid the clone but that just pushed the clone to the consumers which was 🤮 )

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can do track.rid().as_deref() == Some(rid) but I guess that's not much better than this. By using as_deref the Deref coercion mechanism kicks in, resulting in a left hand side type of Option<&str>

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a neat little feature - would there be benefit in changing the type of track.rid()? It's mostly used in checks just like this anyway

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point actually, it should be Option<&str> to avoid the allocation each time it's called. The caller can still clone it if they want

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah okay, Option<String> on the struct and Option<&str> on the trait is much nicer (and analogous to how you'd do a String anyway), I note that there are plenty of inconsistencies in how strings are passed around in the codebase in general and I've left most of them alone in this change so there are a few unnecessary String::from's here, but on the whole it's much nicer

if let Some(rtcp_interceptor) = &t.stream.rtcp_interceptor {
let a = Attributes::new();

Expand Down Expand Up @@ -529,6 +529,7 @@ impl RTCRtpReceiver {
let stream_info = create_stream_info(
"".to_owned(),
encoding.ssrc,
None,
0,
codec.clone(),
&global_params.header_extensions,
Expand Down Expand Up @@ -586,6 +587,7 @@ impl RTCRtpReceiver {
let stream_info = create_stream_info(
"".to_owned(),
rtx_ssrc,
None,
0,
codec.clone(),
&global_params.header_extensions,
Expand Down Expand Up @@ -654,7 +656,7 @@ impl RTCRtpReceiver {
let mut encodings = vec![RTCRtpDecodingParameters::default(); encoding_size];
for (i, encoding) in encodings.iter_mut().enumerate() {
if incoming.rids.len() > i {
encoding.rid = incoming.rids[i].clone();
encoding.rid = Some(incoming.rids[i].clone());
}
if incoming.ssrcs.len() > i {
encoding.ssrc = incoming.ssrcs[i];
Expand Down Expand Up @@ -749,7 +751,7 @@ impl RTCRtpReceiver {
) -> Result<Arc<TrackRemote>> {
let mut tracks = self.internal.tracks.write().await;
for t in &mut *tracks {
if t.track.rid() == rid {
if t.track.rid().map_or(false, |r| r == rid) {
t.track.set_kind(self.kind);
if let Some(codec) = params.codecs.first() {
t.track.set_codec(codec.clone()).await;
Expand Down Expand Up @@ -777,7 +779,7 @@ impl RTCRtpReceiver {
let mut tracks = self.internal.tracks.write().await;
let l = tracks.len();
for t in &mut *tracks {
if (ssrc != 0 && l == 1) || t.track.rid() == rsid {
if (ssrc != 0 && l == 1) || t.track.rid().map_or(false, |r| r == rsid) {
t.repair_stream = repair_stream;

let receive_mtu = self.receive_mtu;
Expand Down
Loading