diff --git a/Cargo.lock b/Cargo.lock index f1893c205ec9..a3aac33ac878 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4486,6 +4486,7 @@ dependencies = [ "serde", "serde_json", "serde_yaml", + "tempfile", "toml", "ureq", "url", diff --git a/crates/build/re_dev_tools/Cargo.toml b/crates/build/re_dev_tools/Cargo.toml index 9946b8b6461d..a19751087764 100644 --- a/crates/build/re_dev_tools/Cargo.toml +++ b/crates/build/re_dev_tools/Cargo.toml @@ -38,6 +38,7 @@ rustdoc-types = "0.24.0" serde = { workspace = true, features = ["derive"] } serde_json.workspace = true serde_yaml.workspace = true +tempfile.workspace = true toml = { workspace = true, features = ["parse", "preserve_order"] } ureq = { workspace = true, features = ["json"] } url.workspace = true diff --git a/crates/build/re_dev_tools/src/build_examples/install.rs b/crates/build/re_dev_tools/src/build_examples/install.rs index aa4df5795456..2dd3df99d513 100644 --- a/crates/build/re_dev_tools/src/build_examples/install.rs +++ b/crates/build/re_dev_tools/src/build_examples/install.rs @@ -36,21 +36,9 @@ impl Install { } let progress = MultiProgress::new(); - let output = wait_for_output(cmd, "installing examples", &progress)?; + wait_for_output(cmd, "installing examples", &progress)?; - if output.status.success() { - println!("Successfully installed examples"); - } else { - anyhow::bail!( - "Failed to install examples: \ - \nstdout: \ - \n{} \ - \nstderr: \ - \n{}", - String::from_utf8(output.stdout)?, - String::from_utf8(output.stderr)?, - ); - } + println!("Successfully installed examples"); Ok(()) } diff --git a/crates/build/re_dev_tools/src/build_examples/rrd.rs b/crates/build/re_dev_tools/src/build_examples/rrd.rs index faaa679edaf5..f9f8009bdbab 100644 --- a/crates/build/re_dev_tools/src/build_examples/rrd.rs +++ b/crates/build/re_dev_tools/src/build_examples/rrd.rs @@ -74,39 +74,39 @@ impl Rrd { impl Example { fn build(self, progress: &MultiProgress, output_dir: &Path) -> anyhow::Result { - let rrd_path = output_dir.join(&self.name).with_extension("rrd"); + let tempdir = tempfile::tempdir()?; - let mut cmd = Command::new("python3"); - cmd.arg("-m").arg(&self.name); - cmd.arg("--save").arg(&rrd_path); - cmd.args(self.script_args); - - let final_args = cmd - .get_args() - .map(|arg| arg.to_string_lossy().to_string()) - .collect::>(); + let initial_rrd_path = tempdir.path().join(&self.name).with_extension("rrd"); - // Configure flushing so that: - // * the resulting file size is deterministic - // * the file is chunked into small batches for better streaming - cmd.env("RERUN_FLUSH_TICK_SECS", 1_000_000_000.to_string()); - cmd.env("RERUN_FLUSH_NUM_BYTES", (128 * 1024).to_string()); + { + let mut cmd = Command::new("python3"); + cmd.arg("-m").arg(&self.name); + cmd.arg("--save").arg(&initial_rrd_path); + cmd.args(self.script_args); - let output = wait_for_output(cmd, &self.name, progress)?; + // Configure flushing so that: + // * the resulting file size is deterministic + // * the file is chunked into small batches for better streaming + cmd.env("RERUN_FLUSH_TICK_SECS", 1_000_000_000.to_string()); + cmd.env("RERUN_FLUSH_NUM_BYTES", (128 * 1024).to_string()); - if output.status.success() { - Ok(rrd_path) - } else { - anyhow::bail!( - "Failed to run `python3 {}`: \ - \nstdout: \ - \n{} \ - \nstderr: \ - \n{}", - final_args.join(" "), - String::from_utf8(output.stdout)?, - String::from_utf8(output.stderr)?, - ); + wait_for_output(cmd, &self.name, progress)?; } + + // Now run compaction on the result: + let final_rrd_path = output_dir.join(&self.name).with_extension("rrd"); + + let mut cmd = Command::new("python3"); + cmd.arg("-m").arg("rerun"); + cmd.arg("rrd"); + cmd.arg("compact"); + // Small chunks for better streaming: + cmd.arg("--max-bytes").arg((128 * 1024).to_string()); + cmd.arg(&initial_rrd_path); + cmd.arg("-o").arg(&final_rrd_path); + + wait_for_output(cmd, &format!("{} compaction", self.name), progress)?; + + Ok(final_rrd_path) } } diff --git a/crates/build/re_dev_tools/src/build_examples/snippets.rs b/crates/build/re_dev_tools/src/build_examples/snippets.rs index 02c0774f606d..7454c556b41c 100644 --- a/crates/build/re_dev_tools/src/build_examples/snippets.rs +++ b/crates/build/re_dev_tools/src/build_examples/snippets.rs @@ -160,11 +160,6 @@ impl Snippet { cmd.arg(&self.path); cmd.args(&self.extra_args); - let final_args = cmd - .get_args() - .map(|arg| arg.to_string_lossy().to_string()) - .collect::>(); - cmd.envs([ ("RERUN_FLUSH_NUM_ROWS", "0"), ("RERUN_STRICT", "1"), @@ -175,22 +170,9 @@ impl Snippet { ), ]); - let output = wait_for_output(cmd, &self.name, progress)?; - - if output.status.success() { - Ok(rrd_path) - } else { - anyhow::bail!( - "Failed to run `python3 {}`: \ - \nstdout: \ - \n{} \ - \nstderr: \ - \n{}", - final_args.join(" "), - String::from_utf8(output.stdout)?, - String::from_utf8(output.stderr)?, - ); - } + wait_for_output(cmd, &self.name, progress)?; + + Ok(rrd_path) } } diff --git a/crates/build/re_dev_tools/src/build_examples/wait_for_output.rs b/crates/build/re_dev_tools/src/build_examples/wait_for_output.rs index ca809f49aa67..113290bacf07 100644 --- a/crates/build/re_dev_tools/src/build_examples/wait_for_output.rs +++ b/crates/build/re_dev_tools/src/build_examples/wait_for_output.rs @@ -1,17 +1,24 @@ use std::io::stdout; use std::io::IsTerminal; use std::process::Command; -use std::process::Output; use std::time::Duration; use indicatif::MultiProgress; use indicatif::ProgressBar; +/// Returns an error on non-zero returncode. pub fn wait_for_output( mut cmd: Command, name: &str, progress: &MultiProgress, -) -> anyhow::Result { +) -> anyhow::Result<()> { + // Remember what we tried to run, for a better error message: + let program = cmd.get_program().to_string_lossy().to_string(); + let args = cmd + .get_args() + .map(|arg| arg.to_string_lossy().to_string()) + .collect::>(); + let progress = progress.add(ProgressBar::new_spinner().with_message(name.to_owned())); progress.enable_steady_tick(Duration::from_millis(100)); @@ -32,5 +39,18 @@ pub fn wait_for_output( println!("{message}"); } - Ok(output) + if !output.status.success() { + let args = args.join(" "); + let stdout = String::from_utf8(output.stdout)?; + let stderr = String::from_utf8(output.stderr)?; + anyhow::bail!( + "Failed to run `{program} {args}`: \ + \nstdout: \ + \n{stdout} \ + \nstderr: \ + \n{stderr}", + ); + } + + Ok(()) } diff --git a/crates/store/re_chunk/src/chunk.rs b/crates/store/re_chunk/src/chunk.rs index 31cd30e3b6fb..0b50b6f418d6 100644 --- a/crates/store/re_chunk/src/chunk.rs +++ b/crates/store/re_chunk/src/chunk.rs @@ -833,6 +833,9 @@ impl Chunk { (times, counters) } + /// All the [`RowId`] in this chunk. + /// + /// This could be in any order if this chunk is unsorted. #[inline] pub fn row_ids(&self) -> impl Iterator + '_ { let (times, counters) = self.row_ids_raw(); diff --git a/crates/store/re_chunk/src/transport.rs b/crates/store/re_chunk/src/transport.rs index 97e696874e37..4507d506a84a 100644 --- a/crates/store/re_chunk/src/transport.rs +++ b/crates/store/re_chunk/src/transport.rs @@ -613,6 +613,7 @@ impl Chunk { #[inline] pub fn to_arrow_msg(&self) -> ChunkResult { + re_tracing::profile_function!(); self.sanity_check()?; let transport = self.to_transport()?; diff --git a/crates/store/re_entity_db/src/entity_db.rs b/crates/store/re_entity_db/src/entity_db.rs index 9b213d284272..3f5ea2125143 100644 --- a/crates/store/re_entity_db/src/entity_db.rs +++ b/crates/store/re_entity_db/src/entity_db.rs @@ -466,35 +466,44 @@ impl EntityDb { .store_info_msg() .map(|msg| Ok(LogMsg::SetStoreInfo(msg.clone()))); - let time_filter = time_selection.map(|(timeline, range)| { - ( - timeline, - ResolvedTimeRange::new(range.min.floor(), range.max.ceil()), - ) - }); - - let data_messages = self - .store() - .iter_chunks() - .filter(move |chunk| { - let Some((timeline, time_range)) = time_filter else { - return true; - }; + let data_messages = { + let time_filter = time_selection.map(|(timeline, range)| { + ( + timeline, + ResolvedTimeRange::new(range.min.floor(), range.max.ceil()), + ) + }); - // TODO(cmc): chunk.slice_time_selection(time_selection) - chunk - .timelines() - .get(&timeline) - .map_or(false, |time_column| { - time_range.contains(time_column.time_range().min()) - || time_range.contains(time_column.time_range().max()) - }) - }) - .map(|chunk| { + let mut chunks: Vec<&Arc> = self + .store() + .iter_chunks() + .filter(move |chunk| { + let Some((timeline, time_range)) = time_filter else { + return true; + }; + + // TODO(cmc): chunk.slice_time_selection(time_selection) + chunk + .timelines() + .get(&timeline) + .map_or(false, |time_column| { + time_range.contains(time_column.time_range().min()) + || time_range.contains(time_column.time_range().max()) + }) + }) + .collect(); + + // Try to roughly preserve the order of the chunks + // from how they were originally logged. + // See https://github.com/rerun-io/rerun/issues/7175 for why. + chunks.sort_by_key(|chunk| chunk.row_id_range().map(|(min, _)| min)); + + chunks.into_iter().map(|chunk| { chunk .to_arrow_msg() .map(|msg| LogMsg::ArrowMsg(self.store_id().clone(), msg)) - }); + }) + }; // If this is a blueprint, make sure to include the `BlueprintActivationCommand` message. // We generally use `to_messages` to export a blueprint via "save". In that