From 7450810daed4855b7c5a68113ed21bf162df6045 Mon Sep 17 00:00:00 2001 From: Esteve Fernandez <33620+esteve@users.noreply.github.com> Date: Tue, 25 Jul 2023 22:40:29 +0200 Subject: [PATCH] Return Arc from the create_node function to match other create_X functions (#294) * Fine-grained locks. Made create_subscription, create_service, create_client not take a mutable self * Return an Arc from rclrs::create_node to match other create_X functions * Update spin* to take an Arc * Fix clippy warning --- docs/writing-your-first-rclrs-node.md | 10 ++--- examples/message_demo/src/message_demo.rs | 7 ++-- .../src/minimal_client.rs | 4 +- .../src/minimal_client_async.rs | 4 +- .../src/minimal_service.rs | 4 +- .../minimal_pub_sub/src/minimal_subscriber.rs | 4 +- .../src/zero_copy_subscriber.rs | 4 +- rclrs/src/lib.rs | 19 +++++---- rclrs/src/node.rs | 42 +++++++++++-------- rclrs/src/node/builder.rs | 8 ++-- rclrs_tests/src/graph_tests.rs | 8 ++-- 11 files changed, 62 insertions(+), 52 deletions(-) diff --git a/docs/writing-your-first-rclrs-node.md b/docs/writing-your-first-rclrs-node.md index 4af713bf..b6b1dadd 100644 --- a/docs/writing-your-first-rclrs-node.md +++ b/docs/writing-your-first-rclrs-node.md @@ -54,7 +54,7 @@ struct RepublisherNode { impl RepublisherNode { fn new(context: &rclrs::Context) -> Result { - let mut node = rclrs::Node::new(context, "republisher")?; + let node = rclrs::Node::new(context, "republisher")?; let data = None; let _subscription = node.create_subscription( "in_topic", @@ -76,7 +76,7 @@ Next, add a main function to launch it: fn main() -> Result<(), rclrs::RclrsError> { let context = rclrs::Context::new(std::env::args())?; let republisher = RepublisherNode::new(&context)?; - rclrs::spin(&republisher.node) + rclrs::spin(republisher.node) } ``` @@ -121,7 +121,7 @@ struct RepublisherNode { impl RepublisherNode { fn new(context: &rclrs::Context) -> Result { - let mut node = rclrs::Node::new(context, "republisher")?; + let node = rclrs::Node::new(context, "republisher")?; let data = Arc::new(Mutex::new(None)); // (3) let data_cb = Arc::clone(&data); let _subscription = { @@ -190,7 +190,7 @@ fn main() -> Result<(), rclrs::RclrsError> { republisher.republish()?; } }); - rclrs::spin(&republisher.node) + rclrs::spin(republisher.node) } ``` @@ -212,7 +212,7 @@ fn main() -> Result<(), rclrs::RclrsError> { republisher_other_thread.republish()?; } }); - rclrs::spin(&republisher.node) + rclrs::spin(republisher.node) } ``` diff --git a/examples/message_demo/src/message_demo.rs b/examples/message_demo/src/message_demo.rs index 1f794921..76d46f67 100644 --- a/examples/message_demo/src/message_demo.rs +++ b/examples/message_demo/src/message_demo.rs @@ -1,5 +1,6 @@ use std::convert::TryInto; use std::env; +use std::sync::Arc; use anyhow::{Error, Result}; use rosidl_runtime_rs::{seq, BoundedSequence, Message, Sequence}; @@ -132,7 +133,7 @@ fn demonstrate_pubsub() -> Result<(), Error> { println!("================== Interoperability demo =================="); // Demonstrate interoperability between idiomatic and RMW-native message types let context = rclrs::Context::new(env::args())?; - let mut node = rclrs::create_node(&context, "message_demo")?; + let node = rclrs::create_node(&context, "message_demo")?; let idiomatic_publisher = node.create_publisher::( "topic", @@ -159,10 +160,10 @@ fn demonstrate_pubsub() -> Result<(), Error> { )?; println!("Sending idiomatic message."); idiomatic_publisher.publish(rclrs_example_msgs::msg::VariousTypes::default())?; - rclrs::spin_once(&node, None)?; + rclrs::spin_once(Arc::clone(&node), None)?; println!("Sending RMW-native message."); direct_publisher.publish(rclrs_example_msgs::msg::rmw::VariousTypes::default())?; - rclrs::spin_once(&node, None)?; + rclrs::spin_once(Arc::clone(&node), None)?; Ok(()) } diff --git a/examples/minimal_client_service/src/minimal_client.rs b/examples/minimal_client_service/src/minimal_client.rs index 685dbf6a..c91cedc9 100644 --- a/examples/minimal_client_service/src/minimal_client.rs +++ b/examples/minimal_client_service/src/minimal_client.rs @@ -5,7 +5,7 @@ use anyhow::{Error, Result}; fn main() -> Result<(), Error> { let context = rclrs::Context::new(env::args())?; - let mut node = rclrs::create_node(&context, "minimal_client")?; + let node = rclrs::create_node(&context, "minimal_client")?; let client = node.create_client::("add_two_ints")?; @@ -28,5 +28,5 @@ fn main() -> Result<(), Error> { std::thread::sleep(std::time::Duration::from_millis(500)); println!("Waiting for response"); - rclrs::spin(&node).map_err(|err| err.into()) + rclrs::spin(node).map_err(|err| err.into()) } diff --git a/examples/minimal_client_service/src/minimal_client_async.rs b/examples/minimal_client_service/src/minimal_client_async.rs index 38f55f12..26f41e07 100644 --- a/examples/minimal_client_service/src/minimal_client_async.rs +++ b/examples/minimal_client_service/src/minimal_client_async.rs @@ -6,7 +6,7 @@ use anyhow::{Error, Result}; async fn main() -> Result<(), Error> { let context = rclrs::Context::new(env::args())?; - let mut node = rclrs::create_node(&context, "minimal_client")?; + let node = rclrs::create_node(&context, "minimal_client")?; let client = node.create_client::("add_two_ints")?; @@ -20,7 +20,7 @@ async fn main() -> Result<(), Error> { println!("Waiting for response"); - let rclrs_spin = tokio::task::spawn_blocking(move || rclrs::spin(&node)); + let rclrs_spin = tokio::task::spawn_blocking(move || rclrs::spin(node)); let response = future.await?; println!( diff --git a/examples/minimal_client_service/src/minimal_service.rs b/examples/minimal_client_service/src/minimal_service.rs index 0150aeaa..b4149c81 100644 --- a/examples/minimal_client_service/src/minimal_service.rs +++ b/examples/minimal_client_service/src/minimal_service.rs @@ -15,11 +15,11 @@ fn handle_service( fn main() -> Result<(), Error> { let context = rclrs::Context::new(env::args())?; - let mut node = rclrs::create_node(&context, "minimal_service")?; + let node = rclrs::create_node(&context, "minimal_service")?; let _server = node .create_service::("add_two_ints", handle_service)?; println!("Starting server"); - rclrs::spin(&node).map_err(|err| err.into()) + rclrs::spin(node).map_err(|err| err.into()) } diff --git a/examples/minimal_pub_sub/src/minimal_subscriber.rs b/examples/minimal_pub_sub/src/minimal_subscriber.rs index eb34a91b..ebc5fc19 100644 --- a/examples/minimal_pub_sub/src/minimal_subscriber.rs +++ b/examples/minimal_pub_sub/src/minimal_subscriber.rs @@ -5,7 +5,7 @@ use anyhow::{Error, Result}; fn main() -> Result<(), Error> { let context = rclrs::Context::new(env::args())?; - let mut node = rclrs::create_node(&context, "minimal_subscriber")?; + let node = rclrs::create_node(&context, "minimal_subscriber")?; let mut num_messages: usize = 0; @@ -19,5 +19,5 @@ fn main() -> Result<(), Error> { }, )?; - rclrs::spin(&node).map_err(|err| err.into()) + rclrs::spin(node).map_err(|err| err.into()) } diff --git a/examples/minimal_pub_sub/src/zero_copy_subscriber.rs b/examples/minimal_pub_sub/src/zero_copy_subscriber.rs index b71faec2..9551dba0 100644 --- a/examples/minimal_pub_sub/src/zero_copy_subscriber.rs +++ b/examples/minimal_pub_sub/src/zero_copy_subscriber.rs @@ -5,7 +5,7 @@ use anyhow::{Error, Result}; fn main() -> Result<(), Error> { let context = rclrs::Context::new(env::args())?; - let mut node = rclrs::create_node(&context, "minimal_subscriber")?; + let node = rclrs::create_node(&context, "minimal_subscriber")?; let mut num_messages: usize = 0; @@ -19,5 +19,5 @@ fn main() -> Result<(), Error> { }, )?; - rclrs::spin(&node).map_err(|err| err.into()) + rclrs::spin(node).map_err(|err| err.into()) } diff --git a/rclrs/src/lib.rs b/rclrs/src/lib.rs index 4fd69a84..cda4a382 100644 --- a/rclrs/src/lib.rs +++ b/rclrs/src/lib.rs @@ -23,6 +23,7 @@ mod rcl_bindings; #[cfg(feature = "dyn_msg")] pub mod dynamic_message; +use std::sync::Arc; use std::time::Duration; pub use arguments::*; @@ -49,8 +50,8 @@ pub use wait::*; /// This can usually be ignored. /// /// [1]: crate::RclReturnCode -pub fn spin_once(node: &Node, timeout: Option) -> Result<(), RclrsError> { - let wait_set = WaitSet::new_for_node(node)?; +pub fn spin_once(node: Arc, timeout: Option) -> Result<(), RclrsError> { + let wait_set = WaitSet::new_for_node(&node)?; let ready_entities = wait_set.wait(timeout)?; for ready_subscription in ready_entities.subscriptions { @@ -71,14 +72,16 @@ pub fn spin_once(node: &Node, timeout: Option) -> Result<(), RclrsErro /// Convenience function for calling [`spin_once`] in a loop. /// /// This function additionally checks that the context is still valid. -pub fn spin(node: &Node) -> Result<(), RclrsError> { +pub fn spin(node: Arc) -> Result<(), RclrsError> { // The context_is_valid functions exists only to abstract away ROS distro differences // SAFETY: No preconditions for this function. - let context_is_valid = - || unsafe { rcl_context_is_valid(&*node.rcl_context_mtx.lock().unwrap()) }; + let context_is_valid = { + let node = Arc::clone(&node); + move || unsafe { rcl_context_is_valid(&*node.rcl_context_mtx.lock().unwrap()) } + }; while context_is_valid() { - match spin_once(node, None) { + match spin_once(Arc::clone(&node), None) { Ok(_) | Err(RclrsError::RclError { code: RclReturnCode::Timeout, @@ -105,8 +108,8 @@ pub fn spin(node: &Node) -> Result<(), RclrsError> { /// assert!(node.is_ok()); /// # Ok::<(), RclrsError>(()) /// ``` -pub fn create_node(context: &Context, node_name: &str) -> Result { - Node::builder(context, node_name).build() +pub fn create_node(context: &Context, node_name: &str) -> Result, RclrsError> { + Ok(Arc::new(Node::builder(context, node_name).build()?)) } /// Creates a [`NodeBuilder`][1]. diff --git a/rclrs/src/node.rs b/rclrs/src/node.rs index 52fd3b3c..74eb5ac8 100644 --- a/rclrs/src/node.rs +++ b/rclrs/src/node.rs @@ -67,10 +67,10 @@ unsafe impl Send for rcl_node_t {} pub struct Node { pub(crate) rcl_node_mtx: Arc>, pub(crate) rcl_context_mtx: Arc>, - pub(crate) clients: Vec>, - pub(crate) guard_conditions: Vec>, - pub(crate) services: Vec>, - pub(crate) subscriptions: Vec>, + pub(crate) clients_mtx: Mutex>>, + pub(crate) guard_conditions_mtx: Mutex>>, + pub(crate) services_mtx: Mutex>>, + pub(crate) subscriptions_mtx: Mutex>>, _parameter_map: ParameterOverrideMap, } @@ -180,13 +180,12 @@ impl Node { /// /// [1]: crate::Client // TODO: make client's lifetime depend on node's lifetime - pub fn create_client(&mut self, topic: &str) -> Result>, RclrsError> + pub fn create_client(&self, topic: &str) -> Result>, RclrsError> where T: rosidl_runtime_rs::Service, { let client = Arc::new(Client::::new(Arc::clone(&self.rcl_node_mtx), topic)?); - self.clients - .push(Arc::downgrade(&client) as Weak); + { self.clients_mtx.lock().unwrap() }.push(Arc::downgrade(&client) as Weak); Ok(client) } @@ -199,12 +198,12 @@ impl Node { /// /// [1]: crate::GuardCondition /// [2]: crate::spin_once - pub fn create_guard_condition(&mut self) -> Arc { + pub fn create_guard_condition(&self) -> Arc { let guard_condition = Arc::new(GuardCondition::new_with_rcl_context( &mut self.rcl_context_mtx.lock().unwrap(), None, )); - self.guard_conditions + { self.guard_conditions_mtx.lock().unwrap() } .push(Arc::downgrade(&guard_condition) as Weak); guard_condition } @@ -226,7 +225,7 @@ impl Node { &mut self.rcl_context_mtx.lock().unwrap(), Some(Box::new(callback) as Box), )); - self.guard_conditions + { self.guard_conditions_mtx.lock().unwrap() } .push(Arc::downgrade(&guard_condition) as Weak); guard_condition } @@ -251,7 +250,7 @@ impl Node { /// [1]: crate::Service // TODO: make service's lifetime depend on node's lifetime pub fn create_service( - &mut self, + &self, topic: &str, callback: F, ) -> Result>, RclrsError> @@ -264,7 +263,7 @@ impl Node { topic, callback, )?); - self.services + { self.services_mtx.lock().unwrap() } .push(Arc::downgrade(&service) as Weak); Ok(service) } @@ -274,7 +273,7 @@ impl Node { /// [1]: crate::Subscription // TODO: make subscription's lifetime depend on node's lifetime pub fn create_subscription( - &mut self, + &self, topic: &str, qos: QoSProfile, callback: impl SubscriptionCallback, @@ -288,32 +287,39 @@ impl Node { qos, callback, )?); - self.subscriptions + { self.subscriptions_mtx.lock() } + .unwrap() .push(Arc::downgrade(&subscription) as Weak); Ok(subscription) } /// Returns the subscriptions that have not been dropped yet. pub(crate) fn live_subscriptions(&self) -> Vec> { - self.subscriptions + { self.subscriptions_mtx.lock().unwrap() } .iter() .filter_map(Weak::upgrade) .collect() } pub(crate) fn live_clients(&self) -> Vec> { - self.clients.iter().filter_map(Weak::upgrade).collect() + { self.clients_mtx.lock().unwrap() } + .iter() + .filter_map(Weak::upgrade) + .collect() } pub(crate) fn live_guard_conditions(&self) -> Vec> { - self.guard_conditions + { self.guard_conditions_mtx.lock().unwrap() } .iter() .filter_map(Weak::upgrade) .collect() } pub(crate) fn live_services(&self) -> Vec> { - self.services.iter().filter_map(Weak::upgrade).collect() + { self.services_mtx.lock().unwrap() } + .iter() + .filter_map(Weak::upgrade) + .collect() } /// Returns the ROS domain ID that the node is using. diff --git a/rclrs/src/node/builder.rs b/rclrs/src/node/builder.rs index 91cd6fbe..60e3afe2 100644 --- a/rclrs/src/node/builder.rs +++ b/rclrs/src/node/builder.rs @@ -275,10 +275,10 @@ impl NodeBuilder { Ok(Node { rcl_node_mtx, rcl_context_mtx: self.context.clone(), - clients: vec![], - guard_conditions: vec![], - services: vec![], - subscriptions: vec![], + clients_mtx: Mutex::new(vec![]), + guard_conditions_mtx: Mutex::new(vec![]), + services_mtx: Mutex::new(vec![]), + subscriptions_mtx: Mutex::new(vec![]), _parameter_map, }) } diff --git a/rclrs_tests/src/graph_tests.rs b/rclrs_tests/src/graph_tests.rs index e8e2d7a0..b32447ca 100644 --- a/rclrs_tests/src/graph_tests.rs +++ b/rclrs_tests/src/graph_tests.rs @@ -85,7 +85,7 @@ fn test_publishers() -> Result<(), RclrsError> { #[test] fn test_subscriptions() -> Result<(), RclrsError> { let namespace = "/test_subscriptions_graph"; - let mut graph = construct_test_graph(namespace)?; + let graph = construct_test_graph(namespace)?; let node_2_empty_subscription = graph.node2.create_subscription::( "graph_test_topic_1", @@ -149,7 +149,7 @@ fn test_subscriptions() -> Result<(), RclrsError> { #[test] fn test_topic_names_and_types() -> Result<(), RclrsError> { - let mut graph = construct_test_graph("test_topics_graph")?; + let graph = construct_test_graph("test_topics_graph")?; let _node_1_defaults_subscription = graph.node1.create_subscription::( "graph_test_topic_3", @@ -191,7 +191,7 @@ fn test_topic_names_and_types() -> Result<(), RclrsError> { #[test] fn test_services() -> Result<(), RclrsError> { let namespace = "/test_services_graph"; - let mut graph = construct_test_graph(namespace)?; + let graph = construct_test_graph(namespace)?; let check_names_and_types = |names_and_types: TopicNamesAndTypes| { let types = names_and_types .get("/test_services_graph/graph_test_topic_4") @@ -225,7 +225,7 @@ fn test_services() -> Result<(), RclrsError> { #[test] fn test_clients() -> Result<(), RclrsError> { let namespace = "/test_clients_graph"; - let mut graph = construct_test_graph(namespace)?; + let graph = construct_test_graph(namespace)?; let _node_2_empty_client = graph .node2 .create_client::("graph_test_topic_4")?;