Skip to content

Commit

Permalink
Refactor wait-set policies examples
Browse files Browse the repository at this point in the history
Signed-off-by: Carlos San Vicente <carlos.sanvicente@apex.ai>
  • Loading branch information
carlossvg committed Jul 6, 2021
1 parent 9864211 commit aa1f133
Show file tree
Hide file tree
Showing 3 changed files with 244 additions and 85 deletions.
81 changes: 64 additions & 17 deletions rclcpp/wait_set/static_wait_set.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020 Open Source Robotics Foundation, Inc.
// Copyright 2021 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -13,6 +13,7 @@
// limitations under the License.

#include <memory>
#include <vector>

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"
Expand All @@ -21,28 +22,74 @@ int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);

// auto do_nothing = [](std_msgs::msg::String::UniquePtr){};
auto node = std::make_shared<rclcpp::Node>("static_wait_set_example_node");

auto guard_condition = std::make_shared<rclcpp::GuardCondition>();
auto do_nothing = [](std_msgs::msg::String::UniquePtr) {assert(false);};
auto sub1 = node->create_subscription<std_msgs::msg::String>("~/chatter", 10, do_nothing);
auto sub2 = node->create_subscription<std_msgs::msg::String>("~/chatter", 10, do_nothing);
std::vector<decltype(sub1)> sub_vector = {sub1, sub2};
auto guard_condition1 = std::make_shared<rclcpp::GuardCondition>();
auto guard_condition2 = std::make_shared<rclcpp::GuardCondition>();

rclcpp::StaticWaitSet<0, 1, 0, 0, 0, 0> static_wait_set(
std::array<rclcpp::StaticWaitSet<0, 1, 0, 0, 0, 0>::SubscriptionEntry, 0>{},
std::array<rclcpp::GuardCondition::SharedPtr, 1>{{guard_condition}},
rclcpp::StaticWaitSet<2, 2, 0, 0, 0, 0> static_wait_set(
std::array<rclcpp::StaticWaitSet<2, 2, 0, 0, 0, 0>::SubscriptionEntry, 2>{{{sub1}, {sub2}}},
std::array<rclcpp::GuardCondition::SharedPtr, 2>{{{guard_condition1}, {guard_condition2}}},
std::array<rclcpp::TimerBase::SharedPtr, 0>{},
std::array<rclcpp::ClientBase::SharedPtr, 0>{},
std::array<rclcpp::ServiceBase::SharedPtr, 0>{},
std::array<rclcpp::StaticWaitSet<0, 1, 0, 0, 0, 0>::WaitableEntry, 0>{});
// Note: The following line will result in a compiler error, since the
// static storage policy prevents editing after construction.
// static_wait_set.add_guard_condition(guard_condition2);
// static_wait_set.remove_guard_condition(guard_condition2);
(void)guard_condition2;

{
auto wait_result = static_wait_set.wait(std::chrono::seconds(1));
assert(wait_result.kind() == rclcpp::WaitResultKind::Timeout);
}
std::array<rclcpp::StaticWaitSet<2, 2, 0, 0, 0, 0>::WaitableEntry, 0>{});

auto wait_and_print_results = [&]() {
RCLCPP_INFO(node->get_logger(), "Waiting...");
auto wait_result = static_wait_set.wait(std::chrono::seconds(1));
if (wait_result.kind() == rclcpp::WaitResultKind::Ready) {
int guard_conditions_num = static_wait_set.get_rcl_wait_set().size_of_guard_conditions;
int subscriptions_num = static_wait_set.get_rcl_wait_set().size_of_subscriptions;

for (int i = 0; i < guard_conditions_num; i++) {
if (wait_result.get_wait_set().get_rcl_wait_set().guard_conditions[i]) {
RCLCPP_INFO(node->get_logger(), "guard_condition %d triggered", i + 1);
}
}
for (int i = 0; i < subscriptions_num; i++) {
if (wait_result.get_wait_set().get_rcl_wait_set().subscriptions[i]) {
RCLCPP_INFO(node->get_logger(), "subscription %d triggered", i + 1);
std_msgs::msg::String msg;
rclcpp::MessageInfo msg_info;
if (sub_vector.at(i)->take(msg, msg_info)) {
RCLCPP_INFO(
node->get_logger(),
"subscription %d: I heard '%s'", i + 1, msg.data.c_str());
} else {
RCLCPP_INFO(node->get_logger(), "subscription %d: No message", i + 1);
}
}
}
} else if (wait_result.kind() == rclcpp::WaitResultKind::Timeout) {
RCLCPP_INFO(node->get_logger(), "wait-set waiting failed with timeout");
} else if (wait_result.kind() == rclcpp::WaitResultKind::Empty) {
RCLCPP_INFO(node->get_logger(), "wait-set waiting failed because wait-set is empty");
}
};

RCLCPP_INFO(node->get_logger(), "Action: Nothing triggered");
wait_and_print_results();

RCLCPP_INFO(node->get_logger(), "Action: Trigger Guard condition 1");
guard_condition1->trigger();
wait_and_print_results();

RCLCPP_INFO(node->get_logger(), "Action: Trigger Guard condition 2");
guard_condition2->trigger();
wait_and_print_results();

RCLCPP_INFO(node->get_logger(), "Action: Message published");
auto pub = node->create_publisher<std_msgs::msg::String>("~/chatter", 1);
pub->publish(std_msgs::msg::String().set__data("test"));
wait_and_print_results();

// Note the static wait-set does not allow adding or removing entities dynamically.
// It will result in a compilation error.

return 0;
}
137 changes: 69 additions & 68 deletions rclcpp/wait_set/thread_safe_wait_set.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020 Open Source Robotics Foundation, Inc.
// Copyright 2021 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -15,96 +15,97 @@
#include <cassert>
#include <memory>
#include <vector>
// #include <string>
// #include <thread>

#include "example_interfaces/srv/add_two_ints.hpp"
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"

int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);

auto do_nothing = [](std_msgs::msg::String::UniquePtr) {assert(false);};

auto node = std::make_shared<rclcpp::Node>("wait_set_example_node");

auto guard_condition = std::make_shared<rclcpp::GuardCondition>();
auto do_nothing = [](std_msgs::msg::String::UniquePtr) {assert(false);};
auto sub1 = node->create_subscription<std_msgs::msg::String>("~/chatter", 10, do_nothing);
auto sub2 = node->create_subscription<std_msgs::msg::String>("~/chatter", 10, do_nothing);
std::vector<decltype(sub1)> sub_vector = {sub1, sub2};
auto guard_condition1 = std::make_shared<rclcpp::GuardCondition>();
auto guard_condition2 = std::make_shared<rclcpp::GuardCondition>();

auto sub = node->create_subscription<std_msgs::msg::String>("~/chatter", 10, do_nothing);
rclcpp::SubscriptionOptions so;
so.use_intra_process_comm = rclcpp::IntraProcessSetting::Enable;
auto sub2 = node->create_subscription<std_msgs::msg::String>("~/chatter", 10, do_nothing, so);

// FIXME: removing sub if it was added in the ctor leads to a failure
// terminate called after throwing an instance of 'std::runtime_error'
// what(): waitable not in wait set
rclcpp::ThreadSafeWaitSet wait_set(
std::vector<rclcpp::ThreadSafeWaitSet::SubscriptionEntry>{{sub}},
std::vector<rclcpp::GuardCondition::SharedPtr>{guard_condition});
// std::vector<rclcpp::WaitSet::SubscriptionEntry>{{sub}},
{},
std::vector<rclcpp::GuardCondition::SharedPtr>{guard_condition1});

wait_set.add_subscription(sub1); // FIXME: add it in the ctor
wait_set.add_subscription(sub2);
wait_set.add_guard_condition(guard_condition2);

{
auto wait_and_print_results = [&]() {
RCLCPP_INFO(node->get_logger(), "Waiting...");
auto wait_result = wait_set.wait(std::chrono::seconds(1));
assert(wait_result.kind() == rclcpp::WaitResultKind::Timeout);
}

guard_condition->trigger();

{
auto wait_result = wait_set.wait(std::chrono::seconds(1));
assert(wait_result.kind() == rclcpp::WaitResultKind::Ready);
assert(wait_result.get_wait_set().get_rcl_wait_set().guard_conditions[0] != nullptr);
assert(wait_result.get_wait_set().get_rcl_wait_set().guard_conditions[1] == nullptr);
assert(wait_result.get_wait_set().get_rcl_wait_set().subscriptions[0] == nullptr);
assert(wait_result.get_wait_set().get_rcl_wait_set().subscriptions[1] == nullptr);
}
if (wait_result.kind() == rclcpp::WaitResultKind::Ready) {
int guard_conditions_num = wait_set.get_rcl_wait_set().size_of_guard_conditions;
int subscriptions_num = wait_set.get_rcl_wait_set().size_of_subscriptions;

for (int i=0; i<guard_conditions_num; i++) {
if (wait_result.get_wait_set().get_rcl_wait_set().guard_conditions[i]) {
RCLCPP_INFO(node->get_logger(), "guard_condition %d triggered", i+1);
}
}
for (int i=0; i<subscriptions_num; i++) {
if (wait_result.get_wait_set().get_rcl_wait_set().subscriptions[i]) {
RCLCPP_INFO(node->get_logger(), "subscription %d triggered", i+1);
std_msgs::msg::String msg;
rclcpp::MessageInfo msg_info;
if (sub_vector.at(i)->take(msg, msg_info)) {
RCLCPP_INFO(node->get_logger(),
"subscription %d: I heard '%s'", i+1, msg.data.c_str());
} else {
RCLCPP_INFO(node->get_logger(), "subscription %d: No message", i+1);
}
}
}
} else if (wait_result.kind() == rclcpp::WaitResultKind::Timeout) {
RCLCPP_INFO(node->get_logger(), "wait-set waiting failed with timeout");
} else if (wait_result.kind() == rclcpp::WaitResultKind::Empty) {
RCLCPP_INFO(node->get_logger(), "wait-set waiting failed because wait-set is empty");
}
};

RCLCPP_INFO(node->get_logger(), "Action: Nothing triggered");
wait_and_print_results();

RCLCPP_INFO(node->get_logger(), "Action: Trigger Guard condition 1");
guard_condition1->trigger();
wait_and_print_results();

RCLCPP_INFO(node->get_logger(), "Action: Trigger Guard condition 2");
guard_condition2->trigger();
wait_and_print_results();

RCLCPP_INFO(node->get_logger(), "Action: Message published");
auto pub = node->create_publisher<std_msgs::msg::String>("~/chatter", 1);
pub->publish(std_msgs::msg::String().set__data("test"));
wait_and_print_results();

RCLCPP_INFO(node->get_logger(), "Action: Guard condition 1 removed");
RCLCPP_INFO(node->get_logger(), "Action: Guard condition 2 removed");
wait_set.remove_guard_condition(guard_condition1);
wait_set.remove_guard_condition(guard_condition2);
wait_and_print_results();

{
// still fails with timeout
auto wait_result = wait_set.wait(std::chrono::seconds(1));
assert(wait_result.kind() == rclcpp::WaitResultKind::Timeout);
}

wait_set.remove_guard_condition(guard_condition);

{
// still fails with timeout
auto wait_result = wait_set.wait(std::chrono::seconds(1));
assert(wait_result.kind() == rclcpp::WaitResultKind::Timeout);
}

RCLCPP_INFO(node->get_logger(), "Action: Subscription 2 removed");
wait_set.remove_subscription(sub2);
wait_and_print_results();

{
// still fails with timeout
auto wait_result = wait_set.wait(std::chrono::seconds(1));
assert(wait_result.kind() == rclcpp::WaitResultKind::Timeout);
}

auto pub = node->create_publisher<std_msgs::msg::String>("~/chatter", 1);
pub->publish(std_msgs::msg::String().set__data("test"));
RCLCPP_INFO(node->get_logger(), "Action: Subscription 1 removed");
wait_set.remove_subscription(sub1);
wait_and_print_results();

{
auto wait_result = wait_set.wait(std::chrono::seconds(1));
assert(wait_result.kind() == rclcpp::WaitResultKind::Ready);
assert(wait_result.get_wait_set().get_rcl_wait_set().subscriptions[0] != nullptr);
std_msgs::msg::String msg;
rclcpp::MessageInfo msg_info;
assert(sub->take(msg, msg_info));
assert(msg.data == "test");
}

wait_set.remove_subscription(sub);

{
// still will not fail, because thread-safe policy we are using adds its
// own guard condition and therefore it will never be empty
auto wait_result = wait_set.wait(std::chrono::seconds(1));
assert(wait_result.kind() == rclcpp::WaitResultKind::Timeout);
}

return 0;
}
111 changes: 111 additions & 0 deletions rclcpp/wait_set/wait_set.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2021 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <cassert>
#include <memory>
#include <vector>

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"


int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);

auto node = std::make_shared<rclcpp::Node>("wait_set_example_node");

auto do_nothing = [](std_msgs::msg::String::UniquePtr) {assert(false);};
auto sub1 = node->create_subscription<std_msgs::msg::String>("~/chatter", 10, do_nothing);
auto sub2 = node->create_subscription<std_msgs::msg::String>("~/chatter", 10, do_nothing);
std::vector<decltype(sub1)> sub_vector = {sub1, sub2};
auto guard_condition1 = std::make_shared<rclcpp::GuardCondition>();
auto guard_condition2 = std::make_shared<rclcpp::GuardCondition>();

// FIXME: removing sub if it was added in the ctor leads to a failure
// terminate called after throwing an instance of 'std::runtime_error'
// what(): waitable not in wait set
rclcpp::WaitSet wait_set(
// std::vector<rclcpp::WaitSet::SubscriptionEntry>{{sub}},
{},
std::vector<rclcpp::GuardCondition::SharedPtr>{guard_condition1});

wait_set.add_subscription(sub1); // FIXME: add it in the ctor
wait_set.add_subscription(sub2);
wait_set.add_guard_condition(guard_condition2);

auto wait_and_print_results = [&]() {
RCLCPP_INFO(node->get_logger(), "Waiting...");
auto wait_result = wait_set.wait(std::chrono::seconds(1));
if (wait_result.kind() == rclcpp::WaitResultKind::Ready) {
int guard_conditions_num = wait_set.get_rcl_wait_set().size_of_guard_conditions;
int subscriptions_num = wait_set.get_rcl_wait_set().size_of_subscriptions;

for (int i=0; i<guard_conditions_num; i++) {
if (wait_result.get_wait_set().get_rcl_wait_set().guard_conditions[i]) {
RCLCPP_INFO(node->get_logger(), "guard_condition %d triggered", i+1);
}
}
for (int i=0; i<subscriptions_num; i++) {
if (wait_result.get_wait_set().get_rcl_wait_set().subscriptions[i]) {
RCLCPP_INFO(node->get_logger(), "subscription %d triggered", i+1);
std_msgs::msg::String msg;
rclcpp::MessageInfo msg_info;
if (sub_vector.at(i)->take(msg, msg_info)) {
RCLCPP_INFO(node->get_logger(),
"subscription %d: I heard '%s'", i+1, msg.data.c_str());
} else {
RCLCPP_INFO(node->get_logger(), "subscription %d: No message", i+1);
}
}
}
} else if (wait_result.kind() == rclcpp::WaitResultKind::Timeout) {
RCLCPP_INFO(node->get_logger(), "wait-set waiting failed with timeout");
} else if (wait_result.kind() == rclcpp::WaitResultKind::Empty) {
RCLCPP_INFO(node->get_logger(), "wait-set waiting failed because wait-set is empty");
}
};

RCLCPP_INFO(node->get_logger(), "Action: Nothing triggered");
wait_and_print_results();

RCLCPP_INFO(node->get_logger(), "Action: Trigger Guard condition 1");
guard_condition1->trigger();
wait_and_print_results();

RCLCPP_INFO(node->get_logger(), "Action: Trigger Guard condition 2");
guard_condition2->trigger();
wait_and_print_results();

RCLCPP_INFO(node->get_logger(), "Action: Message published");
auto pub = node->create_publisher<std_msgs::msg::String>("~/chatter", 1);
pub->publish(std_msgs::msg::String().set__data("test"));
wait_and_print_results();

RCLCPP_INFO(node->get_logger(), "Action: Guard condition 1 removed");
RCLCPP_INFO(node->get_logger(), "Action: Guard condition 2 removed");
wait_set.remove_guard_condition(guard_condition1);
wait_set.remove_guard_condition(guard_condition2);
wait_and_print_results();

RCLCPP_INFO(node->get_logger(), "Action: Subscription 2 removed");
wait_set.remove_subscription(sub2);
wait_and_print_results();

RCLCPP_INFO(node->get_logger(), "Action: Subscription 1 removed");
wait_set.remove_subscription(sub1);
wait_and_print_results();

return 0;
}

0 comments on commit aa1f133

Please sign in to comment.