Skip to content

Commit

Permalink
Return Arc from the create_node function to match other create_X func…
Browse files Browse the repository at this point in the history
…tions (#294)

* Fine-grained locks. Made create_subscription, create_service, create_client not take a mutable self

* Return an Arc from rclrs::create_node to match other create_X functions

* Update spin* to take an Arc

* Fix clippy warning
  • Loading branch information
esteve authored Jul 25, 2023
1 parent f9e7263 commit 7450810
Show file tree
Hide file tree
Showing 11 changed files with 62 additions and 52 deletions.
10 changes: 5 additions & 5 deletions docs/writing-your-first-rclrs-node.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ struct RepublisherNode {

impl RepublisherNode {
fn new(context: &rclrs::Context) -> Result<Self, rclrs::RclrsError> {
let mut node = rclrs::Node::new(context, "republisher")?;
let node = rclrs::Node::new(context, "republisher")?;
let data = None;
let _subscription = node.create_subscription(
"in_topic",
Expand All @@ -76,7 +76,7 @@ Next, add a main function to launch it:
fn main() -> Result<(), rclrs::RclrsError> {
let context = rclrs::Context::new(std::env::args())?;
let republisher = RepublisherNode::new(&context)?;
rclrs::spin(&republisher.node)
rclrs::spin(republisher.node)
}
```

Expand Down Expand Up @@ -121,7 +121,7 @@ struct RepublisherNode {

impl RepublisherNode {
fn new(context: &rclrs::Context) -> Result<Self, rclrs::RclrsError> {
let mut node = rclrs::Node::new(context, "republisher")?;
let node = rclrs::Node::new(context, "republisher")?;
let data = Arc::new(Mutex::new(None)); // (3)
let data_cb = Arc::clone(&data);
let _subscription = {
Expand Down Expand Up @@ -190,7 +190,7 @@ fn main() -> Result<(), rclrs::RclrsError> {
republisher.republish()?;
}
});
rclrs::spin(&republisher.node)
rclrs::spin(republisher.node)
}
```

Expand All @@ -212,7 +212,7 @@ fn main() -> Result<(), rclrs::RclrsError> {
republisher_other_thread.republish()?;
}
});
rclrs::spin(&republisher.node)
rclrs::spin(republisher.node)
}
```

Expand Down
7 changes: 4 additions & 3 deletions examples/message_demo/src/message_demo.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::convert::TryInto;
use std::env;
use std::sync::Arc;

use anyhow::{Error, Result};
use rosidl_runtime_rs::{seq, BoundedSequence, Message, Sequence};
Expand Down Expand Up @@ -132,7 +133,7 @@ fn demonstrate_pubsub() -> Result<(), Error> {
println!("================== Interoperability demo ==================");
// Demonstrate interoperability between idiomatic and RMW-native message types
let context = rclrs::Context::new(env::args())?;
let mut node = rclrs::create_node(&context, "message_demo")?;
let node = rclrs::create_node(&context, "message_demo")?;

let idiomatic_publisher = node.create_publisher::<rclrs_example_msgs::msg::VariousTypes>(
"topic",
Expand All @@ -159,10 +160,10 @@ fn demonstrate_pubsub() -> Result<(), Error> {
)?;
println!("Sending idiomatic message.");
idiomatic_publisher.publish(rclrs_example_msgs::msg::VariousTypes::default())?;
rclrs::spin_once(&node, None)?;
rclrs::spin_once(Arc::clone(&node), None)?;
println!("Sending RMW-native message.");
direct_publisher.publish(rclrs_example_msgs::msg::rmw::VariousTypes::default())?;
rclrs::spin_once(&node, None)?;
rclrs::spin_once(Arc::clone(&node), None)?;

Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions examples/minimal_client_service/src/minimal_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use anyhow::{Error, Result};
fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;

let mut node = rclrs::create_node(&context, "minimal_client")?;
let node = rclrs::create_node(&context, "minimal_client")?;

let client = node.create_client::<example_interfaces::srv::AddTwoInts>("add_two_ints")?;

Expand All @@ -28,5 +28,5 @@ fn main() -> Result<(), Error> {
std::thread::sleep(std::time::Duration::from_millis(500));

println!("Waiting for response");
rclrs::spin(&node).map_err(|err| err.into())
rclrs::spin(node).map_err(|err| err.into())
}
4 changes: 2 additions & 2 deletions examples/minimal_client_service/src/minimal_client_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use anyhow::{Error, Result};
async fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;

let mut node = rclrs::create_node(&context, "minimal_client")?;
let node = rclrs::create_node(&context, "minimal_client")?;

let client = node.create_client::<example_interfaces::srv::AddTwoInts>("add_two_ints")?;

Expand All @@ -20,7 +20,7 @@ async fn main() -> Result<(), Error> {

println!("Waiting for response");

let rclrs_spin = tokio::task::spawn_blocking(move || rclrs::spin(&node));
let rclrs_spin = tokio::task::spawn_blocking(move || rclrs::spin(node));

let response = future.await?;
println!(
Expand Down
4 changes: 2 additions & 2 deletions examples/minimal_client_service/src/minimal_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ fn handle_service(
fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;

let mut node = rclrs::create_node(&context, "minimal_service")?;
let node = rclrs::create_node(&context, "minimal_service")?;

let _server = node
.create_service::<example_interfaces::srv::AddTwoInts, _>("add_two_ints", handle_service)?;

println!("Starting server");
rclrs::spin(&node).map_err(|err| err.into())
rclrs::spin(node).map_err(|err| err.into())
}
4 changes: 2 additions & 2 deletions examples/minimal_pub_sub/src/minimal_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use anyhow::{Error, Result};
fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;

let mut node = rclrs::create_node(&context, "minimal_subscriber")?;
let node = rclrs::create_node(&context, "minimal_subscriber")?;

let mut num_messages: usize = 0;

Expand All @@ -19,5 +19,5 @@ fn main() -> Result<(), Error> {
},
)?;

rclrs::spin(&node).map_err(|err| err.into())
rclrs::spin(node).map_err(|err| err.into())
}
4 changes: 2 additions & 2 deletions examples/minimal_pub_sub/src/zero_copy_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use anyhow::{Error, Result};
fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;

let mut node = rclrs::create_node(&context, "minimal_subscriber")?;
let node = rclrs::create_node(&context, "minimal_subscriber")?;

let mut num_messages: usize = 0;

Expand All @@ -19,5 +19,5 @@ fn main() -> Result<(), Error> {
},
)?;

rclrs::spin(&node).map_err(|err| err.into())
rclrs::spin(node).map_err(|err| err.into())
}
19 changes: 11 additions & 8 deletions rclrs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod rcl_bindings;
#[cfg(feature = "dyn_msg")]
pub mod dynamic_message;

use std::sync::Arc;
use std::time::Duration;

pub use arguments::*;
Expand All @@ -49,8 +50,8 @@ pub use wait::*;
/// This can usually be ignored.
///
/// [1]: crate::RclReturnCode
pub fn spin_once(node: &Node, timeout: Option<Duration>) -> Result<(), RclrsError> {
let wait_set = WaitSet::new_for_node(node)?;
pub fn spin_once(node: Arc<Node>, timeout: Option<Duration>) -> Result<(), RclrsError> {
let wait_set = WaitSet::new_for_node(&node)?;
let ready_entities = wait_set.wait(timeout)?;

for ready_subscription in ready_entities.subscriptions {
Expand All @@ -71,14 +72,16 @@ pub fn spin_once(node: &Node, timeout: Option<Duration>) -> Result<(), RclrsErro
/// Convenience function for calling [`spin_once`] in a loop.
///
/// This function additionally checks that the context is still valid.
pub fn spin(node: &Node) -> Result<(), RclrsError> {
pub fn spin(node: Arc<Node>) -> Result<(), RclrsError> {
// The context_is_valid functions exists only to abstract away ROS distro differences
// SAFETY: No preconditions for this function.
let context_is_valid =
|| unsafe { rcl_context_is_valid(&*node.rcl_context_mtx.lock().unwrap()) };
let context_is_valid = {
let node = Arc::clone(&node);
move || unsafe { rcl_context_is_valid(&*node.rcl_context_mtx.lock().unwrap()) }
};

while context_is_valid() {
match spin_once(node, None) {
match spin_once(Arc::clone(&node), None) {
Ok(_)
| Err(RclrsError::RclError {
code: RclReturnCode::Timeout,
Expand All @@ -105,8 +108,8 @@ pub fn spin(node: &Node) -> Result<(), RclrsError> {
/// assert!(node.is_ok());
/// # Ok::<(), RclrsError>(())
/// ```
pub fn create_node(context: &Context, node_name: &str) -> Result<Node, RclrsError> {
Node::builder(context, node_name).build()
pub fn create_node(context: &Context, node_name: &str) -> Result<Arc<Node>, RclrsError> {
Ok(Arc::new(Node::builder(context, node_name).build()?))
}

/// Creates a [`NodeBuilder`][1].
Expand Down
42 changes: 24 additions & 18 deletions rclrs/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ unsafe impl Send for rcl_node_t {}
pub struct Node {
pub(crate) rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
pub(crate) rcl_context_mtx: Arc<Mutex<rcl_context_t>>,
pub(crate) clients: Vec<Weak<dyn ClientBase>>,
pub(crate) guard_conditions: Vec<Weak<GuardCondition>>,
pub(crate) services: Vec<Weak<dyn ServiceBase>>,
pub(crate) subscriptions: Vec<Weak<dyn SubscriptionBase>>,
pub(crate) clients_mtx: Mutex<Vec<Weak<dyn ClientBase>>>,
pub(crate) guard_conditions_mtx: Mutex<Vec<Weak<GuardCondition>>>,
pub(crate) services_mtx: Mutex<Vec<Weak<dyn ServiceBase>>>,
pub(crate) subscriptions_mtx: Mutex<Vec<Weak<dyn SubscriptionBase>>>,
_parameter_map: ParameterOverrideMap,
}

Expand Down Expand Up @@ -180,13 +180,12 @@ impl Node {
///
/// [1]: crate::Client
// TODO: make client's lifetime depend on node's lifetime
pub fn create_client<T>(&mut self, topic: &str) -> Result<Arc<Client<T>>, RclrsError>
pub fn create_client<T>(&self, topic: &str) -> Result<Arc<Client<T>>, RclrsError>
where
T: rosidl_runtime_rs::Service,
{
let client = Arc::new(Client::<T>::new(Arc::clone(&self.rcl_node_mtx), topic)?);
self.clients
.push(Arc::downgrade(&client) as Weak<dyn ClientBase>);
{ self.clients_mtx.lock().unwrap() }.push(Arc::downgrade(&client) as Weak<dyn ClientBase>);
Ok(client)
}

Expand All @@ -199,12 +198,12 @@ impl Node {
///
/// [1]: crate::GuardCondition
/// [2]: crate::spin_once
pub fn create_guard_condition(&mut self) -> Arc<GuardCondition> {
pub fn create_guard_condition(&self) -> Arc<GuardCondition> {
let guard_condition = Arc::new(GuardCondition::new_with_rcl_context(
&mut self.rcl_context_mtx.lock().unwrap(),
None,
));
self.guard_conditions
{ self.guard_conditions_mtx.lock().unwrap() }
.push(Arc::downgrade(&guard_condition) as Weak<GuardCondition>);
guard_condition
}
Expand All @@ -226,7 +225,7 @@ impl Node {
&mut self.rcl_context_mtx.lock().unwrap(),
Some(Box::new(callback) as Box<dyn Fn() + Send + Sync>),
));
self.guard_conditions
{ self.guard_conditions_mtx.lock().unwrap() }
.push(Arc::downgrade(&guard_condition) as Weak<GuardCondition>);
guard_condition
}
Expand All @@ -251,7 +250,7 @@ impl Node {
/// [1]: crate::Service
// TODO: make service's lifetime depend on node's lifetime
pub fn create_service<T, F>(
&mut self,
&self,
topic: &str,
callback: F,
) -> Result<Arc<Service<T>>, RclrsError>
Expand All @@ -264,7 +263,7 @@ impl Node {
topic,
callback,
)?);
self.services
{ self.services_mtx.lock().unwrap() }
.push(Arc::downgrade(&service) as Weak<dyn ServiceBase>);
Ok(service)
}
Expand All @@ -274,7 +273,7 @@ impl Node {
/// [1]: crate::Subscription
// TODO: make subscription's lifetime depend on node's lifetime
pub fn create_subscription<T, Args>(
&mut self,
&self,
topic: &str,
qos: QoSProfile,
callback: impl SubscriptionCallback<T, Args>,
Expand All @@ -288,32 +287,39 @@ impl Node {
qos,
callback,
)?);
self.subscriptions
{ self.subscriptions_mtx.lock() }
.unwrap()
.push(Arc::downgrade(&subscription) as Weak<dyn SubscriptionBase>);
Ok(subscription)
}

/// Returns the subscriptions that have not been dropped yet.
pub(crate) fn live_subscriptions(&self) -> Vec<Arc<dyn SubscriptionBase>> {
self.subscriptions
{ self.subscriptions_mtx.lock().unwrap() }
.iter()
.filter_map(Weak::upgrade)
.collect()
}

pub(crate) fn live_clients(&self) -> Vec<Arc<dyn ClientBase>> {
self.clients.iter().filter_map(Weak::upgrade).collect()
{ self.clients_mtx.lock().unwrap() }
.iter()
.filter_map(Weak::upgrade)
.collect()
}

pub(crate) fn live_guard_conditions(&self) -> Vec<Arc<GuardCondition>> {
self.guard_conditions
{ self.guard_conditions_mtx.lock().unwrap() }
.iter()
.filter_map(Weak::upgrade)
.collect()
}

pub(crate) fn live_services(&self) -> Vec<Arc<dyn ServiceBase>> {
self.services.iter().filter_map(Weak::upgrade).collect()
{ self.services_mtx.lock().unwrap() }
.iter()
.filter_map(Weak::upgrade)
.collect()
}

/// Returns the ROS domain ID that the node is using.
Expand Down
8 changes: 4 additions & 4 deletions rclrs/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,10 @@ impl NodeBuilder {
Ok(Node {
rcl_node_mtx,
rcl_context_mtx: self.context.clone(),
clients: vec![],
guard_conditions: vec![],
services: vec![],
subscriptions: vec![],
clients_mtx: Mutex::new(vec![]),
guard_conditions_mtx: Mutex::new(vec![]),
services_mtx: Mutex::new(vec![]),
subscriptions_mtx: Mutex::new(vec![]),
_parameter_map,
})
}
Expand Down
8 changes: 4 additions & 4 deletions rclrs_tests/src/graph_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ fn test_publishers() -> Result<(), RclrsError> {
#[test]
fn test_subscriptions() -> Result<(), RclrsError> {
let namespace = "/test_subscriptions_graph";
let mut graph = construct_test_graph(namespace)?;
let graph = construct_test_graph(namespace)?;

let node_2_empty_subscription = graph.node2.create_subscription::<msg::Empty, _>(
"graph_test_topic_1",
Expand Down Expand Up @@ -149,7 +149,7 @@ fn test_subscriptions() -> Result<(), RclrsError> {

#[test]
fn test_topic_names_and_types() -> Result<(), RclrsError> {
let mut graph = construct_test_graph("test_topics_graph")?;
let graph = construct_test_graph("test_topics_graph")?;

let _node_1_defaults_subscription = graph.node1.create_subscription::<msg::Defaults, _>(
"graph_test_topic_3",
Expand Down Expand Up @@ -191,7 +191,7 @@ fn test_topic_names_and_types() -> Result<(), RclrsError> {
#[test]
fn test_services() -> Result<(), RclrsError> {
let namespace = "/test_services_graph";
let mut graph = construct_test_graph(namespace)?;
let graph = construct_test_graph(namespace)?;
let check_names_and_types = |names_and_types: TopicNamesAndTypes| {
let types = names_and_types
.get("/test_services_graph/graph_test_topic_4")
Expand Down Expand Up @@ -225,7 +225,7 @@ fn test_services() -> Result<(), RclrsError> {
#[test]
fn test_clients() -> Result<(), RclrsError> {
let namespace = "/test_clients_graph";
let mut graph = construct_test_graph(namespace)?;
let graph = construct_test_graph(namespace)?;
let _node_2_empty_client = graph
.node2
.create_client::<srv::Empty>("graph_test_topic_4")?;
Expand Down

0 comments on commit 7450810

Please sign in to comment.