From 2981fb830193a6a22d84ad8633a71d6a08d5229e Mon Sep 17 00:00:00 2001 From: Prajakta Gokhale Date: Thu, 23 Apr 2020 20:41:35 -0700 Subject: [PATCH] Add received message age metric to topic statistics (#1080) * Add received message age metric to topic statistics Signed-off-by: Prajakta Gokhale * Add unit tests Signed-off-by: Prajakta Gokhale * Add IMU messages in unit test Signed-off-by: Prajakta Gokhale * Use system time instead of steady time Test received message age stats values are greater than 0 Signed-off-by: Devin Bonnie * Fix test warnings Signed-off-by: Devin Bonnie * Replace IMU messages with new dummy messages Signed-off-by: Prajakta Gokhale * Remove outdated TODO and unused test variables Signed-off-by: Prajakta Gokhale * Address review comments Signed-off-by: Devin Bonnie * Address review comments Signed-off-by: Prajakta Gokhale * Re-add message with header for unit testing Signed-off-by: Prajakta Gokhale * Address message review feedback Signed-off-by: Devin Bonnie * Remove extra newline Signed-off-by: Prajakta Gokhale * Address more review feedback Signed-off-by: Devin Bonnie * Fix Windows failure Signed-off-by: Devin Bonnie * Only set append_library_dirs once Signed-off-by: Devin Bonnie --- rclcpp/CMakeLists.txt | 15 +- rclcpp/include/rclcpp/subscription.hpp | 4 +- .../subscription_topic_statistics.hpp | 8 + rclcpp/package.xml | 1 + rclcpp/test/msg/Header.msg | 3 + rclcpp/test/msg/MessageWithHeader.msg | 3 + .../test_subscription_topic_statistics.cpp | 234 ++++++++++++++---- .../test_topic_stats_utils.hpp | 13 +- 8 files changed, 228 insertions(+), 53 deletions(-) create mode 100644 rclcpp/test/msg/Header.msg create mode 100644 rclcpp/test/msg/MessageWithHeader.msg diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 518951017b..ef2f106c34 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -12,6 +12,7 @@ find_package(rcpputils REQUIRED) find_package(rcutils REQUIRED) find_package(rmw REQUIRED) find_package(rosgraph_msgs REQUIRED) +find_package(rosidl_default_generators REQUIRED) find_package(rosidl_runtime_cpp REQUIRED) find_package(rosidl_typesupport_c REQUIRED) find_package(rosidl_typesupport_cpp REQUIRED) @@ -223,6 +224,14 @@ if(BUILD_TESTING) add_definitions(-DTEST_RESOURCES_DIRECTORY="${CMAKE_CURRENT_BINARY_DIR}/test/resources") + rosidl_generate_interfaces(${PROJECT_NAME}_test_msgs + test/msg/Header.msg + test/msg/MessageWithHeader.msg + DEPENDENCIES builtin_interfaces + LIBRARY_NAME ${PROJECT_NAME} + SKIP_INSTALL + ) + ament_add_gtest(test_client test/test_client.cpp) if(TARGET test_client) ament_target_dependencies(test_client @@ -610,9 +619,12 @@ if(BUILD_TESTING) target_link_libraries(test_wait_set ${PROJECT_NAME}) endif() - ament_add_gtest(test_subscription_topic_statistics test/topic_statistics/test_subscription_topic_statistics.cpp) + ament_add_gtest(test_subscription_topic_statistics test/topic_statistics/test_subscription_topic_statistics.cpp + APPEND_LIBRARY_DIRS "${append_library_dirs}" + ) if(TARGET test_subscription_topic_statistics) ament_target_dependencies(test_subscription_topic_statistics + "builtin_interfaces" "libstatistics_collector" "rcl_interfaces" "rcutils" @@ -621,6 +633,7 @@ if(BUILD_TESTING) "rosidl_typesupport_cpp" "statistics_msgs" "test_msgs") + rosidl_target_interfaces(test_subscription_topic_statistics ${PROJECT_NAME}_test_msgs "rosidl_typesupport_cpp") target_link_libraries(test_subscription_topic_statistics ${PROJECT_NAME}) endif() diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 91343aaee0..2b8108fc94 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -271,8 +271,8 @@ class Subscription : public SubscriptionBase if (subscription_topic_statistics_) { const auto nanos = std::chrono::time_point_cast( - std::chrono::steady_clock::now()); - const auto time = rclcpp::Time(nanos.time_since_epoch().count(), RCL_STEADY_TIME); + std::chrono::system_clock::now()); + const auto time = rclcpp::Time(nanos.time_since_epoch().count()); subscription_topic_statistics_->handle_message(*typed_message, time); } } diff --git a/rclcpp/include/rclcpp/topic_statistics/subscription_topic_statistics.hpp b/rclcpp/include/rclcpp/topic_statistics/subscription_topic_statistics.hpp index 058ed45818..37a2804cbb 100644 --- a/rclcpp/include/rclcpp/topic_statistics/subscription_topic_statistics.hpp +++ b/rclcpp/include/rclcpp/topic_statistics/subscription_topic_statistics.hpp @@ -23,6 +23,7 @@ #include "libstatistics_collector/collector/generate_statistics_message.hpp" #include "libstatistics_collector/moving_average_statistics/types.hpp" #include "libstatistics_collector/topic_statistics_collector/constants.hpp" +#include "libstatistics_collector/topic_statistics_collector/received_message_age.hpp" #include "libstatistics_collector/topic_statistics_collector/received_message_period.hpp" #include "rcl/time.h" @@ -56,6 +57,9 @@ class SubscriptionTopicStatistics using TopicStatsCollector = libstatistics_collector::topic_statistics_collector::TopicStatisticsCollector< CallbackMessageT>; + using ReceivedMessageAge = + libstatistics_collector::topic_statistics_collector::ReceivedMessageAgeCollector< + CallbackMessageT>; using ReceivedMessagePeriod = libstatistics_collector::topic_statistics_collector::ReceivedMessagePeriodCollector< CallbackMessageT>; @@ -173,6 +177,10 @@ class SubscriptionTopicStatistics */ void bring_up() { + auto received_message_age = std::make_unique(); + received_message_age->Start(); + subscriber_statistics_collectors_.emplace_back(std::move(received_message_age)); + auto received_message_period = std::make_unique(); received_message_period->Start(); { diff --git a/rclcpp/package.xml b/rclcpp/package.xml index a5e09c0f18..4e7f15b511 100644 --- a/rclcpp/package.xml +++ b/rclcpp/package.xml @@ -37,6 +37,7 @@ ament_lint_common rmw rmw_implementation_cmake + rosidl_default_generators test_msgs diff --git a/rclcpp/test/msg/Header.msg b/rclcpp/test/msg/Header.msg new file mode 100644 index 0000000000..bfe5dc5366 --- /dev/null +++ b/rclcpp/test/msg/Header.msg @@ -0,0 +1,3 @@ +# Simple Header message with a timestamp field. + +builtin_interfaces/Time stamp diff --git a/rclcpp/test/msg/MessageWithHeader.msg b/rclcpp/test/msg/MessageWithHeader.msg new file mode 100644 index 0000000000..3bd1b45534 --- /dev/null +++ b/rclcpp/test/msg/MessageWithHeader.msg @@ -0,0 +1,3 @@ +# Message containing a simple Header field. + +Header header diff --git a/rclcpp/test/topic_statistics/test_subscription_topic_statistics.cpp b/rclcpp/test/topic_statistics/test_subscription_topic_statistics.cpp index 37433f0f2f..3a7e90c025 100644 --- a/rclcpp/test/topic_statistics/test_subscription_topic_statistics.cpp +++ b/rclcpp/test/topic_statistics/test_subscription_topic_statistics.cpp @@ -18,12 +18,14 @@ #include #include #include +#include #include #include #include "libstatistics_collector/moving_average_statistics/types.hpp" #include "rclcpp/create_publisher.hpp" +#include "rclcpp/msg/message_with_header.hpp" #include "rclcpp/node.hpp" #include "rclcpp/qos.hpp" #include "rclcpp/rclcpp.hpp" @@ -49,6 +51,7 @@ constexpr const uint64_t kNoSamples{0}; constexpr const std::chrono::seconds kTestDuration{10}; } // namespace +using rclcpp::msg::MessageWithHeader; using test_msgs::msg::Empty; using rclcpp::topic_statistics::SubscriptionTopicStatistics; using statistics_msgs::msg::MetricsMessage; @@ -96,21 +99,47 @@ class EmptyPublisher : public rclcpp::Node virtual ~EmptyPublisher() = default; - size_t get_number_published() +private: + void publish_message() { - return number_published_.load(); + auto msg = Empty{}; + publisher_->publish(msg); + } + + rclcpp::Publisher::SharedPtr publisher_; + rclcpp::TimerBase::SharedPtr publish_timer_; +}; + +/** + * MessageWithHeader publisher node: used to publish MessageWithHeader with `header` value set + */ +class MessageWithHeaderPublisher : public rclcpp::Node +{ +public: + MessageWithHeaderPublisher( + const std::string & name, const std::string & topic, + const std::chrono::milliseconds & publish_period = std::chrono::milliseconds{100}) + : Node(name) + { + publisher_ = create_publisher(topic, 10); + publish_timer_ = this->create_wall_timer( + publish_period, [this]() { + this->publish_message(); + }); } + virtual ~MessageWithHeaderPublisher() = default; + private: void publish_message() { - ++number_published_; - auto msg = Empty{}; + auto msg = MessageWithHeader{}; + // Subtract 1 sec from current time so the received message age is always > 0 + msg.header.stamp = this->now() - rclcpp::Duration{1, 0}; publisher_->publish(msg); } - rclcpp::Publisher::SharedPtr publisher_; - std::atomic number_published_{0}; + rclcpp::Publisher::SharedPtr publisher_; rclcpp::TimerBase::SharedPtr publish_timer_; }; @@ -127,8 +156,8 @@ class EmptySubscriber : public rclcpp::Node auto options = rclcpp::SubscriptionOptions(); options.topic_stats_options.state = rclcpp::TopicStatisticsState::Enable; - auto callback = [this](Empty::UniquePtr msg) { - this->receive_message(*msg); + auto callback = [](Empty::UniquePtr msg) { + (void) msg; }; subscription_ = create_subscription>( @@ -140,10 +169,36 @@ class EmptySubscriber : public rclcpp::Node virtual ~EmptySubscriber() = default; private: - void receive_message(const Empty &) const + rclcpp::Subscription::SharedPtr subscription_; +}; + +/** + * MessageWithHeader subscriber node: used to create subscriber topic statistics requirements + */ +class MessageWithHeaderSubscriber : public rclcpp::Node +{ +public: + MessageWithHeaderSubscriber(const std::string & name, const std::string & topic) + : Node(name) { + // manually enable topic statistics via options + auto options = rclcpp::SubscriptionOptions(); + options.topic_stats_options.state = rclcpp::TopicStatisticsState::Enable; + + auto callback = [](MessageWithHeader::UniquePtr msg) { + (void) msg; + }; + subscription_ = create_subscription>( + topic, + rclcpp::QoS(rclcpp::KeepAll()), + callback, + options); } - rclcpp::Subscription::SharedPtr subscription_; + virtual ~MessageWithHeaderSubscriber() = default; + +private: + rclcpp::Subscription::SharedPtr subscription_; }; /** @@ -170,18 +225,18 @@ class TestSubscriptionTopicStatisticsFixture : public ::testing::Test TEST_F(TestSubscriptionTopicStatisticsFixture, test_manual_construction) { - // manually create publisher tied to the node + // Manually create publisher tied to the node auto topic_stats_publisher = empty_subscriber->create_publisher( kTestTopicStatisticsTopic, 10); - // construct a separate instance + // Construct a separate instance auto sub_topic_stats = std::make_unique>( empty_subscriber->get_name(), topic_stats_publisher); - // expect no data has been collected / no samples received + // Expect no data has been collected / no samples received for (const auto & data : sub_topic_stats->get_current_collector_data()) { EXPECT_TRUE(std::isnan(data.average)); EXPECT_TRUE(std::isnan(data.min)); @@ -191,56 +246,147 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_manual_construction) } } -TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_single_empty_stats_message) +TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_for_message_no_header) { - // create an empty publisher + // Create an empty publisher auto empty_publisher = std::make_shared( kTestPubNodeName, kTestSubStatsTopic); // empty_subscriber has a topic statistics instance as part of its subscription // this will listen to and generate statistics for the empty message - // create a listener for topic statistics messages + // Create a listener for topic statistics messages auto statistics_listener = std::make_shared( "test_receive_single_empty_stats_message_listener", - "/statistics"); + "/statistics", + 2); rclcpp::executors::SingleThreadedExecutor ex; ex.add_node(empty_publisher); ex.add_node(statistics_listener); ex.add_node(empty_subscriber); - // spin and get future + // Spin and get future ex.spin_until_future_complete( statistics_listener->GetFuture(), kTestDuration); - // compare message counts, sample count should be the same as published and received count - EXPECT_EQ(1, statistics_listener->GetNumberOfMessagesReceived()); - - // check the received message and the data types - const auto received_message = statistics_listener->GetLastReceivedMessage(); - for (const auto & stats_point : received_message.statistics) { - const auto type = stats_point.data_type; - switch (type) { - case StatisticDataType::STATISTICS_DATA_TYPE_SAMPLE_COUNT: - EXPECT_LT(0, stats_point.data) << "unexpected sample count"; - break; - case StatisticDataType::STATISTICS_DATA_TYPE_AVERAGE: - EXPECT_LT(0, stats_point.data) << "unexpected avg"; - break; - case StatisticDataType::STATISTICS_DATA_TYPE_MINIMUM: - EXPECT_LT(0, stats_point.data) << "unexpected min"; - break; - case StatisticDataType::STATISTICS_DATA_TYPE_MAXIMUM: - EXPECT_LT(0, stats_point.data) << "unexpected max"; - break; - case StatisticDataType::STATISTICS_DATA_TYPE_STDDEV: - EXPECT_LT(0, stats_point.data) << "unexpected stddev"; - break; - default: - FAIL() << "received unknown statistics type: " << std::dec << - static_cast(type); + // Compare message counts, sample count should be the same as published and received count + EXPECT_EQ(2, statistics_listener->GetNumberOfMessagesReceived()); + + // Check the received message and the data types + const auto received_messages = statistics_listener->GetReceivedMessages(); + + EXPECT_EQ(2u, received_messages.size()); + + std::set received_metrics; + for (const auto & msg : received_messages) { + received_metrics.insert(msg.metrics_source); + } + EXPECT_TRUE(received_metrics.find("message_age") != received_metrics.end()); + EXPECT_TRUE(received_metrics.find("message_period") != received_metrics.end()); + + // Check the collected statistics for message period. + // Message age statistics will not be calculated because Empty messages + // don't have a `header` with timestamp. + for (const auto & msg : received_messages) { + if (msg.metrics_source != "message_period") { + continue; + } + for (const auto & stats_point : msg.statistics) { + const auto type = stats_point.data_type; + switch (type) { + case StatisticDataType::STATISTICS_DATA_TYPE_SAMPLE_COUNT: + EXPECT_LT(0, stats_point.data) << "unexpected sample count"; + break; + case StatisticDataType::STATISTICS_DATA_TYPE_AVERAGE: + EXPECT_LT(0, stats_point.data) << "unexpected avg"; + break; + case StatisticDataType::STATISTICS_DATA_TYPE_MINIMUM: + EXPECT_LT(0, stats_point.data) << "unexpected min"; + break; + case StatisticDataType::STATISTICS_DATA_TYPE_MAXIMUM: + EXPECT_LT(0, stats_point.data) << "unexpected max"; + break; + case StatisticDataType::STATISTICS_DATA_TYPE_STDDEV: + EXPECT_LT(0, stats_point.data) << "unexpected stddev"; + break; + default: + FAIL() << "received unknown statistics type: " << std::dec << + static_cast(type); + } + } + } +} + +TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_for_message_with_header) +{ + // Create a MessageWithHeader publisher + auto msg_with_header_publisher = std::make_shared( + kTestPubNodeName, + kTestSubStatsTopic); + // empty_subscriber has a topic statistics instance as part of its subscription + // this will listen to and generate statistics for the empty message + + // Create a listener for topic statistics messages + auto statistics_listener = std::make_shared( + "test_receive_stats_for_message_with_header", + "/statistics", + 2); + + auto msg_with_header_subscriber = std::make_shared( + kTestSubNodeName, + kTestSubStatsTopic); + + rclcpp::executors::SingleThreadedExecutor ex; + ex.add_node(msg_with_header_publisher); + ex.add_node(statistics_listener); + ex.add_node(msg_with_header_subscriber); + + // Spin and get future + ex.spin_until_future_complete( + statistics_listener->GetFuture(), + kTestDuration); + + // Compare message counts, sample count should be the same as published and received count + EXPECT_EQ(2, statistics_listener->GetNumberOfMessagesReceived()); + + // Check the received message and the data types + const auto received_messages = statistics_listener->GetReceivedMessages(); + + EXPECT_EQ(2u, received_messages.size()); + + std::set received_metrics; + for (const auto & msg : received_messages) { + received_metrics.insert(msg.metrics_source); + } + EXPECT_TRUE(received_metrics.find("message_age") != received_metrics.end()); + EXPECT_TRUE(received_metrics.find("message_period") != received_metrics.end()); + + // Check the collected statistics for message period. + for (const auto & msg : received_messages) { + for (const auto & stats_point : msg.statistics) { + const auto type = stats_point.data_type; + switch (type) { + case StatisticDataType::STATISTICS_DATA_TYPE_SAMPLE_COUNT: + EXPECT_LT(0, stats_point.data) << "unexpected sample count"; + break; + case StatisticDataType::STATISTICS_DATA_TYPE_AVERAGE: + EXPECT_LT(0, stats_point.data) << "unexpected avg"; + break; + case StatisticDataType::STATISTICS_DATA_TYPE_MINIMUM: + EXPECT_LT(0, stats_point.data) << "unexpected min"; + break; + case StatisticDataType::STATISTICS_DATA_TYPE_MAXIMUM: + EXPECT_LT(0, stats_point.data) << "unexpected max"; + break; + case StatisticDataType::STATISTICS_DATA_TYPE_STDDEV: + EXPECT_LE(0, stats_point.data) << "unexpected stddev"; + break; + default: + FAIL() << "received unknown statistics type: " << std::dec << + static_cast(type); + } } } } diff --git a/rclcpp/test/topic_statistics/test_topic_stats_utils.hpp b/rclcpp/test/topic_statistics/test_topic_stats_utils.hpp index 0140f919e8..1399abbaff 100644 --- a/rclcpp/test/topic_statistics/test_topic_stats_utils.hpp +++ b/rclcpp/test/topic_statistics/test_topic_stats_utils.hpp @@ -18,6 +18,7 @@ #include #include #include +#include #include "statistics_msgs/msg/metrics_message.hpp" @@ -89,7 +90,7 @@ class MetricsMessageSubscriber : public rclcpp::Node, public PromiseSetter MetricsMessageSubscriber( const std::string & name, const std::string & topic_name, - const int number_of_messages_to_receive = 1) + const int number_of_messages_to_receive = 2) : rclcpp::Node(name), number_of_messages_to_receive_(number_of_messages_to_receive) { @@ -99,7 +100,7 @@ class MetricsMessageSubscriber : public rclcpp::Node, public PromiseSetter subscription_ = create_subscription>( topic_name, - 0 /*history_depth*/, + 10 /*history_depth*/, callback); } @@ -107,10 +108,10 @@ class MetricsMessageSubscriber : public rclcpp::Node, public PromiseSetter * Acquires a mutex in order to get the last message received member. * \return the last message received */ - MetricsMessage GetLastReceivedMessage() const + std::vector GetReceivedMessages() const { std::unique_lock ulock{mutex_}; - return last_received_message_; + return received_messages_; } /** @@ -132,13 +133,13 @@ class MetricsMessageSubscriber : public rclcpp::Node, public PromiseSetter { std::unique_lock ulock{mutex_}; ++num_messages_received_; - last_received_message_ = msg; + received_messages_.push_back(msg); if (num_messages_received_ >= number_of_messages_to_receive_) { PromiseSetter::SetPromise(); } } - MetricsMessage last_received_message_; + std::vector received_messages_; rclcpp::Subscription::SharedPtr subscription_; mutable std::mutex mutex_; std::atomic num_messages_received_{0};