From 4cef2116281805a4d83c0766087104986bfa2139 Mon Sep 17 00:00:00 2001 From: nerdtomars Date: Sat, 23 Dec 2023 15:51:54 +0800 Subject: [PATCH] add version 5 rosbag2 writer --- .github/workflows/CI.yaml | 84 ++++++++++++ .gitignore | 2 + .vscode/settings.json | 6 + Cargo.toml | 21 +++ README.md | 25 ++++ docker/Dockerfile.no_ros | 18 +++ examples/create_rosbag.rs | 48 +++++++ src/lib.rs | 38 ++++++ src/metadata.rs | 58 +++++++++ src/writer.rs | 267 ++++++++++++++++++++++++++++++++++++++ tests/writer_tests.rs | 100 ++++++++++++++ 11 files changed, 667 insertions(+) create mode 100644 .github/workflows/CI.yaml create mode 100644 .gitignore create mode 100644 .vscode/settings.json create mode 100644 Cargo.toml create mode 100644 README.md create mode 100644 docker/Dockerfile.no_ros create mode 100644 examples/create_rosbag.rs create mode 100644 src/lib.rs create mode 100644 src/metadata.rs create mode 100644 src/writer.rs create mode 100644 tests/writer_tests.rs diff --git a/.github/workflows/CI.yaml b/.github/workflows/CI.yaml new file mode 100644 index 0000000..b96c009 --- /dev/null +++ b/.github/workflows/CI.yaml @@ -0,0 +1,84 @@ +name: Rosbag2-rs CI + +on: + push: + branches: [main] + pull_request: + branches: [main] + +jobs: + run-all: + name: run all + runs-on: ubuntu-latest + needs: + - basics + - fmt + - clippy + - docs + steps: + - run: exit 0 + + # Basic actions that must pass before we kick off more expensive tests. + basics: + name: basic checks + runs-on: ubuntu-latest + needs: + - clippy + - fmt + - docs + steps: + - run: exit 0 + + clippy: + name: clippy + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Install Rust ${{ env.rust_clippy }} + uses: dtolnay/rust-toolchain@stable + with: + toolchain: ${{ env.rust_clippy }} + components: clippy + - uses: Swatinem/rust-cache@v2 + - name: cargo fmt + run: cargo fmt + - name: "clippy --all" + run: cargo clippy --all --tests --all-features --no-deps + + docs: + name: docs + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Install Rust ${{ env.rust_nightly }} + uses: dtolnay/rust-toolchain@stable + with: + toolchain: ${{ env.rust_nightly }} + - uses: Swatinem/rust-cache@v2 + - name: "doc --lib --all-features" + run: | + cargo doc --lib --no-deps --all-features --document-private-items + + test-full: + needs: basics + name: all tests + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: + - windows-latest + - ubuntu-latest + - macos-latest + steps: + - uses: actions/checkout@v3 + - name: Install Rust ${{ env.rust_stable }} + uses: dtolnay/rust-toolchain@stable + with: + toolchain: ${{ env.rust_stable }} + + - uses: Swatinem/rust-cache@v2 + + - name: test + run: | + set -euxo pipefail + cargo test diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4fffb2f --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +/Cargo.lock diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..8abb9ad --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,6 @@ +{ + "rust-analyzer.cargo.extraArgs": [ + "--profile", + "rust-analyzer" + ] +} \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..34e5f13 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "rosbag2-rs" +version = "0.1.0" +edition = "2021" +description = "Rosbag2 writer and more..." +authors = ["CT "] +license = "MIT AND Apache-2.0" +readme = "README.md" +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +rusqlite = "0.30.0" +serde = { version = "1.0", features = ["derive"] } +serde_yaml = "0.9.25" +anyhow = "1.0.40" + +[dev-dependencies] +tempfile = "3.8.1" + +[profile.rust-analyzer] +inherits = "dev" diff --git a/README.md b/README.md new file mode 100644 index 0000000..95b1f56 --- /dev/null +++ b/README.md @@ -0,0 +1,25 @@ +# Rosbag2 Rust + +## Introduction + +Rosbag2 Rust is a Rust crate designed to provide functionalities for handling ROS2 bag files. This crate aims to enable efficient reading, writing, and manipulation of ROS bag files, making it easier for developers working with ROS to manage and analyze their data. + +## Features + +### Current Features + +- [ ] Write ROS Bag Files (in progress) + +### Planned Features + +- Read ROS Bag Files +- Advanced message manipulation tools +- ... + +## Contributing + +We welcome contributions to Rosbags Rust! Whether it's reporting a bug, proposing a feature, or submitting a pull request, all contributions are appreciated. + +## License + +Rosbags Rust is open-source and is licensed under the MIT License. diff --git a/docker/Dockerfile.no_ros b/docker/Dockerfile.no_ros new file mode 100644 index 0000000..f3b9664 --- /dev/null +++ b/docker/Dockerfile.no_ros @@ -0,0 +1,18 @@ +FROM ubuntu:latest + +# Update default packages +RUN apt-get update + +# Get Ubuntu packages +RUN apt-get install -y \ + build-essential \ + curl \ + libclang-dev + +# Get Rust +RUN curl --proto '=https' --tlsv1.2 https://sh.rustup.rs -sSf | bash -s -- -y +RUN echo 'source $HOME/.cargo/env' >> $HOME/.bashrc + +COPY . /r2r +RUN chmod +x /r2r/tests/test.bash +ENTRYPOINT [ "/r2r/tests/test.bash" ] diff --git a/examples/create_rosbag.rs b/examples/create_rosbag.rs new file mode 100644 index 0000000..5f7d8eb --- /dev/null +++ b/examples/create_rosbag.rs @@ -0,0 +1,48 @@ +use anyhow::Result; +use rosbag2_rs::Writer; +use std::path::Path; + +fn main() -> Result<()> { + // Specify the path for the new ROS2 bag + let bag_path = Path::new("example_rosbag_by_rust"); + + // Initialize the Writer + let mut writer = Writer::new(bag_path); + writer.open()?; + + // Define a standard ROS2 message type (adjust this according to actual ROS2 message types) + let topic = "example_topic"; + let msgtype = "std_msgs/msg/Int32"; + + // Add a connection for this message type + // adjust qos profile according to actual ROS2 message types (https://docs.ros2.org/latest/api/rmw/structrmw__qos__profile__t.html) + const LATCH: &str = r#"- history: 3 + depth: 0 + reliability: 1 + durability: 1 + deadline: + sec: 2147483647 + nsec: 4294967295 + lifespan: + sec: 2147483647 + nsec: 4294967295 + liveliness: 1 + liveliness_lease_duration: + sec: 2147483647 + nsec: 4294967295 + avoid_ros_namespace_conventions: false +"#; + + let connection = writer.add_connection(topic, msgtype, "cdr", LATCH)?; + + // Write some dummy messages + for i in 0..50 { + let dummy_data = [0, 1, 0, 1, 43, 42, 0, 0]; // dummy data 0x2a2b = (int32)10795 + writer.write(&connection, 1_000_000_000 * i, &dummy_data)?; + } + + writer.close()?; + + println!("ROS2 bag created at {:?}", bag_path); + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..a3a889c --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,38 @@ +pub mod metadata; +pub use metadata::*; + +pub mod writer; +pub use writer::*; + +#[derive(Clone, Debug, PartialEq)] +pub struct TopicConnection { + pub id: i32, + pub topic: String, + pub msgtype: String, + // pub msgdef: String, + // pub digest: String, + pub msgcount: i32, + pub ext: ConnectionExt, +} + +#[derive(Clone, Debug, PartialEq)] +pub struct ConnectionExt { + pub serialization_format: String, + pub offered_qos_profiles: String, + // Add other fields specific to ROS bag version 2 +} + +pub fn add(left: usize, right: usize) -> usize { + left + right +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn it_works() { + let result = add(2, 2); + assert_eq!(result, 4); + } +} diff --git a/src/metadata.rs b/src/metadata.rs new file mode 100644 index 0000000..a9411e8 --- /dev/null +++ b/src/metadata.rs @@ -0,0 +1,58 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +#[derive(Serialize, Deserialize)] +pub struct StartingTime { + pub nanoseconds_since_epoch: i64, +} + +#[derive(Serialize, Deserialize)] +pub struct BagDuration { + pub nanoseconds: i64, +} + +#[derive(Serialize, Deserialize)] +pub struct TopicMetadata { + pub name: String, + + #[serde(rename = "type")] + pub type_: String, // `type` is a reserved keyword in Rust + pub serialization_format: String, + pub offered_qos_profiles: String, + // pub type_description_hash: String // TODO: humble rosbag2 does not need this field +} + +#[derive(Serialize, Deserialize)] +pub struct TopicWithMessageCount { + pub message_count: i32, + pub topic_metadata: TopicMetadata, +} + +#[derive(Serialize, Deserialize)] +pub struct FileInformation { + pub path: String, + pub starting_time: StartingTime, + pub duration: BagDuration, + pub message_count: i32, +} + +#[derive(Serialize, Deserialize)] +pub struct Metadata { + pub version: i32, + pub storage_identifier: String, + pub relative_file_paths: Vec, + pub starting_time: StartingTime, + pub duration: BagDuration, + pub message_count: i32, + pub compression_format: String, + pub compression_mode: String, + pub topics_with_message_count: Vec, + pub files: Vec, + pub custom_data: HashMap, + pub ros_distro: String, +} + +#[derive(Serialize, Deserialize)] +pub struct BagFileInfo { + pub rosbag2_bagfile_information: Metadata, +} diff --git a/src/writer.rs b/src/writer.rs new file mode 100644 index 0000000..d290349 --- /dev/null +++ b/src/writer.rs @@ -0,0 +1,267 @@ +use crate::*; +use anyhow::Result; +use rusqlite::params; +use rusqlite::Connection; +use std::collections::HashMap; +use std::fs::File; +use std::path::{Path, PathBuf}; + +/// This class implements writing of rosbag2 files in version 8. It should be +/// used as a contextmanager. +pub struct Writer { + pub path: PathBuf, + pub metapath: PathBuf, + pub dbpath: PathBuf, + pub connections: Vec, + pub counts: HashMap, + pub conn: Option, + pub custom_data: HashMap, + pub added_types: Vec, + pub compression_mode: String, + pub compression_format: String, +} + +impl Writer { + pub fn new>(path: P) -> Self { + let path = path.as_ref().to_path_buf(); + let metapath = path.join("metadata.yaml"); + let dbpath = path.join(format!( + "{}.db3", + path.file_name().unwrap().to_str().unwrap() + )); + + Writer { + path, + metapath, + dbpath, + connections: Vec::new(), + counts: HashMap::new(), + conn: None, + custom_data: HashMap::new(), + added_types: Vec::new(), + compression_mode: "".to_string(), + compression_format: "".to_string(), + } + } + + /// humble schema can be found here: + /// Rosbag2 + pub fn open(&mut self) -> Result<()> { + if self.dbpath.exists() { + return Err(anyhow::anyhow!( + "Database file {:?} already exists.", + self.dbpath + )); + } + + std::fs::create_dir_all(&self.path)?; + let conn = Connection::open(&self.dbpath)?; + + // TODO: add support for beyond humble ros2bag + // related discussion: https://github.com/ros2/ros2/issues/1159 + conn.execute_batch( + r#" + CREATE TABLE schema( + schema_version INTEGER PRIMARY KEY, + ros_distro TEXT NOT NULL + ); + CREATE TABLE metadata( + id INTEGER PRIMARY KEY, + metadata_version INTEGER NOT NULL, + metadata TEXT NOT NULL + ); + CREATE TABLE topics( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL, + type TEXT NOT NULL, + serialization_format TEXT NOT NULL, + offered_qos_profiles TEXT NOT NULL + ); + CREATE TABLE messages( + id INTEGER PRIMARY KEY, + topic_id INTEGER NOT NULL, + timestamp INTEGER NOT NULL, + data BLOB NOT NULL + ); + CREATE INDEX timestamp_idx ON messages (timestamp ASC); + INSERT INTO schema(schema_version, ros_distro) VALUES (4, 'rosbags'); + "#, + )?; + self.conn = Some(conn); + Ok(()) + } + + pub fn add_connection( + &mut self, + topic: &str, + msgtype: &str, + serialization_format: &str, + offered_qos_profiles: &str, + ) -> Result { + if self.conn.is_none() { + return Err(anyhow::anyhow!("Bag was not opened.")); + } + + let conn = self.conn.as_ref().unwrap(); + + let new_id = self.connections.len() as i32 + 1; + let new_connection = TopicConnection { + id: new_id, + topic: topic.to_string(), + msgtype: msgtype.to_string(), + msgcount: 0, + ext: ConnectionExt { + serialization_format: serialization_format.to_string(), + offered_qos_profiles: offered_qos_profiles.to_string(), + }, + // owner field is omitted in Rust + }; + + if self + .connections + .iter() + .any(|conn| conn.topic == topic && conn.msgtype == msgtype) + { + return Err(anyhow::anyhow!( + "Connection can only be added once: {:?}", + new_connection + )); + } + + self.connections.push(new_connection.clone()); + self.counts.insert(new_id, 0); + conn.execute( + "INSERT INTO topics VALUES(?, ?, ?, ?, ?)", + [ + new_id.to_string().as_str(), + topic, + msgtype, + serialization_format, + offered_qos_profiles, + ], + )?; + + Ok(new_connection) + } + + pub fn write( + &mut self, + connection: &TopicConnection, + timestamp: i64, + data: &[u8], + ) -> Result<()> { + if self.conn.is_none() { + return Err(anyhow::anyhow!("Bag was not opened.")); + } + + let conn = self.conn.as_ref().unwrap(); + + if !self.connections.contains(connection) { + return Err(anyhow::anyhow!( + "Tried to write to unknown connection {:?}", + connection + )); + } + + // TODO: Handle compression if needed + // if self.compression_mode == "message" { + // // Compress data + // // data_to_write = self.compressor.compress(data)?; + // } + + conn.execute( + "INSERT INTO messages (topic_id, timestamp, data) VALUES(?1, ?2, ?3)", + params![ + connection.id.to_string().as_str(), + ×tamp.to_string(), + data + ], + )?; + + if let Some(count) = self.counts.get_mut(&connection.id) { + *count += 1; + } + + Ok(()) + } + + pub fn close(&mut self) -> Result<()> { + if let Some(conn) = self.conn.take() { + // Calculate duration, start time, and message count + let (duration, start, count) = conn.query_row( + "SELECT max(timestamp) - min(timestamp), min(timestamp), count(*) FROM messages", + [], + |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)), + )?; + + // Commit and optimize the database + conn.execute("PRAGMA optimize", [])?; + + // Handle compression (skipped for now as compression is not implemented) + + // Generate metadata + let metadata = BagFileInfo { + rosbag2_bagfile_information: self.generate_metadata(duration, start, count)?, + }; + let file = File::create(&self.metapath)?; + serde_yaml::to_writer(file, &metadata)?; + } + + Ok(()) + } + + fn generate_metadata(&self, duration: i64, start: i64, count: i32) -> Result { + // Placeholder for topics_with_message_count + let topics_with_message_count: Vec = self + .connections + .iter() + .map(|conn| TopicWithMessageCount { + message_count: *self.counts.get(&conn.id).unwrap_or(&0), + topic_metadata: TopicMetadata { + name: conn.topic.clone(), + type_: conn.msgtype.clone(), + serialization_format: conn.ext.serialization_format.clone(), + offered_qos_profiles: conn.ext.offered_qos_profiles.clone(), + // type_description_hash: conn.digest.clone(), // TODO: implement type_description_hash + }, + }) + .collect(); + + // Generate files information + let files = vec![FileInformation { + path: self.dbpath.to_str().unwrap().to_string(), + starting_time: StartingTime { + nanoseconds_since_epoch: start, + }, + duration: BagDuration { + nanoseconds: duration, + }, + message_count: count, + }]; + + Ok(Metadata { + version: 5, + storage_identifier: "sqlite3".to_string(), + relative_file_paths: vec![self + .dbpath + .file_name() + .unwrap() + .to_str() + .unwrap() + .to_string()], + starting_time: StartingTime { + nanoseconds_since_epoch: start, + }, + duration: BagDuration { + nanoseconds: duration, + }, + message_count: count, + compression_format: self.compression_format.clone(), + compression_mode: self.compression_mode.clone(), + topics_with_message_count, + files, + custom_data: self.custom_data.clone(), + ros_distro: "rosbags".to_string(), + }) + } +} diff --git a/tests/writer_tests.rs b/tests/writer_tests.rs new file mode 100644 index 0000000..84a9c05 --- /dev/null +++ b/tests/writer_tests.rs @@ -0,0 +1,100 @@ +use rosbag2_rs::{BagFileInfo, Writer}; +use rusqlite::Connection; +use tempfile::tempdir; + +use std::fs::{self, File}; + +#[test] +fn test_writer_creation_and_opening() { + let test_bag_path = "test_bag"; + + let _ = fs::remove_dir_all(test_bag_path); + + let mut writer = Writer::new(test_bag_path); + assert!(writer.open().is_ok()); + + assert!(writer.dbpath.exists()); + + // Open the SQLite database and check if the 'topics' table exists + let conn = Connection::open(&writer.dbpath).unwrap(); + + let tables = ["schema", "metadata", "topics", "messages"]; + + for &table in &tables { + let exists = conn + .query_row( + "SELECT count(*) FROM sqlite_master WHERE type='table' AND name=?1;", + [table], + |row| row.get::<_, i32>(0), + ) + .unwrap(); + assert_eq!(exists, 1, "Table {} does not exist", table); + } + + let _ = fs::remove_dir_all(test_bag_path); +} + +#[test] +fn test_open_existing_db() { + let dir = tempdir().unwrap(); + let db_path = dir.path().join(format!( + "{}.db3", + dir.path().file_name().unwrap().to_str().unwrap() + )); + + // Create a dummy file to simulate existing database + File::create(db_path).unwrap(); + + let mut writer = Writer::new(dir.path()); + let result = writer.open(); + assert!( + result.is_err(), + "Expected error when opening existing database, but got Ok" + ); +} + +#[test] +fn test_write_operations() { + let dir = tempdir().unwrap(); + + let mut writer = Writer::new(dir.path()); + writer.open().unwrap(); + + // Add a dummy connection + let connection = writer + .add_connection("topic1", "msgtype1", "cdr", "") + .unwrap(); + + // Write dummy messages + for i in 0..10 { + writer.write(&connection, i as i64, &[i as u8]).unwrap(); + } + + // Close the writer to flush all data to the database + writer.close().unwrap(); + + // Open the SQLite database to verify the data + let db_conn = Connection::open(writer.dbpath).unwrap(); + let mut stmt = db_conn + .prepare("SELECT data FROM messages WHERE topic_id = ?") + .unwrap(); + let mut rows = stmt.query([connection.id.to_string().as_str()]).unwrap(); + + let mut i = 0; + while let Some(row) = rows.next().unwrap() { + let data: Vec = row.get(0).unwrap(); + assert_eq!(data, vec![i as u8]); + i += 1; + } + + assert_eq!(i, 10); // Ensure we read back 10 messages + + // also check metadata.yaml + // Read and verify the YAML metadata file + let metadata_contents = fs::read_to_string(writer.metapath).unwrap(); + let bag_info: BagFileInfo = serde_yaml::from_str(&metadata_contents).unwrap(); + let metadata = bag_info.rosbag2_bagfile_information; + assert_eq!(metadata.message_count, 10); + assert_eq!(metadata.topics_with_message_count.len(), 1); + assert_eq!(metadata.topics_with_message_count[0].message_count, 10); +}