diff --git a/.github/banner_dark.png b/.github/banner_dark.png index 26faeae1..7f9fe2a9 100644 Binary files a/.github/banner_dark.png and b/.github/banner_dark.png differ diff --git a/.github/banner_light.png b/.github/banner_light.png index da67f6ae..1fa73715 100644 Binary files a/.github/banner_light.png and b/.github/banner_light.png differ diff --git a/Cargo.lock b/Cargo.lock index 7fc730b6..996eabe6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1547,7 +1547,7 @@ checksum = "c4cd1a83af159aa67994778be9070f0ae1bd732942279cabb14f86f986a21456" [[package]] name = "livekit" -version = "0.3.2" +version = "0.4.0" dependencies = [ "futures-util", "lazy_static", @@ -1592,7 +1592,7 @@ dependencies = [ [[package]] name = "livekit-ffi" -version = "0.5.0" +version = "0.6.0" dependencies = [ "console-subscriber", "dashmap", @@ -1616,7 +1616,7 @@ dependencies = [ [[package]] name = "livekit-protocol" -version = "0.3.2" +version = "0.3.3" dependencies = [ "futures-util", "livekit-runtime", diff --git a/README.md b/README.md index 7e7e55eb..426bcffe 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ - The LiveKit icon, the name of the repository and some sample code in the background. + The LiveKit icon, the name of the repository and some sample code in the background. @@ -188,8 +188,8 @@ We'll first use it as a basis for our Unity SDK (under development), but over ti
- - + + diff --git a/libwebrtc/Cargo.toml b/libwebrtc/Cargo.toml index b099a72b..22c35c95 100644 --- a/libwebrtc/Cargo.toml +++ b/libwebrtc/Cargo.toml @@ -8,7 +8,7 @@ description = "Livekit safe bindings to libwebrtc" repository = "https://github.com/livekit/rust-sdks" [dependencies] -livekit-protocol = { path = "../livekit-protocol", version = "0.3.2" } +livekit-protocol = { path = "../livekit-protocol", version = "0.3.3" } log = "0.4" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/livekit-api/Cargo.toml b/livekit-api/Cargo.toml index 70ab0281..406bcd44 100644 --- a/livekit-api/Cargo.toml +++ b/livekit-api/Cargo.toml @@ -65,7 +65,7 @@ rustls-tls-webpki-roots = [ __rustls-tls = ["tokio-tungstenite?/__rustls-tls", "reqwest?/__rustls"] [dependencies] -livekit-protocol = { path = "../livekit-protocol", version = "0.3.2" } +livekit-protocol = { path = "../livekit-protocol", version = "0.3.3" } thiserror = "1.0" serde = { version = "1.0", features = ["derive"] } sha2 = "0.10" diff --git a/livekit-ffi/Cargo.toml b/livekit-ffi/Cargo.toml index 5343413f..dc5ed6b6 100644 --- a/livekit-ffi/Cargo.toml +++ b/livekit-ffi/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "livekit-ffi" -version = "0.5.0" +version = "0.6.0" edition = "2021" license = "Apache-2.0" description = "FFI interface for bindings in other languages" @@ -18,8 +18,8 @@ __rustls-tls = ["livekit/__rustls-tls"] tracing = ["tokio/tracing", "console-subscriber"] [dependencies] -livekit = { path = "../livekit", version = "0.3.2" } -livekit-protocol = { path = "../livekit-protocol", version = "0.3.2" } +livekit = { path = "../livekit", version = "0.4.0" } +livekit-protocol = { path = "../livekit-protocol", version = "0.3.3" } tokio = { version = "1", features = ["full", "parking_lot"] } futures-util = { version = "0.3", default-features = false, features = ["sink"] } parking_lot = { version = "0.12", features = ["deadlock_detection"] } diff --git a/livekit-ffi/protocol/room.proto b/livekit-ffi/protocol/room.proto index 4187aacf..1e9efc9e 100644 --- a/livekit-ffi/protocol/room.proto +++ b/livekit-ffi/protocol/room.proto @@ -88,9 +88,10 @@ message PublishDataRequest { uint64 local_participant_handle = 1; uint64 data_ptr = 2; uint64 data_len = 3; - DataPacketKind kind = 4; - repeated string destination_sids = 5; // destination + bool reliable = 4; + repeated string destination_sids = 5 [deprecated=true]; optional string topic = 6; + repeated string destination_identities = 7; } message PublishDataResponse { uint64 async_id = 1; @@ -160,7 +161,6 @@ message GetSessionStatsCallback { repeated RtcStats subscriber_stats = 4; } - // // Options // @@ -273,9 +273,10 @@ message RoomEvent { TrackUnmuted track_unmuted = 12; ActiveSpeakersChanged active_speakers_changed = 13; RoomMetadataChanged room_metadata_changed = 14; - ParticipantMetadataChanged participant_metadata_changed = 15; - ParticipantNameChanged participant_name_changed = 16; - ConnectionQualityChanged connection_quality_changed = 17; + RoomSidChanged room_sid_changed = 15; + ParticipantMetadataChanged participant_metadata_changed = 16; + ParticipantNameChanged participant_name_changed = 17; + ConnectionQualityChanged connection_quality_changed = 18; ConnectionStateChanged connection_state_changed = 19; // Connected connected = 20; Disconnected disconnected = 21; @@ -364,6 +365,10 @@ message RoomMetadataChanged { string metadata = 1; } +message RoomSidChanged { + string sid = 1; +} + message ParticipantMetadataChanged { string participant_sid = 1; string metadata = 2; diff --git a/livekit-ffi/src/conversion/room.rs b/livekit-ffi/src/conversion/room.rs index 1dd1bd8a..cee4e44f 100644 --- a/livekit-ffi/src/conversion/room.rs +++ b/livekit-ffi/src/conversion/room.rs @@ -215,6 +215,10 @@ impl From for AudioEncoding { impl From<&FfiRoom> for proto::RoomInfo { fn from(value: &FfiRoom) -> Self { let room = &value.inner.room; - Self { sid: room.sid().into(), name: room.name(), metadata: room.metadata() } + Self { + sid: room.maybe_sid().unwrap_or_default().into(), + name: room.name(), + metadata: room.metadata(), + } } } diff --git a/livekit-ffi/src/livekit.proto.rs b/livekit-ffi/src/livekit.proto.rs index 6b4df6c6..19260c86 100644 --- a/livekit-ffi/src/livekit.proto.rs +++ b/livekit-ffi/src/livekit.proto.rs @@ -1,5 +1,4 @@ // @generated -// This file is @generated by prost-build. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FrameCryptor { @@ -2019,13 +2018,15 @@ pub struct PublishDataRequest { pub data_ptr: u64, #[prost(uint64, tag="3")] pub data_len: u64, - #[prost(enumeration="DataPacketKind", tag="4")] - pub kind: i32, - /// destination + #[prost(bool, tag="4")] + pub reliable: bool, + #[deprecated] #[prost(string, repeated, tag="5")] pub destination_sids: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, #[prost(string, optional, tag="6")] pub topic: ::core::option::Option<::prost::alloc::string::String>, + #[prost(string, repeated, tag="7")] + pub destination_identities: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -2044,20 +2045,6 @@ pub struct PublishDataCallback { /// Publish transcription messages to room #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct TranscriptionSegment { - #[prost(string, tag="1")] - pub id: ::prost::alloc::string::String, - #[prost(string, tag="2")] - pub text: ::prost::alloc::string::String, - #[prost(uint64, tag="3")] - pub start_time: u64, - #[prost(uint64, tag="4")] - pub end_time: u64, - #[prost(bool, tag="5")] - pub r#final: bool, -} -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] pub struct PublishTranscriptionRequest { #[prost(uint64, tag="1")] pub local_participant_handle: u64, @@ -2240,6 +2227,20 @@ pub struct RoomOptions { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct TranscriptionSegment { + #[prost(string, tag="1")] + pub id: ::prost::alloc::string::String, + #[prost(string, tag="2")] + pub text: ::prost::alloc::string::String, + #[prost(uint64, tag="3")] + pub start_time: u64, + #[prost(uint64, tag="4")] + pub end_time: u64, + #[prost(bool, tag="5")] + pub r#final: bool, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct BufferInfo { #[prost(uint64, tag="1")] pub data_ptr: u64, @@ -2259,7 +2260,7 @@ pub struct OwnedBuffer { pub struct RoomEvent { #[prost(uint64, tag="1")] pub room_handle: u64, - #[prost(oneof="room_event::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 19, 21, 22, 23, 24, 25, 26")] + #[prost(oneof="room_event::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 21, 22, 23, 24, 25, 26")] pub message: ::core::option::Option, } /// Nested message and enum types in `RoomEvent`. @@ -2294,10 +2295,12 @@ pub mod room_event { #[prost(message, tag="14")] RoomMetadataChanged(super::RoomMetadataChanged), #[prost(message, tag="15")] - ParticipantMetadataChanged(super::ParticipantMetadataChanged), + RoomSidChanged(super::RoomSidChanged), #[prost(message, tag="16")] - ParticipantNameChanged(super::ParticipantNameChanged), + ParticipantMetadataChanged(super::ParticipantMetadataChanged), #[prost(message, tag="17")] + ParticipantNameChanged(super::ParticipantNameChanged), + #[prost(message, tag="18")] ConnectionQualityChanged(super::ConnectionQualityChanged), #[prost(message, tag="19")] ConnectionStateChanged(super::ConnectionStateChanged), @@ -2445,6 +2448,12 @@ pub struct RoomMetadataChanged { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct RoomSidChanged { + #[prost(string, tag="1")] + pub sid: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct ParticipantMetadataChanged { #[prost(string, tag="1")] pub participant_sid: ::prost::alloc::string::String, diff --git a/livekit-ffi/src/server/room.rs b/livekit-ffi/src/server/room.rs index 4de7ff2e..7e99c401 100644 --- a/livekit-ffi/src/server/room.rs +++ b/livekit-ffi/src/server/room.rs @@ -166,6 +166,21 @@ impl FfiRoom { }, )); + // Update Room SID on promise resolve + let room_handle = inner.handle_id.clone(); + server.async_runtime.spawn(async move { + let _ = server.send_event(proto::ffi_event::Message::RoomEvent( + proto::RoomEvent { + room_handle, + message: Some(proto::room_event::Message::RoomSidChanged( + proto::RoomSidChanged { + sid: ffi_room.inner.room.sid().await.into(), + }, + )), + }, + )); + }); + // Forward events let event_handle = server.watch_panic({ let close_rx = close_rx.resubscribe(); @@ -237,20 +252,20 @@ impl RoomInner { } .to_vec(); - let kind = publish.kind(); - let destination_sids = publish.destination_sids; + let reliable = publish.reliable; + let topic = publish.topic; + let destination_identities = publish.destination_identities; let async_id = server.next_id(); if let Err(err) = self.data_tx.send(FfiDataPacket { payload: DataPacket { payload: data.to_vec(), // Avoid copy? - kind: kind.into(), - topic: publish.topic, - destination_sids: destination_sids + reliable, + topic, + destination_identities: destination_identities .into_iter() .map(|str| str.try_into().unwrap()) .collect(), - destination_identities: Vec::new(), // TODO }, async_id, }) { diff --git a/livekit-protocol/Cargo.toml b/livekit-protocol/Cargo.toml index 98df43ff..fc9f49c8 100644 --- a/livekit-protocol/Cargo.toml +++ b/livekit-protocol/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "livekit-protocol" -version = "0.3.2" +version = "0.3.3" edition = "2021" license = "Apache-2.0" description = "Livekit protocol and utilities for the Rust SDK" @@ -8,7 +8,7 @@ repository = "https://github.com/livekit/rust-sdks" [dependencies] livekit-runtime = { path = "../livekit-runtime", version = "0.3.0", features = [ "tokio" ] } -tokio = { version = "1", default-features = false, features = [ "sync", "macros", "time", "net" ] } +tokio = { version = "1", default-features = false, features = [ "sync", "macros", "time", "net", "rt" ] } futures-util = { version = "0.3", features = ["sink"] } parking_lot = "0.12" prost = "0.12" diff --git a/livekit-protocol/src/lib.rs b/livekit-protocol/src/lib.rs index 60b5a0dc..1af88acd 100644 --- a/livekit-protocol/src/lib.rs +++ b/livekit-protocol/src/lib.rs @@ -18,6 +18,7 @@ pub mod debouncer; pub mod enum_dispatch; pub mod observer; +pub mod promise; include!("livekit.rs"); diff --git a/livekit-protocol/src/promise.rs b/livekit-protocol/src/promise.rs new file mode 100644 index 00000000..475f32bc --- /dev/null +++ b/livekit-protocol/src/promise.rs @@ -0,0 +1,50 @@ +// Copyright 2024 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use tokio::sync::{oneshot, Mutex}; + +pub struct Promise { + tx: Mutex>>, + rx: Mutex>>, + result: Mutex>, +} + +impl Promise { + pub fn new() -> Self { + let (tx, rx) = oneshot::channel(); + Self { tx: Mutex::new(Some(tx)), rx: Mutex::new(Some(rx)), result: Default::default() } + } + + pub fn resolve(&self, result: T) -> Result<(), &'static str> { + let mut tx = self.tx.try_lock().unwrap(); + if tx.is_some() { + let _ = tx.take().unwrap().send(result); + Ok(()) + } else { + Err("promise already used") + } + } + + pub async fn result(&self) -> T { + let mut rx = self.rx.lock().await; + if rx.is_some() { + self.result.lock().await.replace(rx.take().unwrap().await.unwrap()); + } + self.result.lock().await.clone().unwrap() + } + + pub fn try_result(&self) -> Option { + self.result.try_lock().unwrap().clone() + } +} diff --git a/livekit/Cargo.toml b/livekit/Cargo.toml index 9e22a725..0224fc94 100644 --- a/livekit/Cargo.toml +++ b/livekit/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "livekit" -version = "0.3.2" +version = "0.4.0" edition = "2021" license = "Apache-2.0" description = "Rust Client SDK for LiveKit" @@ -30,7 +30,7 @@ __lk-internal = [] livekit-runtime = { path = "../livekit-runtime", version = "0.3.0", default-features = false } livekit-api = { path = "../livekit-api", version = "0.3.2", default-features = false } libwebrtc = { path = "../libwebrtc", version = "0.3.2" } -livekit-protocol = { path = "../livekit-protocol", version = "0.3.2" } +livekit-protocol = { path = "../livekit-protocol", version = "0.3.3" } prost = "0.12" serde = { version = "1", features = ["derive"] } serde_json = "1.0" diff --git a/livekit/src/room/id.rs b/livekit/src/room/id.rs index c5f26451..bd0b42aa 100644 --- a/livekit/src/room/id.rs +++ b/livekit/src/room/id.rs @@ -15,7 +15,7 @@ pub struct ParticipantIdentity(pub String); #[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)] pub struct TrackSid(String); -#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)] +#[derive(Clone, Default, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)] pub struct RoomSid(String); impl From for ParticipantIdentity { diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index 73bc8876..6fccde3b 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -29,7 +29,7 @@ use livekit_protocol::observer::Dispatcher; use livekit_runtime::JoinHandle; use parking_lot::RwLock; pub use proto::DisconnectReason; -use proto::SignalTarget; +use proto::{promise::Promise, SignalTarget}; use thiserror::Error; use tokio::sync::{mpsc, oneshot, Mutex as AsyncMutex}; @@ -179,8 +179,7 @@ pub enum DataPacketKind { pub struct DataPacket { pub payload: Vec, pub topic: Option, - pub kind: DataPacketKind, - pub destination_sids: Vec, + pub reliable: bool, pub destination_identities: Vec, } @@ -189,8 +188,7 @@ impl Default for DataPacket { Self { payload: Vec::new(), topic: None, - kind: DataPacketKind::Reliable, - destination_sids: Vec::new(), + reliable: false, destination_identities: Vec::new(), } } @@ -250,7 +248,7 @@ pub struct Room { impl Debug for Room { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { f.debug_struct("Room") - .field("sid", &self.sid()) + .field("sid", &self.maybe_sid()) .field("name", &self.name()) .field("connection_state", &self.connection_state()) .finish() @@ -264,18 +262,14 @@ struct RoomInfo { pub(crate) struct RoomSession { rtc_engine: Arc, - sid: RoomSid, + sid: Promise, name: String, info: RwLock, dispatcher: Dispatcher, options: RoomOptions, active_speakers: RwLock>, local_participant: LocalParticipant, - participants: RwLock<( - // Keep track of participants by sid and identity - HashMap, - HashMap, - )>, + remote_participants: RwLock>, e2ee_manager: E2eeManager, room_task: AsyncMutex, oneshot::Sender<()>)>>, } @@ -283,7 +277,7 @@ pub(crate) struct RoomSession { impl Debug for RoomSession { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("SessionInner") - .field("sid", &self.sid) + .field("sid", &self.sid.try_result()) .field("name", &self.name) .field("rtc_engine", &self.rtc_engine) .finish() @@ -393,13 +387,13 @@ impl Room { let room_info = join_response.room.unwrap(); let inner = Arc::new(RoomSession { - sid: room_info.sid.try_into().unwrap(), + sid: Promise::new(), name: room_info.name, info: RwLock::new(RoomInfo { state: ConnectionState::Disconnected, metadata: room_info.metadata, }), - participants: Default::default(), + remote_participants: Default::default(), active_speakers: Default::default(), options, rtc_engine: rtc_engine.clone(), @@ -421,7 +415,7 @@ impl Room { { Participant::Local(inner.local_participant.clone()) } else if let Some(participant) = - inner.participants.read().1.get(&participant_identity) + inner.remote_participants.read().get(&participant_identity) { Participant::Remote(participant.clone()) } else { @@ -449,10 +443,10 @@ impl Room { // Get the initial states (Can be useful on some usecases, like the FfiServer) // Getting them here ensure nothing happening before (Like a new participant joining) // because the room task is not started yet - let participants = inner.participants.read().0.clone(); + let participants = inner.remote_participants.read().clone(); let participants_with_tracks = participants .into_values() - .map(|p| (p.clone(), p.tracks().into_values().collect())) + .map(|p| (p.clone(), p.track_publications().into_values().collect())) .collect(); let events = inner.dispatcher.register(); @@ -482,8 +476,12 @@ impl Room { self.inner.dispatcher.register() } - pub fn sid(&self) -> RoomSid { - self.inner.sid.clone() + pub async fn sid(&self) -> RoomSid { + self.inner.sid.result().await + } + + pub fn maybe_sid(&self) -> Option { + self.inner.sid.try_result() } pub fn name(&self) -> String { @@ -502,8 +500,8 @@ impl Room { self.inner.info.read().state } - pub fn participants(&self) -> HashMap { - self.inner.participants.read().0.clone() + pub fn remote_participants(&self) -> HashMap { + self.inner.remote_participants.read().clone() } pub fn e2ee_manager(&self) -> &E2eeManager { @@ -679,14 +677,20 @@ impl RoomSession { return; } - let (participant_sid, track_sid) = lk_stream_id.unwrap(); - let participant_sid = participant_sid.to_owned().try_into().unwrap(); - let track_sid = track_sid.to_owned().try_into().unwrap(); - let remote_participant = self.get_participant_by_sid(&participant_sid); + let (participant_sid, stream_id) = lk_stream_id.unwrap(); + let participant_sid: ParticipantSid = participant_sid.to_owned().try_into().unwrap(); + let stream_id = stream_id.to_owned().try_into().unwrap(); + + let remote_participant = self + .remote_participants + .read() + .values() + .find(|x| &x.sid() == &participant_sid) + .cloned(); if let Some(remote_participant) = remote_participant { livekit_runtime::spawn(async move { - remote_participant.add_subscribed_media_track(track_sid, track, transceiver).await; + remote_participant.add_subscribed_media_track(stream_id, track, transceiver).await; }); } else { // The server should send participant updates before sending a new offer, this should @@ -757,8 +761,8 @@ impl RoomSession { } let mut track_sids = Vec::new(); - for (_, participant) in self.participants.read().0.clone() { - for (track_sid, track) in participant.tracks() { + for (_, participant) in self.remote_participants.read().clone() { + for (track_sid, track) in participant.track_publications() { if track.is_desired() != auto_subscribe { track_sids.push(track_sid.to_string()); } @@ -841,6 +845,9 @@ impl RoomSession { metadata: info.metadata.clone(), }); } + if !room.sid.is_empty() { + let _ = self.sid.resolve(room.sid.try_into().unwrap()); + } } fn handle_resuming(self: &Arc, tx: oneshot::Sender<()>) { @@ -877,7 +884,7 @@ impl RoomSession { fn handle_restarting(self: &Arc, tx: oneshot::Sender<()>) { // Remove existing participants/subscriptions on full reconnect - let participants = self.participants.read().0.clone(); + let participants = self.remote_participants.read().clone(); for (_, participant) in participants.iter() { self.clone().handle_participant_disconnect(participant.clone()); } @@ -894,7 +901,7 @@ impl RoomSession { // Unpublish and republish every track // At this time we know that the RtcSession is successfully restarted - let published_tracks = self.local_participant.tracks(); + let published_tracks = self.local_participant.track_publications(); // Spawining a new task because we need to wait for the RtcEngine to close the reconnection // lock. @@ -1123,34 +1130,32 @@ impl RoomSession { } }); - let mut participants = self.participants.write(); - participants.0.insert(sid, participant.clone()); - participants.1.insert(identity, participant.clone()); + let mut participants = self.remote_participants.write(); + participants.insert(identity, participant.clone()); participant } /// A participant has disconnected /// Cleanup the participant and emit an event fn handle_participant_disconnect(self: Arc, remote_participant: RemoteParticipant) { - for (sid, _) in remote_participant.tracks() { + for (sid, _) in remote_participant.track_publications() { remote_participant.unpublish_track(&sid); } - let mut participants = self.participants.write(); - participants.0.remove(&remote_participant.sid()); - participants.1.remove(&remote_participant.identity()); + let mut participants = self.remote_participants.write(); + participants.remove(&remote_participant.identity()); self.dispatcher.dispatch(&RoomEvent::ParticipantDisconnected(remote_participant)); } fn get_participant_by_sid(&self, sid: &ParticipantSid) -> Option { - self.participants.read().0.get(sid).cloned() + self.remote_participants.read().values().find(|x| &x.sid() == sid).cloned() } fn get_participant_by_identity( &self, identity: &ParticipantIdentity, ) -> Option { - self.participants.read().1.get(identity).cloned() + self.remote_participants.read().get(identity).cloned() } } diff --git a/livekit/src/room/participant/local_participant.rs b/livekit/src/room/participant/local_participant.rs index 344b5f54..73730748 100644 --- a/livekit/src/room/participant/local_participant.rs +++ b/livekit/src/room/participant/local_participant.rs @@ -73,8 +73,8 @@ impl LocalParticipant { } } - pub(crate) fn internal_tracks(&self) -> HashMap { - self.inner.tracks.read().clone() + pub(crate) fn internal_track_publications(&self) -> HashMap { + self.inner.track_publications.read().clone() } pub(crate) fn update_info(&self, info: proto::ParticipantInfo) { @@ -144,7 +144,7 @@ impl LocalParticipant { } pub(crate) fn published_tracks_info(&self) -> Vec { - let tracks = self.tracks(); + let tracks = self.track_publications(); let mut vec = Vec::with_capacity(tracks.len()); for p in tracks.values() { @@ -271,22 +271,24 @@ impl LocalParticipant { } pub async fn publish_data(&self, packet: DataPacket) -> RoomResult<()> { + let kind = match packet.reliable { + true => DataPacketKind::Reliable, + false => DataPacketKind::Lossy, + }; let destination_identities: Vec = packet.destination_identities.into_iter().map(Into::into).collect(); let data = proto::DataPacket { - kind: packet.kind as i32, + kind: kind as i32, destination_identities: destination_identities.clone(), value: Some(proto::data_packet::Value::User(proto::UserPacket { payload: packet.payload, topic: packet.topic, - destination_sids: packet.destination_sids.into_iter().map(Into::into).collect(), - destination_identities: destination_identities.clone(), ..Default::default() })), ..Default::default() }; - self.inner.rtc_engine.publish_data(&data, packet.kind).await.map_err(Into::into) + self.inner.rtc_engine.publish_data(&data, kind).await.map_err(Into::into) } pub async fn publish_transcription(&self, packet: Transcription) -> RoomResult<()> { @@ -321,7 +323,7 @@ impl LocalParticipant { } pub fn get_track_publication(&self, sid: &TrackSid) -> Option { - self.inner.tracks.read().get(sid).map(|track| { + self.inner.track_publications.read().get(sid).map(|track| { if let TrackPublication::Local(local) = track { return local.clone(); } @@ -350,9 +352,9 @@ impl LocalParticipant { self.inner.info.read().speaking } - pub fn tracks(&self) -> HashMap { + pub fn track_publications(&self) -> HashMap { self.inner - .tracks + .track_publications .read() .clone() .into_iter() diff --git a/livekit/src/room/participant/mod.rs b/livekit/src/room/participant/mod.rs index b3daa07a..0e3ea93d 100644 --- a/livekit/src/room/participant/mod.rs +++ b/livekit/src/room/participant/mod.rs @@ -61,10 +61,10 @@ impl Participant { pub(crate) fn remove_publication(self: &Self, sid: &TrackSid) -> Option; ); - pub fn tracks(&self) -> HashMap { + pub fn track_publications(&self) -> HashMap { match self { - Participant::Local(p) => p.internal_tracks(), - Participant::Remote(p) => p.internal_tracks(), + Participant::Local(p) => p.internal_track_publications(), + Participant::Remote(p) => p.internal_track_publications(), } } } @@ -95,7 +95,7 @@ struct ParticipantEvents { pub(super) struct ParticipantInner { rtc_engine: Arc, info: RwLock, - tracks: RwLock>, + track_publications: RwLock>, events: Arc, } @@ -117,7 +117,7 @@ pub(super) fn new_inner( audio_level: 0.0, connection_quality: ConnectionQuality::Excellent, }), - tracks: Default::default(), + track_publications: Default::default(), events: Default::default(), }) } @@ -203,7 +203,7 @@ pub(super) fn remove_publication( _participant: &Participant, sid: &TrackSid, ) -> Option { - let mut tracks = inner.tracks.write(); + let mut tracks = inner.track_publications.write(); let publication = tracks.remove(sid); if let Some(publication) = publication.clone() { // remove events @@ -222,7 +222,7 @@ pub(super) fn add_publication( participant: &Participant, publication: TrackPublication, ) { - let mut tracks = inner.tracks.write(); + let mut tracks = inner.track_publications.write(); tracks.insert(publication.sid(), publication.clone()); publication.on_muted({ diff --git a/livekit/src/room/participant/remote_participant.rs b/livekit/src/room/participant/remote_participant.rs index 22fe5fb5..12277ac3 100644 --- a/livekit/src/room/participant/remote_participant.rs +++ b/livekit/src/room/participant/remote_participant.rs @@ -82,8 +82,8 @@ impl RemoteParticipant { } } - pub(crate) fn internal_tracks(&self) -> HashMap { - self.inner.tracks.read().clone() + pub(crate) fn internal_track_publications(&self) -> HashMap { + self.inner.track_publications.read().clone() } pub(crate) async fn add_subscribed_media_track( @@ -206,7 +206,7 @@ impl RemoteParticipant { } // remove tracks that are no longer valid - let tracks = self.inner.tracks.read().clone(); + let tracks = self.inner.track_publications.read().clone(); for sid in tracks.keys() { if valid_tracks.contains(sid) { continue; @@ -372,7 +372,7 @@ impl RemoteParticipant { } pub fn get_track_publication(&self, sid: &TrackSid) -> Option { - self.inner.tracks.read().get(sid).map(|track| { + self.inner.track_publications.read().get(sid).map(|track| { if let TrackPublication::Remote(remote) = track { return remote.clone(); } @@ -400,9 +400,9 @@ impl RemoteParticipant { self.inner.info.read().speaking } - pub fn tracks(&self) -> HashMap { + pub fn track_publications(&self) -> HashMap { self.inner - .tracks + .track_publications .read() .clone() .into_iter()
LiveKit Ecosystem
Real-time SDKsReact Components · JavaScript · iOS/macOS · Android · Flutter · React Native · Rust · Python · Unity (web) · Unity (beta)
Server APIsNode.js · Golang · Ruby · Java/Kotlin · Python · Rust · PHP (community)
Real-time SDKsReact Components · Browser · iOS/macOS · Android · Flutter · React Native · Rust · Node.js · Python · Unity (web) · Unity (beta)
Server APIsNode.js · Golang · Ruby · Java/Kotlin · Python · Rust · PHP (community)
Agents FrameworksPython · Playground
ServicesLivekit server · Egress · Ingress · SIP
ResourcesDocs · Example apps · Cloud · Self-hosting · CLI