-
Notifications
You must be signed in to change notification settings - Fork 136
/
executor.rs
84 lines (72 loc) · 2.52 KB
/
executor.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
use crate::{rcl_bindings::rcl_context_is_valid, Node, RclReturnCode, RclrsError, WaitSet};
use std::{
sync::{Arc, Mutex, Weak},
time::Duration,
};
/// Single-threaded executor implementation.
pub struct SingleThreadedExecutor {
nodes_mtx: Mutex<Vec<Weak<Node>>>,
}
impl Default for SingleThreadedExecutor {
fn default() -> Self {
Self::new()
}
}
impl SingleThreadedExecutor {
/// Creates a new executor.
pub fn new() -> Self {
SingleThreadedExecutor {
nodes_mtx: Mutex::new(Vec::new()),
}
}
/// Add a node to the executor.
pub fn add_node(&self, node: &Arc<Node>) -> Result<(), RclrsError> {
{ self.nodes_mtx.lock().unwrap() }.push(Arc::downgrade(node));
Ok(())
}
/// Remove a node from the executor.
pub fn remove_node(&self, node: Arc<Node>) -> Result<(), RclrsError> {
{ self.nodes_mtx.lock().unwrap() }
.retain(|n| !n.upgrade().map(|n| Arc::ptr_eq(&n, &node)).unwrap_or(false));
Ok(())
}
/// Polls the nodes for new messages and executes the corresponding callbacks.
///
/// This function additionally checks that the context is still valid.
pub fn spin_once(&self, timeout: Option<Duration>) -> Result<(), RclrsError> {
for node in { self.nodes_mtx.lock().unwrap() }
.iter()
.filter_map(Weak::upgrade)
.filter(|node| unsafe {
rcl_context_is_valid(&*node.handle.context_handle.rcl_context.lock().unwrap())
})
{
let wait_set = WaitSet::new_for_node(&node)?;
let ready_entities = wait_set.wait(timeout)?;
for ready_subscription in ready_entities.subscriptions {
ready_subscription.execute()?;
}
for ready_client in ready_entities.clients {
ready_client.execute()?;
}
for ready_service in ready_entities.services {
ready_service.execute()?;
}
}
Ok(())
}
/// Convenience function for calling [`SingleThreadedExecutor::spin_once`] in a loop.
pub fn spin(&self) -> Result<(), RclrsError> {
while !{ self.nodes_mtx.lock().unwrap() }.is_empty() {
match self.spin_once(None) {
Ok(_)
| Err(RclrsError::RclError {
code: RclReturnCode::Timeout,
..
}) => std::thread::yield_now(),
error => return error,
}
}
Ok(())
}
}