From 690f191ee389e599e8b129b0925d845c800204eb Mon Sep 17 00:00:00 2001 From: nerdtomars Date: Mon, 25 Dec 2023 20:11:56 +0800 Subject: [PATCH] add rosbag reader --- Cargo.toml | 2 +- README.md | 1 + src/lib.rs | 23 +++++ src/reader.rs | 227 +++++++++++++++++++++++++++++++++++++++++ src/sqlite3_storage.rs | 177 ++++++++++++++++++++++++++++++++ tests/reader_tests.rs | 60 +++++++++++ tests/writer_tests.rs | 59 +++++------ 7 files changed, 517 insertions(+), 32 deletions(-) create mode 100644 src/reader.rs create mode 100644 src/sqlite3_storage.rs create mode 100644 tests/reader_tests.rs diff --git a/Cargo.toml b/Cargo.toml index 62c27cc..39105be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rosbag2-rs" -version = "0.1.1" +version = "0.2.0" edition = "2021" description = "Rosbag2 writer and more..." authors = ["CT "] diff --git a/README.md b/README.md index 0d24fd6..42aee79 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ Rosbag2 Rust is a Rust crate designed to provide functionalities for handling RO ### Current Features - [ ] Write ROS Bag Files (in progress, only support version 5 rosbag2 (humble)) +- [ ] Read ROS Bag Files ### Planned Features diff --git a/src/lib.rs b/src/lib.rs index 4d5cc68..eeff1da 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,12 @@ pub mod metadata; pub use metadata::*; +pub mod sqlite3_storage; +pub use sqlite3_storage::*; + +pub mod reader; +pub use reader::*; + pub mod writer; pub use writer::*; @@ -21,3 +27,20 @@ pub struct ConnectionExt { pub offered_qos_profiles: String, // Add other fields specific to ROS bag version 2 } + +#[derive(Clone, Debug)] +pub struct TopicInfo { + pub msgtype: String, + pub msgcount: i32, + pub connections: Vec, +} + +impl TopicInfo { + pub fn new(msgtype: String, msgcount: i32, connections: Vec) -> Self { + TopicInfo { + msgtype, + msgcount, + connections, + } + } +} diff --git a/src/reader.rs b/src/reader.rs new file mode 100644 index 0000000..9e972b9 --- /dev/null +++ b/src/reader.rs @@ -0,0 +1,227 @@ +use crate::*; +use anyhow::{anyhow, Result}; +use std::collections::HashMap; +use std::path::Path; +use std::{fs, vec}; + +// Define other structs like Metadata, FileInformation, Connection, etc. + +pub struct Reader { + pub metadata: Metadata, + pub connections: Vec, + storage: Sqlite3Reader, +} + +/// The `Reader` struct provides an interface for reading message data from a ROS bag file. +/// +/// The `Reader` initializes with the path to a ROS bag directory and reads metadata +/// and message data from the storage. It supports filtering messages by time and handling +/// each message through a user-defined function. +/// +/// # Usage +/// +/// ``` +/// use std::rc::Rc; +/// use std::cell::RefCell; +/// use rosbag2_rs::{Reader, Writer}; +/// use anyhow::Result; +/// use tempfile::tempdir; + +/// fn main() -> Result<()> { +/// let dir = tempdir()?; +/// # let mut writer = Writer::new(dir.path()); +/// # writer.open()?; +/// # let connection = writer.add_connection("topic1", "msgtype1", "cdr", "")?; + +/// # for i in 0..10 { +/// # writer.write(&connection, i as i64, &[i * 2 + 1 as u8])?; +/// # } +/// # let connection = writer.add_connection("topic2", "msgtype2", "cdr", "")?; + +/// # for i in 0..10 { +/// # writer.write(&connection, i as i64, &[i * 2 as u8])?; +/// # } +/// # writer.close()?; + +/// let mut reader = Reader::new(dir.path())?; +/// let msg_data: Vec<(i64, i64, Vec)> = vec![]; +/// let msg_data = Rc::new(RefCell::new(msg_data)); // Wrap the vector in Rc and RefCell +/// reader.handle_messages( +/// |(id, timestamp, data)| { +/// // Use `borrow_mut` to get a mutable reference to the vector +/// println!("processed message: {:?} {:?} {:?}", id, timestamp, data); +/// +/// msg_data.borrow_mut().push((id, timestamp, data)); +/// Ok(()) +/// }, +/// None, +/// None, +/// )?; +/// +/// Ok(()) +/// } +/// ``` +/// +/// # Errors +/// +/// - Returns an error if the ROS bag version is not supported. +/// - Returns an error if the compression mode is not supported. +/// - Returns an error if a non-CDR serialization format is found in any topic. +/// - Returns an error if the storage identifier is not 'sqlite3'. +/// +/// # Note +/// +/// - This struct assumes that the ROS bag files are in `sqlite3` format. +/// - The `handle_messages` method allows for processing of individual messages. + +impl Reader { + pub fn new(path: impl AsRef) -> Result { + let path = path.as_ref().to_path_buf(); + let metapath = path.join("metadata.yaml"); + + let metadata_contents = fs::read_to_string(metapath).unwrap(); + let bag_info: BagFileInfo = serde_yaml::from_str(&metadata_contents).unwrap(); + + let metadata = bag_info.rosbag2_bagfile_information; + + // Check version and storage identifier + if metadata.version > 5 { + return Err(anyhow!("Not supported version: {}", metadata.version)); + } + + if !metadata.compression_mode.is_empty() { + return Err(anyhow!( + "Not supported compression mode: {}", + metadata.compression_mode + )); + } + + // metadata.topics_with_message_count.iter().map(|topi) + if let Some(topic_info) = metadata + .topics_with_message_count + .iter() + .find(|t| t.topic_metadata.serialization_format != "cdr") + { + return Err(anyhow!( + "Only CDR serialization format is supported: {:?}", + topic_info.topic_metadata.serialization_format + )); + } + + if metadata.storage_identifier != "sqlite3" { + return Err(anyhow!( + "Not supported storage identifier: {}", + metadata.storage_identifier + )); + } + + // Initialize connections + let connections = metadata + .topics_with_message_count + .iter() + .enumerate() + .map(|(idx, topic_info)| TopicConnection { + id: idx as i32 + 1, + topic: topic_info.topic_metadata.name.clone(), + msgtype: topic_info.topic_metadata.type_.clone(), + msgcount: topic_info.message_count, + ext: ConnectionExt { + serialization_format: topic_info.topic_metadata.serialization_format.clone(), + offered_qos_profiles: topic_info.topic_metadata.offered_qos_profiles.clone(), + }, + }) + .collect::>(); + + let paths: Vec = metadata + .relative_file_paths + .iter() + .map(|relative_path| path.join(relative_path).to_string_lossy().into_owned()) + .collect(); + let mut storage = Sqlite3Reader::new(paths); + + println!("Opening storage"); + storage.open()?; + println!("Opening storage Done"); + + Ok(Self { + metadata, + connections, + storage, + }) + } + + pub fn open(&mut self) -> Result<()> { + self.storage.open()?; + + Ok(()) + } + + pub fn handle_messages( + &mut self, + handle_func: impl Fn((i64, i64, Vec)) -> Result<()>, + start: Option, + stop: Option, + ) -> Result<()> { + let statement = self + .storage + .messages_statement(&self.connections, start, stop)?; + handle_messages(statement, handle_func)?; + Ok(()) + } + + pub fn duration(&self) -> i64 { + let nsecs = self.metadata.duration.nanoseconds; + if self.message_count() > 0 { + nsecs + 1 + } else { + 0 + } + } + + pub fn start_time(&self) -> i64 { + let nsecs = self.metadata.starting_time.nanoseconds_since_epoch; + if self.message_count() > 0 { + nsecs + } else { + i64::MAX + } + } + + pub fn end_time(&self) -> i64 { + self.start_time() + self.duration() + } + + pub fn message_count(&self) -> i32 { + self.metadata.message_count + } + + pub fn compression_format(&self) -> String { + self.metadata.compression_format.clone() + } + + pub fn compression_mode(&self) -> Option { + let mode = self.metadata.compression_mode.to_lowercase(); + if mode != "none" { + Some(mode) + } else { + None + } + } + + pub fn topics(&self) -> HashMap { + // Assuming TopicInfo is already defined + self.connections + .iter() + .map(|conn| { + ( + conn.topic.clone(), + TopicInfo::new(conn.msgtype.clone(), conn.msgcount, vec![conn.clone()]), + ) + }) + .collect() + } + + pub fn ros_distro(&self) -> String { + self.metadata.ros_distro.clone() + } +} diff --git a/src/sqlite3_storage.rs b/src/sqlite3_storage.rs new file mode 100644 index 0000000..7785481 --- /dev/null +++ b/src/sqlite3_storage.rs @@ -0,0 +1,177 @@ +use crate::*; +use anyhow::Result; +use rusqlite::{Connection, Statement}; +use std::path::Path; + +pub struct Sqlite3Reader { + paths: Vec, // Assuming paths are stored as strings + dbconns: Vec, + schema: i32, + // msgtypes: Vec, + // connections: Vec, +} + +impl Sqlite3Reader { + pub fn new(paths: Vec) -> Self { + Sqlite3Reader { + paths, + dbconns: Vec::new(), + schema: 0, + } + } + + pub fn open(&mut self) -> Result<()> { + for path_str in &self.paths { + let path = Path::new(path_str); + println!("opening db using path {path:?}"); + let conn = Connection::open_with_flags( + path, + rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY | rusqlite::OpenFlags::SQLITE_OPEN_URI, + )?; + + { + let mut stmt = conn.prepare( + "SELECT count(*) FROM sqlite_master WHERE type='table' AND name IN ('messages', 'topics')", + )?; + + let table_count: i32 = stmt.query_row([], |row| row.get(0))?; + + if table_count != 2 { + return Err(anyhow::anyhow!( + "Cannot open database {} or database missing tables.", + path_str + )); + } + } + + self.dbconns.push(conn); + } + + // Check the schema version and initialize `self.schema` and `self.msgtypes` + if let Some(conn) = self.dbconns.last() { + let mut stmt = conn.prepare("PRAGMA table_info(schema)")?; + + self.schema = if stmt.exists([])? { + let schema_version: i32 = + conn.query_row("SELECT schema_version FROM schema", [], |row| row.get(0))?; + schema_version + } else { + let mut stmt = conn.prepare("PRAGMA table_info(topics)")?; + let rows = stmt.query_map([], |row| row.get::<_, String>(1))?; + + if rows + .filter( + |r| matches!(r, Ok(column_name) if column_name == "offered_qos_profiles"), + ) + .count() + > 0 + { + 2 + } else { + 1 + } + }; + + // TODO: Initialize `self.msgtypes` based on the schema version + } + + Ok(()) + } + + /// clear all SQLite connections + pub fn close(&mut self) { + self.dbconns.clear(); + } + + // pub fn get_statuement(&self) -> Statement {} + + pub fn messages_statement( + &self, + connections: &[TopicConnection], + start: Option, + stop: Option, + ) -> Result { + if self.dbconns.is_empty() { + return Err(anyhow::anyhow!("Rosbag has not been opened.")); + } + + let mut query = String::from( + "SELECT topics.id, messages.timestamp, messages.data FROM messages JOIN topics ON messages.topic_id=topics.id", + ); + let mut args: Vec = vec![]; + let mut clause = "WHERE"; + + if !connections.is_empty() { + let topics: Vec = connections.iter().map(|c| c.topic.clone()).collect(); + let formated_topics = topics + .iter() + .map(|t| format!("'{}'", t)) + .collect::>() + .join(", "); + query.push_str(&format!(" {clause} topics.name IN ({formated_topics})")); + clause = "AND"; + } + + if let Some(start_ns) = start { + // query.push_str(&format!(" {clause} messages.timestamp >= ?", clause)); + query.push_str(&format!(" {clause} messages.timestamp >= {start_ns}")); + args.push(start_ns.to_string()); + clause = "AND"; + } + + if let Some(stop_ns) = stop { + // query.push_str(&format!(" {} messages.timestamp < ?", clause)); + query.push_str(&format!(" {clause} messages.timestamp < {stop_ns}")); + args.push(stop_ns.to_string()); + } + + query.push_str(" ORDER BY messages.timestamp"); + + if self.dbconns.len() > 1 { + panic!("not support multiple db3 files"); + } + + return if let Some(conn) = self.dbconns.last() { + println!("query string is {query}"); + let stmt = conn.prepare(&query)?; + // let rows = stmt.query([])?; parse_row + // let messages = stmt.query_map([], parse_row)?; + // let messages = stmt.query_map([], |row| { + // let id: i32 = row.get(0)?; + // let timestamp: i64 = row.get(1)?; + // let data: Vec = row.get(2)?; + // Ok((id, timestamp, data)) + // })?; + // let + Ok(stmt) + // let t = messages.into_iter(); + // Ok(messages) + // Err(anyhow::anyhow!("Cannot open database.")) + } else { + Err(anyhow::anyhow!("Cannot open database.")) + }; + } +} + +pub fn handle_messages)) -> Result<()>>( + mut stmt: Statement, + handle_func: F, +) -> Result<()> { + let handler = stmt.query_map([], |row| { + let id = row.get::<_, i64>(0)?; + let timestamp = row.get(1)?; + let data = row.get(2)?; + let result = handle_func((id, timestamp, data)); + if let Err(e) = result { + return Err(rusqlite::Error::InvalidParameterName(format!("error: {e}"))); + } + Ok(()) + })?; + + for res in handler { + if let Err(e) = res { + println!("error: {e:?} when handle messages"); + } + } + Ok(()) +} diff --git a/tests/reader_tests.rs b/tests/reader_tests.rs new file mode 100644 index 0000000..06549a3 --- /dev/null +++ b/tests/reader_tests.rs @@ -0,0 +1,60 @@ +use anyhow::Result; +use rosbag2_rs::{Reader, Writer}; +use std::{cell::RefCell, rc::Rc}; +use tempfile::tempdir; + +#[test] +fn test_messages_iterator_initialization() -> Result<()> { + // Initialize with mock data and verify properties + + let dir = tempdir().unwrap(); + + let mut writer = Writer::new(dir.path()); + writer.open()?; + + // Add a dummy connection + let connection = writer.add_connection("topic1", "msgtype1", "cdr", "")?; + + // Write dummy messages + for i in 0..10 { + writer.write(&connection, i as i64, &[(i * 2 + 1) as u8])?; + } + + // Add a dummy connection + let connection = writer.add_connection("topic2", "msgtype2", "cdr", "")?; + + // Write dummy messages + for i in 0..10 { + writer.write(&connection, i as i64, &[(i * 2) as u8])?; + } + + writer.close()?; + + let mut reader = Reader::new(dir.path())?; + + assert_eq!(reader.connections.len(), 2); + assert_eq!(reader.connections[0].topic, "topic1"); + assert_eq!(reader.connections[0].msgtype, "msgtype1"); + assert_eq!(reader.connections[0].msgcount, 10); + + assert_eq!(reader.duration(), 10); + + let msg_data: Vec<(i64, i64, Vec)> = vec![]; + let msg_data = Rc::new(RefCell::new(msg_data)); // Wrap the vector in Rc and RefCell + + reader.handle_messages( + |(id, timestamp, data)| { + // Use `borrow_mut` to get a mutable reference to the vector + print!("processed message: {:?} {:?} {:?} \n", id, timestamp, data); + + msg_data.borrow_mut().push((id, timestamp, data)); + Ok(()) + }, + None, + None, + )?; + + assert_eq!(msg_data.borrow().len(), 20); + + Ok(()) +} diff --git a/tests/writer_tests.rs b/tests/writer_tests.rs index 84a9c05..604fb34 100644 --- a/tests/writer_tests.rs +++ b/tests/writer_tests.rs @@ -1,11 +1,11 @@ +use anyhow::{Ok, Result}; use rosbag2_rs::{BagFileInfo, Writer}; use rusqlite::Connection; -use tempfile::tempdir; - use std::fs::{self, File}; +use tempfile::tempdir; #[test] -fn test_writer_creation_and_opening() { +fn test_writer_creation_and_opening() -> Result<()> { let test_bag_path = "test_bag"; let _ = fs::remove_dir_all(test_bag_path); @@ -16,34 +16,33 @@ fn test_writer_creation_and_opening() { assert!(writer.dbpath.exists()); // Open the SQLite database and check if the 'topics' table exists - let conn = Connection::open(&writer.dbpath).unwrap(); + let conn = Connection::open(&writer.dbpath)?; let tables = ["schema", "metadata", "topics", "messages"]; for &table in &tables { - let exists = conn - .query_row( - "SELECT count(*) FROM sqlite_master WHERE type='table' AND name=?1;", - [table], - |row| row.get::<_, i32>(0), - ) - .unwrap(); + let exists = conn.query_row( + "SELECT count(*) FROM sqlite_master WHERE type='table' AND name=?1;", + [table], + |row| row.get::<_, i32>(0), + )?; assert_eq!(exists, 1, "Table {} does not exist", table); } let _ = fs::remove_dir_all(test_bag_path); + Ok(()) } #[test] -fn test_open_existing_db() { - let dir = tempdir().unwrap(); +fn test_open_existing_db() -> Result<()> { + let dir = tempdir()?; let db_path = dir.path().join(format!( "{}.db3", dir.path().file_name().unwrap().to_str().unwrap() )); // Create a dummy file to simulate existing database - File::create(db_path).unwrap(); + File::create(db_path)?; let mut writer = Writer::new(dir.path()); let result = writer.open(); @@ -51,38 +50,35 @@ fn test_open_existing_db() { result.is_err(), "Expected error when opening existing database, but got Ok" ); + Ok(()) } #[test] -fn test_write_operations() { - let dir = tempdir().unwrap(); +fn test_write_operations() -> Result<()> { + let dir = tempdir()?; let mut writer = Writer::new(dir.path()); - writer.open().unwrap(); + writer.open()?; // Add a dummy connection - let connection = writer - .add_connection("topic1", "msgtype1", "cdr", "") - .unwrap(); + let connection = writer.add_connection("topic1", "msgtype1", "cdr", "")?; // Write dummy messages for i in 0..10 { - writer.write(&connection, i as i64, &[i as u8]).unwrap(); + writer.write(&connection, i as i64, &[i as u8])?; } // Close the writer to flush all data to the database - writer.close().unwrap(); + writer.close()?; // Open the SQLite database to verify the data - let db_conn = Connection::open(writer.dbpath).unwrap(); - let mut stmt = db_conn - .prepare("SELECT data FROM messages WHERE topic_id = ?") - .unwrap(); - let mut rows = stmt.query([connection.id.to_string().as_str()]).unwrap(); + let db_conn = Connection::open(writer.dbpath)?; + let mut stmt = db_conn.prepare("SELECT data FROM messages WHERE topic_id = ?")?; + let mut rows = stmt.query([connection.id.to_string().as_str()])?; let mut i = 0; - while let Some(row) = rows.next().unwrap() { - let data: Vec = row.get(0).unwrap(); + while let Some(row) = rows.next()? { + let data: Vec = row.get(0)?; assert_eq!(data, vec![i as u8]); i += 1; } @@ -91,10 +87,11 @@ fn test_write_operations() { // also check metadata.yaml // Read and verify the YAML metadata file - let metadata_contents = fs::read_to_string(writer.metapath).unwrap(); - let bag_info: BagFileInfo = serde_yaml::from_str(&metadata_contents).unwrap(); + let metadata_contents = fs::read_to_string(writer.metapath)?; + let bag_info: BagFileInfo = serde_yaml::from_str(&metadata_contents)?; let metadata = bag_info.rosbag2_bagfile_information; assert_eq!(metadata.message_count, 10); assert_eq!(metadata.topics_with_message_count.len(), 1); assert_eq!(metadata.topics_with_message_count[0].message_count, 10); + Ok(()) }