diff --git a/rclcpp/include/rclcpp/create_subscription.hpp b/rclcpp/include/rclcpp/create_subscription.hpp index 9248254047..207bee4a71 100644 --- a/rclcpp/include/rclcpp/create_subscription.hpp +++ b/rclcpp/include/rclcpp/create_subscription.hpp @@ -15,15 +15,25 @@ #ifndef RCLCPP__CREATE_SUBSCRIPTION_HPP_ #define RCLCPP__CREATE_SUBSCRIPTION_HPP_ +#include +#include #include #include #include +#include "rclcpp/detail/resolve_enable_topic_statistics.hpp" + +#include "rclcpp/node_interfaces/get_node_timers_interface.hpp" #include "rclcpp/node_interfaces/get_node_topics_interface.hpp" +#include "rclcpp/node_interfaces/node_timers_interface.hpp" #include "rclcpp/node_interfaces/node_topics_interface.hpp" + +#include "rclcpp/create_timer.hpp" +#include "rclcpp/qos.hpp" #include "rclcpp/subscription_factory.hpp" #include "rclcpp/subscription_options.hpp" -#include "rclcpp/qos.hpp" +#include "rclcpp/timer.hpp" +#include "rclcpp/topic_statistics/subscription_topic_statistics.hpp" #include "rmw/qos_profiles.h" namespace rclcpp @@ -64,10 +74,46 @@ create_subscription( using rclcpp::node_interfaces::get_node_topics_interface; auto node_topics = get_node_topics_interface(std::forward(node)); + std::shared_ptr> + subscription_topic_stats = nullptr; + + if (rclcpp::detail::resolve_enable_topic_statistics( + options, + *node_topics->get_node_base_interface())) + { + std::shared_ptr> publisher = + create_publisher( + node, + options.topic_stats_options.publish_topic, + qos); + + subscription_topic_stats = std::make_shared< + rclcpp::topic_statistics::SubscriptionTopicStatistics + >(node_topics->get_node_base_interface()->get_name(), publisher); + + auto sub_call_back = [subscription_topic_stats]() { + subscription_topic_stats->publish_message(); + }; + + auto node_timer_interface = node_topics->get_node_timers_interface(); + + auto timer = create_wall_timer( + std::chrono::duration_cast( + options.topic_stats_options.publish_period), + sub_call_back, + options.callback_group, + node_topics->get_node_base_interface(), + node_timer_interface + ); + + subscription_topic_stats->set_publisher_timer(timer); + } + auto factory = rclcpp::create_subscription_factory( std::forward(callback), options, - msg_mem_strat + msg_mem_strat, + subscription_topic_stats ); auto sub = node_topics->create_subscription(topic_name, factory, qos); diff --git a/rclcpp/include/rclcpp/create_timer.hpp b/rclcpp/include/rclcpp/create_timer.hpp index 208474f345..15c641af37 100644 --- a/rclcpp/include/rclcpp/create_timer.hpp +++ b/rclcpp/include/rclcpp/create_timer.hpp @@ -15,6 +15,8 @@ #ifndef RCLCPP__CREATE_TIMER_HPP_ #define RCLCPP__CREATE_TIMER_HPP_ +#include +#include #include #include #include @@ -68,6 +70,46 @@ create_timer( group); } +/// Convenience method to create a timer with node resources. +/** + * + * \tparam DurationRepT + * \tparam DurationT + * \tparam CallbackT + * \param period period to exectute callback + * \param callback callback to execute via the timer period + * \param group + * \param node_base + * \param node_timers + * \return + * \throws std::invalid argument if either node_base or node_timers + * are null + */ +template +typename rclcpp::WallTimer::SharedPtr +create_wall_timer( + std::chrono::duration period, + CallbackT callback, + rclcpp::callback_group::CallbackGroup::SharedPtr group, + node_interfaces::NodeBaseInterface * node_base, + node_interfaces::NodeTimersInterface * node_timers) +{ + if (node_base == nullptr) { + throw std::invalid_argument{"input node_base cannot be null"}; + } + + if (node_timers == nullptr) { + throw std::invalid_argument{"input node_timers cannot be null"}; + } + + auto timer = rclcpp::WallTimer::make_shared( + std::chrono::duration_cast(period), + std::move(callback), + node_base->get_context()); + node_timers->add_timer(timer, group); + return timer; +} + } // namespace rclcpp #endif // RCLCPP__CREATE_TIMER_HPP_ diff --git a/rclcpp/include/rclcpp/detail/resolve_enable_topic_statistics.hpp b/rclcpp/include/rclcpp/detail/resolve_enable_topic_statistics.hpp index 7c59204ff5..e10a92ef10 100644 --- a/rclcpp/include/rclcpp/detail/resolve_enable_topic_statistics.hpp +++ b/rclcpp/include/rclcpp/detail/resolve_enable_topic_statistics.hpp @@ -42,7 +42,6 @@ resolve_enable_topic_statistics(const OptionsT & options, const NodeBaseT & node break; default: throw std::runtime_error("Unrecognized EnableTopicStatistics value"); - break; } return topic_stats_enabled; diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index e29edeb22e..512f472f79 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -37,10 +38,12 @@ #include "rclcpp/create_client.hpp" #include "rclcpp/create_publisher.hpp" #include "rclcpp/create_service.hpp" +#include "rclcpp/create_timer.hpp" #include "rclcpp/create_subscription.hpp" #include "rclcpp/detail/resolve_enable_topic_statistics.hpp" #include "rclcpp/parameter.hpp" #include "rclcpp/qos.hpp" +#include "rclcpp/timer.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/visibility_control.hpp" @@ -108,12 +111,12 @@ Node::create_wall_timer( CallbackT callback, rclcpp::callback_group::CallbackGroup::SharedPtr group) { - auto timer = rclcpp::WallTimer::make_shared( - std::chrono::duration_cast(period), + return rclcpp::create_wall_timer( + period, std::move(callback), - this->node_base_->get_context()); - node_timers_->add_timer(timer, group); - return timer; + group, + this->node_base_.get(), + this->node_timers_.get()); } template diff --git a/rclcpp/include/rclcpp/node_interfaces/node_topics.hpp b/rclcpp/include/rclcpp/node_interfaces/node_topics.hpp index 8209984abc..227d193516 100644 --- a/rclcpp/include/rclcpp/node_interfaces/node_topics.hpp +++ b/rclcpp/include/rclcpp/node_interfaces/node_topics.hpp @@ -22,6 +22,7 @@ #include "rclcpp/macros.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp" +#include "rclcpp/node_interfaces/node_timers_interface.hpp" #include "rclcpp/node_interfaces/node_topics_interface.hpp" #include "rclcpp/publisher.hpp" #include "rclcpp/publisher_factory.hpp" @@ -39,7 +40,9 @@ class NodeTopics : public NodeTopicsInterface RCLCPP_SMART_PTR_ALIASES_ONLY(NodeTopicsInterface) RCLCPP_PUBLIC - explicit NodeTopics(rclcpp::node_interfaces::NodeBaseInterface * node_base); + NodeTopics( + rclcpp::node_interfaces::NodeBaseInterface * node_base, + rclcpp::node_interfaces::NodeTimersInterface * node_timers); RCLCPP_PUBLIC ~NodeTopics() override; @@ -74,10 +77,15 @@ class NodeTopics : public NodeTopicsInterface rclcpp::node_interfaces::NodeBaseInterface * get_node_base_interface() const override; + RCLCPP_PUBLIC + rclcpp::node_interfaces::NodeTimersInterface * + get_node_timers_interface() const override; + private: RCLCPP_DISABLE_COPY(NodeTopics) rclcpp::node_interfaces::NodeBaseInterface * node_base_; + rclcpp::node_interfaces::NodeTimersInterface * node_timers_; }; } // namespace node_interfaces diff --git a/rclcpp/include/rclcpp/node_interfaces/node_topics_interface.hpp b/rclcpp/include/rclcpp/node_interfaces/node_topics_interface.hpp index 857670c58b..4ba28d5e40 100644 --- a/rclcpp/include/rclcpp/node_interfaces/node_topics_interface.hpp +++ b/rclcpp/include/rclcpp/node_interfaces/node_topics_interface.hpp @@ -25,6 +25,7 @@ #include "rclcpp/callback_group.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp" +#include "rclcpp/node_interfaces/node_timers_interface.hpp" #include "rclcpp/publisher.hpp" #include "rclcpp/publisher_factory.hpp" #include "rclcpp/subscription.hpp" @@ -80,6 +81,11 @@ class NodeTopicsInterface virtual rclcpp::node_interfaces::NodeBaseInterface * get_node_base_interface() const = 0; + + RCLCPP_PUBLIC + virtual + rclcpp::node_interfaces::NodeTimersInterface * + get_node_timers_interface() const = 0; }; } // namespace node_interfaces diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 2b5172dc1b..a1e544c773 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -18,6 +18,7 @@ #include #include +#include #include #include #include @@ -25,7 +26,6 @@ #include #include - #include "rcl/error_handling.h" #include "rcl/subscription.h" @@ -47,6 +47,7 @@ #include "rclcpp/type_support_decl.hpp" #include "rclcpp/visibility_control.hpp" #include "rclcpp/waitable.hpp" +#include "rclcpp/topic_statistics/subscription_topic_statistics.hpp" #include "tracetools/tracetools.h" namespace rclcpp @@ -75,6 +76,8 @@ class Subscription : public SubscriptionBase using MessageDeleter = allocator::Deleter; using ConstMessageSharedPtr = std::shared_ptr; using MessageUniquePtr = std::unique_ptr; + using SubscriptionTopicStatisticsSharedPtr = + std::shared_ptr>; RCLCPP_SMART_PTR_DEFINITIONS(Subscription) @@ -98,7 +101,8 @@ class Subscription : public SubscriptionBase const rclcpp::QoS & qos, AnySubscriptionCallback callback, const rclcpp::SubscriptionOptionsWithAllocator & options, - typename MessageMemoryStrategyT::SharedPtr message_memory_strategy) + typename MessageMemoryStrategyT::SharedPtr message_memory_strategy, + SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics = nullptr) : SubscriptionBase( node_base, type_support_handle, @@ -180,6 +184,10 @@ class Subscription : public SubscriptionBase this->setup_intra_process(intra_process_subscription_id, ipm); } + if (subscription_topic_statistics != nullptr) { + this->subscription_topic_statistics_ = std::move(subscription_topic_statistics); + } + TRACEPOINT( rclcpp_subscription_init, (const void *)get_subscription_handle().get(), @@ -260,6 +268,13 @@ class Subscription : public SubscriptionBase } auto typed_message = std::static_pointer_cast(message); any_callback_.dispatch(typed_message, message_info); + + if (subscription_topic_statistics_) { + const auto nanos = std::chrono::time_point_cast( + std::chrono::steady_clock::now()); + const auto time = rclcpp::Time(nanos.time_since_epoch().count(), RCL_STEADY_TIME); + subscription_topic_statistics_->handle_message(*typed_message, time); + } } void @@ -307,6 +322,8 @@ class Subscription : public SubscriptionBase const rclcpp::SubscriptionOptionsWithAllocator options_; typename message_memory_strategy::MessageMemoryStrategy::SharedPtr message_memory_strategy_; + /// Component which computes and publishes topic statistics for this subscriber + SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics_{nullptr}; }; } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/subscription_factory.hpp b/rclcpp/include/rclcpp/subscription_factory.hpp index a0f265c803..ffab0724c8 100644 --- a/rclcpp/include/rclcpp/subscription_factory.hpp +++ b/rclcpp/include/rclcpp/subscription_factory.hpp @@ -32,6 +32,7 @@ #include "rclcpp/subscription_options.hpp" #include "rclcpp/subscription_traits.hpp" #include "rclcpp/visibility_control.hpp" +#include "rclcpp/topic_statistics/subscription_topic_statistics.hpp" namespace rclcpp { @@ -78,7 +79,10 @@ SubscriptionFactory create_subscription_factory( CallbackT && callback, const rclcpp::SubscriptionOptionsWithAllocator & options, - typename MessageMemoryStrategyT::SharedPtr msg_mem_strat) + typename MessageMemoryStrategyT::SharedPtr msg_mem_strat, + std::shared_ptr> + subscription_topic_stats = nullptr +) { auto allocator = options.get_allocator(); @@ -88,7 +92,7 @@ create_subscription_factory( SubscriptionFactory factory { // factory function that creates a MessageT specific SubscriptionT - [options, msg_mem_strat, any_subscription_callback]( + [options, msg_mem_strat, any_subscription_callback, subscription_topic_stats]( rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic_name, const rclcpp::QoS & qos @@ -104,7 +108,8 @@ create_subscription_factory( qos, any_subscription_callback, options, - msg_mem_strat); + msg_mem_strat, + subscription_topic_stats); // This is used for setting up things like intra process comms which // require this->shared_from_this() which cannot be called from // the constructor. diff --git a/rclcpp/src/rclcpp/node.cpp b/rclcpp/src/rclcpp/node.cpp index b429b305c2..b73ec6bf2f 100644 --- a/rclcpp/src/rclcpp/node.cpp +++ b/rclcpp/src/rclcpp/node.cpp @@ -108,7 +108,7 @@ Node::Node( node_graph_(new rclcpp::node_interfaces::NodeGraph(node_base_.get())), node_logging_(new rclcpp::node_interfaces::NodeLogging(node_base_.get())), node_timers_(new rclcpp::node_interfaces::NodeTimers(node_base_.get())), - node_topics_(new rclcpp::node_interfaces::NodeTopics(node_base_.get())), + node_topics_(new rclcpp::node_interfaces::NodeTopics(node_base_.get(), node_timers_.get())), node_services_(new rclcpp::node_interfaces::NodeServices(node_base_.get())), node_clock_(new rclcpp::node_interfaces::NodeClock( node_base_, diff --git a/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp b/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp index 11de4a3278..360b4236c3 100644 --- a/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp +++ b/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp @@ -22,8 +22,10 @@ using rclcpp::exceptions::throw_from_rcl_error; using rclcpp::node_interfaces::NodeTopics; -NodeTopics::NodeTopics(rclcpp::node_interfaces::NodeBaseInterface * node_base) -: node_base_(node_base) +NodeTopics::NodeTopics( + rclcpp::node_interfaces::NodeBaseInterface * node_base, + rclcpp::node_interfaces::NodeTimersInterface * node_timers) +: node_base_(node_base), node_timers_(node_timers) {} NodeTopics::~NodeTopics() @@ -121,3 +123,9 @@ NodeTopics::get_node_base_interface() const { return node_base_; } + +rclcpp::node_interfaces::NodeTimersInterface * +NodeTopics::get_node_timers_interface() const +{ + return node_timers_; +} diff --git a/rclcpp/test/topic_statistics/test_subscription_topic_statistics.cpp b/rclcpp/test/topic_statistics/test_subscription_topic_statistics.cpp index de90ecb590..416c7fc4ad 100644 --- a/rclcpp/test/topic_statistics/test_subscription_topic_statistics.cpp +++ b/rclcpp/test/topic_statistics/test_subscription_topic_statistics.cpp @@ -14,6 +14,9 @@ #include +#include +#include +#include #include #include #include @@ -24,23 +27,34 @@ #include "rclcpp/node.hpp" #include "rclcpp/qos.hpp" #include "rclcpp/rclcpp.hpp" +#include "rclcpp/subscription_options.hpp" #include "rclcpp/topic_statistics/subscription_topic_statistics.hpp" #include "statistics_msgs/msg/metrics_message.hpp" +#include "statistics_msgs/msg/statistic_data_point.hpp" +#include "statistics_msgs/msg/statistic_data_type.hpp" + #include "test_msgs/msg/empty.hpp" +#include "test_topic_stats_utils.hpp" + namespace { -constexpr const char kTestNodeName[]{"test_sub_stats_node"}; +constexpr const char kTestPubNodeName[]{"test_pub_stats_node"}; +constexpr const char kTestSubNodeName[]{"test_sub_stats_node"}; constexpr const char kTestSubStatsTopic[]{"/test_sub_stats_topic"}; constexpr const char kTestTopicStatisticsTopic[]{"/test_topic_statistics_topic"}; constexpr const uint64_t kNoSamples{0}; +constexpr const std::chrono::milliseconds kTestStatsPublishPeriod{5000}; +constexpr const std::chrono::seconds kTestDuration{10}; } // namespace using test_msgs::msg::Empty; -using statistics_msgs::msg::MetricsMessage; using rclcpp::topic_statistics::SubscriptionTopicStatistics; +using statistics_msgs::msg::MetricsMessage; +using statistics_msgs::msg::StatisticDataPoint; +using statistics_msgs::msg::StatisticDataType; using libstatistics_collector::moving_average_statistics::StatisticData; template @@ -63,6 +77,44 @@ class TestSubscriptionTopicStatistics : public SubscriptionTopicStatistics(topic, 10); + publish_timer_ = this->create_wall_timer( + publish_period, [this]() { + this->publish_message(); + }); + } + + virtual ~EmptyPublisher() = default; + + size_t get_number_published() + { + return number_published_.load(); + } + +private: + void publish_message() + { + ++number_published_; + auto msg = Empty{}; + publisher_->publish(msg); + } + + rclcpp::Publisher::SharedPtr publisher_; + std::atomic number_published_{0}; + rclcpp::TimerBase::SharedPtr publish_timer_; +}; + /** * Empty subscriber node: used to create subscriber topic statistics requirements */ @@ -72,6 +124,10 @@ class EmptySubscriber : public rclcpp::Node EmptySubscriber(const std::string & name, const std::string & topic) : Node(name) { + // manually enable topic statistics via options + auto options = rclcpp::SubscriptionOptions(); + options.topic_stats_options.state = rclcpp::TopicStatisticsState::Enable; + auto callback = [this](Empty::UniquePtr msg) { this->receive_message(*msg); }; @@ -79,16 +135,15 @@ class EmptySubscriber : public rclcpp::Node std::function>( topic, rclcpp::QoS(rclcpp::KeepAll()), - callback); + callback, + options); } - virtual ~EmptySubscriber() = default; private: void receive_message(const Empty &) const { } - rclcpp::Subscription::SharedPtr subscription_; }; @@ -102,7 +157,7 @@ class TestSubscriptionTopicStatisticsFixture : public ::testing::Test { rclcpp::init(0 /* argc */, nullptr /* argv */); empty_subscriber = std::make_shared( - kTestNodeName, + kTestSubNodeName, kTestSubStatsTopic); } @@ -122,13 +177,11 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_manual_construction) kTestTopicStatisticsTopic, 10); - // construct the instance + // construct a separate instance auto sub_topic_stats = std::make_unique>( empty_subscriber->get_name(), topic_stats_publisher); - using libstatistics_collector::moving_average_statistics::StatisticData; - // expect no data has been collected / no samples received for (const auto & data : sub_topic_stats->get_current_collector_data()) { EXPECT_TRUE(std::isnan(data.average)); @@ -138,3 +191,57 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_manual_construction) EXPECT_EQ(kNoSamples, data.sample_count); } } + +TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_single_empty_stats_message) +{ + // create an empty publisher + auto empty_publisher = std::make_shared( + kTestPubNodeName, + kTestSubStatsTopic); + // empty_subscriber has a topic statistics instance as part of its subscription + // this will listen to and generate statistics for the empty message + + // create a listener for topic statistics messages + auto statistics_listener = std::make_shared( + "test_receive_single_empty_stats_message_listener", + "/statistics"); + + rclcpp::executors::SingleThreadedExecutor ex; + ex.add_node(empty_publisher); + ex.add_node(statistics_listener); + ex.add_node(empty_subscriber); + + // spin and get future + ex.spin_until_future_complete( + statistics_listener->GetFuture(), + kTestDuration); + + // compare message counts, sample count should be the same as published and received count + EXPECT_EQ(1, statistics_listener->GetNumberOfMessagesReceived()); + + // check the received message and the data types + const auto received_message = statistics_listener->GetLastReceivedMessage(); + for (const auto & stats_point : received_message.statistics) { + const auto type = stats_point.data_type; + switch (type) { + case StatisticDataType::STATISTICS_DATA_TYPE_SAMPLE_COUNT: + EXPECT_LT(0, stats_point.data) << "unexpected sample count"; + break; + case StatisticDataType::STATISTICS_DATA_TYPE_AVERAGE: + EXPECT_LT(0, stats_point.data) << "unexpected avg"; + break; + case StatisticDataType::STATISTICS_DATA_TYPE_MINIMUM: + EXPECT_LT(0, stats_point.data) << "unexpected min"; + break; + case StatisticDataType::STATISTICS_DATA_TYPE_MAXIMUM: + EXPECT_LT(0, stats_point.data) << "unexpected max"; + break; + case StatisticDataType::STATISTICS_DATA_TYPE_STDDEV: + EXPECT_LT(0, stats_point.data) << "unexpected stddev"; + break; + default: + FAIL() << "received unknown statistics type: " << std::dec << + static_cast(type); + } + } +} diff --git a/rclcpp/test/topic_statistics/test_topic_stats_utils.hpp b/rclcpp/test/topic_statistics/test_topic_stats_utils.hpp new file mode 100644 index 0000000000..0140f919e8 --- /dev/null +++ b/rclcpp/test/topic_statistics/test_topic_stats_utils.hpp @@ -0,0 +1,151 @@ +// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include + +#include "statistics_msgs/msg/metrics_message.hpp" + +#ifndef TOPIC_STATISTICS__TEST_TOPIC_STATS_UTILS_HPP_ +#define TOPIC_STATISTICS__TEST_TOPIC_STATS_UTILS_HPP_ + +namespace rclcpp +{ +namespace topic_statistics +{ + +using statistics_msgs::msg::MetricsMessage; + +/** +* Provide an interface to wait for a promise to be satisfied via its future. +*/ +class PromiseSetter +{ +public: + /** + * Reassign the promise member and return it's future. Acquires a mutex in order + * to mutate member variables. + * + * \return the promise member's future, called upon PeriodicMeasurement + */ + std::shared_future GetFuture() + { + std::unique_lock ulock{mutex_}; + use_future_ = true; + promise_ = std::promise(); + return promise_.get_future(); + } + +protected: + /** + * Set the promise to true, which signals the corresponding future. Acquires a mutex and sets + * the promise to true iff GetFuture was invoked before this. + */ + void SetPromise() + { + std::unique_lock ulock{mutex_}; + if (use_future_) { + // only set if GetFuture was called + promise_.set_value(true); + use_future_ = false; // the promise needs to be reassigned to set again + } + } + +private: + mutable std::mutex mutex_; + std::promise promise_; + bool use_future_{false}; +}; + +/** + * Node which listens for published MetricsMessages. This uses the PromiseSetter API + * in order to signal, via a future, that rclcpp should stop spinning upon + * message handling. + */ +class MetricsMessageSubscriber : public rclcpp::Node, public PromiseSetter +{ +public: + /** + * Constructs a MetricsMessageSubscriber. + * \param name the node name + * \param name the topic name + * \param number of messages to receive to trigger the PromiseSetter future + */ + MetricsMessageSubscriber( + const std::string & name, + const std::string & topic_name, + const int number_of_messages_to_receive = 1) + : rclcpp::Node(name), + number_of_messages_to_receive_(number_of_messages_to_receive) + { + auto callback = [this](MetricsMessage::UniquePtr msg) { + this->MetricsMessageCallback(*msg); + }; + subscription_ = create_subscription>( + topic_name, + 0 /*history_depth*/, + callback); + } + + /** + * Acquires a mutex in order to get the last message received member. + * \return the last message received + */ + MetricsMessage GetLastReceivedMessage() const + { + std::unique_lock ulock{mutex_}; + return last_received_message_; + } + + /** + * Return the number of messages received by this subscriber. + * \return the number of messages received by the subscriber callback + */ + int GetNumberOfMessagesReceived() const + { + return num_messages_received_; + } + +private: + /** + * Subscriber callback. Acquires a mutex to set the last message received and + * sets the promise to true. + * \param msg + */ + void MetricsMessageCallback(const MetricsMessage & msg) + { + std::unique_lock ulock{mutex_}; + ++num_messages_received_; + last_received_message_ = msg; + if (num_messages_received_ >= number_of_messages_to_receive_) { + PromiseSetter::SetPromise(); + } + } + + MetricsMessage last_received_message_; + rclcpp::Subscription::SharedPtr subscription_; + mutable std::mutex mutex_; + std::atomic num_messages_received_{0}; + const int number_of_messages_to_receive_; +}; + +} // namespace topic_statistics +} // namespace rclcpp + +#endif // TOPIC_STATISTICS__TEST_TOPIC_STATS_UTILS_HPP_ diff --git a/rclcpp_lifecycle/src/lifecycle_node.cpp b/rclcpp_lifecycle/src/lifecycle_node.cpp index 0d6330b4cd..60debba6f7 100644 --- a/rclcpp_lifecycle/src/lifecycle_node.cpp +++ b/rclcpp_lifecycle/src/lifecycle_node.cpp @@ -67,7 +67,7 @@ LifecycleNode::LifecycleNode( node_graph_(new rclcpp::node_interfaces::NodeGraph(node_base_.get())), node_logging_(new rclcpp::node_interfaces::NodeLogging(node_base_.get())), node_timers_(new rclcpp::node_interfaces::NodeTimers(node_base_.get())), - node_topics_(new rclcpp::node_interfaces::NodeTopics(node_base_.get())), + node_topics_(new rclcpp::node_interfaces::NodeTopics(node_base_.get(), node_timers_.get())), node_services_(new rclcpp::node_interfaces::NodeServices(node_base_.get())), node_clock_(new rclcpp::node_interfaces::NodeClock( node_base_,