From 979d0c55c442b24c83850f64890056ea2baefcba Mon Sep 17 00:00:00 2001 From: zesun96 Date: Wed, 27 Nov 2024 00:27:36 +0800 Subject: [PATCH] add adaptive streaming; (#488) --- livekit-ffi/generate_proto.sh | 1 + livekit-ffi/protocol/ffi.proto | 8 ++++ livekit-ffi/protocol/track_publication.proto | 38 +++++++++++++++++ livekit-ffi/src/livekit.proto.rs | 42 ++++++++++++++++++- livekit-ffi/src/server/requests.rs | 40 ++++++++++++++++++ .../room/participant/remote_participant.rs | 27 +++++++++++- livekit/src/room/publication/remote.rs | 27 ++++++++++++ 7 files changed, 180 insertions(+), 3 deletions(-) create mode 100644 livekit-ffi/protocol/track_publication.proto diff --git a/livekit-ffi/generate_proto.sh b/livekit-ffi/generate_proto.sh index b1d5e9d6..029ccd3d 100755 --- a/livekit-ffi/generate_proto.sh +++ b/livekit-ffi/generate_proto.sh @@ -24,6 +24,7 @@ protoc \ $PROTOCOL/handle.proto \ $PROTOCOL/room.proto \ $PROTOCOL/track.proto \ + $PROTOCOL/track_publication.proto \ $PROTOCOL/participant.proto \ $PROTOCOL/video_frame.proto \ $PROTOCOL/audio_frame.proto \ diff --git a/livekit-ffi/protocol/ffi.proto b/livekit-ffi/protocol/ffi.proto index e8c1fda4..4ff6df53 100644 --- a/livekit-ffi/protocol/ffi.proto +++ b/livekit-ffi/protocol/ffi.proto @@ -20,6 +20,7 @@ option csharp_namespace = "LiveKit.Proto"; // import "handle.proto"; import "e2ee.proto"; import "track.proto"; +import "track_publication.proto"; import "room.proto"; import "video_frame.proto"; import "audio_frame.proto"; @@ -106,6 +107,10 @@ message FfiRequest { RegisterRpcMethodRequest register_rpc_method = 39; UnregisterRpcMethodRequest unregister_rpc_method = 40; RpcMethodInvocationResponseRequest rpc_method_invocation_response = 41; + + // Track Publication + EnableRemoteTrackPublicationRequest enable_remote_track_publication = 42; + UpdateRemoteTrackPublicationDimensionRequest update_remote_track_publication_dimension = 43; } } @@ -160,6 +165,9 @@ message FfiResponse { RegisterRpcMethodResponse register_rpc_method = 38; UnregisterRpcMethodResponse unregister_rpc_method = 39; RpcMethodInvocationResponseResponse rpc_method_invocation_response = 40; + // Track Publication + EnableRemoteTrackPublicationResponse enable_remote_track_publication = 41; + UpdateRemoteTrackPublicationDimensionResponse update_remote_track_publication_dimension = 42; } } diff --git a/livekit-ffi/protocol/track_publication.proto b/livekit-ffi/protocol/track_publication.proto new file mode 100644 index 00000000..44f0b681 --- /dev/null +++ b/livekit-ffi/protocol/track_publication.proto @@ -0,0 +1,38 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto2"; + +package livekit.proto; +option csharp_namespace = "LiveKit.Proto"; + +// Enable/Disable a remote track publication +message EnableRemoteTrackPublicationRequest { + required uint64 track_publication_handle = 1; + required bool enabled = 2; +} + +message EnableRemoteTrackPublicationResponse {} + +// update a remote track publication dimension +message UpdateRemoteTrackPublicationDimensionRequest { + required uint64 track_publication_handle = 1; + required uint32 width = 2; + required uint32 height = 3; +} + +message UpdateRemoteTrackPublicationDimensionResponse {} + + + diff --git a/livekit-ffi/src/livekit.proto.rs b/livekit-ffi/src/livekit.proto.rs index 620588fc..d5b75fed 100644 --- a/livekit-ffi/src/livekit.proto.rs +++ b/livekit-ffi/src/livekit.proto.rs @@ -1529,6 +1529,34 @@ impl StreamState { } } } +/// Enable/Disable a remote track publication +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct EnableRemoteTrackPublicationRequest { + #[prost(uint64, required, tag="1")] + pub track_publication_handle: u64, + #[prost(bool, required, tag="2")] + pub enabled: bool, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct EnableRemoteTrackPublicationResponse { +} +/// update a remote track publication dimension +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct UpdateRemoteTrackPublicationDimensionRequest { + #[prost(uint64, required, tag="1")] + pub track_publication_handle: u64, + #[prost(uint32, required, tag="2")] + pub width: u32, + #[prost(uint32, required, tag="3")] + pub height: u32, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct UpdateRemoteTrackPublicationDimensionResponse { +} #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ParticipantInfo { @@ -3686,7 +3714,7 @@ pub struct RpcMethodInvocationEvent { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FfiRequest { - #[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41")] + #[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43")] pub message: ::core::option::Option, } /// Nested message and enum types in `FfiRequest`. @@ -3779,13 +3807,18 @@ pub mod ffi_request { UnregisterRpcMethod(super::UnregisterRpcMethodRequest), #[prost(message, tag="41")] RpcMethodInvocationResponse(super::RpcMethodInvocationResponseRequest), + /// Track Publication + #[prost(message, tag="42")] + EnableRemoteTrackPublication(super::EnableRemoteTrackPublicationRequest), + #[prost(message, tag="43")] + UpdateRemoteTrackPublicationDimension(super::UpdateRemoteTrackPublicationDimensionRequest), } } /// This is the output of livekit_ffi_request function. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FfiResponse { - #[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40")] + #[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42")] pub message: ::core::option::Option, } /// Nested message and enum types in `FfiResponse`. @@ -3876,6 +3909,11 @@ pub mod ffi_response { UnregisterRpcMethod(super::UnregisterRpcMethodResponse), #[prost(message, tag="40")] RpcMethodInvocationResponse(super::RpcMethodInvocationResponseResponse), + /// Track Publication + #[prost(message, tag="41")] + EnableRemoteTrackPublication(super::EnableRemoteTrackPublicationResponse), + #[prost(message, tag="42")] + UpdateRemoteTrackPublicationDimension(super::UpdateRemoteTrackPublicationDimensionResponse), } } /// To minimize complexity, participant events are not included in the protocol. diff --git a/livekit-ffi/src/server/requests.rs b/livekit-ffi/src/server/requests.rs index 5c4aa32f..f14ab0d7 100644 --- a/livekit-ffi/src/server/requests.rs +++ b/livekit-ffi/src/server/requests.rs @@ -152,6 +152,36 @@ fn on_set_subscribed( Ok(proto::SetSubscribedResponse {}) } +fn on_enable_remote_track_publication( + server: &'static FfiServer, + request: proto::EnableRemoteTrackPublicationRequest, +) -> FfiResult { + let ffi_publication = + server.retrieve_handle::(request.track_publication_handle)?; + + let TrackPublication::Remote(publication) = &ffi_publication.publication else { + return Err(FfiError::InvalidRequest("publication is not a RemotePublication".into())); + }; + + publication.set_enabled(request.enabled); + Ok(proto::EnableRemoteTrackPublicationResponse {}) +} + +fn on_update_remote_track_publication_dimension( + server: &'static FfiServer, + request: proto::UpdateRemoteTrackPublicationDimensionRequest, +) -> FfiResult { + let ffi_publication = + server.retrieve_handle::(request.track_publication_handle)?; + + let TrackPublication::Remote(publication) = &ffi_publication.publication else { + return Err(FfiError::InvalidRequest("publication is not a RemotePublication".into())); + }; + let dimension = TrackDimension(request.width, request.height); + publication.update_video_dimensions(dimension); + Ok(proto::UpdateRemoteTrackPublicationDimensionResponse {}) +} + fn on_set_local_metadata( server: &'static FfiServer, set_local_metadata: proto::SetLocalMetadataRequest, @@ -995,6 +1025,16 @@ pub fn handle_request( on_rpc_method_invocation_response(server, request)?, ) } + proto::ffi_request::Message::EnableRemoteTrackPublication(request) => { + proto::ffi_response::Message::EnableRemoteTrackPublication( + on_enable_remote_track_publication(server, request)?, + ) + } + proto::ffi_request::Message::UpdateRemoteTrackPublicationDimension(request) => { + proto::ffi_response::Message::UpdateRemoteTrackPublicationDimension( + on_update_remote_track_publication_dimension(server, request)?, + ) + } }); Ok(res) diff --git a/livekit/src/room/participant/remote_participant.rs b/livekit/src/room/participant/remote_participant.rs index 7ca6e6bd..e9e61ea0 100644 --- a/livekit/src/room/participant/remote_participant.rs +++ b/livekit/src/room/participant/remote_participant.rs @@ -384,7 +384,32 @@ impl RemoteParticipant { .await }); } - }) + }); + + publication.on_video_dimensions_changed({ + let rtc_engine = self.inner.rtc_engine.clone(); + move |publication, dimension| { + let rtc_engine = rtc_engine.clone(); + livekit_runtime::spawn(async move { + let tsid: String = publication.sid().into(); + let TrackDimension(width, height) = dimension; + let enabled = publication.is_enabled(); + let update_track_settings = proto::UpdateTrackSettings { + track_sids: vec![tsid.clone()], + disabled: !enabled, + width, + height, + ..Default::default() + }; + + rtc_engine + .send_request(proto::signal_request::Message::TrackSetting( + update_track_settings, + )) + .await + }); + } + }); } pub(crate) fn remove_publication(&self, sid: &TrackSid) -> Option { diff --git a/livekit/src/room/publication/remote.rs b/livekit/src/room/publication/remote.rs index dff2d670..fe86ebf1 100644 --- a/livekit/src/room/publication/remote.rs +++ b/livekit/src/room/publication/remote.rs @@ -28,6 +28,7 @@ type PermissionStatusChangedHandler = Box; // old_status, new_status type SubscriptionUpdateNeededHandler = Box; type EnabledStatusChangedHandler = Box; +type VideoDimensionsChangedHandler = Box; #[derive(Default)] struct RemoteEvents { @@ -37,6 +38,7 @@ struct RemoteEvents { permission_status_changed: Mutex>, subscription_update_needed: Mutex>, enabled_status_changed: Mutex>, + video_dimensions_changed: Mutex>, } #[derive(Debug)] @@ -215,6 +217,13 @@ impl RemoteTrackPublication { *self.remote.events.enabled_status_changed.lock() = Some(Box::new(f)); } + pub(crate) fn on_video_dimensions_changed( + &self, + f: impl Fn(RemoteTrackPublication, TrackDimension) + Send + 'static, + ) { + *self.remote.events.video_dimensions_changed.lock() = Some(Box::new(f)); + } + pub fn set_subscribed(&self, subscribed: bool) { let old_subscription_state = self.subscription_status(); let old_permission_state = self.permission_status(); @@ -262,6 +271,24 @@ impl RemoteTrackPublication { } } + pub fn update_video_dimensions(&self, dimension: TrackDimension) { + if self.is_subscribed() { + if dimension != self.dimension() { + let TrackDimension(width, height) = dimension; + let mut new_info = self.proto_info(); + new_info.width = width; + new_info.height = height; + self.update_info(new_info); + } + // Request to send an update to the SFU + if let Some(video_dimensions_changed) = + self.remote.events.video_dimensions_changed.lock().as_ref() + { + video_dimensions_changed(self.clone(), dimension) + } + } + } + pub fn subscription_status(&self) -> SubscriptionStatus { if !self.remote.info.read().subscribed { return SubscriptionStatus::Unsubscribed;