diff --git a/rclcpp/wait_set/CMakeLists.txt b/rclcpp/wait_set/CMakeLists.txt index b62352c9..9dab43ca 100644 --- a/rclcpp/wait_set/CMakeLists.txt +++ b/rclcpp/wait_set/CMakeLists.txt @@ -30,6 +30,9 @@ ament_target_dependencies(wait_set_example rclcpp std_msgs) add_executable(wait_set_random_order wait_set_random_order.cpp) ament_target_dependencies(wait_set_random_order rclcpp std_msgs) +add_executable(executor_random_order executor_random_order.cpp) +ament_target_dependencies(executor_random_order rclcpp std_msgs) + add_executable(wait_set_different_rates_topics wait_set_different_rates_topics.cpp) ament_target_dependencies(wait_set_different_rates_topics rclcpp std_msgs) @@ -42,6 +45,7 @@ install(TARGETS thread_safe_wait_set wait_set_example wait_set_random_order + executor_random_order wait_set_different_rates_topics wait_set_composition DESTINATION lib/${PROJECT_NAME} diff --git a/rclcpp/wait_set/executor_random_order.cpp b/rclcpp/wait_set/executor_random_order.cpp new file mode 100644 index 00000000..746c9a98 --- /dev/null +++ b/rclcpp/wait_set/executor_random_order.cpp @@ -0,0 +1,43 @@ +// Copyright 2021, Apex.AI Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include "random_listener.hpp" +#include "random_talker.hpp" + +/* For this example, we will be creating a talker node with three publishers which will + * publish the topics A, B, C in random order each time. In this case the messages are handled + * using an executor. The order in which the messages are handled will depend on the message + * arrival and the type of messages available at the moment. + */ + +int32_t main(const int32_t argc, char ** const argv) +{ + rclcpp::init(argc, argv); + + // We run the talker and the listener in different threads to simulate inter-process + // communications. + // Note the order of execution will be different if both nodes are spun in the same executor. + // This is because the executor handles the messages in the order in which the + // subscriptions were created. Using difference threads the handling order depends on the + // message arrival and type of messages available. + auto thread = std::thread([]() {rclcpp::spin(std::make_shared());}); + rclcpp::spin(std::make_shared()); + + rclcpp::shutdown(); + thread.join(); + + return 0; +} diff --git a/rclcpp/wait_set/random_listener.hpp b/rclcpp/wait_set/random_listener.hpp new file mode 100644 index 00000000..c8db29e9 --- /dev/null +++ b/rclcpp/wait_set/random_listener.hpp @@ -0,0 +1,44 @@ +// Copyright 2021, Apex.AI Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +class Listener : public rclcpp::Node +{ + using subscription_list = std::vector::SharedPtr>; + +public: + Listener() + : Node("random_listener") + { + auto print_msg = [this](std_msgs::msg::String::UniquePtr msg) { + RCLCPP_INFO(this->get_logger(), "I heard: %s", msg->data.c_str()); + }; + sub1_ = this->create_subscription("topicA", 10, print_msg); + sub2_ = this->create_subscription("topicB", 10, print_msg); + sub3_ = this->create_subscription("topicC", 10, print_msg); + } + + subscription_list get_subscriptions() const + { + return {sub1_, sub2_, sub3_}; + } + +private: + rclcpp::Subscription::SharedPtr sub1_; + rclcpp::Subscription::SharedPtr sub2_; + rclcpp::Subscription::SharedPtr sub3_; +}; diff --git a/rclcpp/wait_set/random_talker.hpp b/rclcpp/wait_set/random_talker.hpp new file mode 100644 index 00000000..8e93b987 --- /dev/null +++ b/rclcpp/wait_set/random_talker.hpp @@ -0,0 +1,67 @@ +// Copyright 2021, Apex.AI Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +using namespace std::chrono_literals; + +class RandomTalker : public rclcpp::Node +{ +public: + RandomTalker() + : Node("random_talker"), + pub1_(this->create_publisher("topicA", 10)), + pub2_(this->create_publisher("topicB", 10)), + pub3_(this->create_publisher("topicC", 10)), + rand_engine_(std::chrono::system_clock::now().time_since_epoch().count()) + { + publish_functions_.emplace_back( + ([this]() { + std_msgs::msg::String msg; + msg.data = "A"; + RCLCPP_INFO(this->get_logger(), "Publishing: %s", msg.data.c_str()); + pub1_->publish(msg); + })); + publish_functions_.emplace_back( + ([this]() { + std_msgs::msg::String msg; + msg.data = "B"; + RCLCPP_INFO(this->get_logger(), "Publishing: %s", msg.data.c_str()); + pub2_->publish(msg); + })); + publish_functions_.emplace_back( + ([this]() { + std_msgs::msg::String msg; + msg.data = "C"; + RCLCPP_INFO(this->get_logger(), "Publishing: %s", msg.data.c_str()); + pub3_->publish(msg); + })); + auto timer_callback = + [this]() -> void { + std::shuffle(publish_functions_.begin(), publish_functions_.end(), rand_engine_); + for (const auto & f : publish_functions_) {f();} + }; + timer_ = this->create_wall_timer(1s, timer_callback); + } + +private: + rclcpp::TimerBase::SharedPtr timer_; + rclcpp::Publisher::SharedPtr pub1_; + rclcpp::Publisher::SharedPtr pub2_; + rclcpp::Publisher::SharedPtr pub3_; + std::vector> publish_functions_; + std::default_random_engine rand_engine_; +}; diff --git a/rclcpp/wait_set/wait_set_composition.cpp b/rclcpp/wait_set/wait_set_composition.cpp index 54a01dc2..1310113e 100644 --- a/rclcpp/wait_set/wait_set_composition.cpp +++ b/rclcpp/wait_set/wait_set_composition.cpp @@ -116,13 +116,13 @@ int main(int argc, char * argv[]) std_msgs::msg::String msg; rclcpp::MessageInfo msg_info; if (listener->get_subscription()->take(msg, msg_info)) { - RCLCPP_INFO(listener->get_logger(), "I heard: '%s' (waitset)", msg.data.c_str()); + RCLCPP_INFO(listener->get_logger(), "I heard: '%s' (wait-set)", msg.data.c_str()); } } } else if (wait_result.kind() == rclcpp::WaitResultKind::Timeout) { - RCLCPP_ERROR(listener->get_logger(), "No message received after 5s."); - } else { - RCLCPP_ERROR(listener->get_logger(), "Wait-set failed."); + if (rclcpp::ok()) { + RCLCPP_ERROR(listener->get_logger(), "Wait-set failed with timeout"); + } } } diff --git a/rclcpp/wait_set/wait_set_different_rates_topics.cpp b/rclcpp/wait_set/wait_set_different_rates_topics.cpp index 066cac81..513bcc5e 100644 --- a/rclcpp/wait_set/wait_set_different_rates_topics.cpp +++ b/rclcpp/wait_set/wait_set_different_rates_topics.cpp @@ -53,7 +53,7 @@ int32_t main(const int32_t argc, char ** const argv) { rclcpp::init(argc, argv); - auto node = std::make_shared("waitset_node"); + auto node = std::make_shared("wait_set_listener"); auto do_nothing = [](std_msgs::msg::String::UniquePtr) {assert(false);}; auto sub1 = node->create_subscription("topicA", 10, do_nothing); @@ -75,9 +75,7 @@ int32_t main(const int32_t argc, char ** const argv) auto publisher_thread = std::thread([&exec]() {exec.spin();}); while (rclcpp::ok()) { - // Waiting up to 5s for a message to arrive - const auto wait_result = wait_set.wait(std::chrono::seconds(5)); - + const auto wait_result = wait_set.wait(3s); if (wait_result.kind() == rclcpp::WaitResultKind::Ready) { bool sub2_has_data = wait_result.get_wait_set().get_rcl_wait_set().subscriptions[1U]; bool sub3_has_data = wait_result.get_wait_set().get_rcl_wait_set().subscriptions[2U]; @@ -98,7 +96,7 @@ int32_t main(const int32_t argc, char ** const argv) handled_data.append(msg1.data); } handled_data.append(msg2.data); - RCLCPP_INFO(node->get_logger(), "Handle topic A and B: %s", handled_data.c_str()); + RCLCPP_INFO(node->get_logger(), "I heard: %s", handled_data.c_str()); } else { RCLCPP_ERROR(node->get_logger(), "An invalid message from topic B was received."); } @@ -109,15 +107,15 @@ int32_t main(const int32_t argc, char ** const argv) std_msgs::msg::String msg; rclcpp::MessageInfo msg_info; if (sub3->take(msg, msg_info)) { - RCLCPP_INFO(node->get_logger(), "Handle topic C: %s", msg.data.c_str()); + RCLCPP_INFO(node->get_logger(), "I heard: %s", msg.data.c_str()); } else { RCLCPP_ERROR(node->get_logger(), "An invalid message from topic C was received."); } } } else if (wait_result.kind() == rclcpp::WaitResultKind::Timeout) { - RCLCPP_ERROR(node->get_logger(), "No message received after 5s."); - } else { - RCLCPP_ERROR(node->get_logger(), "Wait-set failed."); + if (rclcpp::ok()) { + RCLCPP_ERROR(node->get_logger(), "Wait-set failed with timeout"); + } } } diff --git a/rclcpp/wait_set/wait_set_example.cpp b/rclcpp/wait_set/wait_set_example.cpp index 2091effb..ba10d094 100644 --- a/rclcpp/wait_set/wait_set_example.cpp +++ b/rclcpp/wait_set/wait_set_example.cpp @@ -25,7 +25,7 @@ int32_t main(const int32_t argc, char ** const argv) { rclcpp::init(argc, argv); - auto node = std::make_shared("wait_set_example_node"); + auto node = std::make_shared("wait_set_listener"); auto do_nothing = [](std_msgs::msg::String::UniquePtr) {assert(false);}; auto sub1 = node->create_subscription("topicA", 1, do_nothing); @@ -74,9 +74,7 @@ int32_t main(const int32_t argc, char ** const argv) auto num_recv = std::size_t(); while (num_recv < 3U) { - // Waiting up to 5s for a sample to arrive. - const auto wait_result = wait_set.wait(std::chrono::seconds(5)); - + const auto wait_result = wait_set.wait(2s); if (wait_result.kind() == rclcpp::WaitResultKind::Ready) { if (wait_result.get_wait_set().get_rcl_wait_set().timers[0U]) { // we execute manually the timer callback @@ -111,9 +109,9 @@ int32_t main(const int32_t argc, char ** const argv) RCLCPP_INFO(node->get_logger(), "Number of messages already got: %zu of 3", num_recv); } } else if (wait_result.kind() == rclcpp::WaitResultKind::Timeout) { - RCLCPP_ERROR(node->get_logger(), "No message received after 5s."); - } else { - RCLCPP_ERROR(node->get_logger(), "Wait-set failed."); + if (rclcpp::ok()) { + RCLCPP_ERROR(node->get_logger(), "Wait-set failed with timeout"); + } } } RCLCPP_INFO(node->get_logger(), "Got all messages!"); diff --git a/rclcpp/wait_set/wait_set_random_order.cpp b/rclcpp/wait_set/wait_set_random_order.cpp index 30fd8c61..f9772ff6 100644 --- a/rclcpp/wait_set/wait_set_random_order.cpp +++ b/rclcpp/wait_set/wait_set_random_order.cpp @@ -14,122 +14,68 @@ #include #include -#include +#include "random_listener.hpp" +#include "random_talker.hpp" -/* For this example, we will be creating a publishing node with three publishers which will +using namespace std::chrono_literals; + +/* For this example, we will be creating a talker node with three publishers which will * publish the topics A, B, C in random order each time. The order in which the messages are - * handled is defined deterministically directly by the user in the code. That is, in this example - * we always take and process the data in the same order A, B, C regardless of the arrival order. + * handled is defined deterministically directly by the user in the code using a wait-set based + * loop. That is, in this example we always take and process the data in the same order A, B, C + * regardless of the arrival order. */ -class RandomPublisher : public rclcpp::Node -{ -public: - RandomPublisher() - : Node("random_publisher"), - pub1_(this->create_publisher("topicA", 10)), - pub2_(this->create_publisher("topicB", 10)), - pub3_(this->create_publisher("topicC", 10)), - count_(0U) - { - } - - void run() - { - std_msgs::msg::String msg1, msg2, msg3; - std::vector> publisher_functions; - std::vector msgs_data{"A", "B", "C"}; - std::vector publish_order{0, 1, 2}; - unsigned seed = std::chrono::system_clock::now().time_since_epoch().count(); - std::default_random_engine e(seed); - - msg1.data = msgs_data.at(0U); - msg2.data = msgs_data.at(1U); - msg3.data = msgs_data.at(2U); - publisher_functions.emplace_back(([this, msg1]() {pub1_->publish(msg1);})); - publisher_functions.emplace_back(([this, msg2]() {pub2_->publish(msg2);})); - publisher_functions.emplace_back(([this, msg3]() {pub3_->publish(msg3);})); - - while (rclcpp::ok()) { - std::shuffle(publish_order.begin(), publish_order.end(), e); - RCLCPP_INFO( - this->get_logger(), "Publishing: %s%s%s", - msgs_data[publish_order.at(0)].c_str(), - msgs_data[publish_order.at(1)].c_str(), - msgs_data[publish_order.at(2)].c_str()); - publisher_functions.at(publish_order.at(0))(); - publisher_functions.at(publish_order.at(1))(); - publisher_functions.at(publish_order.at(2))(); - ++count_; - std::this_thread::sleep_for(std::chrono::milliseconds(2500)); - } - } - -private: - rclcpp::Publisher::SharedPtr pub1_; - rclcpp::Publisher::SharedPtr pub2_; - rclcpp::Publisher::SharedPtr pub3_; - size_t count_; -}; int32_t main(const int32_t argc, char ** const argv) { rclcpp::init(argc, argv); - auto node = std::make_shared("waitset_node"); - auto print_msg = [&node](std_msgs::msg::String::UniquePtr msg) { - RCLCPP_INFO(node->get_logger(), "I heard: %s", msg->data.c_str()); - }; + auto listener = std::make_shared(); + auto subscriptions = listener->get_subscriptions(); - auto sub1 = node->create_subscription("topicA", 10, print_msg); - auto sub2 = node->create_subscription("topicB", 10, print_msg); - auto sub3 = node->create_subscription("topicC", 10, print_msg); - - rclcpp::WaitSet wait_set({{sub1}, {sub2}, {sub3}}); + // Create a wait-set and add the subscriptions + rclcpp::WaitSet wait_set; + for (const auto & subscription : subscriptions) { + wait_set.add_subscription(subscription); + } - // Creates a random publisher and starts publishing in another thread - RandomPublisher publisher_node; - auto publisher_thread = std::thread([&publisher_node]() {publisher_node.run();}); + // Create a random talker and start publishing in another thread + auto thread = std::thread([]() {rclcpp::spin(std::make_shared());}); while (rclcpp::ok()) { - // Waiting up to 5s for a message to arrive - const auto wait_result = wait_set.wait(std::chrono::seconds(5)); - + const auto wait_result = wait_set.wait(2s); if (wait_result.kind() == rclcpp::WaitResultKind::Ready) { bool sub1_has_data = wait_result.get_wait_set().get_rcl_wait_set().subscriptions[0U]; bool sub2_has_data = wait_result.get_wait_set().get_rcl_wait_set().subscriptions[1U]; bool sub3_has_data = wait_result.get_wait_set().get_rcl_wait_set().subscriptions[2U]; - // We will handle all the messages when all subscriptions have received data. - // Note we could use just one subscription or a combination of conditions to trigger data - // handling. - bool handle_condition = sub1_has_data && sub2_has_data && sub3_has_data; - - if (handle_condition) { - std_msgs::msg::String msg1, msg2, msg3; + // Handle all the messages when all subscriptions have data + if (sub1_has_data && sub2_has_data && sub3_has_data) { + std_msgs::msg::String msg; rclcpp::MessageInfo msg_info; // the messages are taken and handled in a user-defined order - if (sub1->take(msg1, msg_info)) { - std::shared_ptr type_erased_msg = std::make_shared(msg1); - sub1->handle_message(type_erased_msg, msg_info); + if (subscriptions.at(0)->take(msg, msg_info)) { + std::shared_ptr type_erased_msg = std::make_shared(msg); + subscriptions.at(0)->handle_message(type_erased_msg, msg_info); } - if (sub2->take(msg2, msg_info)) { - std::shared_ptr type_erased_msg = std::make_shared(msg2); - sub2->handle_message(type_erased_msg, msg_info); + if (subscriptions.at(1)->take(msg, msg_info)) { + std::shared_ptr type_erased_msg = std::make_shared(msg); + subscriptions.at(1)->handle_message(type_erased_msg, msg_info); } - if (sub3->take(msg3, msg_info)) { - std::shared_ptr type_erased_msg = std::make_shared(msg3); - sub3->handle_message(type_erased_msg, msg_info); + if (subscriptions.at(2)->take(msg, msg_info)) { + std::shared_ptr type_erased_msg = std::make_shared(msg); + subscriptions.at(2)->handle_message(type_erased_msg, msg_info); } } } else if (wait_result.kind() == rclcpp::WaitResultKind::Timeout) { - RCLCPP_ERROR(node->get_logger(), "No message received after 5s."); - } else { - RCLCPP_ERROR(node->get_logger(), "Wait-set failed."); + if (rclcpp::ok()) { + RCLCPP_ERROR(listener->get_logger(), "Wait-set failed with timeout"); + } } } rclcpp::shutdown(); - publisher_thread.join(); + thread.join(); return 0; }