From 2e1d7485a8fe57c2d1a6f2b7acd532083c0f99c8 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Tue, 8 Oct 2024 21:40:35 +0200 Subject: [PATCH 01/33] Initial ffmpeg H.264 support in frames example --- Cargo.lock | 10 + Cargo.toml | 1 + crates/store/re_video/Cargo.toml | 10 +- crates/store/re_video/build.rs | 1 + crates/store/re_video/examples/frames.rs | 32 ++- crates/store/re_video/src/decode/ffmpeg.rs | 216 +++++++++++++++++++++ crates/store/re_video/src/decode/mod.rs | 5 +- crates/store/re_video/src/demux/mod.rs | 4 + 8 files changed, 273 insertions(+), 6 deletions(-) create mode 100644 crates/store/re_video/src/decode/ffmpeg.rs diff --git a/Cargo.lock b/Cargo.lock index 26e9132e2bed..650342573024 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2463,6 +2463,15 @@ dependencies = [ "simd-adler32", ] +[[package]] +name = "ffmpeg-sidecar" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bd1e249e0ceeb0f5c9f84a3c6941c3bde3ebc2815f4b94531a7e806af61c4c0" +dependencies = [ + "anyhow", +] + [[package]] name = "filetime" version = "0.2.23" @@ -6516,6 +6525,7 @@ dependencies = [ "criterion", "crossbeam", "econtext", + "ffmpeg-sidecar", "indicatif", "itertools 0.13.0", "js-sys", diff --git a/Cargo.toml b/Cargo.toml index d5df6f87a744..a4edee627548 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -177,6 +177,7 @@ econtext = "0.2" # Prints error contexts on crashes ehttp = "0.5.0" enumset = "1.0.12" env_logger = { version = "0.10", default-features = false } +ffmpeg-sidecar = "1.1.2" fixed = { version = "<1.28", default-features = false } # 1.28+ is MSRV 1.79+ flatbuffers = "23.0" futures-channel = "0.3" diff --git a/crates/store/re_video/Cargo.toml b/crates/store/re_video/Cargo.toml index f9dd00bf199f..458d68b4403c 100644 --- a/crates/store/re_video/Cargo.toml +++ b/crates/store/re_video/Cargo.toml @@ -23,7 +23,7 @@ features = ["all"] [features] -default = ["av1"] +default = ["av1", "ffmpeg"] ## Enable serialization for data structures that support it. serde = ["dep:serde"] @@ -31,6 +31,9 @@ serde = ["dep:serde"] ## Native AV1 decoding. av1 = ["dep:dav1d"] +## Decode H.264 using ffmpeg over CLI. +ffmpeg = ["dep:ffmpeg-sidecar"] + ## Enable faster native video decoding with assembly. ## You need to install [nasm](https://nasm.us/) to compile with this feature. nasm = [ @@ -49,9 +52,11 @@ econtext.workspace = true itertools.workspace = true parking_lot.workspace = true re_mp4.workspace = true -serde = { workspace = true, optional = true } thiserror.workspace = true +ffmpeg-sidecar = { workspace = true, optional = true } +serde = { workspace = true, optional = true } + # We enable re_rav1d on native, UNLESS we're on Linux Arm64 # See https://github.com/rerun-io/rerun/issues/7755 [target.'cfg(all(not(target_arch = "wasm32"), not(all(target_os = "linux", target_arch = "aarch64"))))'.dependencies] @@ -83,6 +88,7 @@ web-sys = { workspace = true, features = [ ] } [dev-dependencies] +# For the `frames` example: indicatif.workspace = true criterion.workspace = true diff --git a/crates/store/re_video/build.rs b/crates/store/re_video/build.rs index 261ccb770ba3..e9fdfb6aa24f 100644 --- a/crates/store/re_video/build.rs +++ b/crates/store/re_video/build.rs @@ -10,5 +10,6 @@ fn main() { native: { not(target_arch = "wasm32") }, linux_arm64: { all(target_os = "linux", target_arch = "aarch64") }, with_dav1d: { all(feature = "av1", native, not(linux_arm64)) }, // https://github.com/rerun-io/rerun/issues/7755 + with_ffmpeg: { all(feature= "ffmpeg", native) } } } diff --git a/crates/store/re_video/examples/frames.rs b/crates/store/re_video/examples/frames.rs index cd931f31baa0..4837a09cf554 100644 --- a/crates/store/re_video/examples/frames.rs +++ b/crates/store/re_video/examples/frames.rs @@ -14,6 +14,8 @@ use indicatif::ProgressBar; use parking_lot::Mutex; fn main() { + re_log::setup_logging(); + // frames let args: Vec<_> = std::env::args().collect(); let Some(video_path) = args.get(1) else { @@ -83,8 +85,15 @@ fn main() { .create(true) .truncate(true) .open(output_dir.join(format!("{i:0width$}.ppm"))) - .expect("failed to open file"); - write_binary_ppm(&mut file, frame.width, frame.height, &frame.data); + .expect("failed to oformatpen file"); + match frame.format { + re_video::PixelFormat::Rgb8Unorm => { + write_ppm_rgb24(&mut file, frame.width, frame.height, &frame.data); + } + re_video::PixelFormat::Rgba8Unorm => { + write_ppm_rgba32(&mut file, frame.width, frame.height, &frame.data); + } + } } } } @@ -93,7 +102,24 @@ fn num_digits(n: usize) -> usize { (n as f64).log10().floor() as usize + 1 } -fn write_binary_ppm(file: &mut File, width: u32, height: u32, rgba: &[u8]) { +fn write_ppm_rgb24(file: &mut File, width: u32, height: u32, rgb: &[u8]) { + assert_eq!(width as usize * height as usize * 3, rgb.len()); + + let header = format!("P6\n{width} {height}\n255\n"); + + let mut data = Vec::with_capacity(header.len() + width as usize * height as usize * 3); + data.extend_from_slice(header.as_bytes()); + + for rgb in rgb.chunks(3) { + data.extend_from_slice(&[rgb[0], rgb[1], rgb[2]]); + } + + file.write_all(&data).expect("failed to write frame data"); +} + +fn write_ppm_rgba32(file: &mut File, width: u32, height: u32, rgba: &[u8]) { + assert_eq!(width as usize * height as usize * 4, rgba.len()); + let header = format!("P6\n{width} {height}\n255\n"); let mut data = Vec::with_capacity(header.len() + width as usize * height as usize * 3); diff --git a/crates/store/re_video/src/decode/ffmpeg.rs b/crates/store/re_video/src/decode/ffmpeg.rs new file mode 100644 index 000000000000..c6d244f7e0ac --- /dev/null +++ b/crates/store/re_video/src/decode/ffmpeg.rs @@ -0,0 +1,216 @@ +//! Send video data to `ffmpeg` over CLI to decode it. + +use crossbeam::channel::Receiver; +use ffmpeg_sidecar::{ + child::FfmpegChild, + command::FfmpegCommand, + event::{FfmpegEvent, LogLevel}, +}; + +use crate::{Time, Timescale}; + +use super::{async_decoder_wrapper::SyncDecoder, Frame, Result}; + +/// Decode H.264 video via ffmpeg over CLI + +pub struct FfmpegCliH264Decoder { + /// How we send more data to the ffmpeg process + ffmpeg_stdin: std::process::ChildStdin, + + /// How we receive new frames back from ffmpeg + frame_rx: Receiver>, + + avcc: re_mp4::Avc1Box, + timescale: Timescale, +} + +impl FfmpegCliH264Decoder { + pub fn new(avcc: re_mp4::Avc1Box, timescale: Timescale) -> Result { + re_tracing::profile_function!(); + + let mut ffmpeg = { + re_tracing::profile_scope!("spawn-ffmpeg"); + + FfmpegCommand::new() + .hide_banner() + // Keep in mind that all arguments that are about the input, need to go before! + .format("h264") // High risk here: What's is available? + .input("-") // stdin is our input! + .rawvideo() // Output rgb24 on stdout. (TODO(emilk) for later: any format we can read directly on re_renderer would be better!) + .spawn() + .expect("Failed to spawn ffmpeg") + }; + + let mut ffmpeg_stdin = ffmpeg.take_stdin().unwrap(); + let ffmpeg_iterator = ffmpeg.iter().unwrap(); + + let (frame_tx, frame_rx) = crossbeam::channel::unbounded(); + + let thread_handle = std::thread::Builder::new() + .name("ffmpeg-reader".to_owned()) + .spawn(move || { + for event in ffmpeg_iterator { + match event { + FfmpegEvent::Log(LogLevel::Warning, msg) => re_log::warn_once!("{msg}"), + FfmpegEvent::Log(LogLevel::Error, msg) => re_log::error_once!("{msg}"), // TODO: report errors + FfmpegEvent::Progress(p) => { + re_log::debug!("Progress: {}", p.time) + } + FfmpegEvent::OutputFrame(frame) => { + re_log::trace!( + "Received frame: d[0] {} time {:?} fmt {:?} size {}x{}", + frame.data[0], + frame.timestamp, + frame.pix_fmt, + frame.width, + frame.height + ); + + debug_assert_eq!(frame.pix_fmt, "rgb24"); + debug_assert_eq!( + frame.width as usize * frame.height as usize * 3, + frame.data.len() + ); + + frame_tx.send(Ok(super::Frame { + width: frame.width, + height: frame.height, + data: frame.data, + format: crate::PixelFormat::Rgb8Unorm, + presentation_timestamp: Time::from_secs( + frame.timestamp as f64, + timescale, + ), + duration: Time::from_secs(0.1, timescale), // TODO + })); // TODO: handle disconnect + } + // TODO: handle all events + event => re_log::debug!("Event: {event:?}"), + } + } + re_log::debug!("Shutting down ffmpeg"); + }); + + Ok(Self { + ffmpeg_stdin, + frame_rx, + avcc, + timescale, + }) + } +} + +impl SyncDecoder for FfmpegCliH264Decoder { + fn submit_chunk( + &mut self, + should_stop: &std::sync::atomic::AtomicBool, + chunk: super::Chunk, + on_output: &super::OutputCallback, + ) { + re_tracing::profile_function!(); + + let mut state = NaluStreamState::default(); + write_avc_chunk_to_nalu_stream(&self.avcc, &mut self.ffmpeg_stdin, &chunk, &mut state) + .unwrap(); + // consider writing samples while at the same time reading frames, for even lower latency + // and maybe reuse the same ffmpeg process. + + // TODO: handle errors + while let Ok(frame_result) = self.frame_rx.try_recv() { + on_output(frame_result); + } + } + + fn reset(&mut self) { + // TODO: restart ffmpeg process + } +} + +/// Before every NAL unit, here is a nal start code. +/// Can also be 2 bytes of 0x00 and 1 byte of 0x01. +/// +/// This is used in byte stream formats such as h264 files. +/// Packet transform systems (RTP) may omit these. +pub const NAL_START_CODE: &[u8] = &[0x00, 0x00, 0x00, 0x01]; + +#[derive(Default)] +struct NaluStreamState { + previous_frame_was_idr: bool, +} + +fn write_avc_chunk_to_nalu_stream( + avcc: &re_mp4::Avc1Box, + nalu_stream: &mut dyn std::io::Write, + chunk: &super::Chunk, + state: &mut NaluStreamState, +) -> Result<(), Box> { + re_tracing::profile_function!(); + let avcc = &avcc.avcc; + + // Append SPS (Sequence Parameter Set) & PPS (Picture Parameter Set) NAL unit whenever encountering + // an IDR frame unless the previous frame was an IDR frame. + // TODO(andreas): Should we detect this rather from the NALU stream rather than the samples? + if chunk.is_sync && !state.previous_frame_was_idr { + for sps in (&avcc.sequence_parameter_sets).iter() { + nalu_stream.write_all(&NAL_START_CODE)?; + nalu_stream.write_all(&sps.bytes)?; + } + for pps in (&avcc.picture_parameter_sets).iter() { + nalu_stream.write_all(&NAL_START_CODE)?; + nalu_stream.write_all(&pps.bytes)?; + } + state.previous_frame_was_idr = true; + } else { + state.previous_frame_was_idr = false; + } + + // A single cjhunk may consist of multiple NAL units, each of which need our special treatment. + // (most of the time it's 1:1, but there might be extra NAL units for info, especially at the start). + let mut buffer_offset: usize = 0; + let sample_end = chunk.data.len(); + while buffer_offset < sample_end { + re_tracing::profile_scope!("nalu"); + + // Each NAL unit in mp4 is prefixed with a length prefix. + // In Annex B this doesn't exist. + let length_prefix_size = avcc.length_size_minus_one as usize + 1; + + // TODO: improve the error handling here. + let nal_unit_size = match length_prefix_size { + 4 => u32::from_be_bytes( + chunk.data[buffer_offset..(buffer_offset + 4)] + .try_into() + .unwrap(), + ) as usize, + 2 => u16::from_be_bytes( + chunk.data[buffer_offset..(buffer_offset + 2)] + .try_into() + .unwrap(), + ) as usize, + 1 => chunk.data[buffer_offset] as usize, + _ => panic!("invalid length prefix size"), + }; + //re_log::debug!("nal unit size: {}", nal_unit_size); + + if chunk.data.len() < nal_unit_size { + panic!( + "sample size {} is smaller than nal unit size {nal_unit_size}", + chunk.data.len() + ); + } + + nalu_stream.write_all(&NAL_START_CODE)?; + let data_start = buffer_offset + length_prefix_size; // Skip the size. + let data_end = buffer_offset + nal_unit_size + length_prefix_size; + let data = &chunk.data[data_start..data_end]; + + // Note that we don't have to insert "emulation prevention bytes" since mp4 NALU still use them. + // (unlike the NAL start code, the presentation bytes are part of the NAL spec!) + + nalu_stream.write_all(data)?; + + buffer_offset = data_end; + } + + Ok(()) +} diff --git a/crates/store/re_video/src/decode/mod.rs b/crates/store/re_video/src/decode/mod.rs index b870488029fd..d081881c6f22 100644 --- a/crates/store/re_video/src/decode/mod.rs +++ b/crates/store/re_video/src/decode/mod.rs @@ -77,11 +77,14 @@ //! supporting HDR content at which point more properties will be important! //! -#[cfg(with_dav1d)] +#[cfg(any(with_dav1d, with_ffmpeg))] mod async_decoder_wrapper; #[cfg(with_dav1d)] mod av1; +#[cfg(with_ffmpeg)] +pub mod ffmpeg; + #[cfg(target_arch = "wasm32")] mod webcodecs; diff --git a/crates/store/re_video/src/demux/mod.rs b/crates/store/re_video/src/demux/mod.rs index d93083a21cb1..4a642113e39b 100644 --- a/crates/store/re_video/src/demux/mod.rs +++ b/crates/store/re_video/src/demux/mod.rs @@ -337,6 +337,10 @@ impl Config { pub fn is_av1(&self) -> bool { matches!(self.stsd.contents, re_mp4::StsdBoxContent::Av01 { .. }) } + + pub fn is_h264(&self) -> bool { + matches!(self.stsd.contents, re_mp4::StsdBoxContent::Avc1 { .. }) + } } /// Errors that can occur when loading a video. From a06107ef1c75bb8e7f4c01deec2d045418383f43 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Tue, 8 Oct 2024 22:01:20 +0200 Subject: [PATCH 02/33] Actual H.264 video playback inside of Rerun viewer --- crates/store/re_video/src/decode/mod.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/crates/store/re_video/src/decode/mod.rs b/crates/store/re_video/src/decode/mod.rs index d081881c6f22..e7e9ff173afd 100644 --- a/crates/store/re_video/src/decode/mod.rs +++ b/crates/store/re_video/src/decode/mod.rs @@ -181,6 +181,19 @@ pub fn new_decoder( } } + #[cfg(with_ffmpeg)] + re_mp4::StsdBoxContent::Avc1(avc1_box) => { + re_log::trace!("Decoding H.264…"); + return Ok(Box::new(async_decoder_wrapper::AsyncDecoderWrapper::new( + debug_name.to_owned(), + Box::new(ffmpeg::FfmpegCliH264Decoder::new( + avc1_box.clone(), + video.timescale, + )?), + on_output, + ))); + } + _ => Err(Error::UnsupportedCodec(video.human_readable_codec_string())), } } From 770f24c4dcedc7b734166394d0fa393037b44318 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 9 Oct 2024 06:00:16 +0200 Subject: [PATCH 03/33] Fix timestamps and seeking --- crates/store/re_video/src/decode/ffmpeg.rs | 111 +++++++++++++++------ crates/store/re_video/src/decode/mod.rs | 8 ++ 2 files changed, 88 insertions(+), 31 deletions(-) diff --git a/crates/store/re_video/src/decode/ffmpeg.rs b/crates/store/re_video/src/decode/ffmpeg.rs index c6d244f7e0ac..cf5de2cf3424 100644 --- a/crates/store/re_video/src/decode/ffmpeg.rs +++ b/crates/store/re_video/src/decode/ffmpeg.rs @@ -1,6 +1,6 @@ //! Send video data to `ffmpeg` over CLI to decode it. -use crossbeam::channel::Receiver; +use crossbeam::channel::{Receiver, Sender}; use ffmpeg_sidecar::{ child::FfmpegChild, command::FfmpegCommand, @@ -9,14 +9,29 @@ use ffmpeg_sidecar::{ use crate::{Time, Timescale}; -use super::{async_decoder_wrapper::SyncDecoder, Frame, Result}; +use super::{async_decoder_wrapper::SyncDecoder, Error, Frame, Result}; + +/// ffmpeg does not tell us the timestamp/duration of a given frame, so we need to remember it. +struct FrameInfo { + /// Monotonic index, from start + frame_num: u32, + + timestamp: Time, + duration: Time, +} /// Decode H.264 video via ffmpeg over CLI pub struct FfmpegCliH264Decoder { + /// Monotonically increasing + frame_num: u32, + /// How we send more data to the ffmpeg process ffmpeg_stdin: std::process::ChildStdin, + /// For sending frame timestamps to the decoder thread + frame_info_tx: Sender, + /// How we receive new frames back from ffmpeg frame_rx: Receiver>, @@ -38,61 +53,82 @@ impl FfmpegCliH264Decoder { .input("-") // stdin is our input! .rawvideo() // Output rgb24 on stdout. (TODO(emilk) for later: any format we can read directly on re_renderer would be better!) .spawn() - .expect("Failed to spawn ffmpeg") + .map_err(Error::FailedToStartFfmpeg)? }; - let mut ffmpeg_stdin = ffmpeg.take_stdin().unwrap(); + let ffmpeg_stdin = ffmpeg.take_stdin().unwrap(); let ffmpeg_iterator = ffmpeg.iter().unwrap(); + let (frame_info_tx, frame_info_rx) = crossbeam::channel::unbounded(); let (frame_tx, frame_rx) = crossbeam::channel::unbounded(); - let thread_handle = std::thread::Builder::new() + std::thread::Builder::new() .name("ffmpeg-reader".to_owned()) .spawn(move || { for event in ffmpeg_iterator { match event { - FfmpegEvent::Log(LogLevel::Warning, msg) => re_log::warn_once!("{msg}"), + FfmpegEvent::Log(LogLevel::Warning, msg) => { + if !msg.contains( + "No accelerated colorspace conversion found from yuv420p to rgb24", + ) { + re_log::warn_once!("{msg}"); + } + } FfmpegEvent::Log(LogLevel::Error, msg) => re_log::error_once!("{msg}"), // TODO: report errors FfmpegEvent::Progress(p) => { - re_log::debug!("Progress: {}", p.time) + re_log::debug!("Progress: {}", p.time); } FfmpegEvent::OutputFrame(frame) => { - re_log::trace!( - "Received frame: d[0] {} time {:?} fmt {:?} size {}x{}", - frame.data[0], - frame.timestamp, - frame.pix_fmt, - frame.width, - frame.height - ); + // The `frame.timestamp` is monotonically increasing, + // so it is not the actual timestamp in the stream. + + let frame_info: FrameInfo = frame_info_rx.recv().unwrap(); + + let ffmpeg_sidecar::event::OutputVideoFrame { + frame_num, + pix_fmt, + width, + height, + data, + .. + } = frame; + + debug_assert_eq!(frame_info.frame_num, frame_num, "We are out-of-sync"); // TODO: fix somehow - debug_assert_eq!(frame.pix_fmt, "rgb24"); - debug_assert_eq!( - frame.width as usize * frame.height as usize * 3, - frame.data.len() + re_log::trace!( + "Received frame {frame_num}: fmt {pix_fmt:?} size {width}x{height}" ); - frame_tx.send(Ok(super::Frame { - width: frame.width, - height: frame.height, - data: frame.data, - format: crate::PixelFormat::Rgb8Unorm, - presentation_timestamp: Time::from_secs( - frame.timestamp as f64, - timescale, - ), - duration: Time::from_secs(0.1, timescale), // TODO - })); // TODO: handle disconnect + debug_assert_eq!(pix_fmt, "rgb24"); + debug_assert_eq!(width as usize * height as usize * 3, data.len()); + + if frame_tx + .send(Ok(super::Frame { + width, + height, + data, + format: crate::PixelFormat::Rgb8Unorm, + presentation_timestamp: frame_info.timestamp, + duration: frame_info.duration, + })) + .is_err() + { + re_log::debug!("Receiver disconnected"); + break; + } } // TODO: handle all events event => re_log::debug!("Event: {event:?}"), } } re_log::debug!("Shutting down ffmpeg"); - }); + }) + .expect("Failed to spawn ffmpeg thread"); Ok(Self { + frame_num: 0, ffmpeg_stdin, + frame_info_tx, frame_rx, avcc, timescale, @@ -109,6 +145,16 @@ impl SyncDecoder for FfmpegCliH264Decoder { ) { re_tracing::profile_function!(); + // NOTE: this assumes each sample/chunk will result in exactly one frame. + self.frame_info_tx.send(FrameInfo { + frame_num: self.frame_num, + timestamp: chunk.timestamp, + duration: chunk.duration, + }); + + // NOTE: a 60 FPS video can go for two years before wrapping a u32. + self.frame_num = self.frame_num.wrapping_add(1); + let mut state = NaluStreamState::default(); write_avc_chunk_to_nalu_stream(&self.avcc, &mut self.ffmpeg_stdin, &chunk, &mut state) .unwrap(); @@ -117,6 +163,9 @@ impl SyncDecoder for FfmpegCliH264Decoder { // TODO: handle errors while let Ok(frame_result) = self.frame_rx.try_recv() { + if should_stop.load(std::sync::atomic::Ordering::Relaxed) { + return; + } on_output(frame_result); } } diff --git a/crates/store/re_video/src/decode/mod.rs b/crates/store/re_video/src/decode/mod.rs index e7e9ff173afd..5a53357f96a3 100644 --- a/crates/store/re_video/src/decode/mod.rs +++ b/crates/store/re_video/src/decode/mod.rs @@ -114,6 +114,14 @@ pub enum Error { #[cfg(target_arch = "wasm32")] #[error(transparent)] WebDecoderError(#[from] webcodecs::Error), + + #[cfg(with_ffmpeg)] + #[error("Failed to start ffmppeg: {0}")] + FailedToStartFfmpeg(std::io::Error), + + #[cfg(with_ffmpeg)] + #[error("Failed to start ffmppeg: {0}")] + FailedToSpawnThread(std::io::Error), } pub type Result = std::result::Result; From ab3d52aa75f71d98e01f5337add5e5f849915680 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 9 Oct 2024 06:31:05 +0200 Subject: [PATCH 04/33] Code cleanup and better error handling --- crates/store/re_video/src/decode/ffmpeg.rs | 241 ++++++++++++------ crates/store/re_video/src/decode/mod.rs | 15 +- crates/viewer/re_renderer/src/video/mod.rs | 2 +- crates/viewer/re_renderer/src/video/player.rs | 2 +- 4 files changed, 175 insertions(+), 85 deletions(-) diff --git a/crates/store/re_video/src/decode/ffmpeg.rs b/crates/store/re_video/src/decode/ffmpeg.rs index cf5de2cf3424..3a369ee7b0cd 100644 --- a/crates/store/re_video/src/decode/ffmpeg.rs +++ b/crates/store/re_video/src/decode/ffmpeg.rs @@ -1,15 +1,35 @@ //! Send video data to `ffmpeg` over CLI to decode it. -use crossbeam::channel::{Receiver, Sender}; +use crossbeam::channel::{Receiver, Sender, TryRecvError}; use ffmpeg_sidecar::{ - child::FfmpegChild, command::FfmpegCommand, event::{FfmpegEvent, LogLevel}, }; -use crate::{Time, Timescale}; +use crate::Time; -use super::{async_decoder_wrapper::SyncDecoder, Error, Frame, Result}; +use super::{async_decoder_wrapper::SyncDecoder, Frame, Result}; + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("Failed to start ffmppeg: {0}")] + FailedToStartFfmpeg(std::io::Error), + + #[error("Failed to get stdin handle")] + NoStdin, + + #[error("Failed to get iterator: {0}")] + NoIterator(String), + + #[error("There's a bug in Rerun")] + NoFrameInfo, +} + +impl From for super::Error { + fn from(err: Error) -> Self { + Self::Ffmpeg(std::sync::Arc::new(err)) + } +} /// ffmpeg does not tell us the timestamp/duration of a given frame, so we need to remember it. struct FrameInfo { @@ -33,14 +53,13 @@ pub struct FfmpegCliH264Decoder { frame_info_tx: Sender, /// How we receive new frames back from ffmpeg - frame_rx: Receiver>, + frame_rx: Receiver>, avcc: re_mp4::Avc1Box, - timescale: Timescale, } impl FfmpegCliH264Decoder { - pub fn new(avcc: re_mp4::Avc1Box, timescale: Timescale) -> Result { + pub fn new(avcc: re_mp4::Avc1Box) -> Result { re_tracing::profile_function!(); let mut ffmpeg = { @@ -56,8 +75,10 @@ impl FfmpegCliH264Decoder { .map_err(Error::FailedToStartFfmpeg)? }; - let ffmpeg_stdin = ffmpeg.take_stdin().unwrap(); - let ffmpeg_iterator = ffmpeg.iter().unwrap(); + let ffmpeg_stdin = ffmpeg.take_stdin().ok_or(Error::NoStdin)?; + let ffmpeg_iterator = ffmpeg + .iter() + .map_err(|err| Error::NoIterator(err.to_string()))?; let (frame_info_tx, frame_info_rx) = crossbeam::channel::unbounded(); let (frame_tx, frame_rx) = crossbeam::channel::unbounded(); @@ -65,62 +86,7 @@ impl FfmpegCliH264Decoder { std::thread::Builder::new() .name("ffmpeg-reader".to_owned()) .spawn(move || { - for event in ffmpeg_iterator { - match event { - FfmpegEvent::Log(LogLevel::Warning, msg) => { - if !msg.contains( - "No accelerated colorspace conversion found from yuv420p to rgb24", - ) { - re_log::warn_once!("{msg}"); - } - } - FfmpegEvent::Log(LogLevel::Error, msg) => re_log::error_once!("{msg}"), // TODO: report errors - FfmpegEvent::Progress(p) => { - re_log::debug!("Progress: {}", p.time); - } - FfmpegEvent::OutputFrame(frame) => { - // The `frame.timestamp` is monotonically increasing, - // so it is not the actual timestamp in the stream. - - let frame_info: FrameInfo = frame_info_rx.recv().unwrap(); - - let ffmpeg_sidecar::event::OutputVideoFrame { - frame_num, - pix_fmt, - width, - height, - data, - .. - } = frame; - - debug_assert_eq!(frame_info.frame_num, frame_num, "We are out-of-sync"); // TODO: fix somehow - - re_log::trace!( - "Received frame {frame_num}: fmt {pix_fmt:?} size {width}x{height}" - ); - - debug_assert_eq!(pix_fmt, "rgb24"); - debug_assert_eq!(width as usize * height as usize * 3, data.len()); - - if frame_tx - .send(Ok(super::Frame { - width, - height, - data, - format: crate::PixelFormat::Rgb8Unorm, - presentation_timestamp: frame_info.timestamp, - duration: frame_info.duration, - })) - .is_err() - { - re_log::debug!("Receiver disconnected"); - break; - } - } - // TODO: handle all events - event => re_log::debug!("Event: {event:?}"), - } - } + read_ffmpeg_output(ffmpeg_iterator, &frame_info_rx, &frame_tx); re_log::debug!("Shutting down ffmpeg"); }) .expect("Failed to spawn ffmpeg thread"); @@ -131,11 +97,142 @@ impl FfmpegCliH264Decoder { frame_info_tx, frame_rx, avcc, - timescale, }) } } +fn read_ffmpeg_output( + ffmpeg_iterator: ffmpeg_sidecar::iter::FfmpegIterator, + frame_info_rx: &Receiver, + frame_tx: &Sender>, +) { + for event in ffmpeg_iterator { + #[allow(clippy::match_same_arms)] + match event { + FfmpegEvent::Log(LogLevel::Info, msg) => { + re_log::debug!("{msg}"); + } + + FfmpegEvent::Log(LogLevel::Warning, msg) => { + if !msg.contains("No accelerated colorspace conversion found from yuv420p to rgb24") + { + re_log::warn_once!("{msg}"); + } + } + + FfmpegEvent::Log(LogLevel::Error, msg) => { + // TODO: report errors + re_log::error_once!("{msg}"); + } + + // Usefuless info in these: + FfmpegEvent::ParsedInput(_) => {} + FfmpegEvent::ParsedOutput(_) => {} + FfmpegEvent::ParsedStreamMapping(_) => {} + + FfmpegEvent::ParsedInputStream(stream) => { + let ffmpeg_sidecar::event::AVStream { + stream_type, + format, + pix_fmt, // Often 'yuv420p' + width, + height, + fps, + .. + } = stream; + + re_log::debug!("ParsedInputStream {stream_type} {format} {pix_fmt} {width}x{height} @ {fps} FPS"); + + debug_assert_eq!(stream_type.to_ascii_lowercase(), "video"); + } + + FfmpegEvent::ParsedOutputStream(stream) => { + // This just repeats what we told ffmpeg to output, e.g. "rawvideo rgb24" + let ffmpeg_sidecar::event::AVStream { + stream_type, + format, + pix_fmt, + width, + height, + fps, + .. + } = stream; + + re_log::debug!("ParsedOutputStream {stream_type} {format} {pix_fmt} {width}x{height} @ {fps} FPS"); + + debug_assert_eq!(stream_type.to_ascii_lowercase(), "video"); + } + + FfmpegEvent::Progress(_) => { + // We can get out frame number etc here to know how far behind we are. + } + + FfmpegEvent::OutputFrame(frame) => { + // NOTE: `frame.timestamp` is monotonically increasing, + // and is not the actual timestamp in the stream. + + let frame_info: FrameInfo = match frame_info_rx.try_recv() { + Ok(frame_info) => frame_info, + + Err(TryRecvError::Disconnected) => { + re_log::debug!("Receiver disconnected"); + return; + } + + Err(TryRecvError::Empty) => { + // This shouldn't happen + if frame_tx.send(Err(Error::NoFrameInfo.into())).is_err() { + re_log::warn!("Got no frame-info, and failed to send error"); + } + return; + } + }; + + let ffmpeg_sidecar::event::OutputVideoFrame { + frame_num, + pix_fmt, + width, + height, + data, + .. + } = frame; + + debug_assert_eq!( + frame_info.frame_num, frame_num, + "We are out-of-sync with ffmpeg" + ); // TODO: fix somehow + + re_log::trace!("Received frame {frame_num}: fmt {pix_fmt:?} size {width}x{height}"); + + debug_assert_eq!(pix_fmt, "rgb24"); + debug_assert_eq!(width as usize * height as usize * 3, data.len()); + + if frame_tx + .send(Ok(super::Frame { + width, + height, + data, + format: crate::PixelFormat::Rgb8Unorm, + presentation_timestamp: frame_info.timestamp, + duration: frame_info.duration, + })) + .is_err() + { + re_log::debug!("Receiver disconnected"); + return; + } + } + + FfmpegEvent::Done => { + re_log::debug!("ffmpeg is Done"); + return; + } + // TODO: handle all events + event => re_log::debug!("Event: {event:?}"), + } + } +} + impl SyncDecoder for FfmpegCliH264Decoder { fn submit_chunk( &mut self, @@ -148,7 +245,7 @@ impl SyncDecoder for FfmpegCliH264Decoder { // NOTE: this assumes each sample/chunk will result in exactly one frame. self.frame_info_tx.send(FrameInfo { frame_num: self.frame_num, - timestamp: chunk.timestamp, + timestamp: chunk.composition_timestamp, duration: chunk.duration, }); @@ -200,12 +297,12 @@ fn write_avc_chunk_to_nalu_stream( // an IDR frame unless the previous frame was an IDR frame. // TODO(andreas): Should we detect this rather from the NALU stream rather than the samples? if chunk.is_sync && !state.previous_frame_was_idr { - for sps in (&avcc.sequence_parameter_sets).iter() { - nalu_stream.write_all(&NAL_START_CODE)?; + for sps in &avcc.sequence_parameter_sets { + nalu_stream.write_all(NAL_START_CODE)?; nalu_stream.write_all(&sps.bytes)?; } - for pps in (&avcc.picture_parameter_sets).iter() { - nalu_stream.write_all(&NAL_START_CODE)?; + for pps in &avcc.picture_parameter_sets { + nalu_stream.write_all(NAL_START_CODE)?; nalu_stream.write_all(&pps.bytes)?; } state.previous_frame_was_idr = true; @@ -248,7 +345,7 @@ fn write_avc_chunk_to_nalu_stream( ); } - nalu_stream.write_all(&NAL_START_CODE)?; + nalu_stream.write_all(NAL_START_CODE)?; let data_start = buffer_offset + length_prefix_size; // Skip the size. let data_end = buffer_offset + nal_unit_size + length_prefix_size; let data = &chunk.data[data_start..data_end]; diff --git a/crates/store/re_video/src/decode/mod.rs b/crates/store/re_video/src/decode/mod.rs index 5a53357f96a3..3877bdb08018 100644 --- a/crates/store/re_video/src/decode/mod.rs +++ b/crates/store/re_video/src/decode/mod.rs @@ -90,7 +90,7 @@ mod webcodecs; use crate::Time; -#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)] +#[derive(thiserror::Error, Debug, Clone)] pub enum Error { #[error("Unsupported codec: {0}")] UnsupportedCodec(String), @@ -116,12 +116,8 @@ pub enum Error { WebDecoderError(#[from] webcodecs::Error), #[cfg(with_ffmpeg)] - #[error("Failed to start ffmppeg: {0}")] - FailedToStartFfmpeg(std::io::Error), - - #[cfg(with_ffmpeg)] - #[error("Failed to start ffmppeg: {0}")] - FailedToSpawnThread(std::io::Error), + #[error(transparent)] + Ffmpeg(std::sync::Arc), } pub type Result = std::result::Result; @@ -194,10 +190,7 @@ pub fn new_decoder( re_log::trace!("Decoding H.264…"); return Ok(Box::new(async_decoder_wrapper::AsyncDecoderWrapper::new( debug_name.to_owned(), - Box::new(ffmpeg::FfmpegCliH264Decoder::new( - avc1_box.clone(), - video.timescale, - )?), + Box::new(ffmpeg::FfmpegCliH264Decoder::new(avc1_box.clone())?), on_output, ))); } diff --git a/crates/viewer/re_renderer/src/video/mod.rs b/crates/viewer/re_renderer/src/video/mod.rs index 7404c1cc2d0b..d1398b42b817 100644 --- a/crates/viewer/re_renderer/src/video/mod.rs +++ b/crates/viewer/re_renderer/src/video/mod.rs @@ -11,7 +11,7 @@ use re_video::{decode::DecodeHardwareAcceleration, VideoData}; use crate::{resource_managers::GpuTexture2D, RenderContext}; /// Error that can occur during playing videos. -#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)] +#[derive(thiserror::Error, Debug, Clone)] pub enum VideoPlayerError { #[error("The decoder is lagging behind")] EmptyBuffer, diff --git a/crates/viewer/re_renderer/src/video/player.rs b/crates/viewer/re_renderer/src/video/player.rs index 492032cc66cd..da831dbb3545 100644 --- a/crates/viewer/re_renderer/src/video/player.rs +++ b/crates/viewer/re_renderer/src/video/player.rs @@ -280,7 +280,7 @@ impl VideoPlayer { ); if let Err(err) = result { - if err == VideoPlayerError::EmptyBuffer { + if matches!(err, VideoPlayerError::EmptyBuffer) { // No buffered frames // Might this be due to an error? From b25a53980820b23fd7cf50c915ba5d73b78af8d1 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 9 Oct 2024 12:43:33 +0200 Subject: [PATCH 05/33] Better error handling --- crates/store/re_video/src/decode/ffmpeg.rs | 125 +++++++++++++++------ 1 file changed, 91 insertions(+), 34 deletions(-) diff --git a/crates/store/re_video/src/decode/ffmpeg.rs b/crates/store/re_video/src/decode/ffmpeg.rs index 3a369ee7b0cd..2cdea0d4b881 100644 --- a/crates/store/re_video/src/decode/ffmpeg.rs +++ b/crates/store/re_video/src/decode/ffmpeg.rs @@ -1,5 +1,7 @@ //! Send video data to `ffmpeg` over CLI to decode it. +use std::sync::atomic::Ordering; + use crossbeam::channel::{Receiver, Sender, TryRecvError}; use ffmpeg_sidecar::{ command::FfmpegCommand, @@ -23,6 +25,12 @@ pub enum Error { #[error("There's a bug in Rerun")] NoFrameInfo, + + #[error("Failed to write data to ffmpeg: {0}")] + FailedToWriteToFfmpeg(std::io::Error), + + #[error("Bad video data: {0}")] + BadVideoData(String), } impl From for super::Error { @@ -59,7 +67,7 @@ pub struct FfmpegCliH264Decoder { } impl FfmpegCliH264Decoder { - pub fn new(avcc: re_mp4::Avc1Box) -> Result { + pub fn new(avcc: re_mp4::Avc1Box) -> Result { re_tracing::profile_function!(); let mut ffmpeg = { @@ -242,29 +250,53 @@ impl SyncDecoder for FfmpegCliH264Decoder { ) { re_tracing::profile_function!(); - // NOTE: this assumes each sample/chunk will result in exactly one frame. - self.frame_info_tx.send(FrameInfo { + // First read any outstanding messages (e.g. error reports), + // so they get orderer correctly. + while let Ok(frame_result) = self.frame_rx.try_recv() { + if should_stop.load(Ordering::Relaxed) { + return; + } + on_output(frame_result); + } + + // We send the information about this chunk first. + // This assumes each sample/chunk will result in exactly one frame. + // If this assumption is not held, we will get weird errors, like videos playing to slowly. + let frame_info = FrameInfo { frame_num: self.frame_num, timestamp: chunk.composition_timestamp, duration: chunk.duration, - }); + }; // NOTE: a 60 FPS video can go for two years before wrapping a u32. self.frame_num = self.frame_num.wrapping_add(1); - let mut state = NaluStreamState::default(); - write_avc_chunk_to_nalu_stream(&self.avcc, &mut self.ffmpeg_stdin, &chunk, &mut state) - .unwrap(); - // consider writing samples while at the same time reading frames, for even lower latency - // and maybe reuse the same ffmpeg process. + if self.frame_info_tx.send(frame_info).is_err() { + // The other thread must be down, e.g. because `ffmpeg` crashed. + // It should already have reported that as an error - no need to repeat it here. + } else { + // Write chunk to ffmpeg: + let mut state = NaluStreamState::default(); // TODO: remove state? + if let Err(err) = write_avc_chunk_to_nalu_stream( + should_stop, + &self.avcc, + &mut self.ffmpeg_stdin, + &chunk, + &mut state, + ) { + on_output(Err(err.into())); + } + } - // TODO: handle errors + // Read results and/or errors: while let Ok(frame_result) = self.frame_rx.try_recv() { - if should_stop.load(std::sync::atomic::Ordering::Relaxed) { + if should_stop.load(Ordering::Relaxed) { return; } on_output(frame_result); } + + // TODO: block until we have processed the frame! } fn reset(&mut self) { @@ -285,11 +317,12 @@ struct NaluStreamState { } fn write_avc_chunk_to_nalu_stream( + should_stop: &std::sync::atomic::AtomicBool, avcc: &re_mp4::Avc1Box, nalu_stream: &mut dyn std::io::Write, chunk: &super::Chunk, state: &mut NaluStreamState, -) -> Result<(), Box> { +) -> Result<(), Error> { re_tracing::profile_function!(); let avcc = &avcc.avcc; @@ -298,12 +331,20 @@ fn write_avc_chunk_to_nalu_stream( // TODO(andreas): Should we detect this rather from the NALU stream rather than the samples? if chunk.is_sync && !state.previous_frame_was_idr { for sps in &avcc.sequence_parameter_sets { - nalu_stream.write_all(NAL_START_CODE)?; - nalu_stream.write_all(&sps.bytes)?; + nalu_stream + .write_all(NAL_START_CODE) + .map_err(Error::FailedToWriteToFfmpeg)?; + nalu_stream + .write_all(&sps.bytes) + .map_err(Error::FailedToWriteToFfmpeg)?; } for pps in &avcc.picture_parameter_sets { - nalu_stream.write_all(NAL_START_CODE)?; - nalu_stream.write_all(&pps.bytes)?; + nalu_stream + .write_all(NAL_START_CODE) + .map_err(Error::FailedToWriteToFfmpeg)?; + nalu_stream + .write_all(&pps.bytes) + .map_err(Error::FailedToWriteToFfmpeg)?; } state.previous_frame_was_idr = true; } else { @@ -314,46 +355,62 @@ fn write_avc_chunk_to_nalu_stream( // (most of the time it's 1:1, but there might be extra NAL units for info, especially at the start). let mut buffer_offset: usize = 0; let sample_end = chunk.data.len(); - while buffer_offset < sample_end { + while buffer_offset < sample_end && !should_stop.load(Ordering::Relaxed) { re_tracing::profile_scope!("nalu"); // Each NAL unit in mp4 is prefixed with a length prefix. // In Annex B this doesn't exist. let length_prefix_size = avcc.length_size_minus_one as usize + 1; - // TODO: improve the error handling here. + if sample_end < buffer_offset + length_prefix_size { + return Err(Error::BadVideoData( + "Not enough bytes to fit the length prefix".to_owned(), + )); + } + let nal_unit_size = match length_prefix_size { - 4 => u32::from_be_bytes( - chunk.data[buffer_offset..(buffer_offset + 4)] + 1 => chunk.data[buffer_offset] as usize, + + 2 => u16::from_be_bytes( + #[allow(clippy::unwrap_used)] // can't fail + chunk.data[buffer_offset..(buffer_offset + 2)] .try_into() .unwrap(), ) as usize, - 2 => u16::from_be_bytes( - chunk.data[buffer_offset..(buffer_offset + 2)] + + 4 => u32::from_be_bytes( + #[allow(clippy::unwrap_used)] // can't fail + chunk.data[buffer_offset..(buffer_offset + 4)] .try_into() .unwrap(), ) as usize, - 1 => chunk.data[buffer_offset] as usize, - _ => panic!("invalid length prefix size"), - }; - //re_log::debug!("nal unit size: {}", nal_unit_size); - if chunk.data.len() < nal_unit_size { - panic!( - "sample size {} is smaller than nal unit size {nal_unit_size}", - chunk.data.len() - ); - } + _ => { + return Err(Error::BadVideoData(format!( + "Bad length prefix size: {length_prefix_size}" + ))); + } + }; - nalu_stream.write_all(NAL_START_CODE)?; let data_start = buffer_offset + length_prefix_size; // Skip the size. let data_end = buffer_offset + nal_unit_size + length_prefix_size; + + if chunk.data.len() < data_end { + return Err(Error::BadVideoData("Not enough bytes to".to_owned())); + } + let data = &chunk.data[data_start..data_end]; + nalu_stream + .write_all(NAL_START_CODE) + .map_err(Error::FailedToWriteToFfmpeg)?; + // Note that we don't have to insert "emulation prevention bytes" since mp4 NALU still use them. // (unlike the NAL start code, the presentation bytes are part of the NAL spec!) - nalu_stream.write_all(data)?; + nalu_stream + .write_all(data) + .map_err(Error::FailedToWriteToFfmpeg)?; buffer_offset = data_end; } From 32c58460831f2baba67fb5bac71af30ca98cd615 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 9 Oct 2024 13:24:58 +0200 Subject: [PATCH 06/33] Improve log output and thread names --- .../src/decode/async_decoder_wrapper.rs | 2 +- crates/store/re_video/src/decode/ffmpeg.rs | 60 ++++++++++++++++--- 2 files changed, 52 insertions(+), 10 deletions(-) diff --git a/crates/store/re_video/src/decode/async_decoder_wrapper.rs b/crates/store/re_video/src/decode/async_decoder_wrapper.rs index d572e1cbc0c6..8bfac6842fb4 100644 --- a/crates/store/re_video/src/decode/async_decoder_wrapper.rs +++ b/crates/store/re_video/src/decode/async_decoder_wrapper.rs @@ -73,7 +73,7 @@ impl AsyncDecoderWrapper { let comms = Comms::default(); let thread = std::thread::Builder::new() - .name("av1_decoder".into()) + .name(format!("decoer thread for {debug_name}")) .spawn({ let comms = comms.clone(); move || { diff --git a/crates/store/re_video/src/decode/ffmpeg.rs b/crates/store/re_video/src/decode/ffmpeg.rs index 2cdea0d4b881..e81e13a8299f 100644 --- a/crates/store/re_video/src/decode/ffmpeg.rs +++ b/crates/store/re_video/src/decode/ffmpeg.rs @@ -31,6 +31,12 @@ pub enum Error { #[error("Bad video data: {0}")] BadVideoData(String), + + #[error("FFMPEG error: {0}")] + Ffmpeg(String), + + #[error("FFMPEG IPC error: {0}")] + FfmpegSidecar(String), } impl From for super::Error { @@ -49,7 +55,6 @@ struct FrameInfo { } /// Decode H.264 video via ffmpeg over CLI - pub struct FfmpegCliH264Decoder { /// Monotonically increasing frame_num: u32, @@ -67,6 +72,8 @@ pub struct FfmpegCliH264Decoder { } impl FfmpegCliH264Decoder { + // TODO: make this robust against `pkill ffmpeg` somehow. + // Maybe `AsyncDecoder` can auto-restart us, or we wrap ourselves in a new struct that restarts us on certain errors? pub fn new(avcc: re_mp4::Avc1Box) -> Result { re_tracing::profile_function!(); @@ -114,23 +121,51 @@ fn read_ffmpeg_output( frame_info_rx: &Receiver, frame_tx: &Sender>, ) { + /// Ignore some common output from ffmpeg: + fn should_ignore_log_msg(msg: &str) -> bool { + let patterns = [ + "Duration: N/A, bitrate: N/A", + "frame= 0 fps=0.0 q=0.0 size= 0kB time=N/A bitrate=N/A speed=N/A", + "Metadata:", + "No accelerated colorspace conversion found from yuv420p to rgb24", + "Stream mapping:", + ]; + + for pattern in patterns { + if msg.contains(pattern) { + return true; + } + } + + false + } + for event in ffmpeg_iterator { #[allow(clippy::match_same_arms)] match event { FfmpegEvent::Log(LogLevel::Info, msg) => { - re_log::debug!("{msg}"); + if !should_ignore_log_msg(&msg) { + re_log::debug!("{msg}"); + } } FfmpegEvent::Log(LogLevel::Warning, msg) => { - if !msg.contains("No accelerated colorspace conversion found from yuv420p to rgb24") - { + if !should_ignore_log_msg(&msg) { re_log::warn_once!("{msg}"); } } FfmpegEvent::Log(LogLevel::Error, msg) => { - // TODO: report errors - re_log::error_once!("{msg}"); + frame_tx.send(Err(Error::Ffmpeg(msg).into())).ok(); + } + + FfmpegEvent::LogEOF => { + // This event proceeds `FfmpegEvent::Done`. + // This happens on `pkill ffmpeg`, for instance. + } + + FfmpegEvent::Error(error) => { + frame_tx.send(Err(Error::FfmpegSidecar(error).into())).ok(); } // Usefuless info in these: @@ -149,7 +184,9 @@ fn read_ffmpeg_output( .. } = stream; - re_log::debug!("ParsedInputStream {stream_type} {format} {pix_fmt} {width}x{height} @ {fps} FPS"); + re_log::debug!( + "Input: {stream_type} {format} {pix_fmt} {width}x{height} @ {fps} FPS" + ); debug_assert_eq!(stream_type.to_ascii_lowercase(), "video"); } @@ -166,7 +203,9 @@ fn read_ffmpeg_output( .. } = stream; - re_log::debug!("ParsedOutputStream {stream_type} {format} {pix_fmt} {width}x{height} @ {fps} FPS"); + re_log::debug!( + "Output: {stream_type} {format} {pix_fmt} {width}x{height} @ {fps} FPS" + ); debug_assert_eq!(stream_type.to_ascii_lowercase(), "video"); } @@ -232,9 +271,11 @@ fn read_ffmpeg_output( } FfmpegEvent::Done => { + // This happens on `pkill ffmpeg`, for instance. re_log::debug!("ffmpeg is Done"); return; } + // TODO: handle all events event => re_log::debug!("Event: {event:?}"), } @@ -356,7 +397,7 @@ fn write_avc_chunk_to_nalu_stream( let mut buffer_offset: usize = 0; let sample_end = chunk.data.len(); while buffer_offset < sample_end && !should_stop.load(Ordering::Relaxed) { - re_tracing::profile_scope!("nalu"); + re_tracing::profile_scope!("write_nalu"); // Each NAL unit in mp4 is prefixed with a length prefix. // In Annex B this doesn't exist. @@ -408,6 +449,7 @@ fn write_avc_chunk_to_nalu_stream( // Note that we don't have to insert "emulation prevention bytes" since mp4 NALU still use them. // (unlike the NAL start code, the presentation bytes are part of the NAL spec!) + re_tracing::profile_scope!("write_bytes", data.len().to_string()); nalu_stream .write_all(data) .map_err(Error::FailedToWriteToFfmpeg)?; From 32b673994803592203f290a9ec93e7fda07fe144 Mon Sep 17 00:00:00 2001 From: Andreas Reich Date: Fri, 25 Oct 2024 17:14:46 +0200 Subject: [PATCH 07/33] reduce ffmpeg decode delay --- crates/store/re_video/src/decode/ffmpeg.rs | 23 +++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/crates/store/re_video/src/decode/ffmpeg.rs b/crates/store/re_video/src/decode/ffmpeg.rs index e81e13a8299f..ceca6a6f26ae 100644 --- a/crates/store/re_video/src/decode/ffmpeg.rs +++ b/crates/store/re_video/src/decode/ffmpeg.rs @@ -82,10 +82,31 @@ impl FfmpegCliH264Decoder { FfmpegCommand::new() .hide_banner() + // "Reduce the latency introduced by buffering during initial input streams analysis." + //.arg("-fflags nobuffer") + // + // .. instead use these more aggressive options found here + // https://stackoverflow.com/a/49273163 + .args([ + "-probesize", + "32", // 32 bytes is the minimum probe size. + "-analyzeduration", + "0", + ]) // Keep in mind that all arguments that are about the input, need to go before! .format("h264") // High risk here: What's is available? .input("-") // stdin is our input! - .rawvideo() // Output rgb24 on stdout. (TODO(emilk) for later: any format we can read directly on re_renderer would be better!) + // TODO: Do we have to do this instead? + // Set constant frame rate. + // We can't properly handle variable frame rate since `rawvideo` output won't report timestamps. + // To work around this we'd first need to establish a mapping of frame numbers to timestamps. + // This isn't entirely trivial since individual chunks may have arbitrary composition & decode timestamps. + //.fps_mode(1) + // + // TODO(andreas): at least do `rgba`. But we could also do `yuv420p` for instance if that's what the video is specifying + // (should be faster overall at no quality loss if the video is in this format). + // Check `ffmpeg -pix_fmts` for full list. + .rawvideo() // Output rgb24 on stdout. .spawn() .map_err(Error::FailedToStartFfmpeg)? }; From 19fe5d0ce3149a9cddb711f7b4f19f3dd6472f22 Mon Sep 17 00:00:00 2001 From: Andreas Reich Date: Fri, 25 Oct 2024 17:14:58 +0200 Subject: [PATCH 08/33] fix re_video example build --- crates/store/re_video/examples/frames.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/store/re_video/examples/frames.rs b/crates/store/re_video/examples/frames.rs index 4837a09cf554..18b9c55bd59a 100644 --- a/crates/store/re_video/examples/frames.rs +++ b/crates/store/re_video/examples/frames.rs @@ -93,6 +93,9 @@ fn main() { re_video::PixelFormat::Rgba8Unorm => { write_ppm_rgba32(&mut file, frame.width, frame.height, &frame.data); } + re_video::PixelFormat::Yuv { .. } => { + re_log::error_once!("YUV frame writing is not not supported"); + } } } } From e4d37402138cbefbf9c5299725fdbb24418a5b60 Mon Sep 17 00:00:00 2001 From: Andreas Reich Date: Fri, 25 Oct 2024 17:31:39 +0200 Subject: [PATCH 09/33] add decode timestamp, nal header parsing, various comments, todo notes --- crates/store/re_video/src/decode/ffmpeg.rs | 97 ++++++++++++++++--- crates/store/re_video/src/decode/mod.rs | 8 ++ crates/store/re_video/src/demux/mod.rs | 1 + crates/viewer/re_renderer/src/video/player.rs | 3 +- 4 files changed, 97 insertions(+), 12 deletions(-) diff --git a/crates/store/re_video/src/decode/ffmpeg.rs b/crates/store/re_video/src/decode/ffmpeg.rs index ceca6a6f26ae..e68cb4fd207e 100644 --- a/crates/store/re_video/src/decode/ffmpeg.rs +++ b/crates/store/re_video/src/decode/ffmpeg.rs @@ -1,6 +1,6 @@ //! Send video data to `ffmpeg` over CLI to decode it. -use std::sync::atomic::Ordering; +use std::{io::Write, sync::atomic::Ordering}; use crossbeam::channel::{Receiver, Sender, TryRecvError}; use ffmpeg_sidecar::{ @@ -23,7 +23,7 @@ pub enum Error { #[error("Failed to get iterator: {0}")] NoIterator(String), - #[error("There's a bug in Rerun")] + #[error("No frame info received, this is a likely a bug in Rerun")] NoFrameInfo, #[error("Failed to write data to ffmpeg: {0}")] @@ -236,9 +236,6 @@ fn read_ffmpeg_output( } FfmpegEvent::OutputFrame(frame) => { - // NOTE: `frame.timestamp` is monotonically increasing, - // and is not the actual timestamp in the stream. - let frame_info: FrameInfo = match frame_info_rx.try_recv() { Ok(frame_info) => frame_info, @@ -262,7 +259,8 @@ fn read_ffmpeg_output( width, height, data, - .. + output_index: _, // This is the stream index. for all we do it's always 0. + timestamp: _, // This is a timestamp made up by ffmpeg_sidecar based on limited information it has. } = frame; debug_assert_eq!( @@ -313,7 +311,7 @@ impl SyncDecoder for FfmpegCliH264Decoder { re_tracing::profile_function!(); // First read any outstanding messages (e.g. error reports), - // so they get orderer correctly. + // so they get ordered correctly. while let Ok(frame_result) = self.frame_rx.try_recv() { if should_stop.load(Ordering::Relaxed) { return; @@ -324,6 +322,8 @@ impl SyncDecoder for FfmpegCliH264Decoder { // We send the information about this chunk first. // This assumes each sample/chunk will result in exactly one frame. // If this assumption is not held, we will get weird errors, like videos playing to slowly. + // TODO: this also assumes that the frame comes back in this order. + // Which is definitely wrong, as we know that frames are not necessarily in composition time stamp order! let frame_info = FrameInfo { frame_num: self.frame_num, timestamp: chunk.composition_timestamp, @@ -348,17 +348,16 @@ impl SyncDecoder for FfmpegCliH264Decoder { ) { on_output(Err(err.into())); } + + self.ffmpeg_stdin.flush().ok(); } - // Read results and/or errors: while let Ok(frame_result) = self.frame_rx.try_recv() { if should_stop.load(Ordering::Relaxed) { return; } on_output(frame_result); } - - // TODO: block until we have processed the frame! } fn reset(&mut self) { @@ -413,7 +412,7 @@ fn write_avc_chunk_to_nalu_stream( state.previous_frame_was_idr = false; } - // A single cjhunk may consist of multiple NAL units, each of which need our special treatment. + // A single chunk may consist of multiple NAL units, each of which need our special treatment. // (most of the time it's 1:1, but there might be extra NAL units for info, especially at the start). let mut buffer_offset: usize = 0; let sample_end = chunk.data.len(); @@ -461,6 +460,13 @@ fn write_avc_chunk_to_nalu_stream( return Err(Error::BadVideoData("Not enough bytes to".to_owned())); } + let nal_header = NalHeader(chunk.data[data_start]); + re_log::trace!( + "nal_header: {:?}, {}", + nal_header.unit_type(), + nal_header.ref_idc() + ); + let data = &chunk.data[data_start..data_end]; nalu_stream @@ -480,3 +486,72 @@ fn write_avc_chunk_to_nalu_stream( Ok(()) } + +/// Possible values for `nal_unit_type` field in `nal_unit`. +/// +/// Encodes to 5 bits. +/// Via: https://docs.rs/less-avc/0.1.5/src/less_avc/nal_unit.rs.html#232 +#[derive(PartialEq, Eq)] +#[non_exhaustive] +#[repr(u8)] +#[derive(Copy, Clone, Debug)] +pub enum NalUnitType { + /// Unspecified + Unspecified = 0, + + /// Coded slice of a non-IDR picture + CodedSliceOfANonIDRPicture = 1, + + /// Coded slice data partition A + CodedSliceDataPartitionA = 2, + + /// Coded slice data partition B + CodedSliceDataPartitionB = 3, + + /// Coded slice data partition C + CodedSliceDataPartitionC = 4, + + /// Coded slice of an IDR picture + CodedSliceOfAnIDRPicture = 5, + + /// Supplemental enhancement information (SEI) + SupplementalEnhancementInformation = 6, + + /// Sequence parameter set + SequenceParameterSet = 7, + + /// Picture parameter set + PictureParameterSet = 8, + + /// Header type not listed here. + Other, +} + +/// Header of the "Network Abstraction Layer" unit that is used by H.264/AVC & H.265/HEVC. +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +struct NalHeader(pub u8); + +impl NalHeader { + pub fn unit_type(self) -> NalUnitType { + match self.0 & 0b111 { + 0 => NalUnitType::Unspecified, + 1 => NalUnitType::CodedSliceOfANonIDRPicture, + 2 => NalUnitType::CodedSliceDataPartitionA, + 3 => NalUnitType::CodedSliceDataPartitionB, + 4 => NalUnitType::CodedSliceDataPartitionC, + 5 => NalUnitType::CodedSliceOfAnIDRPicture, + 6 => NalUnitType::SupplementalEnhancementInformation, + 7 => NalUnitType::SequenceParameterSet, + 8 => NalUnitType::PictureParameterSet, + _ => NalUnitType::Other, + } + } + + /// Ref idc is a value from 0-3 that tells us how "important" the frame/sample is. + /// + /// For details see: + /// + fn ref_idc(self) -> u8 { + (self.0 >> 5) & 0b11 + } +} diff --git a/crates/store/re_video/src/decode/mod.rs b/crates/store/re_video/src/decode/mod.rs index 3877bdb08018..bcc49ffddffb 100644 --- a/crates/store/re_video/src/decode/mod.rs +++ b/crates/store/re_video/src/decode/mod.rs @@ -208,8 +208,16 @@ pub struct Chunk { pub data: Vec, + /// Decode timestamp of this sample. + /// Chunks are expected to be submitted in the order of decode timestamp. + /// + /// `decode_timestamp <= composition_timestamp` + pub decode_timestamp: Time, + /// Presentation/composition timestamp for the sample in this chunk. /// *not* decode timestamp. + /// + /// `decode_timestamp <= composition_timestamp` pub composition_timestamp: Time, pub duration: Time, diff --git a/crates/store/re_video/src/demux/mod.rs b/crates/store/re_video/src/demux/mod.rs index 4a642113e39b..672d024d1b05 100644 --- a/crates/store/re_video/src/demux/mod.rs +++ b/crates/store/re_video/src/demux/mod.rs @@ -310,6 +310,7 @@ impl Sample { .to_vec(); Some(Chunk { data, + decode_timestamp: self.decode_timestamp, composition_timestamp: self.composition_timestamp, duration: self.duration, is_sync: self.is_sync, diff --git a/crates/viewer/re_renderer/src/video/player.rs b/crates/viewer/re_renderer/src/video/player.rs index da831dbb3545..a6c17d4d7904 100644 --- a/crates/viewer/re_renderer/src/video/player.rs +++ b/crates/viewer/re_renderer/src/video/player.rs @@ -188,8 +188,9 @@ impl VideoPlayer { // = determines the decoding order of samples // // Note: `decode <= composition` for any given sample. - // For some codecs, the two timestamps are the same. + // For some codecs & videos, the two timestamps are the same. // We must enqueue samples in decode order, but show them in composition order. + // In the presence of b-frames this order may be different! // 1. Find the latest sample where `decode_timestamp <= presentation_timestamp`. // Because `decode <= composition`, we never have to look further ahead in the From 6aa86939210726fd5a8ba54bb9c066de5c5ef2c1 Mon Sep 17 00:00:00 2001 From: Andreas Reich Date: Fri, 25 Oct 2024 18:07:04 +0200 Subject: [PATCH 10/33] better timestamp syncing strategy? --- crates/store/re_video/src/decode/av1.rs | 4 +- crates/store/re_video/src/decode/ffmpeg.rs | 73 +++++++++++-------- crates/store/re_video/src/decode/mod.rs | 10 +-- crates/store/re_video/src/demux/mod.rs | 11 +-- crates/store/re_video/src/demux/mp4.rs | 4 +- crates/viewer/re_renderer/src/video/player.rs | 4 +- 6 files changed, 58 insertions(+), 48 deletions(-) diff --git a/crates/store/re_video/src/decode/av1.rs b/crates/store/re_video/src/decode/av1.rs index 61e821622388..a1c8528d5c5a 100644 --- a/crates/store/re_video/src/decode/av1.rs +++ b/crates/store/re_video/src/decode/av1.rs @@ -76,14 +76,14 @@ impl SyncDav1dDecoder { re_tracing::profile_function!(); econtext::econtext_function_data!(format!( "chunk timestamp: {:?}", - chunk.composition_timestamp + chunk.presentation_timestamp )); re_tracing::profile_scope!("send_data"); match self.decoder.send_data( chunk.data, None, - Some(chunk.composition_timestamp.0), + Some(chunk.presentation_timestamp.0), Some(chunk.duration.0), ) { Ok(()) => {} diff --git a/crates/store/re_video/src/decode/ffmpeg.rs b/crates/store/re_video/src/decode/ffmpeg.rs index e68cb4fd207e..863b01350fcb 100644 --- a/crates/store/re_video/src/decode/ffmpeg.rs +++ b/crates/store/re_video/src/decode/ffmpeg.rs @@ -1,8 +1,8 @@ //! Send video data to `ffmpeg` over CLI to decode it. -use std::{io::Write, sync::atomic::Ordering}; +use std::{collections::BTreeMap, io::Write, sync::atomic::Ordering}; -use crossbeam::channel::{Receiver, Sender, TryRecvError}; +use crossbeam::channel::{Receiver, Sender}; use ffmpeg_sidecar::{ command::FfmpegCommand, event::{FfmpegEvent, LogLevel}, @@ -47,10 +47,8 @@ impl From for super::Error { /// ffmpeg does not tell us the timestamp/duration of a given frame, so we need to remember it. struct FrameInfo { - /// Monotonic index, from start - frame_num: u32, - - timestamp: Time, + decode_timestamp: Time, + presentation_timestamp: Time, duration: Time, } @@ -161,6 +159,8 @@ fn read_ffmpeg_output( false } + let mut pending_frames = BTreeMap::new(); + for event in ffmpeg_iterator { #[allow(clippy::match_same_arms)] match event { @@ -236,25 +236,37 @@ fn read_ffmpeg_output( } FfmpegEvent::OutputFrame(frame) => { - let frame_info: FrameInfo = match frame_info_rx.try_recv() { - Ok(frame_info) => frame_info, - - Err(TryRecvError::Disconnected) => { - re_log::debug!("Receiver disconnected"); - return; - } - - Err(TryRecvError::Empty) => { - // This shouldn't happen - if frame_tx.send(Err(Error::NoFrameInfo.into())).is_err() { - re_log::warn!("Got no frame-info, and failed to send error"); + let frame_info = match pending_frames.pop_first() { + Some((_, frame_info)) => frame_info, + None => { + // Retrieve frame infos until decode timestamp is no longer behind composition timestamp. + // This is important because frame infos come not in in composition order, + // but ffmpeg will report frames in composition order! + loop { + let Ok(frame_info) = frame_info_rx.try_recv() else { + re_log::debug!("Receiver disconnected"); + return; + }; + + // Example how how presentation timestamps and decode timestamps can play out: + // PTS: 1 4 2 3 + // DTS: 1 2 3 4 + // Stream: I P B B + // + // Essentially we need to wait until the dts has "caught up" with the pts! + let highest_pts = pending_frames + .last_key_value() + .map_or(frame_info.presentation_timestamp, |(pts, _)| *pts); + if frame_info.decode_timestamp <= highest_pts { + break frame_info; + } + pending_frames.insert(frame_info.presentation_timestamp, frame_info); } - return; } }; let ffmpeg_sidecar::event::OutputVideoFrame { - frame_num, + frame_num: _, // This is made up by ffmpeg sidecar. pix_fmt, width, height, @@ -263,12 +275,11 @@ fn read_ffmpeg_output( timestamp: _, // This is a timestamp made up by ffmpeg_sidecar based on limited information it has. } = frame; - debug_assert_eq!( - frame_info.frame_num, frame_num, - "We are out-of-sync with ffmpeg" - ); // TODO: fix somehow - - re_log::trace!("Received frame {frame_num}: fmt {pix_fmt:?} size {width}x{height}"); + re_log::trace!( + "Received frame: dts {:?} cts {:?} fmt {pix_fmt:?} size {width}x{height}", + frame_info.decode_timestamp, + frame_info.presentation_timestamp + ); debug_assert_eq!(pix_fmt, "rgb24"); debug_assert_eq!(width as usize * height as usize * 3, data.len()); @@ -279,7 +290,7 @@ fn read_ffmpeg_output( height, data, format: crate::PixelFormat::Rgb8Unorm, - presentation_timestamp: frame_info.timestamp, + presentation_timestamp: frame_info.presentation_timestamp, duration: frame_info.duration, })) .is_err() @@ -322,11 +333,9 @@ impl SyncDecoder for FfmpegCliH264Decoder { // We send the information about this chunk first. // This assumes each sample/chunk will result in exactly one frame. // If this assumption is not held, we will get weird errors, like videos playing to slowly. - // TODO: this also assumes that the frame comes back in this order. - // Which is definitely wrong, as we know that frames are not necessarily in composition time stamp order! let frame_info = FrameInfo { - frame_num: self.frame_num, - timestamp: chunk.composition_timestamp, + presentation_timestamp: chunk.presentation_timestamp, + decode_timestamp: chunk.decode_timestamp, duration: chunk.duration, }; @@ -490,7 +499,7 @@ fn write_avc_chunk_to_nalu_stream( /// Possible values for `nal_unit_type` field in `nal_unit`. /// /// Encodes to 5 bits. -/// Via: https://docs.rs/less-avc/0.1.5/src/less_avc/nal_unit.rs.html#232 +/// Via: #[derive(PartialEq, Eq)] #[non_exhaustive] #[repr(u8)] diff --git a/crates/store/re_video/src/decode/mod.rs b/crates/store/re_video/src/decode/mod.rs index bcc49ffddffb..3483aaaa0988 100644 --- a/crates/store/re_video/src/decode/mod.rs +++ b/crates/store/re_video/src/decode/mod.rs @@ -211,14 +211,14 @@ pub struct Chunk { /// Decode timestamp of this sample. /// Chunks are expected to be submitted in the order of decode timestamp. /// - /// `decode_timestamp <= composition_timestamp` + /// `decode_timestamp <= presentation_timestamp` pub decode_timestamp: Time, - /// Presentation/composition timestamp for the sample in this chunk. - /// *not* decode timestamp. + /// Presentation timestamp for the sample in this chunk. + /// Often synonymous with `composition_timestamp`. /// - /// `decode_timestamp <= composition_timestamp` - pub composition_timestamp: Time, + /// `decode_timestamp <= presentation_timestamp` + pub presentation_timestamp: Time, pub duration: Time, } diff --git a/crates/store/re_video/src/demux/mod.rs b/crates/store/re_video/src/demux/mod.rs index 672d024d1b05..1f4fd189991b 100644 --- a/crates/store/re_video/src/demux/mod.rs +++ b/crates/store/re_video/src/demux/mod.rs @@ -238,7 +238,7 @@ impl VideoData { self.gops.iter().flat_map(|seg| { self.samples[seg.range()] .iter() - .map(|sample| sample.composition_timestamp.into_nanos(self.timescale)) + .map(|sample| sample.presentation_timestamp.into_nanos(self.timescale)) .sorted() }) } @@ -276,15 +276,16 @@ pub struct Sample { /// /// Samples should be decoded in this order. /// - /// `decode_timestamp <= composition_timestamp` + /// `decode_timestamp <= presentation_timestamp` pub decode_timestamp: Time, /// Time at which this sample appears in the frame stream, in time units. + /// Often synonymous with `presentation_timestamp`. /// /// The frame should be shown at this time. /// - /// `decode_timestamp <= composition_timestamp` - pub composition_timestamp: Time, + /// `decode_timestamp <= presentation_timestamp` + pub presentation_timestamp: Time, /// Duration of the sample, in time units. pub duration: Time, @@ -311,7 +312,7 @@ impl Sample { Some(Chunk { data, decode_timestamp: self.decode_timestamp, - composition_timestamp: self.composition_timestamp, + presentation_timestamp: self.presentation_timestamp, duration: self.duration, is_sync: self.is_sync, }) diff --git a/crates/store/re_video/src/demux/mp4.rs b/crates/store/re_video/src/demux/mp4.rs index 3bff80f0a63e..a886dd7429e2 100644 --- a/crates/store/re_video/src/demux/mp4.rs +++ b/crates/store/re_video/src/demux/mp4.rs @@ -54,7 +54,7 @@ impl VideoData { } let decode_timestamp = Time::new(sample.decode_timestamp as i64); - let composition_timestamp = Time::new(sample.composition_timestamp as i64); + let presentation_timestamp = Time::new(sample.composition_timestamp as i64); let duration = Time::new(sample.duration as i64); let byte_offset = sample.offset as u32; @@ -63,7 +63,7 @@ impl VideoData { samples.push(Sample { is_sync: sample.is_sync, decode_timestamp, - composition_timestamp, + presentation_timestamp, duration, byte_offset, byte_length, diff --git a/crates/viewer/re_renderer/src/video/player.rs b/crates/viewer/re_renderer/src/video/player.rs index a6c17d4d7904..0dc30812cf4e 100644 --- a/crates/viewer/re_renderer/src/video/player.rs +++ b/crates/viewer/re_renderer/src/video/player.rs @@ -204,11 +204,11 @@ impl VideoPlayer { }; // 2. Search _backwards_, starting at `decode_sample_idx`, looking for - // the first sample where `sample.composition_timestamp <= presentation_timestamp`. + // the first sample where `sample.presentation_timestamp <= presentation_timestamp`. // This is the sample which when decoded will be presented at the timestamp the user requested. let Some(requested_sample_idx) = self.data.samples[..=decode_sample_idx] .iter() - .rposition(|sample| sample.composition_timestamp <= presentation_timestamp) + .rposition(|sample| sample.presentation_timestamp <= presentation_timestamp) else { return Err(VideoPlayerError::EmptyVideo); }; From 84d0a55e9176c9732fd3fde8e7fe89e705081c81 Mon Sep 17 00:00:00 2001 From: Andreas Reich Date: Mon, 28 Oct 2024 11:11:18 +0100 Subject: [PATCH 11/33] make ffmpeg an async decoder --- crates/store/re_video/src/decode/ffmpeg.rs | 96 +++++++++------------- crates/store/re_video/src/decode/mod.rs | 14 ++-- 2 files changed, 47 insertions(+), 63 deletions(-) diff --git a/crates/store/re_video/src/decode/ffmpeg.rs b/crates/store/re_video/src/decode/ffmpeg.rs index 863b01350fcb..9c6681941aab 100644 --- a/crates/store/re_video/src/decode/ffmpeg.rs +++ b/crates/store/re_video/src/decode/ffmpeg.rs @@ -1,6 +1,6 @@ //! Send video data to `ffmpeg` over CLI to decode it. -use std::{collections::BTreeMap, io::Write, sync::atomic::Ordering}; +use std::{collections::BTreeMap, io::Write, sync::Arc}; use crossbeam::channel::{Receiver, Sender}; use ffmpeg_sidecar::{ @@ -10,7 +10,7 @@ use ffmpeg_sidecar::{ use crate::Time; -use super::{async_decoder_wrapper::SyncDecoder, Frame, Result}; +use super::{AsyncDecoder, Frame, OutputCallback}; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -63,16 +63,18 @@ pub struct FfmpegCliH264Decoder { /// For sending frame timestamps to the decoder thread frame_info_tx: Sender, - /// How we receive new frames back from ffmpeg - frame_rx: Receiver>, - avcc: re_mp4::Avc1Box, + + on_output: Arc, } impl FfmpegCliH264Decoder { // TODO: make this robust against `pkill ffmpeg` somehow. // Maybe `AsyncDecoder` can auto-restart us, or we wrap ourselves in a new struct that restarts us on certain errors? - pub fn new(avcc: re_mp4::Avc1Box) -> Result { + pub fn new( + avcc: re_mp4::Avc1Box, + on_output: impl Fn(super::Result) + Send + Sync + 'static, + ) -> Result { re_tracing::profile_function!(); let mut ffmpeg = { @@ -115,13 +117,17 @@ impl FfmpegCliH264Decoder { .map_err(|err| Error::NoIterator(err.to_string()))?; let (frame_info_tx, frame_info_rx) = crossbeam::channel::unbounded(); - let (frame_tx, frame_rx) = crossbeam::channel::unbounded(); + + let on_output = Arc::new(on_output); std::thread::Builder::new() .name("ffmpeg-reader".to_owned()) - .spawn(move || { - read_ffmpeg_output(ffmpeg_iterator, &frame_info_rx, &frame_tx); - re_log::debug!("Shutting down ffmpeg"); + .spawn({ + let on_output = on_output.clone(); + move || { + read_ffmpeg_output(ffmpeg_iterator, &frame_info_rx, on_output.as_ref()); + re_log::debug!("Shutting down ffmpeg"); + } }) .expect("Failed to spawn ffmpeg thread"); @@ -129,8 +135,8 @@ impl FfmpegCliH264Decoder { frame_num: 0, ffmpeg_stdin, frame_info_tx, - frame_rx, avcc, + on_output, }) } } @@ -138,7 +144,7 @@ impl FfmpegCliH264Decoder { fn read_ffmpeg_output( ffmpeg_iterator: ffmpeg_sidecar::iter::FfmpegIterator, frame_info_rx: &Receiver, - frame_tx: &Sender>, + on_output: &OutputCallback, ) { /// Ignore some common output from ffmpeg: fn should_ignore_log_msg(msg: &str) -> bool { @@ -177,7 +183,7 @@ fn read_ffmpeg_output( } FfmpegEvent::Log(LogLevel::Error, msg) => { - frame_tx.send(Err(Error::Ffmpeg(msg).into())).ok(); + on_output(Err(Error::Ffmpeg(msg).into())); } FfmpegEvent::LogEOF => { @@ -186,7 +192,7 @@ fn read_ffmpeg_output( } FfmpegEvent::Error(error) => { - frame_tx.send(Err(Error::FfmpegSidecar(error).into())).ok(); + on_output(Err(Error::FfmpegSidecar(error).into())); } // Usefuless info in these: @@ -284,20 +290,14 @@ fn read_ffmpeg_output( debug_assert_eq!(pix_fmt, "rgb24"); debug_assert_eq!(width as usize * height as usize * 3, data.len()); - if frame_tx - .send(Ok(super::Frame { - width, - height, - data, - format: crate::PixelFormat::Rgb8Unorm, - presentation_timestamp: frame_info.presentation_timestamp, - duration: frame_info.duration, - })) - .is_err() - { - re_log::debug!("Receiver disconnected"); - return; - } + on_output(Ok(super::Frame { + width, + height, + data, + format: crate::PixelFormat::Rgb8Unorm, + presentation_timestamp: frame_info.presentation_timestamp, + duration: frame_info.duration, + })); } FfmpegEvent::Done => { @@ -312,24 +312,10 @@ fn read_ffmpeg_output( } } -impl SyncDecoder for FfmpegCliH264Decoder { - fn submit_chunk( - &mut self, - should_stop: &std::sync::atomic::AtomicBool, - chunk: super::Chunk, - on_output: &super::OutputCallback, - ) { +impl AsyncDecoder for FfmpegCliH264Decoder { + fn submit_chunk(&mut self, chunk: super::Chunk) -> super::Result<()> { re_tracing::profile_function!(); - // First read any outstanding messages (e.g. error reports), - // so they get ordered correctly. - while let Ok(frame_result) = self.frame_rx.try_recv() { - if should_stop.load(Ordering::Relaxed) { - return; - } - on_output(frame_result); - } - // We send the information about this chunk first. // This assumes each sample/chunk will result in exactly one frame. // If this assumption is not held, we will get weird errors, like videos playing to slowly. @@ -349,28 +335,29 @@ impl SyncDecoder for FfmpegCliH264Decoder { // Write chunk to ffmpeg: let mut state = NaluStreamState::default(); // TODO: remove state? if let Err(err) = write_avc_chunk_to_nalu_stream( - should_stop, &self.avcc, &mut self.ffmpeg_stdin, &chunk, &mut state, ) { - on_output(Err(err.into())); + (self.on_output)(Err(err.into())); } self.ffmpeg_stdin.flush().ok(); } - while let Ok(frame_result) = self.frame_rx.try_recv() { - if should_stop.load(Ordering::Relaxed) { - return; - } - on_output(frame_result); - } + Ok(()) } - fn reset(&mut self) { + fn reset(&mut self) -> super::Result<()> { // TODO: restart ffmpeg process + Ok(()) + } +} + +impl Drop for FfmpegCliH264Decoder { + fn drop(&mut self) { + // TODO: stop ffmpeg thread } } @@ -387,7 +374,6 @@ struct NaluStreamState { } fn write_avc_chunk_to_nalu_stream( - should_stop: &std::sync::atomic::AtomicBool, avcc: &re_mp4::Avc1Box, nalu_stream: &mut dyn std::io::Write, chunk: &super::Chunk, @@ -425,7 +411,7 @@ fn write_avc_chunk_to_nalu_stream( // (most of the time it's 1:1, but there might be extra NAL units for info, especially at the start). let mut buffer_offset: usize = 0; let sample_end = chunk.data.len(); - while buffer_offset < sample_end && !should_stop.load(Ordering::Relaxed) { + while buffer_offset < sample_end { re_tracing::profile_scope!("write_nalu"); // Each NAL unit in mp4 is prefixed with a length prefix. diff --git a/crates/store/re_video/src/decode/mod.rs b/crates/store/re_video/src/decode/mod.rs index 3483aaaa0988..e79bd1736d72 100644 --- a/crates/store/re_video/src/decode/mod.rs +++ b/crates/store/re_video/src/decode/mod.rs @@ -77,14 +77,12 @@ //! supporting HDR content at which point more properties will be important! //! -#[cfg(any(with_dav1d, with_ffmpeg))] +#[cfg(with_dav1d)] mod async_decoder_wrapper; #[cfg(with_dav1d)] mod av1; - #[cfg(with_ffmpeg)] -pub mod ffmpeg; - +mod ffmpeg; #[cfg(target_arch = "wasm32")] mod webcodecs; @@ -187,12 +185,12 @@ pub fn new_decoder( #[cfg(with_ffmpeg)] re_mp4::StsdBoxContent::Avc1(avc1_box) => { + // TODO: check if we have ffmpeg ONCE, and remember re_log::trace!("Decoding H.264…"); - return Ok(Box::new(async_decoder_wrapper::AsyncDecoderWrapper::new( - debug_name.to_owned(), - Box::new(ffmpeg::FfmpegCliH264Decoder::new(avc1_box.clone())?), + Ok(Box::new(ffmpeg::FfmpegCliH264Decoder::new( + avc1_box.clone(), on_output, - ))); + )?)) } _ => Err(Error::UnsupportedCodec(video.human_readable_codec_string())), From 4897f1964435f7fd7de56a6bb731fc007a570c55 Mon Sep 17 00:00:00 2001 From: Andreas Reich Date: Mon, 28 Oct 2024 12:54:02 +0100 Subject: [PATCH 12/33] set fps mode to passthrough --- crates/store/re_video/src/decode/ffmpeg.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/crates/store/re_video/src/decode/ffmpeg.rs b/crates/store/re_video/src/decode/ffmpeg.rs index 9c6681941aab..2d770c9a8aac 100644 --- a/crates/store/re_video/src/decode/ffmpeg.rs +++ b/crates/store/re_video/src/decode/ffmpeg.rs @@ -94,15 +94,12 @@ impl FfmpegCliH264Decoder { "0", ]) // Keep in mind that all arguments that are about the input, need to go before! - .format("h264") // High risk here: What's is available? + .format("h264") // TODO(andreas): should we check ahead of time whether this is available? + //.fps_mode("0") .input("-") // stdin is our input! - // TODO: Do we have to do this instead? - // Set constant frame rate. - // We can't properly handle variable frame rate since `rawvideo` output won't report timestamps. - // To work around this we'd first need to establish a mapping of frame numbers to timestamps. - // This isn't entirely trivial since individual chunks may have arbitrary composition & decode timestamps. - //.fps_mode(1) - // + // h264 bitstreams doesn't have timestamp information. Whatever ffmpeg tries to make up about timestamp is wrong. + // If we don't tell it to just pass the frames through, variable framerate (VFR) video will just not play at all. + .fps_mode("passthrough") // TODO(andreas): at least do `rgba`. But we could also do `yuv420p` for instance if that's what the video is specifying // (should be faster overall at no quality loss if the video is in this format). // Check `ffmpeg -pix_fmts` for full list. From f8c70fcfb0e74c6ea3ba746cc30f59f9829c288f Mon Sep 17 00:00:00 2001 From: Andreas Reich Date: Mon, 28 Oct 2024 14:40:33 +0100 Subject: [PATCH 13/33] comments & debug output --- crates/store/re_video/src/decode/ffmpeg.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/crates/store/re_video/src/decode/ffmpeg.rs b/crates/store/re_video/src/decode/ffmpeg.rs index 2d770c9a8aac..d1445df324ea 100644 --- a/crates/store/re_video/src/decode/ffmpeg.rs +++ b/crates/store/re_video/src/decode/ffmpeg.rs @@ -54,9 +54,6 @@ struct FrameInfo { /// Decode H.264 video via ffmpeg over CLI pub struct FfmpegCliH264Decoder { - /// Monotonically increasing - frame_num: u32, - /// How we send more data to the ffmpeg process ffmpeg_stdin: std::process::ChildStdin, @@ -97,7 +94,7 @@ impl FfmpegCliH264Decoder { .format("h264") // TODO(andreas): should we check ahead of time whether this is available? //.fps_mode("0") .input("-") // stdin is our input! - // h264 bitstreams doesn't have timestamp information. Whatever ffmpeg tries to make up about timestamp is wrong. + // h264 bitstreams doesn't have timestamp information. Whatever ffmpeg tries to make up about timing & framerates is wrong! // If we don't tell it to just pass the frames through, variable framerate (VFR) video will just not play at all. .fps_mode("passthrough") // TODO(andreas): at least do `rgba`. But we could also do `yuv420p` for instance if that's what the video is specifying @@ -129,7 +126,6 @@ impl FfmpegCliH264Decoder { .expect("Failed to spawn ffmpeg thread"); Ok(Self { - frame_num: 0, ffmpeg_stdin, frame_info_tx, avcc, @@ -193,8 +189,13 @@ fn read_ffmpeg_output( } // Usefuless info in these: - FfmpegEvent::ParsedInput(_) => {} - FfmpegEvent::ParsedOutput(_) => {} + FfmpegEvent::ParsedInput(input) => { + re_log::debug!("{input:?}"); + } + FfmpegEvent::ParsedOutput(output) => { + re_log::debug!("{output:?}"); + } + FfmpegEvent::ParsedStreamMapping(_) => {} FfmpegEvent::ParsedInputStream(stream) => { @@ -236,6 +237,7 @@ fn read_ffmpeg_output( FfmpegEvent::Progress(_) => { // We can get out frame number etc here to know how far behind we are. + // By default this triggers every 0.5s. } FfmpegEvent::OutputFrame(frame) => { @@ -322,9 +324,7 @@ impl AsyncDecoder for FfmpegCliH264Decoder { duration: chunk.duration, }; - // NOTE: a 60 FPS video can go for two years before wrapping a u32. - self.frame_num = self.frame_num.wrapping_add(1); - + // TODO: schedule this. if self.frame_info_tx.send(frame_info).is_err() { // The other thread must be down, e.g. because `ffmpeg` crashed. // It should already have reported that as an error - no need to repeat it here. From edceed6af69d59e571ed739d667a097cef360fc2 Mon Sep 17 00:00:00 2001 From: Andreas Reich Date: Mon, 28 Oct 2024 15:02:25 +0100 Subject: [PATCH 14/33] crude reset implementation --- crates/store/re_video/src/decode/ffmpeg.rs | 107 +++++++++++---------- 1 file changed, 58 insertions(+), 49 deletions(-) diff --git a/crates/store/re_video/src/decode/ffmpeg.rs b/crates/store/re_video/src/decode/ffmpeg.rs index d1445df324ea..edc7bfcffd41 100644 --- a/crates/store/re_video/src/decode/ffmpeg.rs +++ b/crates/store/re_video/src/decode/ffmpeg.rs @@ -59,6 +59,7 @@ pub struct FfmpegCliH264Decoder { /// For sending frame timestamps to the decoder thread frame_info_tx: Sender, + frame_info_rx: Receiver, avcc: re_mp4::Avc1Box, @@ -74,66 +75,70 @@ impl FfmpegCliH264Decoder { ) -> Result { re_tracing::profile_function!(); - let mut ffmpeg = { - re_tracing::profile_scope!("spawn-ffmpeg"); - - FfmpegCommand::new() - .hide_banner() - // "Reduce the latency introduced by buffering during initial input streams analysis." - //.arg("-fflags nobuffer") - // - // .. instead use these more aggressive options found here - // https://stackoverflow.com/a/49273163 - .args([ - "-probesize", - "32", // 32 bytes is the minimum probe size. - "-analyzeduration", - "0", - ]) - // Keep in mind that all arguments that are about the input, need to go before! - .format("h264") // TODO(andreas): should we check ahead of time whether this is available? - //.fps_mode("0") - .input("-") // stdin is our input! - // h264 bitstreams doesn't have timestamp information. Whatever ffmpeg tries to make up about timing & framerates is wrong! - // If we don't tell it to just pass the frames through, variable framerate (VFR) video will just not play at all. - .fps_mode("passthrough") - // TODO(andreas): at least do `rgba`. But we could also do `yuv420p` for instance if that's what the video is specifying - // (should be faster overall at no quality loss if the video is in this format). - // Check `ffmpeg -pix_fmts` for full list. - .rawvideo() // Output rgb24 on stdout. - .spawn() - .map_err(Error::FailedToStartFfmpeg)? - }; - - let ffmpeg_stdin = ffmpeg.take_stdin().ok_or(Error::NoStdin)?; - let ffmpeg_iterator = ffmpeg - .iter() - .map_err(|err| Error::NoIterator(err.to_string()))?; - - let (frame_info_tx, frame_info_rx) = crossbeam::channel::unbounded(); - let on_output = Arc::new(on_output); + let (frame_info_tx, frame_info_rx) = crossbeam::channel::unbounded(); - std::thread::Builder::new() - .name("ffmpeg-reader".to_owned()) - .spawn({ - let on_output = on_output.clone(); - move || { - read_ffmpeg_output(ffmpeg_iterator, &frame_info_rx, on_output.as_ref()); - re_log::debug!("Shutting down ffmpeg"); - } - }) - .expect("Failed to spawn ffmpeg thread"); + let ffmpeg_stdin = start_ffmpeg_process(on_output.clone(), frame_info_rx.clone())?; Ok(Self { ffmpeg_stdin, frame_info_tx, avcc, on_output, + frame_info_rx, }) } } +fn start_ffmpeg_process( + on_output: Arc, + frame_info_rx: Receiver, +) -> Result { + let mut ffmpeg = { + re_tracing::profile_scope!("spawn-ffmpeg"); + + FfmpegCommand::new() + .hide_banner() + // "Reduce the latency introduced by buffering during initial input streams analysis." + //.arg("-fflags nobuffer") + // + // .. instead use these more aggressive options found here + // https://stackoverflow.com/a/49273163 + .args([ + "-probesize", + "32", // 32 bytes is the minimum probe size. + "-analyzeduration", + "0", + ]) + // Keep in mind that all arguments that are about the input, need to go before! + .format("h264") // TODO(andreas): should we check ahead of time whether this is available? + //.fps_mode("0") + .input("-") // stdin is our input! + // h264 bitstreams doesn't have timestamp information. Whatever ffmpeg tries to make up about timing & framerates is wrong! + // If we don't tell it to just pass the frames through, variable framerate (VFR) video will just not play at all. + .fps_mode("passthrough") + // TODO(andreas): at least do `rgba`. But we could also do `yuv420p` for instance if that's what the video is specifying + // (should be faster overall at no quality loss if the video is in this format). + // Check `ffmpeg -pix_fmts` for full list. + .rawvideo() // Output rgb24 on stdout. + .spawn() + .map_err(Error::FailedToStartFfmpeg)? + }; + let ffmpeg_stdin = ffmpeg.take_stdin().ok_or(Error::NoStdin)?; + let ffmpeg_iterator = ffmpeg + .iter() + .map_err(|err| Error::NoIterator(err.to_string()))?; + + std::thread::Builder::new() + .name("ffmpeg-reader".to_owned()) + .spawn(move || { + read_ffmpeg_output(ffmpeg_iterator, &frame_info_rx, on_output.as_ref()); + }) + .expect("Failed to spawn ffmpeg thread"); + + Ok(ffmpeg_stdin) +} + fn read_ffmpeg_output( ffmpeg_iterator: ffmpeg_sidecar::iter::FfmpegIterator, frame_info_rx: &Receiver, @@ -347,13 +352,17 @@ impl AsyncDecoder for FfmpegCliH264Decoder { } fn reset(&mut self) -> super::Result<()> { - // TODO: restart ffmpeg process + re_log::debug!("Resetting ffmpeg decoder"); + // TODO: ensure previous ffmpeg process is dead and thread has stopped. + self.ffmpeg_stdin = + start_ffmpeg_process(self.on_output.clone(), self.frame_info_rx.clone())?; Ok(()) } } impl Drop for FfmpegCliH264Decoder { fn drop(&mut self) { + re_log::debug!("Dropping ffmpeg decoder"); // TODO: stop ffmpeg thread } } From 3f21b3dfecfce80fb48dd78c0c2e9d45898181cf Mon Sep 17 00:00:00 2001 From: Andreas Reich Date: Mon, 28 Oct 2024 17:30:32 +0100 Subject: [PATCH 15/33] fix frame reordering --- crates/store/re_video/src/decode/ffmpeg.rs | 69 +++++++++++++--------- 1 file changed, 40 insertions(+), 29 deletions(-) diff --git a/crates/store/re_video/src/decode/ffmpeg.rs b/crates/store/re_video/src/decode/ffmpeg.rs index edc7bfcffd41..0070472a81b8 100644 --- a/crates/store/re_video/src/decode/ffmpeg.rs +++ b/crates/store/re_video/src/decode/ffmpeg.rs @@ -163,7 +163,9 @@ fn read_ffmpeg_output( false } - let mut pending_frames = BTreeMap::new(); + // Pending frames, sorted by their presentation timestamp. + let mut pending_frame_infos = BTreeMap::new(); + let mut highest_dts = Time(i64::MIN); // Highest dts encountered so far. for event in ffmpeg_iterator { #[allow(clippy::match_same_arms)] @@ -242,36 +244,43 @@ fn read_ffmpeg_output( FfmpegEvent::Progress(_) => { // We can get out frame number etc here to know how far behind we are. - // By default this triggers every 0.5s. + // By default this triggers every 5s. } FfmpegEvent::OutputFrame(frame) => { - let frame_info = match pending_frames.pop_first() { - Some((_, frame_info)) => frame_info, - None => { - // Retrieve frame infos until decode timestamp is no longer behind composition timestamp. - // This is important because frame infos come not in in composition order, - // but ffmpeg will report frames in composition order! - loop { - let Ok(frame_info) = frame_info_rx.try_recv() else { - re_log::debug!("Receiver disconnected"); - return; - }; - - // Example how how presentation timestamps and decode timestamps can play out: - // PTS: 1 4 2 3 - // DTS: 1 2 3 4 - // Stream: I P B B - // - // Essentially we need to wait until the dts has "caught up" with the pts! - let highest_pts = pending_frames - .last_key_value() - .map_or(frame_info.presentation_timestamp, |(pts, _)| *pts); - if frame_info.decode_timestamp <= highest_pts { - break frame_info; - } - pending_frames.insert(frame_info.presentation_timestamp, frame_info); - } + // DTS <= PTS + // chunk sorted by DTS + + // Frames come in in PTS order, but "frame info" comes in in DTS order! + // + // Whenever the highest known DTS is behind the PTS, we need to wait until the DTS catches up. + // Otherwise, we'd assign the wrong PTS to the frame that just came in. + // + // Example how how presentation timestamps and decode timestamps + // can play out in the presence of B-frames to illustrate this: + // PTS: 1 4 2 3 + // DTS: 1 2 3 4 + // Stream: I P B B + let frame_info = loop { + if pending_frame_infos + .first_key_value() + .map_or(true, |(pts, _)| *pts > highest_dts) + { + let Ok(frame_info) = frame_info_rx.try_recv() else { + re_log::debug!("Receiver disconnected"); + return; + }; + + debug_assert!( + frame_info.decode_timestamp > highest_dts, + "Decode timestamps are expected to increase monotonically" + ); + highest_dts = frame_info.decode_timestamp; + pending_frame_infos.insert(frame_info.presentation_timestamp, frame_info); + } else { + // There must be an element here, otherwise we wouldn't be in this branch. + #[allow(clippy::unwrap_used)] + break pending_frame_infos.pop_first().unwrap().1; } }; @@ -329,7 +338,7 @@ impl AsyncDecoder for FfmpegCliH264Decoder { duration: chunk.duration, }; - // TODO: schedule this. + // TODO: schedule this in a thread. if self.frame_info_tx.send(frame_info).is_err() { // The other thread must be down, e.g. because `ffmpeg` crashed. // It should already have reported that as an error - no need to repeat it here. @@ -485,6 +494,8 @@ fn write_avc_chunk_to_nalu_stream( buffer_offset = data_end; } + // TODO: Write an Access Unit Delimiter (AUD) NAL unit to the stream? + Ok(()) } From 67843f706eff5d0e273b2d9e1be3c5280d4c930e Mon Sep 17 00:00:00 2001 From: Andreas Reich Date: Wed, 30 Oct 2024 11:57:17 +0100 Subject: [PATCH 16/33] expose DTS through frame info --- crates/store/re_video/src/decode/av1.rs | 1 + crates/store/re_video/src/decode/ffmpeg.rs | 1 + crates/store/re_video/src/decode/mod.rs | 6 ++++++ crates/viewer/re_data_ui/src/video.rs | 10 +++++++++- 4 files changed, 17 insertions(+), 1 deletion(-) diff --git a/crates/store/re_video/src/decode/av1.rs b/crates/store/re_video/src/decode/av1.rs index eae47c2a2a67..422aa9e55aec 100644 --- a/crates/store/re_video/src/decode/av1.rs +++ b/crates/store/re_video/src/decode/av1.rs @@ -237,6 +237,7 @@ fn output_picture( info: FrameInfo { presentation_timestamp: Time(picture.timestamp().unwrap_or(0)), duration: Time(picture.duration()), + ..Default::default() }, }; on_output(Ok(frame)); diff --git a/crates/store/re_video/src/decode/ffmpeg.rs b/crates/store/re_video/src/decode/ffmpeg.rs index a07848c63e30..4aad8d17b4cb 100644 --- a/crates/store/re_video/src/decode/ffmpeg.rs +++ b/crates/store/re_video/src/decode/ffmpeg.rs @@ -313,6 +313,7 @@ fn read_ffmpeg_output( info: super::FrameInfo { presentation_timestamp: frame_info.presentation_timestamp, duration: frame_info.duration, + latest_decode_timestamp: Some(frame_info.decode_timestamp), }, })); } diff --git a/crates/store/re_video/src/decode/mod.rs b/crates/store/re_video/src/decode/mod.rs index e46edee2f98d..dfc1dc262d48 100644 --- a/crates/store/re_video/src/decode/mod.rs +++ b/crates/store/re_video/src/decode/mod.rs @@ -249,6 +249,11 @@ pub struct FrameInfo { /// A duration of [`Time::MAX`] indicates that the frame is invalid or not yet available. // Implementation note: unlike with presentation timestamp we may be able fine with making this optional. pub duration: Time, + + /// The decode timestamp of the last chunk that was needed to decode this frame. + /// + /// None indicates that the information is not available. + pub latest_decode_timestamp: Option