From b1e4abad0f1fb7732dcf7281ff1d989b7c559129 Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Thu, 1 Jul 2021 17:29:19 +0800 Subject: [PATCH 1/5] Add an example about how to use wait_for_all_acked Signed-off-by: Barry Xu --- .../topics/minimal_publisher/CMakeLists.txt | 4 + ...ember_function_with_wait_for_all_acked.cpp | 77 +++++++++++++++++++ 2 files changed, 81 insertions(+) create mode 100644 rclcpp/topics/minimal_publisher/member_function_with_wait_for_all_acked.cpp diff --git a/rclcpp/topics/minimal_publisher/CMakeLists.txt b/rclcpp/topics/minimal_publisher/CMakeLists.txt index 7f689544..86a00778 100644 --- a/rclcpp/topics/minimal_publisher/CMakeLists.txt +++ b/rclcpp/topics/minimal_publisher/CMakeLists.txt @@ -26,6 +26,9 @@ ament_target_dependencies(publisher_member_function_with_type_adapter rclcpp std add_executable(publisher_member_function_with_unique_network_flow_endpoints member_function_with_unique_network_flow_endpoints.cpp) ament_target_dependencies(publisher_member_function_with_unique_network_flow_endpoints rclcpp std_msgs) +add_executable(publisher_member_function_with_wait_for_all_acked member_function_with_wait_for_all_acked.cpp) +ament_target_dependencies(publisher_member_function_with_wait_for_all_acked rclcpp std_msgs) + add_executable(publisher_not_composable not_composable.cpp) ament_target_dependencies(publisher_not_composable rclcpp std_msgs) @@ -34,6 +37,7 @@ install(TARGETS publisher_member_function publisher_member_function_with_type_adapter publisher_member_function_with_unique_network_flow_endpoints + publisher_member_function_with_wait_for_all_acked publisher_not_composable DESTINATION lib/${PROJECT_NAME} ) diff --git a/rclcpp/topics/minimal_publisher/member_function_with_wait_for_all_acked.cpp b/rclcpp/topics/minimal_publisher/member_function_with_wait_for_all_acked.cpp new file mode 100644 index 00000000..e54ce6c8 --- /dev/null +++ b/rclcpp/topics/minimal_publisher/member_function_with_wait_for_all_acked.cpp @@ -0,0 +1,77 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +#include "rclcpp/rclcpp.hpp" +#include "std_msgs/msg/string.hpp" + +using namespace std::chrono_literals; + +/* This example shows how to use wait_for_all_acked for the publisher */ + +class MinimalPublisher : public rclcpp::Node +{ +public: + MinimalPublisher() + : Node("minimal_publisher_with_wait_for_all_acked"), count_(0), timeout_(500) + { + // publisher must set reliable mode + publisher_ = this->create_publisher( + "topic", + rclcpp::QoS(10).reliable()); + timer_ = this->create_wall_timer( + 1s, std::bind(&MinimalPublisher::timer_callback, this)); + } + +private: + void timer_callback() + { + auto message = std_msgs::msg::String(); + message.data = "Hello, world! " + std::to_string(count_++); + RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", message.data.c_str()); + publisher_->publish(message); + + // If no subscription is connected, wait_for_all_acked() always return true. + if (publisher_->get_subscription_count() != 0) { + if (publisher_->wait_for_all_acked(timeout_)) { + RCLCPP_INFO( + this->get_logger(), + "Subscriber acknowledges message '%s'", + message.data.c_str()); + } else { + RCLCPP_INFO( + this->get_logger(), + "Subscriber doesn't acknowledge message '%s' during %ld ms", + message.data.c_str(), + timeout_.count()); + } + } + } + rclcpp::TimerBase::SharedPtr timer_; + rclcpp::Publisher::SharedPtr publisher_; + size_t count_; + std::chrono::milliseconds timeout_; +}; + +int main(int argc, char * argv[]) +{ + rclcpp::init(argc, argv); + rclcpp::spin(std::make_shared()); + rclcpp::shutdown(); + return 0; +} From 3fa4bfefd751d338e44b577b4dbf711e121f844b Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Fri, 2 Jul 2021 17:46:25 +0800 Subject: [PATCH 2/5] Update codes for reasonable case Signed-off-by: Barry Xu --- ...ember_function_with_wait_for_all_acked.cpp | 81 ++++++++++++++----- 1 file changed, 61 insertions(+), 20 deletions(-) diff --git a/rclcpp/topics/minimal_publisher/member_function_with_wait_for_all_acked.cpp b/rclcpp/topics/minimal_publisher/member_function_with_wait_for_all_acked.cpp index e54ce6c8..77322d83 100644 --- a/rclcpp/topics/minimal_publisher/member_function_with_wait_for_all_acked.cpp +++ b/rclcpp/topics/minimal_publisher/member_function_with_wait_for_all_acked.cpp @@ -24,21 +24,69 @@ using namespace std::chrono_literals; /* This example shows how to use wait_for_all_acked for the publisher */ +class CustomContext : public rclcpp::Context +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(CustomContext) + + bool shutdown(const std::string & reason) override + { + // Before shutdown, confirm all subscribers acknowledge sent messages + if (custom_shutdown_func_) { + custom_shutdown_func_(); + } + + return rclcpp::Context::shutdown(reason); + } + + void set_custom_shutdown_func(std::function custom_shutdown_func) + { + custom_shutdown_func_ = custom_shutdown_func; + } + +private: + std::function custom_shutdown_func_; +}; + class MinimalPublisher : public rclcpp::Node { public: - MinimalPublisher() - : Node("minimal_publisher_with_wait_for_all_acked"), count_(0), timeout_(500) + explicit MinimalPublisher(const CustomContext::SharedPtr context) + : Node("minimal_publisher_with_wait_for_all_acked", rclcpp::NodeOptions().context(context)), + count_(0), + wait_timeout_(300) { // publisher must set reliable mode publisher_ = this->create_publisher( "topic", rclcpp::QoS(10).reliable()); + + context->set_custom_shutdown_func( + [this]() { + this->wait_for_all_acked(); + }); + timer_ = this->create_wall_timer( - 1s, std::bind(&MinimalPublisher::timer_callback, this)); + 500ms, std::bind(&MinimalPublisher::timer_callback, this)); } private: + void wait_for_all_acked() + { + // Confirm all subscribers receive sent messages. + // Note that if no subscription is connected, wait_for_all_acked() always return true. + if (publisher_->wait_for_all_acked(wait_timeout_)) { + RCLCPP_INFO( + this->get_logger(), + "All Subscribers acknowledges messages"); + } else { + RCLCPP_INFO( + this->get_logger(), + "Not all subscribers acknowledge messages during %ld ms", + wait_timeout_.count()); + } + } + void timer_callback() { auto message = std_msgs::msg::String(); @@ -46,32 +94,25 @@ class MinimalPublisher : public rclcpp::Node RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", message.data.c_str()); publisher_->publish(message); - // If no subscription is connected, wait_for_all_acked() always return true. - if (publisher_->get_subscription_count() != 0) { - if (publisher_->wait_for_all_acked(timeout_)) { - RCLCPP_INFO( - this->get_logger(), - "Subscriber acknowledges message '%s'", - message.data.c_str()); - } else { - RCLCPP_INFO( - this->get_logger(), - "Subscriber doesn't acknowledge message '%s' during %ld ms", - message.data.c_str(), - timeout_.count()); - } - } + // After sending some messages, you can call wait_for_all_acked() to confirm all subscribers + // acknowledge message. } rclcpp::TimerBase::SharedPtr timer_; rclcpp::Publisher::SharedPtr publisher_; size_t count_; - std::chrono::milliseconds timeout_; + std::chrono::milliseconds wait_timeout_; }; int main(int argc, char * argv[]) { rclcpp::init(argc, argv); - rclcpp::spin(std::make_shared()); + + auto custom_context = std::make_shared(); + custom_context->init(argc, argv); + + auto publisher = std::make_shared(custom_context); + rclcpp::spin(publisher); rclcpp::shutdown(); + return 0; } From 706c63f199b1d76e9f69a1562940f3a1062324e0 Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Tue, 30 Nov 2021 13:46:23 +0800 Subject: [PATCH 3/5] Update code based on new shutdown interface Signed-off-by: Barry Xu --- ...ember_function_with_wait_for_all_acked.cpp | 41 ++++--------------- 1 file changed, 8 insertions(+), 33 deletions(-) diff --git a/rclcpp/topics/minimal_publisher/member_function_with_wait_for_all_acked.cpp b/rclcpp/topics/minimal_publisher/member_function_with_wait_for_all_acked.cpp index 77322d83..3033005d 100644 --- a/rclcpp/topics/minimal_publisher/member_function_with_wait_for_all_acked.cpp +++ b/rclcpp/topics/minimal_publisher/member_function_with_wait_for_all_acked.cpp @@ -24,35 +24,11 @@ using namespace std::chrono_literals; /* This example shows how to use wait_for_all_acked for the publisher */ -class CustomContext : public rclcpp::Context -{ -public: - RCLCPP_SMART_PTR_DEFINITIONS(CustomContext) - - bool shutdown(const std::string & reason) override - { - // Before shutdown, confirm all subscribers acknowledge sent messages - if (custom_shutdown_func_) { - custom_shutdown_func_(); - } - - return rclcpp::Context::shutdown(reason); - } - - void set_custom_shutdown_func(std::function custom_shutdown_func) - { - custom_shutdown_func_ = custom_shutdown_func; - } - -private: - std::function custom_shutdown_func_; -}; - class MinimalPublisher : public rclcpp::Node { public: - explicit MinimalPublisher(const CustomContext::SharedPtr context) - : Node("minimal_publisher_with_wait_for_all_acked", rclcpp::NodeOptions().context(context)), + MinimalPublisher() + : Node("minimal_publisher_with_wait_for_all_acked"), count_(0), wait_timeout_(300) { @@ -61,7 +37,9 @@ class MinimalPublisher : public rclcpp::Node "topic", rclcpp::QoS(10).reliable()); - context->set_custom_shutdown_func( + // call wait_for_all_acked before shutdown + using rclcpp::contexts::get_global_default_context; + get_global_default_context()->add_pre_shutdown_callback( [this]() { this->wait_for_all_acked(); }); @@ -78,7 +56,7 @@ class MinimalPublisher : public rclcpp::Node if (publisher_->wait_for_all_acked(wait_timeout_)) { RCLCPP_INFO( this->get_logger(), - "All Subscribers acknowledges messages"); + "All subscribers acknowledge messages"); } else { RCLCPP_INFO( this->get_logger(), @@ -95,7 +73,7 @@ class MinimalPublisher : public rclcpp::Node publisher_->publish(message); // After sending some messages, you can call wait_for_all_acked() to confirm all subscribers - // acknowledge message. + // acknowledge messages. } rclcpp::TimerBase::SharedPtr timer_; rclcpp::Publisher::SharedPtr publisher_; @@ -107,10 +85,7 @@ int main(int argc, char * argv[]) { rclcpp::init(argc, argv); - auto custom_context = std::make_shared(); - custom_context->init(argc, argv); - - auto publisher = std::make_shared(custom_context); + auto publisher = std::make_shared(); rclcpp::spin(publisher); rclcpp::shutdown(); From eef6f313440d2f04091209eb6b0a38cdc96cf8c1 Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Wed, 22 Dec 2021 11:11:04 +0800 Subject: [PATCH 4/5] Update code to avoid build warning on macOS Signed-off-by: Barry Xu --- .../member_function_with_wait_for_all_acked.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rclcpp/topics/minimal_publisher/member_function_with_wait_for_all_acked.cpp b/rclcpp/topics/minimal_publisher/member_function_with_wait_for_all_acked.cpp index 3033005d..7e878d2f 100644 --- a/rclcpp/topics/minimal_publisher/member_function_with_wait_for_all_acked.cpp +++ b/rclcpp/topics/minimal_publisher/member_function_with_wait_for_all_acked.cpp @@ -11,6 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include @@ -60,8 +61,8 @@ class MinimalPublisher : public rclcpp::Node } else { RCLCPP_INFO( this->get_logger(), - "Not all subscribers acknowledge messages during %ld ms", - wait_timeout_.count()); + "Not all subscribers acknowledge messages during %" PRId64 " ms", + static_cast(wait_timeout_.count())); } } From ce5e34fd13205dd0a30519347fd06d03e2d9fdec Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Thu, 13 Jan 2022 13:11:08 +0800 Subject: [PATCH 5/5] Address review comments Signed-off-by: Barry Xu --- rclcpp/topics/minimal_publisher/CMakeLists.txt | 6 +++--- .../member_function_with_wait_for_all_acked.cpp | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/rclcpp/topics/minimal_publisher/CMakeLists.txt b/rclcpp/topics/minimal_publisher/CMakeLists.txt index 86a00778..504411d5 100644 --- a/rclcpp/topics/minimal_publisher/CMakeLists.txt +++ b/rclcpp/topics/minimal_publisher/CMakeLists.txt @@ -26,8 +26,8 @@ ament_target_dependencies(publisher_member_function_with_type_adapter rclcpp std add_executable(publisher_member_function_with_unique_network_flow_endpoints member_function_with_unique_network_flow_endpoints.cpp) ament_target_dependencies(publisher_member_function_with_unique_network_flow_endpoints rclcpp std_msgs) -add_executable(publisher_member_function_with_wait_for_all_acked member_function_with_wait_for_all_acked.cpp) -ament_target_dependencies(publisher_member_function_with_wait_for_all_acked rclcpp std_msgs) +add_executable(publisher_wait_for_all_acked member_function_with_wait_for_all_acked.cpp) +ament_target_dependencies(publisher_wait_for_all_acked rclcpp std_msgs) add_executable(publisher_not_composable not_composable.cpp) ament_target_dependencies(publisher_not_composable rclcpp std_msgs) @@ -37,7 +37,7 @@ install(TARGETS publisher_member_function publisher_member_function_with_type_adapter publisher_member_function_with_unique_network_flow_endpoints - publisher_member_function_with_wait_for_all_acked + publisher_wait_for_all_acked publisher_not_composable DESTINATION lib/${PROJECT_NAME} ) diff --git a/rclcpp/topics/minimal_publisher/member_function_with_wait_for_all_acked.cpp b/rclcpp/topics/minimal_publisher/member_function_with_wait_for_all_acked.cpp index 7e878d2f..bf5a5c92 100644 --- a/rclcpp/topics/minimal_publisher/member_function_with_wait_for_all_acked.cpp +++ b/rclcpp/topics/minimal_publisher/member_function_with_wait_for_all_acked.cpp @@ -11,10 +11,9 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -#include - #include #include +#include #include #include @@ -42,6 +41,7 @@ class MinimalPublisher : public rclcpp::Node using rclcpp::contexts::get_global_default_context; get_global_default_context()->add_pre_shutdown_callback( [this]() { + this->timer_->cancel(); this->wait_for_all_acked(); });