diff --git a/Cargo.lock b/Cargo.lock index 891729bc94d94..c093f6b6f45f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2513,6 +2513,15 @@ dependencies = [ "log", ] +[[package]] +name = "lz4_flex" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b8c72594ac26bfd34f2d99dfced2edfaddfe8a476e3ff2ca0eb293d925c4f83" +dependencies = [ + "twox-hash", +] + [[package]] name = "macaw" version = "0.18.0" @@ -3837,6 +3846,7 @@ dependencies = [ "ehttp", "instant", "js-sys", + "lz4_flex", "mimalloc", "parking_lot 0.12.1", "puffin", @@ -3845,13 +3855,11 @@ dependencies = [ "re_log_types", "re_smart_channel", "rmp-serde", - "ruzstd", "serde_test", "thiserror", "wasm-bindgen", "wasm-bindgen-futures", "web-sys", - "zstd", ] [[package]] diff --git a/crates/re_log_encoding/Cargo.toml b/crates/re_log_encoding/Cargo.toml index b1c5c2943af62..b275ea660a070 100644 --- a/crates/re_log_encoding/Cargo.toml +++ b/crates/re_log_encoding/Cargo.toml @@ -20,10 +20,10 @@ all-features = true default = [] ## Enable loading data from an .rrd file. -decoder = ["dep:rmp-serde", "dep:zstd", "dep:ruzstd"] +decoder = ["dep:rmp-serde", "dep:lz4_flex"] # Enable encoding of log messages to an .rrd file/stream: -encoder = ["dep:rmp-serde", "dep:zstd"] +encoder = ["dep:rmp-serde", "dep:lz4_flex"] [dependencies] @@ -41,17 +41,16 @@ thiserror.workspace = true # Optional external dependencies: rmp-serde = { version = "1", optional = true } +lz4_flex = { version = "0.10", optional = true } # Native dependencies: [target.'cfg(not(target_arch = "wasm32"))'.dependencies] puffin.workspace = true -zstd = { version = "0.11.0", optional = true } # native only # Web dependencies: [target.'cfg(target_arch = "wasm32")'.dependencies] instant = { version = "0.1", features = ["wasm-bindgen"] } js-sys = "0.3" -ruzstd = { version = "0.3.0", optional = true } # works on wasm, in contrast to zstd wasm-bindgen = "0.2" wasm-bindgen-futures = "0.4" web-sys = { version = "0.3.52", features = ["Window"] } diff --git a/crates/re_log_encoding/src/decoder.rs b/crates/re_log_encoding/src/decoder.rs index ee6976a54dbf3..48f3565c0f831 100644 --- a/crates/re_log_encoding/src/decoder.rs +++ b/crates/re_log_encoding/src/decoder.rs @@ -32,17 +32,8 @@ pub enum DecodeError { #[error("Failed to read: {0}")] Read(std::io::Error), - #[cfg(not(target_arch = "wasm32"))] - #[error("Zstd error: {0}")] - Zstd(std::io::Error), - - #[cfg(target_arch = "wasm32")] - #[error("Zstd error: {0}")] - RuzstdInit(ruzstd::frame_decoder::FrameDecoderError), - - #[cfg(target_arch = "wasm32")] - #[error("Zstd read error: {0}")] - RuzstdRead(std::io::Error), + #[error("lz4 error: {0}")] + Lz4(std::io::Error), #[error("MsgPack error: {0}")] MsgPack(#[from] rmp_serde::decode::Error), @@ -60,74 +51,12 @@ pub fn decode_bytes(bytes: &[u8]) -> Result, DecodeError> { } // ---------------------------------------------------------------------------- -// native decode: - -#[cfg(not(target_arch = "wasm32"))] -pub struct Decoder<'r, R: std::io::BufRead> { - zdecoder: zstd::stream::Decoder<'r, R>, - buffer: Vec, -} - -#[cfg(not(target_arch = "wasm32"))] -impl<'r, R: std::io::Read> Decoder<'r, std::io::BufReader> { - pub fn new(mut read: R) -> Result { - crate::profile_function!(); - - let mut header = [0_u8; 4]; - read.read_exact(&mut header).map_err(DecodeError::Read)?; - if &header != b"RRF0" { - return Err(DecodeError::NotAnRrd); - } - read.read_exact(&mut header).map_err(DecodeError::Read)?; - warn_on_version_mismatch(header); - - let zdecoder = zstd::stream::read::Decoder::new(read).map_err(DecodeError::Zstd)?; - Ok(Self { - zdecoder, - buffer: vec![], - }) - } -} - -#[cfg(not(target_arch = "wasm32"))] -impl<'r, R: std::io::BufRead> Iterator for Decoder<'r, R> { - type Item = Result; - - fn next(&mut self) -> Option { - crate::profile_function!(); - use std::io::Read as _; - - let mut len = [0_u8; 8]; - self.zdecoder.read_exact(&mut len).ok()?; - let len = u64::from_le_bytes(len) as usize; - - self.buffer.resize(len, 0); - - { - crate::profile_scope!("zstd"); - if let Err(err) = self.zdecoder.read_exact(&mut self.buffer) { - return Some(Err(DecodeError::Zstd(err))); - } - } - - crate::profile_scope!("MsgPack deser"); - match rmp_serde::from_read(&mut self.buffer.as_slice()) { - Ok(msg) => Some(Ok(msg)), - Err(err) => Some(Err(err.into())), - } - } -} - -// ---------------------------------------------------------------------------- -// wasm decode: -#[cfg(target_arch = "wasm32")] pub struct Decoder { - zdecoder: ruzstd::StreamingDecoder, + lz4_decoder: lz4_flex::frame::FrameDecoder, buffer: Vec, } -#[cfg(target_arch = "wasm32")] impl Decoder { pub fn new(mut read: R) -> Result { crate::profile_function!(); @@ -140,15 +69,14 @@ impl Decoder { read.read_exact(&mut header).map_err(DecodeError::Read)?; warn_on_version_mismatch(header); - let zdecoder = ruzstd::StreamingDecoder::new(read).map_err(DecodeError::RuzstdInit)?; + let lz4_decoder = lz4_flex::frame::FrameDecoder::new(read); Ok(Self { - zdecoder, + lz4_decoder, buffer: vec![], }) } } -#[cfg(target_arch = "wasm32")] impl Iterator for Decoder { type Item = Result; @@ -157,15 +85,15 @@ impl Iterator for Decoder { use std::io::Read as _; let mut len = [0_u8; 8]; - self.zdecoder.read_exact(&mut len).ok()?; + self.lz4_decoder.read_exact(&mut len).ok()?; let len = u64::from_le_bytes(len) as usize; self.buffer.resize(len, 0); { - crate::profile_scope!("ruzstd"); - if let Err(err) = self.zdecoder.read_exact(&mut self.buffer) { - return Some(Err(DecodeError::RuzstdRead(err))); + crate::profile_scope!("lz4"); + if let Err(err) = self.lz4_decoder.read_exact(&mut self.buffer) { + return Some(Err(DecodeError::Lz4(err))); } } diff --git a/crates/re_log_encoding/src/encoder.rs b/crates/re_log_encoding/src/encoder.rs index 06aa629887b95..556f39285acf8 100644 --- a/crates/re_log_encoding/src/encoder.rs +++ b/crates/re_log_encoding/src/encoder.rs @@ -12,8 +12,11 @@ pub enum EncodeError { #[error("Failed to write: {0}")] Write(std::io::Error), - #[error("Zstd error: {0}")] - Zstd(std::io::Error), + #[error("lz4 error: {0}")] + Lz4Write(std::io::Error), + + #[error("lz4 error: {0}")] + Lz4Finish(lz4_flex::frame::Error), #[error("MsgPack error: {0}")] MsgPack(#[from] rmp_serde::encode::Error), @@ -43,13 +46,13 @@ pub fn encode_to_bytes<'a>( /// Encode a stream of [`LogMsg`] into an `.rrd` file. pub struct Encoder { /// Set to None when finished. - zstd_encoder: Option>, + lz4_encoder: Option>, buffer: Vec, } impl Drop for Encoder { fn drop(&mut self) { - if self.zstd_encoder.is_some() { + if self.lz4_encoder.is_some() { re_log::warn!("Encoder dropped without calling finish()!"); if let Err(err) = self.finish() { re_log::error!("Failed to finish encoding: {err}"); @@ -67,29 +70,30 @@ impl Encoder { .write_all(&rerun_version.to_bytes()) .map_err(EncodeError::Write)?; - let level = 3; - let zstd_encoder = zstd::stream::Encoder::new(write, level).map_err(EncodeError::Zstd)?; + let lz4_encoder = lz4_flex::frame::FrameEncoder::new(write); Ok(Self { - zstd_encoder: Some(zstd_encoder), + lz4_encoder: Some(lz4_encoder), buffer: vec![], }) } pub fn append(&mut self, message: &LogMsg) -> Result<(), EncodeError> { let Self { - zstd_encoder, + lz4_encoder, buffer, } = self; - if let Some(zstd_encoder) = zstd_encoder { + if let Some(lz4_encoder) = lz4_encoder { buffer.clear(); rmp_serde::encode::write_named(buffer, message)?; - zstd_encoder + lz4_encoder .write_all(&(buffer.len() as u64).to_le_bytes()) - .map_err(EncodeError::Zstd)?; - zstd_encoder.write_all(buffer).map_err(EncodeError::Zstd)?; + .map_err(EncodeError::Lz4Write)?; + lz4_encoder + .write_all(buffer) + .map_err(EncodeError::Lz4Write)?; Ok(()) } else { @@ -98,8 +102,8 @@ impl Encoder { } pub fn finish(&mut self) -> Result<(), EncodeError> { - if let Some(zstd_encoder) = self.zstd_encoder.take() { - zstd_encoder.finish().map_err(EncodeError::Zstd)?; + if let Some(lz4_encoder) = self.lz4_encoder.take() { + lz4_encoder.finish().map_err(EncodeError::Lz4Finish)?; Ok(()) } else { re_log::warn!("Encoder::finish called twice");