Skip to content

Commit

Permalink
fix compilation errors
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasIO committed Dec 20, 2024
1 parent 9f6f23d commit f858454
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 14 deletions.
49 changes: 49 additions & 0 deletions livekit-ffi/src/conversion/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,42 @@ impl From<livekit_protocol::data_stream::Header> for proto::data_stream::Header
}
}

impl From<proto::data_stream::Header> for livekit_protocol::data_stream::Header {
fn from(msg: proto::data_stream::Header) -> Self {
let content_header = match msg.content_header {
Some(proto::data_stream::header::ContentHeader::TextHeader(text_header)) => {
Some(livekit_protocol::data_stream::header::ContentHeader::TextHeader(
livekit_protocol::data_stream::TextHeader {
operation_type: text_header.operation_type,
version: text_header.version,
reply_to_stream_id: text_header.reply_to_stream_id,
attached_stream_ids: text_header.attached_stream_ids,
generated: text_header.generated,
},
))
}
Some(proto::data_stream::header::ContentHeader::FileHeader(file_header)) => {
Some(livekit_protocol::data_stream::header::ContentHeader::FileHeader(
livekit_protocol::data_stream::FileHeader { file_name: file_header.file_name },
))
}
None => None,
};

livekit_protocol::data_stream::Header {
stream_id: msg.stream_id,
timestamp: msg.timestamp,
topic: msg.topic,
mime_type: msg.mime_type,
total_chunks: msg.total_chunks,
total_length: msg.total_length,
extensions: msg.extensions,
content_header,
encryption_type: 0,
}
}
}

impl From<livekit_protocol::data_stream::Chunk> for proto::data_stream::Chunk {
fn from(msg: livekit_protocol::data_stream::Chunk) -> Self {
proto::data_stream::Chunk {
Expand All @@ -329,3 +365,16 @@ impl From<livekit_protocol::data_stream::Chunk> for proto::data_stream::Chunk {
}
}
}

impl From<proto::data_stream::Chunk> for livekit_protocol::data_stream::Chunk {
fn from(msg: proto::data_stream::Chunk) -> Self {
livekit_protocol::data_stream::Chunk {
stream_id: msg.stream_id,
content: msg.content,
complete: msg.complete,
chunk_index: msg.chunk_index,
version: msg.version,
iv: msg.iv,
}
}
}
4 changes: 2 additions & 2 deletions livekit-ffi/src/server/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ fn on_edit_chat_message(
fn on_send_stream_header(
server: &'static FfiServer,
stream_header_message: proto::SendStreamHeaderRequest,
) {
) -> FfiResult<proto::SendStreamHeaderResponse> {
let ffi_participant = server
.retrieve_handle::<FfiParticipant>(stream_header_message.local_participant_handle)?
.clone();
Expand All @@ -250,7 +250,7 @@ fn on_send_stream_header(
fn on_send_stream_chunk(
server: &'static FfiServer,
stream_chunk_message: proto::SendStreamChunkRequest,
) {
) -> FfiResult<proto::SendStreamChunkResponse> {
let ffi_participant = server
.retrieve_handle::<FfiParticipant>(stream_chunk_message.local_participant_handle)?
.clone();
Expand Down
34 changes: 22 additions & 12 deletions livekit-ffi/src/server/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use std::time::Duration;
use std::{collections::HashSet, slice, sync::Arc};

use livekit::prelude::*;
use livekit::proto as lk_proto;
use livekit::ChatMessage;
use livekit_protocol as lk_proto;
use parking_lot::Mutex;
use tokio::sync::{broadcast, mpsc, oneshot, Mutex as AsyncMutex};
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -678,39 +678,49 @@ impl RoomInner {
}

pub fn send_stream_header(
&self,
self: &Arc<Self>,
server: &'static FfiServer,
send_stream_header: proto::SendStreamHeaderRequest,
) {
) -> proto::SendStreamHeaderResponse {
let packet = lk_proto::DataPacket {
kind: lk_proto::DataPacketKind::Reliable,
participant_identity: send_stream_header.sender_identity,
kind: proto::DataPacketKind::KindReliable.into(),
participant_identity: send_stream_header.sender_identity.unwrap(),
destination_identities: send_stream_header.destination_identities,
value: send_stream_header.header,
value: livekit_protocol::data_packet::Value::StreamHeader(
send_stream_header.header.into(),
)
.into(),
};
let async_id = server.next_id();
let inner = self.clone();
let handle = server.async_runtime.spawn(async move {
let res = inner.room.local_participant().publish_raw_data(packet, true).await;
});
server.watch_panic(handle);
proto::SendStreamHeaderResponse { async_id }
}

pub fn send_stream_chunk(
&self,
self: &Arc<Self>,
server: &'static FfiServer,
send_stream_chunk: proto::SendStreamChunkRequest,
) {
) -> proto::SendStreamChunkResponse {
let packet = lk_proto::DataPacket {
kind: lk_proto::DataPacketKind::Reliable,
participant_identity: send_stream_header.sender_identity,
destination_identities: send_stream_header.destination_identities,
value: send_stream_header.chunk,
kind: proto::DataPacketKind::KindReliable.into(),
participant_identity: send_stream_chunk.sender_identity.unwrap(),
destination_identities: send_stream_chunk.destination_identities,
value: livekit_protocol::data_packet::Value::StreamChunk(
send_stream_chunk.chunk.into(),
)
.into(),
};
let async_id = server.next_id();
let inner = self.clone();
let handle = server.async_runtime.spawn(async move {
let res = inner.room.local_participant().publish_raw_data(packet, true).await;
});
server.watch_panic(handle);
proto::SendStreamChunkResponse { async_id }
}

pub fn store_rpc_method_invocation_waiter(
Expand Down

0 comments on commit f858454

Please sign in to comment.