Skip to content

Commit

Permalink
Handle action server/client readiness in WaitSet and executor
Browse files Browse the repository at this point in the history
Currently, action servers and clients that are ready in multiple ways
are returned to the executor as a list of pairs, with one readiness mode
per entry. This could alternatively be encoded as a bitfield or similar
struct, with any given client/server only occurring once in the list.
However, to ensure that the executor has control over execution order,
we would need to expose individual `execute_readiness_mode()` methods
from the client and server, rather than a unified `execute(Mode)`
method. That's fine too, but something to keep in mind.
  • Loading branch information
nwn committed Jul 7, 2024
1 parent b845051 commit 453c208
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 13 deletions.
2 changes: 1 addition & 1 deletion rclrs/src/action.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
mod client;
pub(crate) mod client;
pub(crate) mod server;
mod server_goal_handle;

Expand Down
17 changes: 15 additions & 2 deletions rclrs/src/action/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,18 @@ impl Drop for ActionClientHandle {
pub trait ActionClientBase: Send + Sync {
/// Internal function to get a reference to the `rcl` handle.
fn handle(&self) -> &ActionClientHandle;
/// Returns the number of underlying entities for the action client.
fn num_entities(&self) -> &WaitableNumEntities;
// /// Tries to take a new request and run the callback with it.
// fn execute(&self) -> Result<(), RclrsError>;
/// Tries to run the callback for the given readiness mode.
fn execute(&self, mode: ReadyMode) -> Result<(), RclrsError>;
}

pub(crate) enum ReadyMode {
Feedback,
Status,
GoalResponse,
CancelResponse,
ResultResponse,
}

pub struct ActionClient<ActionT>
Expand Down Expand Up @@ -143,4 +152,8 @@ where
fn num_entities(&self) -> &WaitableNumEntities {
&self.num_entities
}

fn execute(&self, mode: ReadyMode) -> Result<(), RclrsError> {
todo!()
}
}
16 changes: 14 additions & 2 deletions rclrs/src/action/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,17 @@ impl Drop for ActionServerHandle {
pub trait ActionServerBase: Send + Sync {
/// Internal function to get a reference to the `rcl` handle.
fn handle(&self) -> &ActionServerHandle;
/// Returns the number of underlying entities for the action server.
fn num_entities(&self) -> &WaitableNumEntities;
// /// Tries to take a new request and run the callback with it.
// fn execute(&self) -> Result<(), RclrsError>;
/// Tries to run the callback for the given readiness mode.
fn execute(&self, mode: ReadyMode) -> Result<(), RclrsError>;
}

pub(crate) enum ReadyMode {
GoalRequest,
CancelRequest,
ResultRequest,
GoalExpired,
}

pub type GoalCallback<ActionT> = dyn Fn(GoalUuid, <ActionT as rosidl_runtime_rs::Action>::Goal) -> GoalResponse + 'static + Send + Sync;
Expand Down Expand Up @@ -163,4 +171,8 @@ where
fn num_entities(&self) -> &WaitableNumEntities {
&self.num_entities
}

fn execute(&self, mode: ReadyMode) -> Result<(), RclrsError> {
todo!()
}
}
21 changes: 17 additions & 4 deletions rclrs/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ impl SingleThreadedExecutor {

/// Add a node to the executor.
pub fn add_node(&self, node: &Arc<Node>) -> Result<(), RclrsError> {
{ self.nodes_mtx.lock().unwrap() }.push(Arc::downgrade(node));
self.nodes_mtx.lock().unwrap().push(Arc::downgrade(node));
Ok(())
}

/// Remove a node from the executor.
pub fn remove_node(&self, node: Arc<Node>) -> Result<(), RclrsError> {
{ self.nodes_mtx.lock().unwrap() }
self.nodes_mtx
.lock()
.unwrap()
.retain(|n| !n.upgrade().map(|n| Arc::ptr_eq(&n, &node)).unwrap_or(false));
Ok(())
}
Expand All @@ -40,7 +42,10 @@ impl SingleThreadedExecutor {
///
/// This function additionally checks that the context is still valid.
pub fn spin_once(&self, timeout: Option<Duration>) -> Result<(), RclrsError> {
for node in { self.nodes_mtx.lock().unwrap() }
for node in self
.nodes_mtx
.lock()
.unwrap()
.iter()
.filter_map(Weak::upgrade)
.filter(|node| unsafe {
Expand All @@ -61,14 +66,22 @@ impl SingleThreadedExecutor {
for ready_service in ready_entities.services {
ready_service.execute()?;
}

for (ready_action_client, mode) in ready_entities.action_clients {
ready_action_client.execute(mode)?;
}

for (ready_action_server, mode) in ready_entities.action_servers {
ready_action_server.execute(mode)?;
}
}

Ok(())
}

/// Convenience function for calling [`SingleThreadedExecutor::spin_once`] in a loop.
pub fn spin(&self) -> Result<(), RclrsError> {
while !{ self.nodes_mtx.lock().unwrap() }.is_empty() {
while !self.nodes_mtx.lock().unwrap().is_empty() {
match self.spin_once(None) {
Ok(_)
| Err(RclrsError::RclError {
Expand Down
119 changes: 115 additions & 4 deletions rclrs/src/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
use std::{sync::Arc, time::Duration, vec::Vec};

use crate::{
action::{
client::ReadyMode as ActionClientReadyMode, server::ReadyMode as ActionServerReadyMode,
},
error::{to_rclrs_result, RclReturnCode, RclrsError, ToResult},
rcl_bindings::*,
ActionClientBase, ActionServerBase, ClientBase, Context, ContextHandle, Node, ServiceBase,
Expand Down Expand Up @@ -66,6 +69,10 @@ pub struct ReadyEntities {
pub guard_conditions: Vec<Arc<GuardCondition>>,
/// A list of services that have potentially received requests.
pub services: Vec<Arc<dyn ServiceBase>>,
/// A list of action clients and the ways in which they are ready.
pub action_clients: Vec<(Arc<dyn ActionClientBase>, ActionClientReadyMode)>,
/// A list of action servers and the ways in which they are ready.
pub action_servers: Vec<(Arc<dyn ActionServerBase>, ActionServerReadyMode)>,
}

impl Drop for rcl_wait_set_t {
Expand Down Expand Up @@ -156,8 +163,12 @@ impl WaitSet {
let mut num_services = live_services.len();
let mut num_events = 0;

let action_client_entities = live_action_clients.iter().map(|client| client.num_entities());
let action_server_entities = live_action_servers.iter().map(|server| server.num_entities());
let action_client_entities = live_action_clients
.iter()
.map(|client| client.num_entities());
let action_server_entities = live_action_servers
.iter()
.map(|server| server.num_entities());
for num_entities in action_client_entities.chain(action_server_entities) {
num_subscriptions += num_entities.num_subscriptions;
num_timers += num_entities.num_timers;
Expand Down Expand Up @@ -451,8 +462,9 @@ impl WaitSet {
};
// SAFETY: The comments in rcl mention "This function cannot operate on the same wait set
// in multiple threads, and the wait sets may not share content."
// We cannot currently guarantee that the wait sets may not share content, but it is
// mentioned in the doc comment for `add_subscription`.
// By taking exclusive ownership of `self`, we can guarantee that the wait set is not in
// use from another thread. We guarantee that waits sets may not share content using
// `ExclusivityGuard`s on each entity added.
// Also, the rcl_wait_set is obviously valid.
match unsafe { rcl_wait(&mut self.handle.rcl_wait_set, timeout_ns) }.ok() {
Ok(_) => (),
Expand All @@ -469,6 +481,8 @@ impl WaitSet {
clients: Vec::new(),
guard_conditions: Vec::new(),
services: Vec::new(),
action_clients: Vec::new(),
action_servers: Vec::new(),
};
for (i, subscription) in self.subscriptions.iter().enumerate() {
// SAFETY: The `subscriptions` entry is an array of pointers, and this dereferencing is
Expand Down Expand Up @@ -513,6 +527,103 @@ impl WaitSet {
ready_entities.services.push(Arc::clone(&service.waitable));
}
}

for action_client in &self.action_clients {
let mut is_feedback_ready = false;
let mut is_status_ready = false;
let mut is_goal_response_ready = false;
let mut is_cancel_response_ready = false;
let mut is_result_response_ready = false;
// SAFETY: The wait set is exclusively owned by this function, which guarantees thread
// safety.
unsafe {
rcl_action_client_wait_set_get_entities_ready(
&self.handle.rcl_wait_set,
&*action_client.waitable.handle().lock(),
&mut is_feedback_ready,
&mut is_status_ready,
&mut is_goal_response_ready,
&mut is_cancel_response_ready,
&mut is_result_response_ready,
)
.ok()?;
}
if is_feedback_ready {
ready_entities.action_clients.push((
Arc::clone(&action_client.waitable),
ActionClientReadyMode::Feedback,
));
}
if is_status_ready {
ready_entities.action_clients.push((
Arc::clone(&action_client.waitable),
ActionClientReadyMode::Status,
));
}
if is_goal_response_ready {
ready_entities.action_clients.push((
Arc::clone(&action_client.waitable),
ActionClientReadyMode::GoalResponse,
));
}
if is_cancel_response_ready {
ready_entities.action_clients.push((
Arc::clone(&action_client.waitable),
ActionClientReadyMode::CancelResponse,
));
}
if is_result_response_ready {
ready_entities.action_clients.push((
Arc::clone(&action_client.waitable),
ActionClientReadyMode::ResultResponse,
));
}
}

for action_server in &self.action_servers {
let mut is_goal_request_ready = false;
let mut is_cancel_request_ready = false;
let mut is_result_request_ready = false;
let mut is_goal_expired = false;
// SAFETY: The wait set is exclusively owned by this function, which guarantees thread
// safety.
unsafe {
rcl_action_server_wait_set_get_entities_ready(
&self.handle.rcl_wait_set,
&*action_server.waitable.handle().lock(),
&mut is_goal_request_ready,
&mut is_cancel_request_ready,
&mut is_result_request_ready,
&mut is_goal_expired,
)
.ok()?;
}
if is_goal_request_ready {
ready_entities.action_servers.push((
Arc::clone(&action_server.waitable),
ActionServerReadyMode::GoalRequest,
));
}
if is_cancel_request_ready {
ready_entities.action_servers.push((
Arc::clone(&action_server.waitable),
ActionServerReadyMode::CancelRequest,
));
}
if is_result_request_ready {
ready_entities.action_servers.push((
Arc::clone(&action_server.waitable),
ActionServerReadyMode::ResultRequest,
));
}
if is_goal_expired {
ready_entities.action_servers.push((
Arc::clone(&action_server.waitable),
ActionServerReadyMode::GoalExpired,
));
}
}

Ok(ready_entities)
}
}
Expand Down

0 comments on commit 453c208

Please sign in to comment.