From f858454c53a7b0fb5df6b42567afeb062abd64cd Mon Sep 17 00:00:00 2001 From: lukasIO Date: Fri, 20 Dec 2024 12:51:28 +0100 Subject: [PATCH] fix compilation errors --- livekit-ffi/src/conversion/room.rs | 49 ++++++++++++++++++++++++++++++ livekit-ffi/src/server/requests.rs | 4 +-- livekit-ffi/src/server/room.rs | 34 +++++++++++++-------- 3 files changed, 73 insertions(+), 14 deletions(-) diff --git a/livekit-ffi/src/conversion/room.rs b/livekit-ffi/src/conversion/room.rs index 521c5c8c..c02b624a 100644 --- a/livekit-ffi/src/conversion/room.rs +++ b/livekit-ffi/src/conversion/room.rs @@ -317,6 +317,42 @@ impl From for proto::data_stream::Header } } +impl From for livekit_protocol::data_stream::Header { + fn from(msg: proto::data_stream::Header) -> Self { + let content_header = match msg.content_header { + Some(proto::data_stream::header::ContentHeader::TextHeader(text_header)) => { + Some(livekit_protocol::data_stream::header::ContentHeader::TextHeader( + livekit_protocol::data_stream::TextHeader { + operation_type: text_header.operation_type, + version: text_header.version, + reply_to_stream_id: text_header.reply_to_stream_id, + attached_stream_ids: text_header.attached_stream_ids, + generated: text_header.generated, + }, + )) + } + Some(proto::data_stream::header::ContentHeader::FileHeader(file_header)) => { + Some(livekit_protocol::data_stream::header::ContentHeader::FileHeader( + livekit_protocol::data_stream::FileHeader { file_name: file_header.file_name }, + )) + } + None => None, + }; + + livekit_protocol::data_stream::Header { + stream_id: msg.stream_id, + timestamp: msg.timestamp, + topic: msg.topic, + mime_type: msg.mime_type, + total_chunks: msg.total_chunks, + total_length: msg.total_length, + extensions: msg.extensions, + content_header, + encryption_type: 0, + } + } +} + impl From for proto::data_stream::Chunk { fn from(msg: livekit_protocol::data_stream::Chunk) -> Self { proto::data_stream::Chunk { @@ -329,3 +365,16 @@ impl From for proto::data_stream::Chunk { } } } + +impl From for livekit_protocol::data_stream::Chunk { + fn from(msg: proto::data_stream::Chunk) -> Self { + livekit_protocol::data_stream::Chunk { + stream_id: msg.stream_id, + content: msg.content, + complete: msg.complete, + chunk_index: msg.chunk_index, + version: msg.version, + iv: msg.iv, + } + } +} diff --git a/livekit-ffi/src/server/requests.rs b/livekit-ffi/src/server/requests.rs index 0521822e..274a9ac0 100644 --- a/livekit-ffi/src/server/requests.rs +++ b/livekit-ffi/src/server/requests.rs @@ -239,7 +239,7 @@ fn on_edit_chat_message( fn on_send_stream_header( server: &'static FfiServer, stream_header_message: proto::SendStreamHeaderRequest, -) { +) -> FfiResult { let ffi_participant = server .retrieve_handle::(stream_header_message.local_participant_handle)? .clone(); @@ -250,7 +250,7 @@ fn on_send_stream_header( fn on_send_stream_chunk( server: &'static FfiServer, stream_chunk_message: proto::SendStreamChunkRequest, -) { +) -> FfiResult { let ffi_participant = server .retrieve_handle::(stream_chunk_message.local_participant_handle)? .clone(); diff --git a/livekit-ffi/src/server/room.rs b/livekit-ffi/src/server/room.rs index 2d502099..7b6dc742 100644 --- a/livekit-ffi/src/server/room.rs +++ b/livekit-ffi/src/server/room.rs @@ -17,8 +17,8 @@ use std::time::Duration; use std::{collections::HashSet, slice, sync::Arc}; use livekit::prelude::*; -use livekit::proto as lk_proto; use livekit::ChatMessage; +use livekit_protocol as lk_proto; use parking_lot::Mutex; use tokio::sync::{broadcast, mpsc, oneshot, Mutex as AsyncMutex}; use tokio::task::JoinHandle; @@ -678,39 +678,49 @@ impl RoomInner { } pub fn send_stream_header( - &self, + self: &Arc, server: &'static FfiServer, send_stream_header: proto::SendStreamHeaderRequest, - ) { + ) -> proto::SendStreamHeaderResponse { let packet = lk_proto::DataPacket { - kind: lk_proto::DataPacketKind::Reliable, - participant_identity: send_stream_header.sender_identity, + kind: proto::DataPacketKind::KindReliable.into(), + participant_identity: send_stream_header.sender_identity.unwrap(), destination_identities: send_stream_header.destination_identities, - value: send_stream_header.header, + value: livekit_protocol::data_packet::Value::StreamHeader( + send_stream_header.header.into(), + ) + .into(), }; let async_id = server.next_id(); let inner = self.clone(); let handle = server.async_runtime.spawn(async move { let res = inner.room.local_participant().publish_raw_data(packet, true).await; }); + server.watch_panic(handle); + proto::SendStreamHeaderResponse { async_id } } pub fn send_stream_chunk( - &self, + self: &Arc, server: &'static FfiServer, send_stream_chunk: proto::SendStreamChunkRequest, - ) { + ) -> proto::SendStreamChunkResponse { let packet = lk_proto::DataPacket { - kind: lk_proto::DataPacketKind::Reliable, - participant_identity: send_stream_header.sender_identity, - destination_identities: send_stream_header.destination_identities, - value: send_stream_header.chunk, + kind: proto::DataPacketKind::KindReliable.into(), + participant_identity: send_stream_chunk.sender_identity.unwrap(), + destination_identities: send_stream_chunk.destination_identities, + value: livekit_protocol::data_packet::Value::StreamChunk( + send_stream_chunk.chunk.into(), + ) + .into(), }; let async_id = server.next_id(); let inner = self.clone(); let handle = server.async_runtime.spawn(async move { let res = inner.room.local_participant().publish_raw_data(packet, true).await; }); + server.watch_panic(handle); + proto::SendStreamChunkResponse { async_id } } pub fn store_rpc_method_invocation_waiter(