diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index 80c2f14c1005..70a92fbd78e7 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -216,6 +216,9 @@ jobs: - name: Run tests run: cd rerun_py/tests && pytest + - name: Run e2e test + run: scripts/run_python_e2e_test.py + - name: Unpack the wheel shell: bash run: | diff --git a/Cargo.lock b/Cargo.lock index c7b167fedbe4..81e7f2a90a21 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4313,6 +4313,7 @@ dependencies = [ "re_analytics", "re_build_build_info", "re_build_info", + "re_data_store", "re_format", "re_log", "re_log_encoding", diff --git a/crates/re_sdk_comms/src/server.rs b/crates/re_sdk_comms/src/server.rs index fd1ceca50ea0..75f766d40b7c 100644 --- a/crates/re_sdk_comms/src/server.rs +++ b/crates/re_sdk_comms/src/server.rs @@ -158,7 +158,8 @@ async fn run_client( let msg = crate::decode_log_msg(&packet)?; if matches!(msg, LogMsg::Goodbye(_)) { - re_log::debug!("Client sent goodbye message."); + re_log::debug!("Received goodbye message."); + tx.send(msg)?; return Ok(()); } diff --git a/crates/rerun/Cargo.toml b/crates/rerun/Cargo.toml index 643ca8b25deb..15e31264d47f 100644 --- a/crates/rerun/Cargo.toml +++ b/crates/rerun/Cargo.toml @@ -64,6 +64,7 @@ web_viewer = [ [dependencies] re_build_info.workspace = true +re_data_store.workspace = true re_format.workspace = true re_log_encoding = { workspace = true, features = ["decoder", "encoder"] } re_log_types.workspace = true diff --git a/crates/rerun/src/run.rs b/crates/rerun/src/run.rs index 691b3c359250..4dbbb2e87617 100644 --- a/crates/rerun/src/run.rs +++ b/crates/rerun/src/run.rs @@ -36,58 +36,67 @@ use crate::web_viewer::host_web_viewer; #[derive(Debug, clap::Parser)] #[clap(author, about)] struct Args { - /// Print version and quit + // Note: arguments are sorted lexicographically for nicer `--help` message: + #[command(subcommand)] + commands: Option, + + /// Set a maximum input latency, e.g. "200ms" or "10s". + /// + /// If we go over this, we start dropping packets. + /// + /// The default is no limit, which means Rerun might eat more and more memory, + /// and have longer and longer latency, if you are logging data faster + /// than Rerun can index it. #[clap(long)] - version: bool, + drop_at_latency: Option, - /// Either a path to a `.rrd` file to load, an http url to an `.rrd` file, - /// or a websocket url to a Rerun Server from which to read data + /// An upper limit on how much memory the Rerun Viewer should use. /// - /// If none is given, a server will be hosted which the Rerun SDK can connect to. - url_or_path: Option, + /// When this limit is used, Rerun will purge the oldest data. + /// + /// Example: `16GB` + #[clap(long)] + memory_limit: Option, /// What TCP port do we listen to (for SDK:s to connect to)? #[cfg(feature = "server")] #[clap(long, default_value_t = re_sdk_comms::DEFAULT_SERVER_PORT)] port: u16, - /// Start the viewer in the browser (instead of locally). - /// Requires Rerun to have been compiled with the 'web_viewer' feature. + /// Start with the puffin profiler running. #[clap(long)] - web_viewer: bool, + profile: bool, /// Stream incoming log events to an .rrd file at the given path. #[clap(long)] save: Option, - /// Start with the puffin profiler running. - #[clap(long)] - profile: bool, - /// Exit with a non-zero exit code if any warning or error is logged. Useful for tests. #[clap(long)] strict: bool, - /// An upper limit on how much memory the Rerun Viewer should use. + /// Ingest data and then quit once the goodbye message has been received. /// - /// When this limit is used, Rerun will purge the oldest data. + /// Used for testing together with the `--strict` argument. /// - /// Example: `16GB` + /// Fails if no messages are received, or if no messages are received within a dozen or so seconds. #[clap(long)] - memory_limit: Option, + test_receive: bool, - /// Set a maximum input latency, e.g. "200ms" or "10s". - /// - /// If we go over this, we start dropping packets. + /// Either a path to a `.rrd` file to load, an http url to an `.rrd` file, + /// or a websocket url to a Rerun Server from which to read data /// - /// The default is no limit, which means Rerun might eat more and more memory, - /// and have longer and longer latency, if you are logging data faster - /// than Rerun can index it. + /// If none is given, a server will be hosted which the Rerun SDK can connect to. + url_or_path: Option, + + /// Print version and quit #[clap(long)] - drop_at_latency: Option, + version: bool, - #[command(subcommand)] - commands: Option, + /// Start the viewer in the browser (instead of locally). + /// Requires Rerun to have been compiled with the 'web_viewer' feature. + #[clap(long)] + web_viewer: bool, } #[derive(Debug, Clone, Subcommand)] @@ -329,7 +338,9 @@ async fn run_impl( // Now what do we do with the data? - if let Some(rrd_path) = args.save { + if args.test_receive { + receive_into_log_db(&rx).map(|_db| ()) + } else if let Some(rrd_path) = args.save { Ok(stream_to_rrd(&rx, &rrd_path.into(), &shutdown_bool)?) } else if args.web_viewer { #[cfg(feature = "web_viewer")] @@ -404,6 +415,44 @@ async fn run_impl( } } +fn receive_into_log_db(rx: &Receiver) -> anyhow::Result { + use re_smart_channel::RecvTimeoutError; + + re_log::info!("Receiving messages into a LogDb…"); + + let mut db = re_data_store::LogDb::default(); + + let mut num_messages = 0; + + let timeout = std::time::Duration::from_secs(12); + + loop { + match rx.recv_timeout(timeout) { + Ok(msg) => { + re_log::info_once!("Received first message."); + let is_goodbye = matches!(msg, re_log_types::LogMsg::Goodbye(_)); + db.add(msg)?; + num_messages += 1; + if is_goodbye { + db.entity_db.data_store.sanity_check()?; + anyhow::ensure!(0 < num_messages, "No messages received"); + re_log::info!("Successfully ingested {num_messages} messages."); + return Ok(db); + } + } + Err(RecvTimeoutError::Timeout) => { + anyhow::bail!( + "Didn't receive any messages within {} seconds. Giving up.", + timeout.as_secs() + ); + } + Err(RecvTimeoutError::Disconnected) => { + anyhow::bail!("Channel disconnected without a Goodbye message."); + } + } + } +} + enum ArgumentCategory { /// A remote RRD file, served over http. RrdHttpUrl(String), diff --git a/justfile b/justfile index 8c0378b036c7..fac1282d06ce 100644 --- a/justfile +++ b/justfile @@ -38,14 +38,15 @@ py-run-all: py-build fd main.py | xargs -I _ sh -c "echo _ && python3 _" # Build and install the package into the venv -py-build: +py-build *ARGS: #!/usr/bin/env bash set -euo pipefail unset CONDA_PREFIX && \ source venv/bin/activate && \ maturin develop \ -m rerun_py/Cargo.toml \ - --extras="tests" + --extras="tests" \ + {{ARGS}} # Run autoformatting py-format: diff --git a/scripts/run_python_e2e_test.py b/scripts/run_python_e2e_test.py new file mode 100755 index 000000000000..2a67f656a98e --- /dev/null +++ b/scripts/run_python_e2e_test.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3 + +""" +Run some of our python exeamples, piping their log stream to the rerun process. + +This is an end-to-end test for testing: +* Our Python API +* LogMsg encoding/decoding +* Arrow encoding/decoding +* TCP connection +* Data store ingestion +""" + +import os +import subprocess +import sys +import time + + +def main() -> None: + build_env = os.environ.copy() + if "RUST_LOG" in build_env: + del build_env["RUST_LOG"] # The user likely only meant it for the actual tests; not the setup + + print("----------------------------------------------------------") + print("Building rerun-sdk…") + start_time = time.time() + subprocess.Popen(["just", "py-build", "--quiet"], env=build_env).wait() + elapsed = time.time() - start_time + print(f"rerun-sdk built in {elapsed:.1f} seconds") + print("") + + examples = [ + # Trivial examples that don't require weird dependencies, or downloading data + "examples/python/api_demo/main.py", + "examples/python/car/main.py", + "examples/python/multithreading/main.py", + "examples/python/plots/main.py", + "examples/python/text_logging/main.py", + ] + for example in examples: + print("----------------------------------------------------------") + print(f"Testing {example}…\n") + start_time = time.time() + run_example(example) + elapsed = time.time() - start_time + print(f"{example} done in {elapsed:.1f} seconds") + print() + + print() + print("All tests passed successfully!") + + +def run_example(example: str) -> None: + port = 9752 + + # sys.executable: the absolute path of the executable binary for the Python interpreter + python_executable = sys.executable + if python_executable is None: + python_executable = "python3" + + rerun_process = subprocess.Popen( + [python_executable, "-m", "rerun", "--port", str(port), "--strict", "--test-receive"] + ) + time.sleep(0.3) # Wait for rerun server to start to remove a logged warning + + python_process = subprocess.Popen([python_executable, example, "--connect", "--addr", f"127.0.0.1:{port}"]) + + print("Waiting for python process to finish…") + returncode = python_process.wait(timeout=30) + assert returncode == 0, f"python process exited with error code {returncode}" + + print("Waiting for rerun process to finish…") + returncode = rerun_process.wait(timeout=30) + assert returncode == 0, f"rerun process exited with error code {returncode}" + + +if __name__ == "__main__": + main()