Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New FFI for data packets. Support receiving DTMF. #319

Merged
merged 3 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions livekit-ffi/protocol/room.proto
Original file line number Diff line number Diff line change
Expand Up @@ -252,14 +252,14 @@ message RoomEvent {
ParticipantMetadataChanged participant_metadata_changed = 15;
ParticipantNameChanged participant_name_changed = 16;
ConnectionQualityChanged connection_quality_changed = 17;
DataReceived data_received = 18;
ConnectionStateChanged connection_state_changed = 19;
// Connected connected = 20;
Disconnected disconnected = 21;
Reconnecting reconnecting = 22;
Reconnected reconnected = 23;
E2eeStateChanged e2ee_state_changed = 24;
RoomEOS eos = 25; // The stream of room events has ended
DataPacketReceived data_packet_received = 26;
}
}

Expand Down Expand Up @@ -355,11 +355,24 @@ message ConnectionQualityChanged {
ConnectionQuality quality = 2;
}

message DataReceived {
message UserPacket {
OwnedBuffer data = 1;
optional string participant_sid = 2; // Can be empty if the data is sent a server SDK
DataPacketKind kind = 3;
optional string topic = 4;
optional string topic = 2;
}

message SipDTMF {
uint32 code = 1;
optional string digit = 2;
}

message DataPacketReceived {
DataPacketKind kind = 1;
string participant_identity = 2; // Can be empty if the data is sent a server SDK
optional string participant_sid = 3 [deprecated=true]; // Can be empty if the data is sent a server SDK
oneof value {
UserPacket user = 4;
SipDTMF sip_dtmf = 5;
}
}

message ConnectionStateChanged { ConnectionState state = 1; }
Expand Down
47 changes: 38 additions & 9 deletions livekit-ffi/src/livekit.proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2215,7 +2215,7 @@ pub struct OwnedBuffer {
pub struct RoomEvent {
#[prost(uint64, tag="1")]
pub room_handle: u64,
#[prost(oneof="room_event::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 21, 22, 23, 24, 25")]
#[prost(oneof="room_event::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 19, 21, 22, 23, 24, 25, 26")]
pub message: ::core::option::Option<room_event::Message>,
}
/// Nested message and enum types in `RoomEvent`.
Expand Down Expand Up @@ -2255,8 +2255,6 @@ pub mod room_event {
ParticipantNameChanged(super::ParticipantNameChanged),
#[prost(message, tag="17")]
ConnectionQualityChanged(super::ConnectionQualityChanged),
#[prost(message, tag="18")]
DataReceived(super::DataReceived),
#[prost(message, tag="19")]
ConnectionStateChanged(super::ConnectionStateChanged),
/// Connected connected = 20;
Expand All @@ -2271,6 +2269,8 @@ pub mod room_event {
/// The stream of room events has ended
#[prost(message, tag="25")]
Eos(super::RoomEos),
#[prost(message, tag="26")]
DataPacketReceived(super::DataPacketReceived),
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down Expand Up @@ -2425,19 +2425,48 @@ pub struct ConnectionQualityChanged {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct DataReceived {
pub struct UserPacket {
#[prost(message, optional, tag="1")]
pub data: ::core::option::Option<OwnedBuffer>,
/// Can be empty if the data is sent a server SDK
#[prost(string, optional, tag="2")]
pub participant_sid: ::core::option::Option<::prost::alloc::string::String>,
#[prost(enumeration="DataPacketKind", tag="3")]
pub kind: i32,
#[prost(string, optional, tag="4")]
pub topic: ::core::option::Option<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SipDtmf {
#[prost(uint32, tag="1")]
pub code: u32,
#[prost(string, optional, tag="2")]
pub digit: ::core::option::Option<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct DataPacketReceived {
#[prost(enumeration="DataPacketKind", tag="1")]
pub kind: i32,
/// Can be empty if the data is sent a server SDK
#[prost(string, tag="2")]
pub participant_identity: ::prost::alloc::string::String,
/// Can be empty if the data is sent a server SDK
#[deprecated]
#[prost(string, optional, tag="3")]
pub participant_sid: ::core::option::Option<::prost::alloc::string::String>,
#[prost(oneof="data_packet_received::Value", tags="4, 5")]
pub value: ::core::option::Option<data_packet_received::Value>,
}
/// Nested message and enum types in `DataPacketReceived`.
pub mod data_packet_received {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Value {
#[prost(message, tag="4")]
User(super::UserPacket),
#[prost(message, tag="5")]
SipDtmf(super::SipDtmf),
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ConnectionStateChanged {
#[prost(enumeration="ConnectionState", tag="1")]
pub state: i32,
Expand Down
45 changes: 36 additions & 9 deletions livekit-ffi/src/server/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::{collections::HashSet, slice, sync::Arc, time::Duration};

use livekit::participant;
use livekit::prelude::*;
use parking_lot::Mutex;
use tokio::sync::{broadcast, mpsc, oneshot, Mutex as AsyncMutex};
Expand Down Expand Up @@ -645,17 +646,43 @@ async fn forward_event(
data_ptr: payload.as_ptr() as u64,
data_len: payload.len() as u64,
};
let (sid, identity) = match participant {
Some(p) => (Some(p.sid().to_string()), p.identity().to_string()),
None => (None, String::new()),
};

server.store_handle(handle_id, FfiDataBuffer { handle: handle_id, data: payload });
let _ = send_event(proto::room_event::Message::DataReceived(proto::DataReceived {
data: Some(proto::OwnedBuffer {
handle: Some(proto::FfiOwnedHandle { id: handle_id }),
data: Some(buffer_info),
}),
participant_sid: participant.map(|p| p.sid().to_string()),
kind: proto::DataPacketKind::from(kind).into(),
topic,
}));
let _ = send_event(proto::room_event::Message::DataPacketReceived(
proto::DataPacketReceived {
value: Some(proto::data_packet_received::Value::User(proto::UserPacket {
data: Some(proto::OwnedBuffer {
handle: Some(proto::FfiOwnedHandle { id: handle_id }),
data: Some(buffer_info),
}),
topic,
})),
participant_identity: identity,
participant_sid: sid,
kind: proto::DataPacketKind::from(kind).into(),
},
));
}
RoomEvent::SipDTMFReceived { code, digit, participant } => {
let (sid, identity) = match participant {
Some(p) => (Some(p.sid().to_string()), p.identity().to_string()),
None => (None, String::new()),
};
let _ = send_event(proto::room_event::Message::DataPacketReceived(
proto::DataPacketReceived {
value: Some(proto::data_packet_received::Value::SipDtmf(proto::SipDtmf {
code,
digit,
})),
participant_identity: identity,
participant_sid: sid,
kind: proto::DataPacketKind::KindReliable.into(),
},
));
}
RoomEvent::ConnectionStateChanged(state) => {
let _ = send_event(proto::room_event::Message::ConnectionStateChanged(
Expand Down
Loading