From 8ec85f03a7478fd91dc9d23e3bfc3820da9e5bf7 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 14 Aug 2024 12:28:20 +0200 Subject: [PATCH 1/7] Sort chunks by their first row id when exporting to .rrd --- crates/store/re_chunk/src/transport.rs | 1 + crates/store/re_entity_db/src/entity_db.rs | 47 +++++++++++----------- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/crates/store/re_chunk/src/transport.rs b/crates/store/re_chunk/src/transport.rs index 97e696874e37..4507d506a84a 100644 --- a/crates/store/re_chunk/src/transport.rs +++ b/crates/store/re_chunk/src/transport.rs @@ -613,6 +613,7 @@ impl Chunk { #[inline] pub fn to_arrow_msg(&self) -> ChunkResult { + re_tracing::profile_function!(); self.sanity_check()?; let transport = self.to_transport()?; diff --git a/crates/store/re_entity_db/src/entity_db.rs b/crates/store/re_entity_db/src/entity_db.rs index 9b213d284272..99902cd928b1 100644 --- a/crates/store/re_entity_db/src/entity_db.rs +++ b/crates/store/re_entity_db/src/entity_db.rs @@ -466,35 +466,34 @@ impl EntityDb { .store_info_msg() .map(|msg| Ok(LogMsg::SetStoreInfo(msg.clone()))); - let time_filter = time_selection.map(|(timeline, range)| { - ( - timeline, - ResolvedTimeRange::new(range.min.floor(), range.max.ceil()), - ) - }); + let data_messages = { + let mut chunks: Vec<&Arc> = self.store().iter_chunks().collect(); + + if let Some((timeline, range)) = time_selection { + let time_range = ResolvedTimeRange::new(range.min.floor(), range.max.ceil()); + chunks.retain(|chunk| { + // TODO(cmc): chunk.slice_time_selection(time_selection) + chunk + .timelines() + .get(&timeline) + .map_or(false, |time_column| { + time_range.contains(time_column.time_range().min()) + || time_range.contains(time_column.time_range().max()) + }) + }); + } - let data_messages = self - .store() - .iter_chunks() - .filter(move |chunk| { - let Some((timeline, time_range)) = time_filter else { - return true; - }; + // Try to roughly preserve the order of the chunks + // from how they were originally logged. + // See https://github.com/rerun-io/rerun/issues/7175 for why. + chunks.sort_by_key(|chunk| chunk.row_ids().next()); - // TODO(cmc): chunk.slice_time_selection(time_selection) - chunk - .timelines() - .get(&timeline) - .map_or(false, |time_column| { - time_range.contains(time_column.time_range().min()) - || time_range.contains(time_column.time_range().max()) - }) - }) - .map(|chunk| { + chunks.into_iter().map(|chunk| { chunk .to_arrow_msg() .map(|msg| LogMsg::ArrowMsg(self.store_id().clone(), msg)) - }); + }) + }; // If this is a blueprint, make sure to include the `BlueprintActivationCommand` message. // We generally use `to_messages` to export a blueprint via "save". In that From a318e712110102325b35391d64d9d79ffe2fc2b4 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 14 Aug 2024 12:37:00 +0200 Subject: [PATCH 2/7] re_dev_tools refactor: handle failure in the same place --- .../src/build_examples/install.rs | 16 ++---------- .../re_dev_tools/src/build_examples/rrd.rs | 22 ++-------------- .../src/build_examples/snippets.rs | 24 +++-------------- .../src/build_examples/wait_for_output.rs | 26 ++++++++++++++++--- 4 files changed, 30 insertions(+), 58 deletions(-) diff --git a/crates/build/re_dev_tools/src/build_examples/install.rs b/crates/build/re_dev_tools/src/build_examples/install.rs index aa4df5795456..2dd3df99d513 100644 --- a/crates/build/re_dev_tools/src/build_examples/install.rs +++ b/crates/build/re_dev_tools/src/build_examples/install.rs @@ -36,21 +36,9 @@ impl Install { } let progress = MultiProgress::new(); - let output = wait_for_output(cmd, "installing examples", &progress)?; + wait_for_output(cmd, "installing examples", &progress)?; - if output.status.success() { - println!("Successfully installed examples"); - } else { - anyhow::bail!( - "Failed to install examples: \ - \nstdout: \ - \n{} \ - \nstderr: \ - \n{}", - String::from_utf8(output.stdout)?, - String::from_utf8(output.stderr)?, - ); - } + println!("Successfully installed examples"); Ok(()) } diff --git a/crates/build/re_dev_tools/src/build_examples/rrd.rs b/crates/build/re_dev_tools/src/build_examples/rrd.rs index faaa679edaf5..6e724efe9cfd 100644 --- a/crates/build/re_dev_tools/src/build_examples/rrd.rs +++ b/crates/build/re_dev_tools/src/build_examples/rrd.rs @@ -81,32 +81,14 @@ impl Example { cmd.arg("--save").arg(&rrd_path); cmd.args(self.script_args); - let final_args = cmd - .get_args() - .map(|arg| arg.to_string_lossy().to_string()) - .collect::>(); - // Configure flushing so that: // * the resulting file size is deterministic // * the file is chunked into small batches for better streaming cmd.env("RERUN_FLUSH_TICK_SECS", 1_000_000_000.to_string()); cmd.env("RERUN_FLUSH_NUM_BYTES", (128 * 1024).to_string()); - let output = wait_for_output(cmd, &self.name, progress)?; + wait_for_output(cmd, &self.name, progress)?; - if output.status.success() { - Ok(rrd_path) - } else { - anyhow::bail!( - "Failed to run `python3 {}`: \ - \nstdout: \ - \n{} \ - \nstderr: \ - \n{}", - final_args.join(" "), - String::from_utf8(output.stdout)?, - String::from_utf8(output.stderr)?, - ); - } + Ok(rrd_path) } } diff --git a/crates/build/re_dev_tools/src/build_examples/snippets.rs b/crates/build/re_dev_tools/src/build_examples/snippets.rs index 02c0774f606d..7454c556b41c 100644 --- a/crates/build/re_dev_tools/src/build_examples/snippets.rs +++ b/crates/build/re_dev_tools/src/build_examples/snippets.rs @@ -160,11 +160,6 @@ impl Snippet { cmd.arg(&self.path); cmd.args(&self.extra_args); - let final_args = cmd - .get_args() - .map(|arg| arg.to_string_lossy().to_string()) - .collect::>(); - cmd.envs([ ("RERUN_FLUSH_NUM_ROWS", "0"), ("RERUN_STRICT", "1"), @@ -175,22 +170,9 @@ impl Snippet { ), ]); - let output = wait_for_output(cmd, &self.name, progress)?; - - if output.status.success() { - Ok(rrd_path) - } else { - anyhow::bail!( - "Failed to run `python3 {}`: \ - \nstdout: \ - \n{} \ - \nstderr: \ - \n{}", - final_args.join(" "), - String::from_utf8(output.stdout)?, - String::from_utf8(output.stderr)?, - ); - } + wait_for_output(cmd, &self.name, progress)?; + + Ok(rrd_path) } } diff --git a/crates/build/re_dev_tools/src/build_examples/wait_for_output.rs b/crates/build/re_dev_tools/src/build_examples/wait_for_output.rs index ca809f49aa67..113290bacf07 100644 --- a/crates/build/re_dev_tools/src/build_examples/wait_for_output.rs +++ b/crates/build/re_dev_tools/src/build_examples/wait_for_output.rs @@ -1,17 +1,24 @@ use std::io::stdout; use std::io::IsTerminal; use std::process::Command; -use std::process::Output; use std::time::Duration; use indicatif::MultiProgress; use indicatif::ProgressBar; +/// Returns an error on non-zero returncode. pub fn wait_for_output( mut cmd: Command, name: &str, progress: &MultiProgress, -) -> anyhow::Result { +) -> anyhow::Result<()> { + // Remember what we tried to run, for a better error message: + let program = cmd.get_program().to_string_lossy().to_string(); + let args = cmd + .get_args() + .map(|arg| arg.to_string_lossy().to_string()) + .collect::>(); + let progress = progress.add(ProgressBar::new_spinner().with_message(name.to_owned())); progress.enable_steady_tick(Duration::from_millis(100)); @@ -32,5 +39,18 @@ pub fn wait_for_output( println!("{message}"); } - Ok(output) + if !output.status.success() { + let args = args.join(" "); + let stdout = String::from_utf8(output.stdout)?; + let stderr = String::from_utf8(output.stderr)?; + anyhow::bail!( + "Failed to run `{program} {args}`: \ + \nstdout: \ + \n{stdout} \ + \nstderr: \ + \n{stderr}", + ); + } + + Ok(()) } From 7066c85551876643a7d6dd8abc944c2ccca9b926 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 14 Aug 2024 12:51:53 +0200 Subject: [PATCH 3/7] Run compaction on final .rrd files --- Cargo.lock | 1 + crates/build/re_dev_tools/Cargo.toml | 1 + .../re_dev_tools/src/build_examples/rrd.rs | 42 +++++++++++++------ 3 files changed, 32 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f1893c205ec9..a3aac33ac878 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4486,6 +4486,7 @@ dependencies = [ "serde", "serde_json", "serde_yaml", + "tempfile", "toml", "ureq", "url", diff --git a/crates/build/re_dev_tools/Cargo.toml b/crates/build/re_dev_tools/Cargo.toml index 9946b8b6461d..a19751087764 100644 --- a/crates/build/re_dev_tools/Cargo.toml +++ b/crates/build/re_dev_tools/Cargo.toml @@ -38,6 +38,7 @@ rustdoc-types = "0.24.0" serde = { workspace = true, features = ["derive"] } serde_json.workspace = true serde_yaml.workspace = true +tempfile.workspace = true toml = { workspace = true, features = ["parse", "preserve_order"] } ureq = { workspace = true, features = ["json"] } url.workspace = true diff --git a/crates/build/re_dev_tools/src/build_examples/rrd.rs b/crates/build/re_dev_tools/src/build_examples/rrd.rs index 6e724efe9cfd..f9f8009bdbab 100644 --- a/crates/build/re_dev_tools/src/build_examples/rrd.rs +++ b/crates/build/re_dev_tools/src/build_examples/rrd.rs @@ -74,21 +74,39 @@ impl Rrd { impl Example { fn build(self, progress: &MultiProgress, output_dir: &Path) -> anyhow::Result { - let rrd_path = output_dir.join(&self.name).with_extension("rrd"); + let tempdir = tempfile::tempdir()?; - let mut cmd = Command::new("python3"); - cmd.arg("-m").arg(&self.name); - cmd.arg("--save").arg(&rrd_path); - cmd.args(self.script_args); + let initial_rrd_path = tempdir.path().join(&self.name).with_extension("rrd"); + + { + let mut cmd = Command::new("python3"); + cmd.arg("-m").arg(&self.name); + cmd.arg("--save").arg(&initial_rrd_path); + cmd.args(self.script_args); - // Configure flushing so that: - // * the resulting file size is deterministic - // * the file is chunked into small batches for better streaming - cmd.env("RERUN_FLUSH_TICK_SECS", 1_000_000_000.to_string()); - cmd.env("RERUN_FLUSH_NUM_BYTES", (128 * 1024).to_string()); + // Configure flushing so that: + // * the resulting file size is deterministic + // * the file is chunked into small batches for better streaming + cmd.env("RERUN_FLUSH_TICK_SECS", 1_000_000_000.to_string()); + cmd.env("RERUN_FLUSH_NUM_BYTES", (128 * 1024).to_string()); + + wait_for_output(cmd, &self.name, progress)?; + } + + // Now run compaction on the result: + let final_rrd_path = output_dir.join(&self.name).with_extension("rrd"); + + let mut cmd = Command::new("python3"); + cmd.arg("-m").arg("rerun"); + cmd.arg("rrd"); + cmd.arg("compact"); + // Small chunks for better streaming: + cmd.arg("--max-bytes").arg((128 * 1024).to_string()); + cmd.arg(&initial_rrd_path); + cmd.arg("-o").arg(&final_rrd_path); - wait_for_output(cmd, &self.name, progress)?; + wait_for_output(cmd, &format!("{} compaction", self.name), progress)?; - Ok(rrd_path) + Ok(final_rrd_path) } } From bf577d07bd6f2f40738dd907987802d604868f47 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 14 Aug 2024 14:34:01 +0200 Subject: [PATCH 4/7] Filter before collecting --- crates/store/re_entity_db/src/entity_db.rs | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/crates/store/re_entity_db/src/entity_db.rs b/crates/store/re_entity_db/src/entity_db.rs index 99902cd928b1..edbfd7dd41e0 100644 --- a/crates/store/re_entity_db/src/entity_db.rs +++ b/crates/store/re_entity_db/src/entity_db.rs @@ -467,11 +467,21 @@ impl EntityDb { .map(|msg| Ok(LogMsg::SetStoreInfo(msg.clone()))); let data_messages = { - let mut chunks: Vec<&Arc> = self.store().iter_chunks().collect(); + let time_filter = time_selection.map(|(timeline, range)| { + ( + timeline, + ResolvedTimeRange::new(range.min.floor(), range.max.ceil()), + ) + }); + + let mut chunks: Vec<&Arc> = self + .store() + .iter_chunks() + .filter(move |chunk| { + let Some((timeline, time_range)) = time_filter else { + return true; + }; - if let Some((timeline, range)) = time_selection { - let time_range = ResolvedTimeRange::new(range.min.floor(), range.max.ceil()); - chunks.retain(|chunk| { // TODO(cmc): chunk.slice_time_selection(time_selection) chunk .timelines() @@ -480,8 +490,8 @@ impl EntityDb { time_range.contains(time_column.time_range().min()) || time_range.contains(time_column.time_range().max()) }) - }); - } + }) + .collect(); // Try to roughly preserve the order of the chunks // from how they were originally logged. From 58dd7ebe8ee5ad165bc6c5b93c7d1039c812504e Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 14 Aug 2024 14:36:28 +0200 Subject: [PATCH 5/7] use row_id_range Co-authored-by: Clement Rey --- crates/store/re_entity_db/src/entity_db.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/store/re_entity_db/src/entity_db.rs b/crates/store/re_entity_db/src/entity_db.rs index edbfd7dd41e0..7de19acb0b62 100644 --- a/crates/store/re_entity_db/src/entity_db.rs +++ b/crates/store/re_entity_db/src/entity_db.rs @@ -496,7 +496,7 @@ impl EntityDb { // Try to roughly preserve the order of the chunks // from how they were originally logged. // See https://github.com/rerun-io/rerun/issues/7175 for why. - chunks.sort_by_key(|chunk| chunk.row_ids().next()); + chunks.sort_by_key(|chunk| chunk.row_id_range().map(|(min, _)| min); chunks.into_iter().map(|chunk| { chunk From 0ecf2708982cc789402351203bf96602da8ddc92 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 14 Aug 2024 14:37:12 +0200 Subject: [PATCH 6/7] build-fix --- crates/store/re_entity_db/src/entity_db.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/store/re_entity_db/src/entity_db.rs b/crates/store/re_entity_db/src/entity_db.rs index 7de19acb0b62..3f5ea2125143 100644 --- a/crates/store/re_entity_db/src/entity_db.rs +++ b/crates/store/re_entity_db/src/entity_db.rs @@ -496,7 +496,7 @@ impl EntityDb { // Try to roughly preserve the order of the chunks // from how they were originally logged. // See https://github.com/rerun-io/rerun/issues/7175 for why. - chunks.sort_by_key(|chunk| chunk.row_id_range().map(|(min, _)| min); + chunks.sort_by_key(|chunk| chunk.row_id_range().map(|(min, _)| min)); chunks.into_iter().map(|chunk| { chunk From 8f63a3d6aede91b4994bca985831d688d04a4fab Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Wed, 14 Aug 2024 14:37:25 +0200 Subject: [PATCH 7/7] Add docstring to `row_ids` --- crates/store/re_chunk/src/chunk.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/store/re_chunk/src/chunk.rs b/crates/store/re_chunk/src/chunk.rs index 31cd30e3b6fb..0b50b6f418d6 100644 --- a/crates/store/re_chunk/src/chunk.rs +++ b/crates/store/re_chunk/src/chunk.rs @@ -833,6 +833,9 @@ impl Chunk { (times, counters) } + /// All the [`RowId`] in this chunk. + /// + /// This could be in any order if this chunk is unsorted. #[inline] pub fn row_ids(&self) -> impl Iterator + '_ { let (times, counters) = self.row_ids_raw();