Skip to content

Commit

Permalink
Support identities and SIP DTMF in data packets.
Browse files Browse the repository at this point in the history
  • Loading branch information
dennwc committed Mar 28, 2024
1 parent 299382b commit 56545db
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 6 deletions.
1 change: 1 addition & 0 deletions livekit-ffi/src/server/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ impl RoomInner {
.into_iter()
.map(|str| str.try_into().unwrap())
.collect(),
destination_identities: Vec::new(), // TODO
},
async_id,
}) {
Expand Down
53 changes: 48 additions & 5 deletions livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ pub enum RoomEvent {
kind: DataPacketKind,
participant: Option<RemoteParticipant>,
},
SipDTMFReceived {
code: u32,
digit: Option<String>,
participant: Option<RemoteParticipant>,
},
E2eeStateChanged {
participant: Participant,
state: EncryptionState,
Expand Down Expand Up @@ -178,6 +183,7 @@ pub struct DataPacket {
pub topic: Option<String>,
pub kind: DataPacketKind,
pub destination_sids: Vec<ParticipantSid>,
pub destination_identities: Vec<ParticipantIdentity>,
}

impl Default for DataPacket {
Expand All @@ -187,6 +193,7 @@ impl Default for DataPacket {
topic: None,
kind: DataPacketKind::Reliable,
destination_sids: Vec::new(),
destination_identities: Vec::new(),
}
}
}
Expand Down Expand Up @@ -561,8 +568,11 @@ impl RoomSession {
self.handle_signal_restarted(join_response, tx)
}
EngineEvent::Disconnected { reason } => self.handle_disconnected(reason),
EngineEvent::Data { payload, topic, kind, participant_sid } => {
self.handle_data(payload, topic, kind, participant_sid);
EngineEvent::Data { payload, topic, kind, participant_sid, participant_identity } => {
self.handle_data(payload, topic, kind, participant_sid, participant_identity);
}
EngineEvent::SipDTMF { code, digit, participant_identity } => {
self.handle_dtmf(code, digit, participant_identity);
}
EngineEvent::SpeakersChanged { speakers } => self.handle_speakers_changed(speakers),
EngineEvent::ConnectionQuality { updates } => {
Expand Down Expand Up @@ -946,11 +956,21 @@ impl RoomSession {
topic: Option<String>,
kind: DataPacketKind,
participant_sid: Option<ParticipantSid>,
participant_identity: Option<ParticipantIdentity>,
) {
let participant =
participant_sid.as_ref().map(|sid| self.get_participant_by_sid(sid)).unwrap_or(None);
let mut participant = participant_identity
.as_ref()
.map(|identity| self.get_participant_by_identity(identity))
.unwrap_or(None);

if participant.is_none() {
participant = participant_sid
.as_ref()
.map(|sid| self.get_participant_by_sid(sid))
.unwrap_or(None);
}

if participant.is_none() && participant_sid.is_some() {
if participant.is_none() && (participant_identity.is_some() || participant_sid.is_some()) {
// We received a data packet from a participant that is not in the participants list
return;
}
Expand All @@ -963,6 +983,29 @@ impl RoomSession {
});
}

fn handle_dtmf(
&self,
code: u32,
digit: Option<String>,
participant_identity: Option<ParticipantIdentity>,
) {
let participant = participant_identity
.as_ref()
.map(|identity| self.get_participant_by_identity(identity))
.unwrap_or(None);

if participant.is_none() && participant_identity.is_some() {
// We received a DTMF from a participant that is not in the participants list
return;
}

self.dispatcher.dispatch(&RoomEvent::SipDTMFReceived {
code,
digit,
participant,
});
}

/// Create a new participant
/// Also add it to the participants list
fn create_participant(
Expand Down
4 changes: 4 additions & 0 deletions livekit/src/room/participant/local_participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,14 +271,18 @@ impl LocalParticipant {
}

pub async fn publish_data(&self, packet: DataPacket) -> RoomResult<()> {
let destination_identities: Vec<String> = packet.destination_identities.into_iter().map(Into::into).collect();
let data = proto::DataPacket {
kind: packet.kind as i32,
destination_identities: destination_identities.clone(),
value: Some(proto::data_packet::Value::User(proto::UserPacket {
payload: packet.payload,
topic: packet.topic,
destination_sids: packet.destination_sids.into_iter().map(Into::into).collect(),
destination_identities: destination_identities.clone(),
..Default::default()
})),
..Default::default()
};

self.inner.rtc_engine.publish_data(&data, packet.kind).await.map_err(Into::into)
Expand Down
17 changes: 16 additions & 1 deletion livekit/src/rtc_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::{
},
DataPacketKind,
};
use crate::prelude::ParticipantIdentity;

pub mod lk_runtime;
mod peer_transport;
Expand Down Expand Up @@ -95,10 +96,16 @@ pub enum EngineEvent {
},
Data {
participant_sid: Option<ParticipantSid>,
participant_identity: Option<ParticipantIdentity>,
payload: Vec<u8>,
topic: Option<String>,
kind: DataPacketKind,
},
SipDTMF {
participant_identity: Option<ParticipantIdentity>,
code: u32,
digit: Option<String>,
},
SpeakersChanged {
speakers: Vec<proto::SpeakerInfo>,
},
Expand Down Expand Up @@ -387,14 +394,22 @@ impl EngineInner {
});
}
}
SessionEvent::Data { participant_sid, payload, topic, kind } => {
SessionEvent::Data { participant_sid,participant_identity, payload, topic, kind } => {
let _ = self.engine_tx.send(EngineEvent::Data {
participant_sid,
participant_identity,
payload,
topic,
kind,
});
}
SessionEvent::SipDTMF { participant_identity, code, digit } => {
let _ = self.engine_tx.send(EngineEvent::SipDTMF {
participant_identity,
code,
digit,
});
}
SessionEvent::MediaTrack { track, stream, transceiver } => {
let _ = self.engine_tx.send(EngineEvent::MediaTrack { track, stream, transceiver });
}
Expand Down
35 changes: 35 additions & 0 deletions livekit/src/rtc_engine/rtc_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use crate::{
track::LocalTrack,
DataPacketKind,
};
use crate::id::ParticipantIdentity;

pub const ICE_CONNECT_TIMEOUT: Duration = Duration::from_secs(15);
pub const TRACK_PUBLISH_TIMEOUT: Duration = Duration::from_secs(10);
Expand All @@ -78,10 +79,17 @@ pub enum SessionEvent {
Data {
// None when the data comes from the ServerSDK (So no real participant)
participant_sid: Option<ParticipantSid>,
participant_identity: Option<ParticipantIdentity>,
payload: Vec<u8>,
topic: Option<String>,
kind: DataPacketKind,
},
SipDTMF {
// None when the data comes from the ServerSDK (So no real participant)
participant_identity: Option<ParticipantIdentity>,
code: u32,
digit: Option<String>,
},
MediaTrack {
track: MediaStreamTrack,
stream: MediaStream,
Expand Down Expand Up @@ -565,13 +573,40 @@ impl SessionInner {
.not()
.then_some(user.participant_sid.clone());

let participant_identity = if !data.participant_identity.is_empty() {
Some(data.participant_identity.clone())
} else if !user.participant_identity.is_empty() {
Some(user.participant_identity.clone())
} else {
None
};

let _ = self.emitter.send(SessionEvent::Data {
kind: data.kind().into(),
participant_sid: participant_sid.map(|s| s.try_into().unwrap()),
participant_identity: participant_identity.map(|s| s.try_into().unwrap()),
payload: user.payload.clone(),
topic: user.topic.clone(),
});
}
proto::data_packet::Value::SipDtmf(dtmf) => {
let participant_identity = data
.participant_identity
.is_empty()
.not()
.then_some(data.participant_identity.clone());
let digit = dtmf
.digit
.is_empty()
.not()
.then_some(dtmf.digit.clone());

let _ = self.emitter.send(SessionEvent::SipDTMF {
participant_identity: participant_identity.map(|s| s.try_into().unwrap()),
digit: digit.map(|s| s.try_into().unwrap()),
code: dtmf.code,
});
}
proto::data_packet::Value::Speaker(_) => {}
}
}
Expand Down

0 comments on commit 56545db

Please sign in to comment.