Skip to content

Commit

Permalink
add adaptive streaming; (livekit#488)
Browse files Browse the repository at this point in the history
  • Loading branch information
zesun96 authored and mikayla-maki committed Dec 2, 2024
1 parent c59af3e commit b131b74
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 3 deletions.
1 change: 1 addition & 0 deletions livekit-ffi/generate_proto.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ protoc \
$PROTOCOL/handle.proto \
$PROTOCOL/room.proto \
$PROTOCOL/track.proto \
$PROTOCOL/track_publication.proto \
$PROTOCOL/participant.proto \
$PROTOCOL/video_frame.proto \
$PROTOCOL/audio_frame.proto \
Expand Down
8 changes: 8 additions & 0 deletions livekit-ffi/protocol/ffi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ option csharp_namespace = "LiveKit.Proto";
// import "handle.proto";
import "e2ee.proto";
import "track.proto";
import "track_publication.proto";
import "room.proto";
import "video_frame.proto";
import "audio_frame.proto";
Expand Down Expand Up @@ -106,6 +107,10 @@ message FfiRequest {
RegisterRpcMethodRequest register_rpc_method = 39;
UnregisterRpcMethodRequest unregister_rpc_method = 40;
RpcMethodInvocationResponseRequest rpc_method_invocation_response = 41;

// Track Publication
EnableRemoteTrackPublicationRequest enable_remote_track_publication = 42;
UpdateRemoteTrackPublicationDimensionRequest update_remote_track_publication_dimension = 43;
}
}

Expand Down Expand Up @@ -160,6 +165,9 @@ message FfiResponse {
RegisterRpcMethodResponse register_rpc_method = 38;
UnregisterRpcMethodResponse unregister_rpc_method = 39;
RpcMethodInvocationResponseResponse rpc_method_invocation_response = 40;
// Track Publication
EnableRemoteTrackPublicationResponse enable_remote_track_publication = 41;
UpdateRemoteTrackPublicationDimensionResponse update_remote_track_publication_dimension = 42;
}
}

Expand Down
38 changes: 38 additions & 0 deletions livekit-ffi/protocol/track_publication.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

syntax = "proto2";

package livekit.proto;
option csharp_namespace = "LiveKit.Proto";

// Enable/Disable a remote track publication
message EnableRemoteTrackPublicationRequest {
required uint64 track_publication_handle = 1;
required bool enabled = 2;
}

message EnableRemoteTrackPublicationResponse {}

// update a remote track publication dimension
message UpdateRemoteTrackPublicationDimensionRequest {
required uint64 track_publication_handle = 1;
required uint32 width = 2;
required uint32 height = 3;
}

message UpdateRemoteTrackPublicationDimensionResponse {}



42 changes: 40 additions & 2 deletions livekit-ffi/src/livekit.proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1529,6 +1529,34 @@ impl StreamState {
}
}
}
/// Enable/Disable a remote track publication
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct EnableRemoteTrackPublicationRequest {
#[prost(uint64, required, tag="1")]
pub track_publication_handle: u64,
#[prost(bool, required, tag="2")]
pub enabled: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct EnableRemoteTrackPublicationResponse {
}
/// update a remote track publication dimension
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct UpdateRemoteTrackPublicationDimensionRequest {
#[prost(uint64, required, tag="1")]
pub track_publication_handle: u64,
#[prost(uint32, required, tag="2")]
pub width: u32,
#[prost(uint32, required, tag="3")]
pub height: u32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct UpdateRemoteTrackPublicationDimensionResponse {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ParticipantInfo {
Expand Down Expand Up @@ -3686,7 +3714,7 @@ pub struct RpcMethodInvocationEvent {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FfiRequest {
#[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41")]
#[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43")]
pub message: ::core::option::Option<ffi_request::Message>,
}
/// Nested message and enum types in `FfiRequest`.
Expand Down Expand Up @@ -3779,13 +3807,18 @@ pub mod ffi_request {
UnregisterRpcMethod(super::UnregisterRpcMethodRequest),
#[prost(message, tag="41")]
RpcMethodInvocationResponse(super::RpcMethodInvocationResponseRequest),
/// Track Publication
#[prost(message, tag="42")]
EnableRemoteTrackPublication(super::EnableRemoteTrackPublicationRequest),
#[prost(message, tag="43")]
UpdateRemoteTrackPublicationDimension(super::UpdateRemoteTrackPublicationDimensionRequest),
}
}
/// This is the output of livekit_ffi_request function.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FfiResponse {
#[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40")]
#[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42")]
pub message: ::core::option::Option<ffi_response::Message>,
}
/// Nested message and enum types in `FfiResponse`.
Expand Down Expand Up @@ -3876,6 +3909,11 @@ pub mod ffi_response {
UnregisterRpcMethod(super::UnregisterRpcMethodResponse),
#[prost(message, tag="40")]
RpcMethodInvocationResponse(super::RpcMethodInvocationResponseResponse),
/// Track Publication
#[prost(message, tag="41")]
EnableRemoteTrackPublication(super::EnableRemoteTrackPublicationResponse),
#[prost(message, tag="42")]
UpdateRemoteTrackPublicationDimension(super::UpdateRemoteTrackPublicationDimensionResponse),
}
}
/// To minimize complexity, participant events are not included in the protocol.
Expand Down
40 changes: 40 additions & 0 deletions livekit-ffi/src/server/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,36 @@ fn on_set_subscribed(
Ok(proto::SetSubscribedResponse {})
}

fn on_enable_remote_track_publication(
server: &'static FfiServer,
request: proto::EnableRemoteTrackPublicationRequest,
) -> FfiResult<proto::EnableRemoteTrackPublicationResponse> {
let ffi_publication =
server.retrieve_handle::<FfiPublication>(request.track_publication_handle)?;

let TrackPublication::Remote(publication) = &ffi_publication.publication else {
return Err(FfiError::InvalidRequest("publication is not a RemotePublication".into()));
};

publication.set_enabled(request.enabled);
Ok(proto::EnableRemoteTrackPublicationResponse {})
}

fn on_update_remote_track_publication_dimension(
server: &'static FfiServer,
request: proto::UpdateRemoteTrackPublicationDimensionRequest,
) -> FfiResult<proto::UpdateRemoteTrackPublicationDimensionResponse> {
let ffi_publication =
server.retrieve_handle::<FfiPublication>(request.track_publication_handle)?;

let TrackPublication::Remote(publication) = &ffi_publication.publication else {
return Err(FfiError::InvalidRequest("publication is not a RemotePublication".into()));
};
let dimension = TrackDimension(request.width, request.height);
publication.update_video_dimensions(dimension);
Ok(proto::UpdateRemoteTrackPublicationDimensionResponse {})
}

fn on_set_local_metadata(
server: &'static FfiServer,
set_local_metadata: proto::SetLocalMetadataRequest,
Expand Down Expand Up @@ -995,6 +1025,16 @@ pub fn handle_request(
on_rpc_method_invocation_response(server, request)?,
)
}
proto::ffi_request::Message::EnableRemoteTrackPublication(request) => {
proto::ffi_response::Message::EnableRemoteTrackPublication(
on_enable_remote_track_publication(server, request)?,
)
}
proto::ffi_request::Message::UpdateRemoteTrackPublicationDimension(request) => {
proto::ffi_response::Message::UpdateRemoteTrackPublicationDimension(
on_update_remote_track_publication_dimension(server, request)?,
)
}
});

Ok(res)
Expand Down
27 changes: 26 additions & 1 deletion livekit/src/room/participant/remote_participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,32 @@ impl RemoteParticipant {
.await
});
}
})
});

publication.on_video_dimensions_changed({
let rtc_engine = self.inner.rtc_engine.clone();
move |publication, dimension| {
let rtc_engine = rtc_engine.clone();
livekit_runtime::spawn(async move {
let tsid: String = publication.sid().into();
let TrackDimension(width, height) = dimension;
let enabled = publication.is_enabled();
let update_track_settings = proto::UpdateTrackSettings {
track_sids: vec![tsid.clone()],
disabled: !enabled,
width,
height,
..Default::default()
};

rtc_engine
.send_request(proto::signal_request::Message::TrackSetting(
update_track_settings,
))
.await
});
}
});
}

pub(crate) fn remove_publication(&self, sid: &TrackSid) -> Option<TrackPublication> {
Expand Down
27 changes: 27 additions & 0 deletions livekit/src/room/publication/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type PermissionStatusChangedHandler =
Box<dyn Fn(RemoteTrackPublication, PermissionStatus, PermissionStatus) + Send>; // old_status, new_status
type SubscriptionUpdateNeededHandler = Box<dyn Fn(RemoteTrackPublication, bool) + Send>;
type EnabledStatusChangedHandler = Box<dyn Fn(RemoteTrackPublication, bool) + Send>;
type VideoDimensionsChangedHandler = Box<dyn Fn(RemoteTrackPublication, TrackDimension) + Send>;

#[derive(Default)]
struct RemoteEvents {
Expand All @@ -37,6 +38,7 @@ struct RemoteEvents {
permission_status_changed: Mutex<Option<PermissionStatusChangedHandler>>,
subscription_update_needed: Mutex<Option<SubscriptionUpdateNeededHandler>>,
enabled_status_changed: Mutex<Option<EnabledStatusChangedHandler>>,
video_dimensions_changed: Mutex<Option<VideoDimensionsChangedHandler>>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -215,6 +217,13 @@ impl RemoteTrackPublication {
*self.remote.events.enabled_status_changed.lock() = Some(Box::new(f));
}

pub(crate) fn on_video_dimensions_changed(
&self,
f: impl Fn(RemoteTrackPublication, TrackDimension) + Send + 'static,
) {
*self.remote.events.video_dimensions_changed.lock() = Some(Box::new(f));
}

pub fn set_subscribed(&self, subscribed: bool) {
let old_subscription_state = self.subscription_status();
let old_permission_state = self.permission_status();
Expand Down Expand Up @@ -262,6 +271,24 @@ impl RemoteTrackPublication {
}
}

pub fn update_video_dimensions(&self, dimension: TrackDimension) {
if self.is_subscribed() {
if dimension != self.dimension() {
let TrackDimension(width, height) = dimension;
let mut new_info = self.proto_info();
new_info.width = width;
new_info.height = height;
self.update_info(new_info);
}
// Request to send an update to the SFU
if let Some(video_dimensions_changed) =
self.remote.events.video_dimensions_changed.lock().as_ref()
{
video_dimensions_changed(self.clone(), dimension)
}
}
}

pub fn subscription_status(&self) -> SubscriptionStatus {
if !self.remote.info.read().subscribed {
return SubscriptionStatus::Unsubscribed;
Expand Down

0 comments on commit b131b74

Please sign in to comment.