Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure example .rrd files have description first #7179

Merged
merged 7 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4486,6 +4486,7 @@ dependencies = [
"serde",
"serde_json",
"serde_yaml",
"tempfile",
"toml",
"ureq",
"url",
Expand Down
1 change: 1 addition & 0 deletions crates/build/re_dev_tools/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ rustdoc-types = "0.24.0"
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
serde_yaml.workspace = true
tempfile.workspace = true
toml = { workspace = true, features = ["parse", "preserve_order"] }
ureq = { workspace = true, features = ["json"] }
url.workspace = true
Expand Down
16 changes: 2 additions & 14 deletions crates/build/re_dev_tools/src/build_examples/install.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,9 @@ impl Install {
}

let progress = MultiProgress::new();
let output = wait_for_output(cmd, "installing examples", &progress)?;
wait_for_output(cmd, "installing examples", &progress)?;

if output.status.success() {
println!("Successfully installed examples");
} else {
anyhow::bail!(
"Failed to install examples: \
\nstdout: \
\n{} \
\nstderr: \
\n{}",
String::from_utf8(output.stdout)?,
String::from_utf8(output.stderr)?,
);
}
println!("Successfully installed examples");

Ok(())
}
Expand Down
58 changes: 29 additions & 29 deletions crates/build/re_dev_tools/src/build_examples/rrd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,39 +74,39 @@ impl Rrd {

impl Example {
fn build(self, progress: &MultiProgress, output_dir: &Path) -> anyhow::Result<PathBuf> {
let rrd_path = output_dir.join(&self.name).with_extension("rrd");
let tempdir = tempfile::tempdir()?;

let mut cmd = Command::new("python3");
cmd.arg("-m").arg(&self.name);
cmd.arg("--save").arg(&rrd_path);
cmd.args(self.script_args);

let final_args = cmd
.get_args()
.map(|arg| arg.to_string_lossy().to_string())
.collect::<Vec<_>>();
let initial_rrd_path = tempdir.path().join(&self.name).with_extension("rrd");

// Configure flushing so that:
// * the resulting file size is deterministic
// * the file is chunked into small batches for better streaming
cmd.env("RERUN_FLUSH_TICK_SECS", 1_000_000_000.to_string());
cmd.env("RERUN_FLUSH_NUM_BYTES", (128 * 1024).to_string());
{
let mut cmd = Command::new("python3");
cmd.arg("-m").arg(&self.name);
cmd.arg("--save").arg(&initial_rrd_path);
cmd.args(self.script_args);

let output = wait_for_output(cmd, &self.name, progress)?;
// Configure flushing so that:
// * the resulting file size is deterministic
// * the file is chunked into small batches for better streaming
cmd.env("RERUN_FLUSH_TICK_SECS", 1_000_000_000.to_string());
cmd.env("RERUN_FLUSH_NUM_BYTES", (128 * 1024).to_string());

if output.status.success() {
Ok(rrd_path)
} else {
anyhow::bail!(
"Failed to run `python3 {}`: \
\nstdout: \
\n{} \
\nstderr: \
\n{}",
final_args.join(" "),
String::from_utf8(output.stdout)?,
String::from_utf8(output.stderr)?,
);
wait_for_output(cmd, &self.name, progress)?;
}

// Now run compaction on the result:
let final_rrd_path = output_dir.join(&self.name).with_extension("rrd");

let mut cmd = Command::new("python3");
cmd.arg("-m").arg("rerun");
cmd.arg("rrd");
cmd.arg("compact");
// Small chunks for better streaming:
cmd.arg("--max-bytes").arg((128 * 1024).to_string());
cmd.arg(&initial_rrd_path);
cmd.arg("-o").arg(&final_rrd_path);

wait_for_output(cmd, &format!("{} compaction", self.name), progress)?;

Ok(final_rrd_path)
}
}
24 changes: 3 additions & 21 deletions crates/build/re_dev_tools/src/build_examples/snippets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,6 @@ impl Snippet {
cmd.arg(&self.path);
cmd.args(&self.extra_args);

let final_args = cmd
.get_args()
.map(|arg| arg.to_string_lossy().to_string())
.collect::<Vec<_>>();

cmd.envs([
("RERUN_FLUSH_NUM_ROWS", "0"),
("RERUN_STRICT", "1"),
Expand All @@ -175,22 +170,9 @@ impl Snippet {
),
]);

let output = wait_for_output(cmd, &self.name, progress)?;

if output.status.success() {
Ok(rrd_path)
} else {
anyhow::bail!(
"Failed to run `python3 {}`: \
\nstdout: \
\n{} \
\nstderr: \
\n{}",
final_args.join(" "),
String::from_utf8(output.stdout)?,
String::from_utf8(output.stderr)?,
);
}
wait_for_output(cmd, &self.name, progress)?;

Ok(rrd_path)
}
}

Expand Down
26 changes: 23 additions & 3 deletions crates/build/re_dev_tools/src/build_examples/wait_for_output.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
use std::io::stdout;
use std::io::IsTerminal;
use std::process::Command;
use std::process::Output;
use std::time::Duration;

use indicatif::MultiProgress;
use indicatif::ProgressBar;

/// Returns an error on non-zero returncode.
pub fn wait_for_output(
mut cmd: Command,
name: &str,
progress: &MultiProgress,
) -> anyhow::Result<Output> {
) -> anyhow::Result<()> {
// Remember what we tried to run, for a better error message:
let program = cmd.get_program().to_string_lossy().to_string();
let args = cmd
.get_args()
.map(|arg| arg.to_string_lossy().to_string())
.collect::<Vec<_>>();

let progress = progress.add(ProgressBar::new_spinner().with_message(name.to_owned()));
progress.enable_steady_tick(Duration::from_millis(100));

Expand All @@ -32,5 +39,18 @@ pub fn wait_for_output(
println!("{message}");
}

Ok(output)
if !output.status.success() {
let args = args.join(" ");
let stdout = String::from_utf8(output.stdout)?;
let stderr = String::from_utf8(output.stderr)?;
anyhow::bail!(
"Failed to run `{program} {args}`: \
\nstdout: \
\n{stdout} \
\nstderr: \
\n{stderr}",
);
}

Ok(())
}
1 change: 1 addition & 0 deletions crates/store/re_chunk/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,7 @@ impl Chunk {

#[inline]
pub fn to_arrow_msg(&self) -> ChunkResult<re_log_types::ArrowMsg> {
re_tracing::profile_function!();
self.sanity_check()?;

let transport = self.to_transport()?;
Expand Down
47 changes: 23 additions & 24 deletions crates/store/re_entity_db/src/entity_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,35 +466,34 @@ impl EntityDb {
.store_info_msg()
.map(|msg| Ok(LogMsg::SetStoreInfo(msg.clone())));

let time_filter = time_selection.map(|(timeline, range)| {
(
timeline,
ResolvedTimeRange::new(range.min.floor(), range.max.ceil()),
)
});
let data_messages = {
let mut chunks: Vec<&Arc<Chunk>> = self.store().iter_chunks().collect();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be possible to apply the time filter before collect()


if let Some((timeline, range)) = time_selection {
let time_range = ResolvedTimeRange::new(range.min.floor(), range.max.ceil());
chunks.retain(|chunk| {
// TODO(cmc): chunk.slice_time_selection(time_selection)
chunk
.timelines()
.get(&timeline)
.map_or(false, |time_column| {
time_range.contains(time_column.time_range().min())
|| time_range.contains(time_column.time_range().max())
})
});
}

let data_messages = self
.store()
.iter_chunks()
.filter(move |chunk| {
let Some((timeline, time_range)) = time_filter else {
return true;
};
// Try to roughly preserve the order of the chunks
// from how they were originally logged.
// See https://github.com/rerun-io/rerun/issues/7175 for why.
chunks.sort_by_key(|chunk| chunk.row_ids().next());
emilk marked this conversation as resolved.
Show resolved Hide resolved

// TODO(cmc): chunk.slice_time_selection(time_selection)
chunk
.timelines()
.get(&timeline)
.map_or(false, |time_column| {
time_range.contains(time_column.time_range().min())
|| time_range.contains(time_column.time_range().max())
})
})
.map(|chunk| {
chunks.into_iter().map(|chunk| {
chunk
.to_arrow_msg()
.map(|msg| LogMsg::ArrowMsg(self.store_id().clone(), msg))
});
})
};

// If this is a blueprint, make sure to include the `BlueprintActivationCommand` message.
// We generally use `to_messages` to export a blueprint via "save". In that
Expand Down
Loading