diff --git a/quality_of_service_demo/CMakeLists.txt b/quality_of_service_demo/CMakeLists.txt new file mode 100644 index 000000000..ee2e01ca8 --- /dev/null +++ b/quality_of_service_demo/CMakeLists.txt @@ -0,0 +1,42 @@ +cmake_minimum_required(VERSION 3.5) + +project(quality_of_service_demo) + +# Default to C++14 +if(NOT CMAKE_CXX_STANDARD) + set(CMAKE_CXX_STANDARD 14) +endif() + +if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") + add_compile_options(-Wall -Wextra -Wpedantic) +endif() + +find_package(ament_cmake REQUIRED) +find_package(rclcpp REQUIRED) +find_package(rcutils) +find_package(rmw REQUIRED) +find_package(std_msgs REQUIRED) + +include_directories(include) +ament_export_include_directories(include) + +function(qos_demo_executable target) + add_executable(${target} src/${target}.cpp src/common_nodes.cpp) + ament_target_dependencies(${target} + "rclcpp" + "rcutils" + "std_msgs" + ) + install(TARGETS ${target} DESTINATION lib/${PROJECT_NAME}) +endfunction() + +qos_demo_executable(liveliness) +qos_demo_executable(lifespan) +qos_demo_executable(deadline) + +if(BUILD_TESTING) + find_package(ament_lint_auto REQUIRED) + ament_lint_auto_find_test_dependencies() +endif() + +ament_package() diff --git a/quality_of_service_demo/README.md b/quality_of_service_demo/README.md new file mode 100644 index 000000000..f402f91ed --- /dev/null +++ b/quality_of_service_demo/README.md @@ -0,0 +1,72 @@ +# Quality of Service Demos + +The demo applications in this package show the various Quality of Service settings available in ROS2. +The intent is to provide a dedicated application for each QoS type, in order to highlight that single feature in isolation. + +## History + +No History QoS demo app exists yet in this package. + +## Reliability + +No Reliability QoS demo app exists yet in this package. + +## Durability + +No Durability QoS demo app exists yet in this package. + +## Deadline + +`quality_of_service_demo/deadline` demonstrates messages not meeting their required deadline. + +The application requires an argument a `deadline_duration` - this is the maximum acceptable time (in positive integer milliseconds) between messages published on the topic. +A `deadline_duration` of `0` means that there is no deadline, so the application will act as a standard Talker/Listener pair. + +The Publisher in this demo will pause publishing periodically, which will print deadline expirations from the Subscriber. + +Run `quality_of_service_demo/deadline -h` for more usage information. + +Examples: +* `ros2 run quality_of_service_demo deadline 600 --publish-for 5000 --pause-for 2000` + * The publisher will publish for 5 seconds, then pause for 2 - deadline events will start firing on both participants, until the publisher comes back. +* `ros2 run quality_of_service_demo deadline 600 --publish-for 5000 --pause-for 0` + * The publisher doesn't actually pause, no deadline misses should occur. + +## Lifespan + +`quality_of_service_demo/lifespan` demonstrates messages being deleted from a Publisher's outgoing queue once their configured lifespan expires. + +The application requires an argument `lifespan_duration` - this is the maximum time (in positive integer milliseconds) that a message will sit in the Publisher's outgoing queue. + +The Publisher in this demo will publish a preset number of messages, then stop publishing. +A Subscriber will start subscribing after a preset amount of time and may receive _fewer_ backfilled messages than were originally published, because some message can outlast their lifespan and be removed. + +Run `lifespan -h` for more usage information. + +Examples: +* `ros2 run quality_of_service_demo lifespan 1000 --publish-count 10 --subscribe-after 3000` + * After a few seconds, you should see (approximately) messages 4-9 printed from the Subscriber. + * The first few messages, with 1 second lifespan, were gone by the time the Subscriber joined after 3 seconds. +* `ros2 run quality_of_service_demo lifespan 4000 --publish-count 10 --subscribe-after 3000` + * After a few seconds, you should see all of the messages (0-9) printed from the Subscriber. + * All messages, with their 4 second lifespan, survived until the subscriber joined. + +## Liveliness + +`quality_of_service_demo/liveliness` demonstrates notification of liveliness changes, based on different Liveliness configurations. + +The application requires an argument `lease_duration` that specifies how often (in milliseconds) an entity must "check in" to let the system know it's alive. + +The Publisher in this demo will assert its liveliness based on passed in options, and be stopped after some amount of time. +When using `AUTOMATIC` liveliness policy, the Publisher is deleted at this time, in order to stop all automatic liveliness assertions in the rmw implementation - therefore only the Subscription will receive a Liveliness event for `AUTOMATIC`. +For `MANUAL` liveliness policies, the Publisher's assertions are stopped, as well as its message publishing. +Publishing a messages implicitly counts as asserting liveliness, so publishing is stopped in order to allow the Liveliness lease to lapse. +Therefore in the `MANUAL` policy types, you will see Liveliness events from both Publisher and Subscription (in rmw implementations that implement this feature). + +Run `quality_of_service_demo/liveliness -h` for more usage information. + +Examples: +* `ros2 run quality_of_service_demo liveliness 1000 --kill-publisher-after 2000` + * After 2 seconds, the publisher will be killed, and the subscriber will receive a callback 1 second after that notifying it that the liveliness has changed. +* `ros2 run quality_of_service_demo liveliness 250 --node-assert-period 0 --policy MANUAL_BY_NODE` + * With this configuration, the node never explicitly asserts its liveliness, but it publishes messages (implicitly asserting liveliness) less often (500ms) than the liveliness lease, so you will see alternating alive/not-alive messages as the publisher misses its lease and "becomes alive again" when it publishes, until the talker is stopped. diff --git a/quality_of_service_demo/include/quality_of_service_demo/common_nodes.hpp b/quality_of_service_demo/include/quality_of_service_demo/common_nodes.hpp new file mode 100644 index 000000000..002a95f87 --- /dev/null +++ b/quality_of_service_demo/include/quality_of_service_demo/common_nodes.hpp @@ -0,0 +1,109 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef QUALITY_OF_SERVICE_DEMO__COMMON_NODES_HPP_ +#define QUALITY_OF_SERVICE_DEMO__COMMON_NODES_HPP_ + +#include +#include +#include + +#include "rclcpp/rclcpp.hpp" +#include "std_msgs/msg/string.hpp" + +static const uint32_t MILLION = 1000L * 1000L; + +class Talker : public rclcpp::Node +{ +public: + /** + * \param[in] topic_name Topic to publish to. + * \param[in] qos_profile QoS profile for Publisher. + * \param[in] publisher_options Additional options for Publisher. + * \param[in] publish_count (Optional) Number of messages to publish before stopping. + * 0 (default) means publish forever. + * \param[in] assert_node_period (Optional) How often to manually assert Node liveliness. + * 0 (default) means never. + * \param[in] assert_topic_period (Optional) How often to manually assert Publisher liveliness. + * 0 (default) means never. + **/ + Talker( + const std::string & topic_name, + const rclcpp::QoS & qos_profile, + const rclcpp::PublisherOptions & publisher_options, + size_t publish_count = 0, + std::chrono::milliseconds assert_node_period = std::chrono::milliseconds(0), + std::chrono::milliseconds assert_topic_period = std::chrono::milliseconds(0)); + + /// Stop publishing for a while. + /** + * Stops the publisher for the specified amount of time. + * A message will be published immediately on the expiration of pause_duration. + * The regular publishing interval will resume at that point. + * If publishing is already paused, this call will be ignored. + * The remaining pause duration will not be affected. + * \param[in] pause_duration Amount of time to pause for. + **/ + void pause_for(std::chrono::milliseconds pause_duration); + + /// Publish a single message. + /** + * Counts towards the total message count that will be published. + */ + void publish(); + + /// Cancel publishing, and any manual liveliness assertions this node was configured to do. + void stop(); + +private: + rclcpp::TimerBase::SharedPtr assert_node_timer_ = nullptr; + rclcpp::TimerBase::SharedPtr assert_topic_timer_ = nullptr; + rclcpp::TimerBase::SharedPtr publish_timer_ = nullptr; + rclcpp::TimerBase::SharedPtr pause_timer_ = nullptr; + rclcpp::Publisher::SharedPtr publisher_ = nullptr; + size_t publish_count_ = 0; + const size_t stop_at_count_ = 0; +}; + +class Listener : public rclcpp::Node +{ +public: + /// Standard Constructor. + /** + * \param[in] topic_name Topic to subscribe to. + * \param[in] qos_profile QoS profile for Subscription. + * \param[in] sub_options Additional options for Subscription. + * \param[in] defer_subscribe (Optional) don't create Subscription until user calls + * start_listening(). + */ + Listener( + const std::string & topic_name, + const rclcpp::QoS & qos_profile, + const rclcpp::SubscriptionOptions & subscription_options, + bool defer_subscribe = false); + + /// Instantiates Subscription. + /** + * Does nothing if it has already been called. + */ + void start_listening(); + +private: + rclcpp::Subscription::SharedPtr subscription_ = nullptr; + rclcpp::QoS qos_profile_; + rclcpp::SubscriptionOptions subscription_options_; + const std::string topic_name_; +}; + +#endif // QUALITY_OF_SERVICE_DEMO__COMMON_NODES_HPP_ diff --git a/quality_of_service_demo/package.xml b/quality_of_service_demo/package.xml new file mode 100644 index 000000000..a7097e6ff --- /dev/null +++ b/quality_of_service_demo/package.xml @@ -0,0 +1,38 @@ + + + + quality_of_service_demo + 0.1.0 + Demo applications for Quality of Service features + + Amazon ROS Contributions + + Apache License 2.0 + + Emerson Knapp + + ament_cmake + + example_interfaces + rclcpp + rcutils + rmw + rmw_implementation_cmake + std_msgs + + example_interfaces + launch_ros + rclcpp + rcutils + rmw + std_msgs + + ament_lint_auto + ament_lint_common + launch + launch_testing + + + ament_cmake + + diff --git a/quality_of_service_demo/src/common_nodes.cpp b/quality_of_service_demo/src/common_nodes.cpp new file mode 100644 index 000000000..eaf592702 --- /dev/null +++ b/quality_of_service_demo/src/common_nodes.cpp @@ -0,0 +1,127 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include "quality_of_service_demo/common_nodes.hpp" + +using namespace std::chrono_literals; + +Talker::Talker( + const std::string & topic_name, + const rclcpp::QoS & qos_profile, + const rclcpp::PublisherOptions & publisher_options, + size_t publish_count, + std::chrono::milliseconds assert_node_period, + std::chrono::milliseconds assert_topic_period) +: Node("talker"), + stop_at_count_(publish_count) +{ + RCLCPP_INFO(get_logger(), "Talker starting up"); + publisher_ = create_publisher( + topic_name, qos_profile, publisher_options); + publish_timer_ = create_wall_timer(500ms, std::bind(&Talker::publish, this)); + // If enabled, create timer to assert liveliness at the node level + if (assert_node_period != 0ms) { + assert_node_timer_ = create_wall_timer( + assert_node_period, + [this]() -> bool { + return this->assert_liveliness(); + }); + } + // If enabled, create timer to assert liveliness on the topic + if (assert_topic_period != 0ms) { + assert_topic_timer_ = create_wall_timer( + assert_topic_period, + [this]() -> bool { + return publisher_->assert_liveliness(); + }); + } +} + +void +Talker::pause_for(std::chrono::milliseconds pause_length) +{ + if (pause_timer_) { + // Already paused - ignoring. + return; + } + publish_timer_->cancel(); + pause_timer_ = create_wall_timer( + pause_length, + [this]() { + // Publishing immediately on pause expiration and resuming regular interval. + publish(); + publish_timer_->reset(); + pause_timer_ = nullptr; + }); +} + +void +Talker::publish() +{ + std_msgs::msg::String msg; + msg.data = "Talker says " + std::to_string(publish_count_); + publish_count_ += 1; + publisher_->publish(msg); + if (stop_at_count_ > 0 && publish_count_ >= stop_at_count_) { + publish_timer_->cancel(); + } +} + +void +Talker::stop() +{ + if (assert_node_timer_) { + assert_node_timer_->cancel(); + assert_node_timer_.reset(); + } + if (assert_topic_timer_) { + assert_topic_timer_->cancel(); + assert_topic_timer_.reset(); + } + publish_timer_->cancel(); +} + +Listener::Listener( + const std::string & topic_name, + const rclcpp::QoS & qos_profile, + const rclcpp::SubscriptionOptions & subscription_options, + bool defer_subscribe) +: Node("listener"), + qos_profile_(qos_profile), + subscription_options_(subscription_options), + topic_name_(topic_name) +{ + RCLCPP_INFO(get_logger(), "Listener starting up"); + if (!defer_subscribe) { + start_listening(); + } +} + +void +Listener::start_listening() +{ + if (!subscription_) { + subscription_ = create_subscription( + topic_name_, + qos_profile_, + [this](const typename std_msgs::msg::String::SharedPtr msg) -> void + { + RCLCPP_INFO(get_logger(), "Listener heard: [%s]", msg->data.c_str()); + }, + subscription_options_); + } +} diff --git a/quality_of_service_demo/src/deadline.cpp b/quality_of_service_demo/src/deadline.cpp new file mode 100644 index 000000000..d5290237a --- /dev/null +++ b/quality_of_service_demo/src/deadline.cpp @@ -0,0 +1,128 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include "rcutils/cmdline_parser.h" + +#include "rclcpp/rclcpp.hpp" +#include "rclcpp/executors/single_threaded_executor.hpp" + +#include "std_msgs/msg/string.hpp" +#include "std_msgs/msg/bool.hpp" + +#include "quality_of_service_demo/common_nodes.hpp" + +static const char * OPTION_PUBLISH_FOR = "--publish-for"; +static const size_t DEFAULT_PUBLISH_FOR = 5000; +static const char * OPTION_PAUSE_FOR = "--pause-for"; +static const size_t DEFAULT_PAUSE_FOR = 1000; + +void print_usage() +{ + printf("Usage for deadline:\n"); + printf("deadline " + "deadline_duration " + "[%s publish_for] " + "[%s pause_for] " + "[-h]\n", + OPTION_PUBLISH_FOR, + OPTION_PAUSE_FOR); + printf("required arguments:\n"); + printf("deadline_duration: " + "Duration in positive integer milliseconds of the Deadline QoS setting.\n"); + printf("optional arguments:\n"); + printf("-h : Print this help message.\n"); + printf("%s publish_for : " + "Duration to publish (in positive integer milliseconds) until pausing the talker. " + "Defaults to %zu.\n", + OPTION_PUBLISH_FOR, DEFAULT_PUBLISH_FOR); + printf("%s pause_for : " + "Duration to pause (in positive integer milliseconds) before starting to publish again. " + "Defaults to %zu.\n", + OPTION_PAUSE_FOR, DEFAULT_PAUSE_FOR); +} + +int main(int argc, char * argv[]) +{ + // Force flush of the stdout buffer. + setvbuf(stdout, NULL, _IONBF, BUFSIZ); + + // Argument count and usage + if (argc < 2 || rcutils_cli_option_exist(argv, argv + argc, "-h")) { + print_usage(); + return 0; + } + + // Configuration variables + std::chrono::milliseconds deadline_duration(std::stoul(argv[1])); + std::chrono::milliseconds period_pause_talker(DEFAULT_PUBLISH_FOR); + std::chrono::milliseconds duration_pause_talker(DEFAULT_PAUSE_FOR); + std::string topic("qos_deadline_chatter"); + + // Optional argument parsing + if (rcutils_cli_option_exist(argv, argv + argc, OPTION_PUBLISH_FOR)) { + period_pause_talker = std::chrono::milliseconds( + std::stoul(rcutils_cli_get_option(argv, argv + argc, OPTION_PUBLISH_FOR))); + } + if (rcutils_cli_option_exist(argv, argv + argc, OPTION_PAUSE_FOR)) { + duration_pause_talker = std::chrono::milliseconds( + std::stoul(rcutils_cli_get_option(argv, argv + argc, OPTION_PAUSE_FOR))); + } + + // Initialization and configuration + rclcpp::init(argc, argv); + rclcpp::executors::SingleThreadedExecutor executor; + + rclcpp::QoS qos_profile(10); + qos_profile.deadline(deadline_duration); + + rclcpp::SubscriptionOptions sub_options; + sub_options.event_callbacks.deadline_callback = + [](rclcpp::QOSDeadlineRequestedInfo & event) -> void + { + RCLCPP_INFO(rclcpp::get_logger("Listener"), + "Requested deadline missed - total %d delta %d", + event.total_count, event.total_count_change); + }; + + rclcpp::PublisherOptions pub_options; + pub_options.event_callbacks.deadline_callback = + [](rclcpp::QOSDeadlineOfferedInfo & event) -> void + { + RCLCPP_INFO(rclcpp::get_logger("Talker"), + "Offered deadline missed - total %d delta %d", + event.total_count, event.total_count_change); + }; + + auto listener = std::make_shared(topic, qos_profile, sub_options); + auto talker = std::make_shared(topic, qos_profile, pub_options); + + auto pause_timer = talker->create_wall_timer( + period_pause_talker, + [talker, duration_pause_talker]() -> void { + talker->pause_for(duration_pause_talker); + }); + + // Execution + executor.add_node(talker); + executor.add_node(listener); + executor.spin(); + + // Cleanup + rclcpp::shutdown(); + return 0; +} diff --git a/quality_of_service_demo/src/lifespan.cpp b/quality_of_service_demo/src/lifespan.cpp new file mode 100644 index 000000000..f00836964 --- /dev/null +++ b/quality_of_service_demo/src/lifespan.cpp @@ -0,0 +1,127 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include +#include +#include +#include + +#include "rcutils/cmdline_parser.h" + +#include "rclcpp/rclcpp.hpp" +#include "rclcpp/executors/single_threaded_executor.hpp" + +#include "quality_of_service_demo/common_nodes.hpp" + +static const char * OPTION_HISTORY = "--history"; +static const size_t DEFAULT_HISTORY = 10; +static const char * OPTION_PUBLISH_COUNT = "--publish-count"; +static const size_t DEFAULT_PUBLISH_COUNT = 5; +static const char * OPTION_SUBSCRIBE_AFTER = "--subscribe-after"; +static const size_t DEFAULT_SUBSCRIBE_AFTER = 5000; + +void print_usage() +{ + printf("Usage for lifespan:\n"); + printf("lifespan " + "lifespan_duration " + "[%s history_depth] " + "[%s publish_count] " + "[%s subscribe_after] " + "[-h]\n", + OPTION_HISTORY, + OPTION_PUBLISH_COUNT, + OPTION_SUBSCRIBE_AFTER); + printf("required arguments:\n"); + printf("lifespan duration: " + "Duration in positive integer milliseconds of the Lifespan QoS setting.\n"); + printf("optional arguments:\n"); + printf("-h : Print this help message.\n"); + printf("%s history : " + "The depth of the Publisher's history queue - " + "the maximum number of messages it will store for late-joining subscriptions. " + "Defaults to %zu\n", + OPTION_HISTORY, DEFAULT_HISTORY); + printf("%s publish_n_messages : " + "How many messages to publish before stopping. " + "Defaults to %zu\n", + OPTION_PUBLISH_COUNT, DEFAULT_PUBLISH_COUNT); + printf("%s subscribe_after_duration : " + "The Subscriber will be created this long in positive integer milliseconds after " + "application startup. Defaults to %zu\n", + OPTION_SUBSCRIBE_AFTER, DEFAULT_SUBSCRIBE_AFTER); +} + +int main(int argc, char * argv[]) +{ + // Force flush of the stdout buffer. + setvbuf(stdout, NULL, _IONBF, BUFSIZ); + + // Argument parsing + if (argc < 2 || rcutils_cli_option_exist(argv, argv + argc, "-h")) { + print_usage(); + return 0; + } + + // Required arguments + std::chrono::milliseconds lifespan_duration(std::stoul(argv[1])); + + // Optional argument default values + size_t history = DEFAULT_HISTORY; + size_t publish_count = DEFAULT_PUBLISH_COUNT; + std::chrono::milliseconds subscribe_after_duration(DEFAULT_SUBSCRIBE_AFTER); + + // Optional argument parsing + if (rcutils_cli_option_exist(argv, argv + argc, OPTION_HISTORY)) { + history = std::stoul(rcutils_cli_get_option(argv, argv + argc, OPTION_HISTORY)); + } + if (rcutils_cli_option_exist(argv, argv + argc, OPTION_PUBLISH_COUNT)) { + publish_count = std::stoul(rcutils_cli_get_option(argv, argv + argc, OPTION_PUBLISH_COUNT)); + } + if (rcutils_cli_option_exist(argv, argv + argc, OPTION_SUBSCRIBE_AFTER)) { + subscribe_after_duration = std::chrono::milliseconds( + std::stoul(rcutils_cli_get_option(argv, argv + argc, OPTION_SUBSCRIBE_AFTER))); + } + + // Configuration and Initialization + rclcpp::init(argc, argv); + rclcpp::executors::SingleThreadedExecutor exec; + + std::string topic("qos_lifespan_chatter"); + + rclcpp::QoS qos_profile(history); + qos_profile + .transient_local() + .lifespan(lifespan_duration); + + rclcpp::SubscriptionOptions sub_options; + rclcpp::PublisherOptions pub_options; + + auto listener = std::make_shared(topic, qos_profile, sub_options, true); + auto talker = std::make_shared(topic, qos_profile, pub_options, publish_count); + + auto timer = listener->create_wall_timer( + subscribe_after_duration, + [listener]() -> void { + listener->start_listening(); + }); + + // Execution + exec.add_node(talker); + exec.add_node(listener); + exec.spin(); + + // Cleanup + rclcpp::shutdown(); + return 0; +} diff --git a/quality_of_service_demo/src/liveliness.cpp b/quality_of_service_demo/src/liveliness.cpp new file mode 100644 index 000000000..53239a4c5 --- /dev/null +++ b/quality_of_service_demo/src/liveliness.cpp @@ -0,0 +1,176 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include "rcutils/cmdline_parser.h" + +#include "rclcpp/rclcpp.hpp" +#include "rclcpp/executors/single_threaded_executor.hpp" + +#include "quality_of_service_demo/common_nodes.hpp" + +static const char * OPTION_POLICY = "--policy"; +static const char * DEFAULT_POLICY = "AUTOMATIC"; +static const char * OPTION_NODE_ASSERT_PERIOD = "--node-assert-period"; +static const char * OPTION_TOPIC_ASSERT_PERIOD = "--topic-assert-period"; +static const char * OPTION_KILL_PUBLISHER_AFTER = "--kill-publisher-after"; +static const size_t DEFAULT_KILL_PUBLISHER_AFTER = 3000; + +void print_usage() +{ + printf("Usage for liveliness:\n"); + printf("liveliness " + "lease_duration " + "[%s liveliness_policy] " + "[%s node_assert_liveliness_period] " + "[%s topic_assert_liveliness_period] " + "[-h]\n", + OPTION_POLICY, + OPTION_NODE_ASSERT_PERIOD, + OPTION_TOPIC_ASSERT_PERIOD); + printf("required arguments:\n"); + printf("lease_duration: " + "Duration in positive integer milliseconds after which an inactive Publisher is considered " + "not-alive. 0 means never.\n"); + printf("optional arguments:\n"); + printf("-h : Print this help message.\n"); + printf("%s liveliness_policy : " + "You may specify AUTOMATIC, MANUAL_BY_NODE, or MANUAL_BY_TOPIC. " + "Defaults to %s\n", + OPTION_POLICY, DEFAULT_POLICY); + printf("%s node_assert_period : " + "How often the Publisher will assert the liveliness of its Node, in positive integer " + "milliseconds. " + "Defaults to 0 (never)\n", + OPTION_NODE_ASSERT_PERIOD); + printf("%s topic_assert_period : " + "How often the Publisher will assert the liveliness of its Publisher " + ", in positive integer milliseconds. " + "Defaults to 0 (never)\n", + OPTION_TOPIC_ASSERT_PERIOD); + printf("%s kill_publisher_after : " + "Kill the publisher after this amount of time (in uint milliseconds). " + "In AUTOMATIC - destroy the whole node. " + "In MANUAL_BY_NODE, stop node liveliness assertion. " + "In MANUAL_BY_TOPIC, stop topic liveliness assertion. " + "Defaults to %zu\n", + OPTION_KILL_PUBLISHER_AFTER, DEFAULT_KILL_PUBLISHER_AFTER); +} + +int main(int argc, char * argv[]) +{ + // Force flush of the stdout buffer. + setvbuf(stdout, NULL, _IONBF, BUFSIZ); + + // Argument count and usage + if (argc < 2 || rcutils_cli_option_exist(argv, argv + argc, "-h")) { + print_usage(); + return 0; + } + + // Configuration variables + std::chrono::milliseconds liveliness_lease_duration(std::stoul(argv[1])); + std::chrono::milliseconds node_assert_period(0); + std::chrono::milliseconds topic_assert_period(0); + std::chrono::milliseconds kill_publisher_after(DEFAULT_KILL_PUBLISHER_AFTER); + const char * policy_name = DEFAULT_POLICY; + rmw_qos_liveliness_policy_t liveliness_policy_kind = RMW_QOS_POLICY_LIVELINESS_AUTOMATIC; + std::string topic("qos_liveliness_chatter"); + + // Optional argument parsing + if (rcutils_cli_option_exist(argv, argv + argc, OPTION_NODE_ASSERT_PERIOD)) { + node_assert_period = std::chrono::milliseconds( + std::stoul(rcutils_cli_get_option(argv, argv + argc, OPTION_NODE_ASSERT_PERIOD))); + } + if (rcutils_cli_option_exist(argv, argv + argc, OPTION_TOPIC_ASSERT_PERIOD)) { + topic_assert_period = std::chrono::milliseconds( + std::stoul(rcutils_cli_get_option(argv, argv + argc, OPTION_TOPIC_ASSERT_PERIOD))); + } + if (rcutils_cli_option_exist(argv, argv + argc, OPTION_KILL_PUBLISHER_AFTER)) { + kill_publisher_after = std::chrono::milliseconds( + std::stoul(rcutils_cli_get_option(argv, argv + argc, OPTION_KILL_PUBLISHER_AFTER))); + } + if (rcutils_cli_option_exist(argv, argv + argc, OPTION_POLICY)) { + policy_name = rcutils_cli_get_option(argv, argv + argc, OPTION_POLICY); + } + + if (strcmp(policy_name, "AUTOMATIC") == 0) { + liveliness_policy_kind = RMW_QOS_POLICY_LIVELINESS_AUTOMATIC; + } else if (strcmp(policy_name, "MANUAL_BY_NODE") == 0) { + liveliness_policy_kind = RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE; + } else if (strcmp(policy_name, "MANUAL_BY_TOPIC") == 0) { + liveliness_policy_kind = RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC; + } else { + printf("Unknown liveliness policy: %s\n", policy_name); + print_usage(); + return 1; + } + + // Initialization and configuration + rclcpp::init(argc, argv); + rclcpp::executors::SingleThreadedExecutor executor; + + rclcpp::QoS qos_profile(10); + qos_profile + .liveliness(liveliness_policy_kind) + .liveliness_lease_duration(liveliness_lease_duration); + + rclcpp::SubscriptionOptions subscription_options; + subscription_options.event_callbacks.liveliness_callback = + [](rclcpp::QOSLivelinessChangedInfo & event) + { + printf("Liveliness changed event: \n"); + printf(" alive_count: %d\n", event.alive_count); + printf(" not_alive_count: %d\n", event.not_alive_count); + printf(" alive_count_change: %d\n", event.alive_count_change); + printf(" not_alive_count_change: %d\n", event.not_alive_count_change); + }; + + rclcpp::PublisherOptions publisher_options; + + auto listener = std::make_shared(topic, qos_profile, subscription_options); + auto talker = std::make_shared( + topic, qos_profile, publisher_options, 0, node_assert_period, topic_assert_period); + + auto kill_talker_timer = listener->create_wall_timer( + kill_publisher_after, [&talker, &executor, liveliness_policy_kind]() { + if (!talker) { + return; + } + switch (liveliness_policy_kind) { + case RMW_QOS_POLICY_LIVELINESS_AUTOMATIC: + executor.remove_node(talker); + talker.reset(); + break; + case RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE: + case RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC: + talker->stop(); + break; + default: + break; + } + }); + + // Execution + executor.add_node(listener); + executor.add_node(talker); + executor.spin(); + + // Cleanup + rclcpp::shutdown(); + return 0; +}