Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add adaptive streaming; #488

Merged
merged 4 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions livekit-ffi/generate_proto.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ protoc \
$PROTOCOL/handle.proto \
$PROTOCOL/room.proto \
$PROTOCOL/track.proto \
$PROTOCOL/track_publication.proto \
$PROTOCOL/participant.proto \
$PROTOCOL/video_frame.proto \
$PROTOCOL/audio_frame.proto \
Expand Down
8 changes: 8 additions & 0 deletions livekit-ffi/protocol/ffi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ option csharp_namespace = "LiveKit.Proto";
// import "handle.proto";
import "e2ee.proto";
import "track.proto";
import "track_publication.proto";
import "room.proto";
import "video_frame.proto";
import "audio_frame.proto";
Expand Down Expand Up @@ -106,6 +107,10 @@ message FfiRequest {
RegisterRpcMethodRequest register_rpc_method = 39;
UnregisterRpcMethodRequest unregister_rpc_method = 40;
RpcMethodInvocationResponseRequest rpc_method_invocation_response = 41;

// Track Publication
EnableRemoteTrackPublicationRequest enable_remote_track_publication = 42;
UpdateRemoteTrackPublicationDimensionRequest update_remote_track_publication_dimension = 43;
}
}

Expand Down Expand Up @@ -160,6 +165,9 @@ message FfiResponse {
RegisterRpcMethodResponse register_rpc_method = 38;
UnregisterRpcMethodResponse unregister_rpc_method = 39;
RpcMethodInvocationResponseResponse rpc_method_invocation_response = 40;
// Track Publication
EnableRemoteTrackPublicationResponse enable_remote_track_publication = 41;
UpdateRemoteTrackPublicationDimensionResponse update_remote_track_publication_dimension = 42;
}
}

Expand Down
38 changes: 38 additions & 0 deletions livekit-ffi/protocol/track_publication.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

syntax = "proto2";

package livekit.proto;
option csharp_namespace = "LiveKit.Proto";

// Enable/Disable a remote track publication
message EnableRemoteTrackPublicationRequest {
required uint64 track_publication_handle = 1;
required bool enabled = 2;
}

message EnableRemoteTrackPublicationResponse {}

// update a remote track publication dimension
message UpdateRemoteTrackPublicationDimensionRequest {
required uint64 track_publication_handle = 1;
required uint32 width = 2;
required uint32 height = 3;
}

message UpdateRemoteTrackPublicationDimensionResponse {}



42 changes: 40 additions & 2 deletions livekit-ffi/src/livekit.proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1529,6 +1529,34 @@ impl StreamState {
}
}
}
/// Enable/Disable a remote track publication
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct EnableRemoteTrackPublicationRequest {
#[prost(uint64, required, tag="1")]
pub track_publication_handle: u64,
#[prost(bool, required, tag="2")]
pub enabled: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct EnableRemoteTrackPublicationResponse {
}
/// update a remote track publication dimension
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct UpdateRemoteTrackPublicationDimensionRequest {
#[prost(uint64, required, tag="1")]
pub track_publication_handle: u64,
#[prost(uint32, required, tag="2")]
pub width: u32,
#[prost(uint32, required, tag="3")]
pub height: u32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct UpdateRemoteTrackPublicationDimensionResponse {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ParticipantInfo {
Expand Down Expand Up @@ -3686,7 +3714,7 @@ pub struct RpcMethodInvocationEvent {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FfiRequest {
#[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41")]
#[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43")]
pub message: ::core::option::Option<ffi_request::Message>,
}
/// Nested message and enum types in `FfiRequest`.
Expand Down Expand Up @@ -3779,13 +3807,18 @@ pub mod ffi_request {
UnregisterRpcMethod(super::UnregisterRpcMethodRequest),
#[prost(message, tag="41")]
RpcMethodInvocationResponse(super::RpcMethodInvocationResponseRequest),
/// Track Publication
#[prost(message, tag="42")]
EnableRemoteTrackPublication(super::EnableRemoteTrackPublicationRequest),
#[prost(message, tag="43")]
UpdateRemoteTrackPublicationDimension(super::UpdateRemoteTrackPublicationDimensionRequest),
}
}
/// This is the output of livekit_ffi_request function.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FfiResponse {
#[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40")]
#[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42")]
pub message: ::core::option::Option<ffi_response::Message>,
}
/// Nested message and enum types in `FfiResponse`.
Expand Down Expand Up @@ -3876,6 +3909,11 @@ pub mod ffi_response {
UnregisterRpcMethod(super::UnregisterRpcMethodResponse),
#[prost(message, tag="40")]
RpcMethodInvocationResponse(super::RpcMethodInvocationResponseResponse),
/// Track Publication
#[prost(message, tag="41")]
EnableRemoteTrackPublication(super::EnableRemoteTrackPublicationResponse),
#[prost(message, tag="42")]
UpdateRemoteTrackPublicationDimension(super::UpdateRemoteTrackPublicationDimensionResponse),
}
}
/// To minimize complexity, participant events are not included in the protocol.
Expand Down
40 changes: 40 additions & 0 deletions livekit-ffi/src/server/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,36 @@ fn on_set_subscribed(
Ok(proto::SetSubscribedResponse {})
}

fn on_enable_remote_track_publication(
server: &'static FfiServer,
request: proto::EnableRemoteTrackPublicationRequest,
) -> FfiResult<proto::EnableRemoteTrackPublicationResponse> {
let ffi_publication =
server.retrieve_handle::<FfiPublication>(request.track_publication_handle)?;

let TrackPublication::Remote(publication) = &ffi_publication.publication else {
return Err(FfiError::InvalidRequest("publication is not a RemotePublication".into()));
};

publication.set_enabled(request.enabled);
Ok(proto::EnableRemoteTrackPublicationResponse {})
}

fn on_update_remote_track_publication_dimension(
server: &'static FfiServer,
request: proto::UpdateRemoteTrackPublicationDimensionRequest,
) -> FfiResult<proto::UpdateRemoteTrackPublicationDimensionResponse> {
let ffi_publication =
server.retrieve_handle::<FfiPublication>(request.track_publication_handle)?;

let TrackPublication::Remote(publication) = &ffi_publication.publication else {
return Err(FfiError::InvalidRequest("publication is not a RemotePublication".into()));
};
let dimension = TrackDimension(request.width, request.height);
publication.update_video_dimensions(dimension);
Ok(proto::UpdateRemoteTrackPublicationDimensionResponse {})
}

fn on_set_local_metadata(
server: &'static FfiServer,
set_local_metadata: proto::SetLocalMetadataRequest,
Expand Down Expand Up @@ -995,6 +1025,16 @@ pub fn handle_request(
on_rpc_method_invocation_response(server, request)?,
)
}
proto::ffi_request::Message::EnableRemoteTrackPublication(request) => {
proto::ffi_response::Message::EnableRemoteTrackPublication(
on_enable_remote_track_publication(server, request)?,
)
}
proto::ffi_request::Message::UpdateRemoteTrackPublicationDimension(request) => {
proto::ffi_response::Message::UpdateRemoteTrackPublicationDimension(
on_update_remote_track_publication_dimension(server, request)?,
)
}
});

Ok(res)
Expand Down
27 changes: 26 additions & 1 deletion livekit/src/room/participant/remote_participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,32 @@ impl RemoteParticipant {
.await
});
}
})
});

publication.on_video_dimensions_changed({
let rtc_engine = self.inner.rtc_engine.clone();
move |publication, dimension| {
let rtc_engine = rtc_engine.clone();
livekit_runtime::spawn(async move {
let tsid: String = publication.sid().into();
let TrackDimension(width, height) = dimension;
let enabled = publication.is_enabled();
let update_track_settings = proto::UpdateTrackSettings {
track_sids: vec![tsid.clone()],
disabled: !enabled,
width,
height,
..Default::default()
};

rtc_engine
.send_request(proto::signal_request::Message::TrackSetting(
update_track_settings,
))
.await
});
}
});
}

pub(crate) fn remove_publication(&self, sid: &TrackSid) -> Option<TrackPublication> {
Expand Down
27 changes: 27 additions & 0 deletions livekit/src/room/publication/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type PermissionStatusChangedHandler =
Box<dyn Fn(RemoteTrackPublication, PermissionStatus, PermissionStatus) + Send>; // old_status, new_status
type SubscriptionUpdateNeededHandler = Box<dyn Fn(RemoteTrackPublication, bool) + Send>;
type EnabledStatusChangedHandler = Box<dyn Fn(RemoteTrackPublication, bool) + Send>;
type VideoDimensionsChangedHandler = Box<dyn Fn(RemoteTrackPublication, TrackDimension) + Send>;

#[derive(Default)]
struct RemoteEvents {
Expand All @@ -37,6 +38,7 @@ struct RemoteEvents {
permission_status_changed: Mutex<Option<PermissionStatusChangedHandler>>,
subscription_update_needed: Mutex<Option<SubscriptionUpdateNeededHandler>>,
enabled_status_changed: Mutex<Option<EnabledStatusChangedHandler>>,
video_dimensions_changed: Mutex<Option<VideoDimensionsChangedHandler>>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -215,6 +217,13 @@ impl RemoteTrackPublication {
*self.remote.events.enabled_status_changed.lock() = Some(Box::new(f));
}

pub(crate) fn on_video_dimensions_changed(
&self,
f: impl Fn(RemoteTrackPublication, TrackDimension) + Send + 'static,
) {
*self.remote.events.video_dimensions_changed.lock() = Some(Box::new(f));
}

pub fn set_subscribed(&self, subscribed: bool) {
let old_subscription_state = self.subscription_status();
let old_permission_state = self.permission_status();
Expand Down Expand Up @@ -262,6 +271,24 @@ impl RemoteTrackPublication {
}
}

pub fn update_video_dimensions(&self, dimension: TrackDimension) {
if self.is_subscribed() {
if dimension != self.dimension() {
let TrackDimension(width, height) = dimension;
let mut new_info = self.proto_info();
new_info.width = width;
new_info.height = height;
self.update_info(new_info);
}
// Request to send an update to the SFU
if let Some(video_dimensions_changed) =
self.remote.events.video_dimensions_changed.lock().as_ref()
{
video_dimensions_changed(self.clone(), dimension)
}
}
}

pub fn subscription_status(&self) -> SubscriptionStatus {
if !self.remote.info.read().subscribed {
return SubscriptionStatus::Unsubscribed;
Expand Down
Loading