diff --git a/crates/re_data_store/examples/memory_usage.rs b/crates/re_data_store/examples/memory_usage.rs index 3365a06cf3d0..5010965d10ab 100644 --- a/crates/re_data_store/examples/memory_usage.rs +++ b/crates/re_data_store/examples/memory_usage.rs @@ -65,7 +65,9 @@ fn log_messages() { fn encode_log_msg(log_msg: &LogMsg) -> Vec { let mut bytes = vec![]; - re_log_encoding::encoder::encode(std::iter::once(log_msg), &mut bytes).unwrap(); + let encoding_options = re_log_encoding::EncodingOptions::COMPRESSED; + re_log_encoding::encoder::encode(encoding_options, std::iter::once(log_msg), &mut bytes) + .unwrap(); bytes } diff --git a/crates/re_log_encoding/benches/msg_encode_benchmark.rs b/crates/re_log_encoding/benches/msg_encode_benchmark.rs index dd22adc50697..6750fc985e72 100644 --- a/crates/re_log_encoding/benches/msg_encode_benchmark.rs +++ b/crates/re_log_encoding/benches/msg_encode_benchmark.rs @@ -27,8 +27,9 @@ criterion_group!( criterion_main!(benches); fn encode_log_msgs(messages: &[LogMsg]) -> Vec { + let encoding_options = re_log_encoding::EncodingOptions::COMPRESSED; let mut bytes = vec![]; - re_log_encoding::encoder::encode(messages.iter(), &mut bytes).unwrap(); + re_log_encoding::encoder::encode(encoding_options, messages.iter(), &mut bytes).unwrap(); assert!(bytes.len() > messages.len()); bytes } diff --git a/crates/re_log_encoding/src/decoder.rs b/crates/re_log_encoding/src/decoder.rs index 44f77720cec4..568ef06df25b 100644 --- a/crates/re_log_encoding/src/decoder.rs +++ b/crates/re_log_encoding/src/decoder.rs @@ -2,6 +2,8 @@ use re_log_types::LogMsg; +use crate::{Compression, EncodingOptions, Serializer}; + // ---------------------------------------------------------------------------- fn warn_on_version_mismatch(encoded_version: [u8; 4]) { @@ -29,6 +31,12 @@ pub enum DecodeError { #[error("Not an .rrd file")] NotAnRrd, + #[error("Found an .rrd file from a Rerun version from 0.5.1 or earlier")] + OldRrdVersion, + + #[error("Failed to decode the options: {0}")] + Options(#[from] crate::OptionsError), + #[error("Failed to read: {0}")] Read(std::io::Error), @@ -52,8 +60,33 @@ pub fn decode_bytes(bytes: &[u8]) -> Result, DecodeError> { // ---------------------------------------------------------------------------- +enum Decompressor { + Uncompressed(R), + Lz4(lz4_flex::frame::FrameDecoder), +} + +impl Decompressor { + fn new(compression: Compression, read: R) -> Self { + match compression { + Compression::Off => Self::Uncompressed(read), + Compression::LZ4 => Self::Lz4(lz4_flex::frame::FrameDecoder::new(read)), + } + } + + pub fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), DecodeError> { + use std::io::Read as _; + + match self { + Decompressor::Uncompressed(read) => read.read_exact(buf).map_err(DecodeError::Read), + Decompressor::Lz4(lz4) => lz4.read_exact(buf).map_err(DecodeError::Lz4), + } + } +} + +// ---------------------------------------------------------------------------- + pub struct Decoder { - lz4_decoder: lz4_flex::frame::FrameDecoder, + decompressor: Decompressor, buffer: Vec, } @@ -61,17 +94,36 @@ impl Decoder { pub fn new(mut read: R) -> Result { crate::profile_function!(); - let mut header = [0_u8; 4]; - read.read_exact(&mut header).map_err(DecodeError::Read)?; - if &header != b"RRF0" { - return Err(DecodeError::NotAnRrd); + { + let mut header = [0_u8; 4]; + read.read_exact(&mut header).map_err(DecodeError::Read)?; + if &header == b"RRF0" { + return Err(DecodeError::OldRrdVersion); + } else if &header != crate::RRD_HEADER { + return Err(DecodeError::NotAnRrd); + } + } + + { + let mut version_bytes = [0_u8; 4]; + read.read_exact(&mut version_bytes) + .map_err(DecodeError::Read)?; + warn_on_version_mismatch(version_bytes); + } + + let options = { + let mut options_bytes = [0_u8; 4]; + read.read_exact(&mut options_bytes) + .map_err(DecodeError::Read)?; + EncodingOptions::from_bytes(options_bytes)? + }; + + match options.serializer { + Serializer::MsgPack => {} } - read.read_exact(&mut header).map_err(DecodeError::Read)?; - warn_on_version_mismatch(header); - let lz4_decoder = lz4_flex::frame::FrameDecoder::new(read); Ok(Self { - lz4_decoder, + decompressor: Decompressor::new(options.compression, read), buffer: vec![], }) } @@ -82,18 +134,17 @@ impl Iterator for Decoder { fn next(&mut self) -> Option { crate::profile_function!(); - use std::io::Read as _; let mut len = [0_u8; 8]; - self.lz4_decoder.read_exact(&mut len).ok()?; + self.decompressor.read_exact(&mut len).ok()?; let len = u64::from_le_bytes(len) as usize; self.buffer.resize(len, 0); { crate::profile_scope!("lz4"); - if let Err(err) = self.lz4_decoder.read_exact(&mut self.buffer) { - return Some(Err(DecodeError::Lz4(err))); + if let Err(err) = self.decompressor.read_exact(&mut self.buffer) { + return Some(Err(err)); } } @@ -130,13 +181,26 @@ fn test_encode_decode() { }, })]; - let mut file = vec![]; - crate::encoder::encode(messages.iter(), &mut file).unwrap(); + let options = [ + EncodingOptions { + compression: Compression::Off, + serializer: Serializer::MsgPack, + }, + EncodingOptions { + compression: Compression::LZ4, + serializer: Serializer::MsgPack, + }, + ]; + + for options in options { + let mut file = vec![]; + crate::encoder::encode(options, messages.iter(), &mut file).unwrap(); - let decoded_messages = Decoder::new(&mut file.as_slice()) - .unwrap() - .collect::, DecodeError>>() - .unwrap(); + let decoded_messages = Decoder::new(&mut file.as_slice()) + .unwrap() + .collect::, DecodeError>>() + .unwrap(); - assert_eq!(messages, decoded_messages); + assert_eq!(messages, decoded_messages); + } } diff --git a/crates/re_log_encoding/src/encoder.rs b/crates/re_log_encoding/src/encoder.rs index ce889bf2b149..a7d1e9cd3983 100644 --- a/crates/re_log_encoding/src/encoder.rs +++ b/crates/re_log_encoding/src/encoder.rs @@ -4,6 +4,10 @@ use std::io::Write as _; use re_log_types::LogMsg; +use crate::{Compression, EncodingOptions}; + +// ---------------------------------------------------------------------------- + /// On failure to encode or serialize a [`LogMsg`]. #[derive(thiserror::Error, Debug)] pub enum EncodeError { @@ -26,11 +30,12 @@ pub enum EncodeError { // ---------------------------------------------------------------------------- pub fn encode_to_bytes<'a>( + options: EncodingOptions, msgs: impl IntoIterator, ) -> Result, EncodeError> { let mut bytes: Vec = vec![]; { - let mut encoder = Encoder::new(std::io::Cursor::new(&mut bytes))?; + let mut encoder = Encoder::new(options, std::io::Cursor::new(&mut bytes))?; for msg in msgs { encoder.append(msg)?; } @@ -41,14 +46,42 @@ pub fn encode_to_bytes<'a>( // ---------------------------------------------------------------------------- -/// Encode a stream of [`LogMsg`] into an `.rrd` file. -pub struct Encoder { - /// Set to None when finished. +struct Lz4Compressor { + /// `None` if finished. lz4_encoder: Option>, - buffer: Vec, } -impl Drop for Encoder { +impl Lz4Compressor { + pub fn new(write: W) -> Self { + Self { + lz4_encoder: Some(lz4_flex::frame::FrameEncoder::new(write)), + } + } + + pub fn write(&mut self, bytes: &[u8]) -> Result<(), EncodeError> { + if let Some(lz4_encoder) = &mut self.lz4_encoder { + lz4_encoder + .write_all(bytes) + .map_err(EncodeError::Lz4Write)?; + + Ok(()) + } else { + Err(EncodeError::AlreadyFinished) + } + } + + pub fn finish(&mut self) -> Result<(), EncodeError> { + if let Some(lz4_encoder) = self.lz4_encoder.take() { + lz4_encoder.finish().map_err(EncodeError::Lz4Finish)?; + Ok(()) + } else { + re_log::warn!("Encoder::finish called twice"); + Ok(()) + } + } +} + +impl Drop for Lz4Compressor { fn drop(&mut self) { if self.lz4_encoder.is_some() { re_log::warn!("Encoder dropped without calling finish()!"); @@ -59,62 +92,95 @@ impl Drop for Encoder { } } +#[allow(clippy::large_enum_variant)] +enum Compressor { + Off(W), + Lz4(Lz4Compressor), +} + +impl Compressor { + pub fn new(compression: Compression, write: W) -> Self { + match compression { + Compression::Off => Self::Off(write), + Compression::LZ4 => Self::Lz4(Lz4Compressor::new(write)), + } + } + + pub fn write(&mut self, bytes: &[u8]) -> Result<(), EncodeError> { + let len = (bytes.len() as u64).to_le_bytes(); + + match self { + Compressor::Off(write) => { + write.write_all(&len).map_err(EncodeError::Write)?; + write.write_all(bytes).map_err(EncodeError::Write) + } + Compressor::Lz4(lz4) => { + lz4.write(&len)?; + lz4.write(bytes) + } + } + } + + pub fn finish(&mut self) -> Result<(), EncodeError> { + match self { + Compressor::Off(_) => Ok(()), + Compressor::Lz4(lz4) => lz4.finish(), + } + } +} + +// ---------------------------------------------------------------------------- + +/// Encode a stream of [`LogMsg`] into an `.rrd` file. +pub struct Encoder { + compressor: Compressor, + buffer: Vec, +} + impl Encoder { - pub fn new(mut write: W) -> Result { + pub fn new(options: EncodingOptions, mut write: W) -> Result { let rerun_version = re_build_info::CrateVersion::parse(env!("CARGO_PKG_VERSION")); - write.write_all(b"RRF0").map_err(EncodeError::Write)?; + write + .write_all(crate::RRD_HEADER) + .map_err(EncodeError::Write)?; write .write_all(&rerun_version.to_bytes()) .map_err(EncodeError::Write)?; + write + .write_all(&options.to_bytes()) + .map_err(EncodeError::Write)?; - let lz4_encoder = lz4_flex::frame::FrameEncoder::new(write); + match options.serializer { + crate::Serializer::MsgPack => {} + } Ok(Self { - lz4_encoder: Some(lz4_encoder), + compressor: Compressor::new(options.compression, write), buffer: vec![], }) } pub fn append(&mut self, message: &LogMsg) -> Result<(), EncodeError> { - let Self { - lz4_encoder, - buffer, - } = self; + let Self { compressor, buffer } = self; - if let Some(lz4_encoder) = lz4_encoder { - buffer.clear(); - rmp_serde::encode::write_named(buffer, message)?; + buffer.clear(); + rmp_serde::encode::write_named(buffer, message)?; - lz4_encoder - .write_all(&(buffer.len() as u64).to_le_bytes()) - .map_err(EncodeError::Lz4Write)?; - lz4_encoder - .write_all(buffer) - .map_err(EncodeError::Lz4Write)?; - - Ok(()) - } else { - Err(EncodeError::AlreadyFinished) - } + compressor.write(buffer) } pub fn finish(&mut self) -> Result<(), EncodeError> { - if let Some(lz4_encoder) = self.lz4_encoder.take() { - lz4_encoder.finish().map_err(EncodeError::Lz4Finish)?; - Ok(()) - } else { - re_log::warn!("Encoder::finish called twice"); - Ok(()) - } + self.compressor.finish() } } pub fn encode<'a>( + options: EncodingOptions, messages: impl Iterator, write: &mut impl std::io::Write, ) -> Result<(), EncodeError> { - let mut encoder = Encoder::new(write)?; + let mut encoder = Encoder::new(options, write)?; for message in messages { encoder.append(message)?; } @@ -122,10 +188,11 @@ pub fn encode<'a>( } pub fn encode_owned( + options: EncodingOptions, messages: impl Iterator, write: impl std::io::Write, ) -> Result<(), EncodeError> { - let mut encoder = Encoder::new(write)?; + let mut encoder = Encoder::new(options, write)?; for message in messages { encoder.append(&message)?; } diff --git a/crates/re_log_encoding/src/file_sink.rs b/crates/re_log_encoding/src/file_sink.rs index 121383553eb2..d501bc491e03 100644 --- a/crates/re_log_encoding/src/file_sink.rs +++ b/crates/re_log_encoding/src/file_sink.rs @@ -39,6 +39,9 @@ impl Drop for FileSink { impl FileSink { /// Start writing log messages to a file at the given path. pub fn new(path: impl Into) -> Result { + // We always compress on disk + let encoding_options = crate::EncodingOptions::COMPRESSED; + let (tx, rx) = std::sync::mpsc::channel(); let path = path.into(); @@ -47,7 +50,7 @@ impl FileSink { let file = std::fs::File::create(&path) .map_err(|err| FileSinkError::CreateFile(path.clone(), err))?; - let mut encoder = crate::encoder::Encoder::new(file)?; + let mut encoder = crate::encoder::Encoder::new(encoding_options, file)?; let join_handle = std::thread::Builder::new() .name("file_writer".into()) diff --git a/crates/re_log_encoding/src/lib.rs b/crates/re_log_encoding/src/lib.rs index 16b883448803..4aec0ae79e79 100644 --- a/crates/re_log_encoding/src/lib.rs +++ b/crates/re_log_encoding/src/lib.rs @@ -19,6 +19,89 @@ pub mod stream_rrd_from_http; #[cfg(not(target_arch = "wasm32"))] pub use file_sink::{FileSink, FileSinkError}; +// ---------------------------------------------------------------------------- + +const RRD_HEADER: &[u8; 4] = b"RRF1"; + +// ---------------------------------------------------------------------------- + +/// Compression format used. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +#[repr(u8)] +pub enum Compression { + Off = 0, + + /// Very fast compression and decompression, but not very good compression ratio. + LZ4 = 1, +} + +/// How we serialize the data +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +#[repr(u8)] +pub enum Serializer { + MsgPack = 1, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct EncodingOptions { + pub compression: Compression, + pub serializer: Serializer, +} + +impl EncodingOptions { + pub const UNCOMPRESSED: Self = Self { + compression: Compression::Off, + serializer: Serializer::MsgPack, + }; + pub const COMPRESSED: Self = Self { + compression: Compression::LZ4, + serializer: Serializer::MsgPack, + }; + + pub fn from_bytes(bytes: [u8; 4]) -> Result { + match bytes { + [compression, serializer, 0, 0] => { + let compression = match compression { + 0 => Compression::Off, + 1 => Compression::LZ4, + _ => return Err(OptionsError::UnknownCompression(compression)), + }; + let serializer = match serializer { + 1 => Serializer::MsgPack, + _ => return Err(OptionsError::UnknownSerializer(serializer)), + }; + Ok(Self { + compression, + serializer, + }) + } + _ => Err(OptionsError::UnknownReservedBytes), + } + } + + pub fn to_bytes(&self) -> [u8; 4] { + [ + self.compression as u8, + self.serializer as u8, + 0, // reserved + 0, // reserved + ] + } +} + +/// On failure to decode [`EncodingOptions`] +#[derive(thiserror::Error, Debug)] +pub enum OptionsError { + #[error("Reserved bytes not zero")] + UnknownReservedBytes, + + #[error("Unknown compression: {0}")] + UnknownCompression(u8), + + #[error("Unknown serializer: {0}")] + UnknownSerializer(u8), +} + // --------------------------------------------------------------------------- /// Profiling macro for feature "puffin" diff --git a/crates/re_sdk/src/log_sink.rs b/crates/re_sdk/src/log_sink.rs index cd386abafde5..5bf65cb9cb59 100644 --- a/crates/re_sdk/src/log_sink.rs +++ b/crates/re_sdk/src/log_sink.rs @@ -135,7 +135,9 @@ impl MemorySinkStorage { let mut buffer = std::io::Cursor::new(Vec::new()); { - let mut encoder = re_log_encoding::encoder::Encoder::new(&mut buffer)?; + let encoding_options = re_log_encoding::EncodingOptions::COMPRESSED; + let mut encoder = + re_log_encoding::encoder::Encoder::new(encoding_options, &mut buffer)?; for sink in sinks { for message in sink.read().iter() { encoder.append(message)?; diff --git a/crates/re_sdk_comms/src/buffered_client.rs b/crates/re_sdk_comms/src/buffered_client.rs index efddace8c3be..e9105b6d804b 100644 --- a/crates/re_sdk_comms/src/buffered_client.rs +++ b/crates/re_sdk_comms/src/buffered_client.rs @@ -64,10 +64,21 @@ impl Client { let (send_quit_tx, send_quit_rx) = crossbeam::channel::unbounded(); let (drop_quit_tx, drop_quit_rx) = crossbeam::channel::unbounded(); + // We don't compress the stream becausew e assume the SDK + // and server are on the same machine and compression + // can be expensive, see https://github.com/rerun-io/rerun/issues/2216 + let encoding_options = re_log_encoding::EncodingOptions::UNCOMPRESSED; + let encode_join = std::thread::Builder::new() .name("msg_encoder".into()) .spawn(move || { - msg_encode(&msg_rx, &msg_drop_tx, &encode_quit_rx, &packet_tx); + msg_encode( + encoding_options, + &msg_rx, + &msg_drop_tx, + &encode_quit_rx, + &packet_tx, + ); re_log::debug!("Shutting down msg encoder thread"); }) .expect("Failed to spawn thread"); @@ -173,6 +184,7 @@ fn msg_drop(msg_drop_rx: &Receiver, quit_rx: &Receiver) { } fn msg_encode( + encoding_options: re_log_encoding::EncodingOptions, msg_rx: &Receiver, msg_drop_tx: &Sender, quit_rx: &Receiver, @@ -187,7 +199,7 @@ fn msg_encode( let packet_msg = match &msg_msg { MsgMsg::LogMsg(log_msg) => { - match re_log_encoding::encoder::encode_to_bytes(std::iter::once(log_msg)) { + match re_log_encoding::encoder::encode_to_bytes(encoding_options, std::iter::once(log_msg)) { Ok(packet) => { re_log::trace!("Encoded message of size {}", packet.len()); Some(PacketMsg::Packet(packet)) diff --git a/crates/re_viewer/src/app.rs b/crates/re_viewer/src/app.rs index a8beb50be70c..ee795c711a96 100644 --- a/crates/re_viewer/src/app.rs +++ b/crates/re_viewer/src/app.rs @@ -2035,7 +2035,8 @@ fn save_database_to_file( let file = std::fs::File::create(path.as_path()) .with_context(|| format!("Failed to create file at {path:?}"))?; - re_log_encoding::encoder::encode_owned(msgs, file) + let encoding_options = re_log_encoding::EncodingOptions::COMPRESSED; + re_log_encoding::encoder::encode_owned(encoding_options, msgs, file) .map(|_| path) .context("Message encode") }) diff --git a/crates/rerun/src/run.rs b/crates/rerun/src/run.rs index e71c6de9e855..5eeb4af6aa34 100644 --- a/crates/rerun/src/run.rs +++ b/crates/rerun/src/run.rs @@ -811,9 +811,10 @@ fn stream_to_rrd( re_log::info!("Saving incoming log stream to {path:?}. Abort with Ctrl-C."); + let encoding_options = re_log_encoding::EncodingOptions::COMPRESSED; let file = std::fs::File::create(path).map_err(|err| FileSinkError::CreateFile(path.clone(), err))?; - let mut encoder = re_log_encoding::encoder::Encoder::new(file)?; + let mut encoder = re_log_encoding::encoder::Encoder::new(encoding_options, file)?; loop { match rx.recv() {