diff --git a/rclrs/src/action.rs b/rclrs/src/action.rs index c2ef0e7a..e84944c4 100644 --- a/rclrs/src/action.rs +++ b/rclrs/src/action.rs @@ -1,4 +1,4 @@ -mod client; +pub(crate) mod client; pub(crate) mod server; mod server_goal_handle; diff --git a/rclrs/src/action/client.rs b/rclrs/src/action/client.rs index deef8a16..98d7ca37 100644 --- a/rclrs/src/action/client.rs +++ b/rclrs/src/action/client.rs @@ -48,9 +48,18 @@ impl Drop for ActionClientHandle { pub trait ActionClientBase: Send + Sync { /// Internal function to get a reference to the `rcl` handle. fn handle(&self) -> &ActionClientHandle; + /// Returns the number of underlying entities for the action client. fn num_entities(&self) -> &WaitableNumEntities; - // /// Tries to take a new request and run the callback with it. - // fn execute(&self) -> Result<(), RclrsError>; + /// Tries to run the callback for the given readiness mode. + fn execute(&self, mode: ReadyMode) -> Result<(), RclrsError>; +} + +pub(crate) enum ReadyMode { + Feedback, + Status, + GoalResponse, + CancelResponse, + ResultResponse, } pub struct ActionClient @@ -143,4 +152,8 @@ where fn num_entities(&self) -> &WaitableNumEntities { &self.num_entities } + + fn execute(&self, mode: ReadyMode) -> Result<(), RclrsError> { + todo!() + } } diff --git a/rclrs/src/action/server.rs b/rclrs/src/action/server.rs index b9d0b2c6..0a5a81e0 100644 --- a/rclrs/src/action/server.rs +++ b/rclrs/src/action/server.rs @@ -50,9 +50,17 @@ impl Drop for ActionServerHandle { pub trait ActionServerBase: Send + Sync { /// Internal function to get a reference to the `rcl` handle. fn handle(&self) -> &ActionServerHandle; + /// Returns the number of underlying entities for the action server. fn num_entities(&self) -> &WaitableNumEntities; - // /// Tries to take a new request and run the callback with it. - // fn execute(&self) -> Result<(), RclrsError>; + /// Tries to run the callback for the given readiness mode. + fn execute(&self, mode: ReadyMode) -> Result<(), RclrsError>; +} + +pub(crate) enum ReadyMode { + GoalRequest, + CancelRequest, + ResultRequest, + GoalExpired, } pub type GoalCallback = dyn Fn(GoalUuid, ::Goal) -> GoalResponse + 'static + Send + Sync; @@ -163,4 +171,8 @@ where fn num_entities(&self) -> &WaitableNumEntities { &self.num_entities } + + fn execute(&self, mode: ReadyMode) -> Result<(), RclrsError> { + todo!() + } } diff --git a/rclrs/src/executor.rs b/rclrs/src/executor.rs index 37c43a68..7bd96c58 100644 --- a/rclrs/src/executor.rs +++ b/rclrs/src/executor.rs @@ -25,13 +25,15 @@ impl SingleThreadedExecutor { /// Add a node to the executor. pub fn add_node(&self, node: &Arc) -> Result<(), RclrsError> { - { self.nodes_mtx.lock().unwrap() }.push(Arc::downgrade(node)); + self.nodes_mtx.lock().unwrap().push(Arc::downgrade(node)); Ok(()) } /// Remove a node from the executor. pub fn remove_node(&self, node: Arc) -> Result<(), RclrsError> { - { self.nodes_mtx.lock().unwrap() } + self.nodes_mtx + .lock() + .unwrap() .retain(|n| !n.upgrade().map(|n| Arc::ptr_eq(&n, &node)).unwrap_or(false)); Ok(()) } @@ -40,7 +42,10 @@ impl SingleThreadedExecutor { /// /// This function additionally checks that the context is still valid. pub fn spin_once(&self, timeout: Option) -> Result<(), RclrsError> { - for node in { self.nodes_mtx.lock().unwrap() } + for node in self + .nodes_mtx + .lock() + .unwrap() .iter() .filter_map(Weak::upgrade) .filter(|node| unsafe { @@ -61,6 +66,14 @@ impl SingleThreadedExecutor { for ready_service in ready_entities.services { ready_service.execute()?; } + + for (ready_action_client, mode) in ready_entities.action_clients { + ready_action_client.execute(mode)?; + } + + for (ready_action_server, mode) in ready_entities.action_servers { + ready_action_server.execute(mode)?; + } } Ok(()) @@ -68,7 +81,7 @@ impl SingleThreadedExecutor { /// Convenience function for calling [`SingleThreadedExecutor::spin_once`] in a loop. pub fn spin(&self) -> Result<(), RclrsError> { - while !{ self.nodes_mtx.lock().unwrap() }.is_empty() { + while !self.nodes_mtx.lock().unwrap().is_empty() { match self.spin_once(None) { Ok(_) | Err(RclrsError::RclError { diff --git a/rclrs/src/wait.rs b/rclrs/src/wait.rs index 68d6d962..68c3570f 100644 --- a/rclrs/src/wait.rs +++ b/rclrs/src/wait.rs @@ -18,6 +18,9 @@ use std::{sync::Arc, time::Duration, vec::Vec}; use crate::{ + action::{ + client::ReadyMode as ActionClientReadyMode, server::ReadyMode as ActionServerReadyMode, + }, error::{to_rclrs_result, RclReturnCode, RclrsError, ToResult}, rcl_bindings::*, ActionClientBase, ActionServerBase, ClientBase, Context, ContextHandle, Node, ServiceBase, @@ -66,6 +69,10 @@ pub struct ReadyEntities { pub guard_conditions: Vec>, /// A list of services that have potentially received requests. pub services: Vec>, + /// A list of action clients and the ways in which they are ready. + pub action_clients: Vec<(Arc, ActionClientReadyMode)>, + /// A list of action servers and the ways in which they are ready. + pub action_servers: Vec<(Arc, ActionServerReadyMode)>, } impl Drop for rcl_wait_set_t { @@ -156,8 +163,12 @@ impl WaitSet { let mut num_services = live_services.len(); let mut num_events = 0; - let action_client_entities = live_action_clients.iter().map(|client| client.num_entities()); - let action_server_entities = live_action_servers.iter().map(|server| server.num_entities()); + let action_client_entities = live_action_clients + .iter() + .map(|client| client.num_entities()); + let action_server_entities = live_action_servers + .iter() + .map(|server| server.num_entities()); for num_entities in action_client_entities.chain(action_server_entities) { num_subscriptions += num_entities.num_subscriptions; num_timers += num_entities.num_timers; @@ -451,8 +462,9 @@ impl WaitSet { }; // SAFETY: The comments in rcl mention "This function cannot operate on the same wait set // in multiple threads, and the wait sets may not share content." - // We cannot currently guarantee that the wait sets may not share content, but it is - // mentioned in the doc comment for `add_subscription`. + // By taking exclusive ownership of `self`, we can guarantee that the wait set is not in + // use from another thread. We guarantee that waits sets may not share content using + // `ExclusivityGuard`s on each entity added. // Also, the rcl_wait_set is obviously valid. match unsafe { rcl_wait(&mut self.handle.rcl_wait_set, timeout_ns) }.ok() { Ok(_) => (), @@ -469,6 +481,8 @@ impl WaitSet { clients: Vec::new(), guard_conditions: Vec::new(), services: Vec::new(), + action_clients: Vec::new(), + action_servers: Vec::new(), }; for (i, subscription) in self.subscriptions.iter().enumerate() { // SAFETY: The `subscriptions` entry is an array of pointers, and this dereferencing is @@ -513,6 +527,103 @@ impl WaitSet { ready_entities.services.push(Arc::clone(&service.waitable)); } } + + for action_client in &self.action_clients { + let mut is_feedback_ready = false; + let mut is_status_ready = false; + let mut is_goal_response_ready = false; + let mut is_cancel_response_ready = false; + let mut is_result_response_ready = false; + // SAFETY: The wait set is exclusively owned by this function, which guarantees thread + // safety. + unsafe { + rcl_action_client_wait_set_get_entities_ready( + &self.handle.rcl_wait_set, + &*action_client.waitable.handle().lock(), + &mut is_feedback_ready, + &mut is_status_ready, + &mut is_goal_response_ready, + &mut is_cancel_response_ready, + &mut is_result_response_ready, + ) + .ok()?; + } + if is_feedback_ready { + ready_entities.action_clients.push(( + Arc::clone(&action_client.waitable), + ActionClientReadyMode::Feedback, + )); + } + if is_status_ready { + ready_entities.action_clients.push(( + Arc::clone(&action_client.waitable), + ActionClientReadyMode::Status, + )); + } + if is_goal_response_ready { + ready_entities.action_clients.push(( + Arc::clone(&action_client.waitable), + ActionClientReadyMode::GoalResponse, + )); + } + if is_cancel_response_ready { + ready_entities.action_clients.push(( + Arc::clone(&action_client.waitable), + ActionClientReadyMode::CancelResponse, + )); + } + if is_result_response_ready { + ready_entities.action_clients.push(( + Arc::clone(&action_client.waitable), + ActionClientReadyMode::ResultResponse, + )); + } + } + + for action_server in &self.action_servers { + let mut is_goal_request_ready = false; + let mut is_cancel_request_ready = false; + let mut is_result_request_ready = false; + let mut is_goal_expired = false; + // SAFETY: The wait set is exclusively owned by this function, which guarantees thread + // safety. + unsafe { + rcl_action_server_wait_set_get_entities_ready( + &self.handle.rcl_wait_set, + &*action_server.waitable.handle().lock(), + &mut is_goal_request_ready, + &mut is_cancel_request_ready, + &mut is_result_request_ready, + &mut is_goal_expired, + ) + .ok()?; + } + if is_goal_request_ready { + ready_entities.action_servers.push(( + Arc::clone(&action_server.waitable), + ActionServerReadyMode::GoalRequest, + )); + } + if is_cancel_request_ready { + ready_entities.action_servers.push(( + Arc::clone(&action_server.waitable), + ActionServerReadyMode::CancelRequest, + )); + } + if is_result_request_ready { + ready_entities.action_servers.push(( + Arc::clone(&action_server.waitable), + ActionServerReadyMode::ResultRequest, + )); + } + if is_goal_expired { + ready_entities.action_servers.push(( + Arc::clone(&action_server.waitable), + ActionServerReadyMode::GoalExpired, + )); + } + } + Ok(ready_entities) } }