diff --git a/Cargo.lock b/Cargo.lock index 6a97b48ee4bc..3ee3ac187fac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4014,6 +4014,7 @@ dependencies = [ "document-features", "rand", "re_log", + "re_log_encoding", "re_log_types", "re_smart_channel", "tokio", diff --git a/crates/re_log_encoding/src/decoder.rs b/crates/re_log_encoding/src/decoder.rs index a4ba2592b757..ee6976a54dbf 100644 --- a/crates/re_log_encoding/src/decoder.rs +++ b/crates/re_log_encoding/src/decoder.rs @@ -48,6 +48,17 @@ pub enum DecodeError { MsgPack(#[from] rmp_serde::decode::Error), } +// ---------------------------------------------------------------------------- + +pub fn decode_bytes(bytes: &[u8]) -> Result, DecodeError> { + let decoder = Decoder::new(std::io::Cursor::new(bytes))?; + let mut msgs = vec![]; + for msg in decoder { + msgs.push(msg?); + } + Ok(msgs) +} + // ---------------------------------------------------------------------------- // native decode: diff --git a/crates/re_log_encoding/src/encoder.rs b/crates/re_log_encoding/src/encoder.rs index 6d444746b6e1..06aa629887b9 100644 --- a/crates/re_log_encoding/src/encoder.rs +++ b/crates/re_log_encoding/src/encoder.rs @@ -1,5 +1,7 @@ //! Encoding of [`LogMsg`]es as a binary stream, e.g. to store in an `.rrd` file, or send over network. +// TODO(1316): switch to using lz4flex - see https://github.com/rerun-io/rerun/issues/1316#issuecomment-1510967893 + use std::io::Write as _; use re_log_types::LogMsg; @@ -20,6 +22,24 @@ pub enum EncodeError { AlreadyFinished, } +// ---------------------------------------------------------------------------- + +pub fn encode_to_bytes<'a>( + msgs: impl IntoIterator, +) -> Result, EncodeError> { + let mut bytes: Vec = vec![]; + { + let mut encoder = Encoder::new(std::io::Cursor::new(&mut bytes))?; + for msg in msgs { + encoder.append(msg)?; + } + encoder.finish()?; + } + Ok(bytes) +} + +// ---------------------------------------------------------------------------- + /// Encode a stream of [`LogMsg`] into an `.rrd` file. pub struct Encoder { /// Set to None when finished. diff --git a/crates/re_sdk_comms/Cargo.toml b/crates/re_sdk_comms/Cargo.toml index 485b13488d72..a7c5842c9107 100644 --- a/crates/re_sdk_comms/Cargo.toml +++ b/crates/re_sdk_comms/Cargo.toml @@ -26,6 +26,7 @@ server = [] [dependencies] re_log.workspace = true +re_log_encoding.workspace = true re_log_types = { workspace = true, features = ["serde"] } re_smart_channel.workspace = true diff --git a/crates/re_sdk_comms/src/buffered_client.rs b/crates/re_sdk_comms/src/buffered_client.rs index 797a4a23b34e..d12b49dd6974 100644 --- a/crates/re_sdk_comms/src/buffered_client.rs +++ b/crates/re_sdk_comms/src/buffered_client.rs @@ -188,26 +188,35 @@ fn msg_encode( loop { select! { recv(msg_rx) -> msg_msg => { - if let Ok(msg_msg) = msg_msg { - let packet_msg = match &msg_msg { - MsgMsg::LogMsg(log_msg) => { - let packet = crate::encode_log_msg(log_msg); - re_log::trace!("Encoded message of size {}", packet.len()); - PacketMsg::Packet(packet) + let Ok(msg_msg) = msg_msg else { + return; // channel has closed + }; + + let packet_msg = match &msg_msg { + MsgMsg::LogMsg(log_msg) => { + match re_log_encoding::encoder::encode_to_bytes(std::iter::once(log_msg)) { + Ok(packet) => { + re_log::trace!("Encoded message of size {}", packet.len()); + Some(PacketMsg::Packet(packet)) + } + Err(err) => { + re_log::error_once!("Failed to encode log message: {err}"); + None + } } - MsgMsg::Flush => PacketMsg::Flush, - }; + } + MsgMsg::Flush => Some(PacketMsg::Flush), + }; + if let Some(packet_msg) = packet_msg { if packet_tx.send(packet_msg).is_err() { re_log::error!("Failed to send message to tcp_sender thread. Likely a shutdown race-condition."); return; } - if msg_drop_tx.send(msg_msg).is_err() { - re_log::error!("Failed to send message to msg_drop thread. Likely a shutdown race-condition"); - return; - } - } else { - return; // channel has closed + } + if msg_drop_tx.send(msg_msg).is_err() { + re_log::error!("Failed to send message to msg_drop thread. Likely a shutdown race-condition"); + return; } } recv(quit_rx) -> _quit_msg => { diff --git a/crates/re_sdk_comms/src/lib.rs b/crates/re_sdk_comms/src/lib.rs index 210c2881069f..44c61432af9b 100644 --- a/crates/re_sdk_comms/src/lib.rs +++ b/crates/re_sdk_comms/src/lib.rs @@ -19,8 +19,6 @@ mod server; #[cfg(feature = "server")] pub use server::{serve, ServerOptions}; -use re_log_types::LogMsg; - pub type Result = anyhow::Result; pub const PROTOCOL_VERSION: u16 = 0; @@ -31,26 +29,3 @@ pub const DEFAULT_SERVER_PORT: u16 = 9876; pub fn default_server_addr() -> std::net::SocketAddr { std::net::SocketAddr::from(([127, 0, 0, 1], DEFAULT_SERVER_PORT)) } - -const PREFIX: [u8; 4] = *b"RR00"; - -pub fn encode_log_msg(log_msg: &LogMsg) -> Vec { - use bincode::Options as _; - let mut bytes = PREFIX.to_vec(); - bincode::DefaultOptions::new() - .serialize_into(&mut bytes, log_msg) - .unwrap(); - bytes -} - -pub fn decode_log_msg(data: &[u8]) -> Result { - let payload = data - .strip_prefix(&PREFIX) - .ok_or_else(|| anyhow::format_err!("Message didn't start with the correct prefix"))?; - - use anyhow::Context as _; - use bincode::Options as _; - bincode::DefaultOptions::new() - .deserialize(payload) - .context("bincode") -} diff --git a/crates/re_sdk_comms/src/server.rs b/crates/re_sdk_comms/src/server.rs index 75f766d40b7c..682bddbe05a1 100644 --- a/crates/re_sdk_comms/src/server.rs +++ b/crates/re_sdk_comms/src/server.rs @@ -151,25 +151,25 @@ async fn run_client( packet.resize(packet_size as usize, 0_u8); stream.read_exact(&mut packet).await?; - re_log::trace!("Received log message of size {packet_size}."); + re_log::trace!("Received packet of size {packet_size}."); congestion_manager.register_latency(tx.latency_sec()); - let msg = crate::decode_log_msg(&packet)?; - - if matches!(msg, LogMsg::Goodbye(_)) { - re_log::debug!("Received goodbye message."); - tx.send(msg)?; - return Ok(()); - } + for msg in re_log_encoding::decoder::decode_bytes(&packet)? { + if matches!(msg, LogMsg::Goodbye(_)) { + re_log::debug!("Received goodbye message."); + tx.send(msg)?; + return Ok(()); + } - if congestion_manager.should_send(&msg) { - tx.send(msg)?; - } else { - re_log::warn_once!( - "Input latency is over the max ({} s) - dropping packets.", - options.max_latency_sec - ); + if congestion_manager.should_send(&msg) { + tx.send(msg)?; + } else { + re_log::warn_once!( + "Input latency is over the max ({} s) - dropping packets.", + options.max_latency_sec + ); + } } } }