diff --git a/rclcpp/include/rclcpp/create_subscription.hpp b/rclcpp/include/rclcpp/create_subscription.hpp index 5b84930ff7..016c966b62 100644 --- a/rclcpp/include/rclcpp/create_subscription.hpp +++ b/rclcpp/include/rclcpp/create_subscription.hpp @@ -50,8 +50,8 @@ template< typename SubscriptionT, typename MessageMemoryStrategyT, typename NodeParametersT, - typename NodeTopicsT, - typename ROSMessageType = typename SubscriptionT::ROSMessageType> + typename NodeTopicsT +> typename std::shared_ptr create_subscription( NodeParametersT & node_parameters, @@ -70,7 +70,7 @@ create_subscription( using rclcpp::node_interfaces::get_node_topics_interface; auto node_topics_interface = get_node_topics_interface(node_topics); - std::shared_ptr> + std::shared_ptr subscription_topic_stats = nullptr; if (rclcpp::detail::resolve_enable_topic_statistics( @@ -80,8 +80,7 @@ create_subscription( if (options.topic_stats_options.publish_period <= std::chrono::milliseconds(0)) { throw std::invalid_argument( "topic_stats_options.publish_period must be greater than 0, specified value of " + - std::to_string(options.topic_stats_options.publish_period.count()) + - " ms"); + std::to_string(options.topic_stats_options.publish_period.count()) + " ms"); } std::shared_ptr> @@ -91,12 +90,12 @@ create_subscription( options.topic_stats_options.publish_topic, qos); - subscription_topic_stats = std::make_shared< - rclcpp::topic_statistics::SubscriptionTopicStatistics - >(node_topics_interface->get_node_base_interface()->get_name(), publisher); + subscription_topic_stats = + std::make_shared( + node_topics_interface->get_node_base_interface()->get_name(), publisher); std::weak_ptr< - rclcpp::topic_statistics::SubscriptionTopicStatistics + rclcpp::topic_statistics::SubscriptionTopicStatistics > weak_subscription_topic_stats(subscription_topic_stats); auto sub_call_back = [weak_subscription_topic_stats]() { auto subscription_topic_stats = weak_subscription_topic_stats.lock(); diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index b0857fb102..8e1aac2aa7 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -104,7 +104,7 @@ class Subscription : public SubscriptionBase private: using SubscriptionTopicStatisticsSharedPtr = - std::shared_ptr>; + std::shared_ptr; public: RCLCPP_SMART_PTR_DEFINITIONS(Subscription) @@ -316,7 +316,7 @@ class Subscription : public SubscriptionBase if (subscription_topic_statistics_) { const auto nanos = std::chrono::time_point_cast(now); const auto time = rclcpp::Time(nanos.time_since_epoch().count()); - subscription_topic_statistics_->handle_message(*typed_message, time); + subscription_topic_statistics_->handle_message(message_info.get_rmw_message_info(), time); } } @@ -357,7 +357,7 @@ class Subscription : public SubscriptionBase if (subscription_topic_statistics_) { const auto nanos = std::chrono::time_point_cast(now); const auto time = rclcpp::Time(nanos.time_since_epoch().count()); - subscription_topic_statistics_->handle_message(*typed_message, time); + subscription_topic_statistics_->handle_message(message_info.get_rmw_message_info(), time); } } diff --git a/rclcpp/include/rclcpp/subscription_factory.hpp b/rclcpp/include/rclcpp/subscription_factory.hpp index a1727eab5a..0e9d9fefe5 100644 --- a/rclcpp/include/rclcpp/subscription_factory.hpp +++ b/rclcpp/include/rclcpp/subscription_factory.hpp @@ -75,15 +75,14 @@ template< typename CallbackT, typename AllocatorT, typename SubscriptionT = rclcpp::Subscription, - typename MessageMemoryStrategyT = typename SubscriptionT::MessageMemoryStrategyType, - typename ROSMessageType = typename SubscriptionT::ROSMessageType + typename MessageMemoryStrategyT = typename SubscriptionT::MessageMemoryStrategyType > SubscriptionFactory create_subscription_factory( CallbackT && callback, const rclcpp::SubscriptionOptionsWithAllocator & options, typename MessageMemoryStrategyT::SharedPtr msg_mem_strat, - std::shared_ptr> + std::shared_ptr subscription_topic_stats = nullptr ) { diff --git a/rclcpp/include/rclcpp/topic_statistics/subscription_topic_statistics.hpp b/rclcpp/include/rclcpp/topic_statistics/subscription_topic_statistics.hpp index 4b9221406f..781e2c86fc 100644 --- a/rclcpp/include/rclcpp/topic_statistics/subscription_topic_statistics.hpp +++ b/rclcpp/include/rclcpp/topic_statistics/subscription_topic_statistics.hpp @@ -48,21 +48,12 @@ using libstatistics_collector::moving_average_statistics::StatisticData; /** * Class used to collect, measure, and publish topic statistics data. Current statistics * supported for subscribers are received message age and received message period. - * - * \tparam CallbackMessageT the subscribed message type - */ -template + */ class SubscriptionTopicStatistics { - using TopicStatsCollector = - libstatistics_collector::topic_statistics_collector::TopicStatisticsCollector< - CallbackMessageT>; - using ReceivedMessageAge = - libstatistics_collector::topic_statistics_collector::ReceivedMessageAgeCollector< - CallbackMessageT>; - using ReceivedMessagePeriod = - libstatistics_collector::topic_statistics_collector::ReceivedMessagePeriodCollector< - CallbackMessageT>; + using TopicStatsCollector = libstatistics_collector::TopicStatisticsCollector; + using ReceivedMessageAge = libstatistics_collector::ReceivedMessageAgeCollector; + using ReceivedMessagePeriod = libstatistics_collector::ReceivedMessagePeriodCollector; public: /// Construct a SubscriptionTopicStatistics object. @@ -101,16 +92,16 @@ class SubscriptionTopicStatistics /** * This method acquires a lock to prevent race conditions to collectors list. * - * \param received_message the message received by the subscription + * \param message_info the message info corresponding to the received message * \param now_nanoseconds current time in nanoseconds */ virtual void handle_message( - const CallbackMessageT & received_message, + const rmw_message_info_t & message_info, const rclcpp::Time now_nanoseconds) const { std::lock_guard lock(mutex_); for (const auto & collector : subscriber_statistics_collectors_) { - collector->OnMessageReceived(received_message, now_nanoseconds.nanoseconds()); + collector->OnMessageReceived(message_info, now_nanoseconds.nanoseconds()); } } diff --git a/rclcpp/test/rclcpp/topic_statistics/test_subscription_topic_statistics.cpp b/rclcpp/test/rclcpp/topic_statistics/test_subscription_topic_statistics.cpp index ce6887c631..9166272207 100644 --- a/rclcpp/test/rclcpp/topic_statistics/test_subscription_topic_statistics.cpp +++ b/rclcpp/test/rclcpp/topic_statistics/test_subscription_topic_statistics.cpp @@ -14,7 +14,6 @@ #include -#include #include #include #include @@ -22,12 +21,12 @@ #include #include #include +#include #include #include "libstatistics_collector/moving_average_statistics/types.hpp" #include "rclcpp/create_publisher.hpp" -#include "rclcpp/msg/message_with_header.hpp" #include "rclcpp/node.hpp" #include "rclcpp/qos.hpp" #include "rclcpp/rclcpp.hpp" @@ -36,10 +35,10 @@ #include "rclcpp/topic_statistics/subscription_topic_statistics.hpp" #include "statistics_msgs/msg/metrics_message.hpp" -#include "statistics_msgs/msg/statistic_data_point.hpp" #include "statistics_msgs/msg/statistic_data_type.hpp" #include "test_msgs/msg/empty.hpp" +#include "test_msgs/msg/strings.hpp" #include "test_topic_stats_utils.hpp" @@ -67,7 +66,6 @@ constexpr const std::chrono::seconds kUnstableMessageAgeWindowDuration{ constexpr const std::chrono::seconds kUnstableMessageAgeOffset{std::chrono::seconds{1}}; } // namespace -using rclcpp::msg::MessageWithHeader; using test_msgs::msg::Empty; using rclcpp::topic_statistics::SubscriptionTopicStatistics; using statistics_msgs::msg::MetricsMessage; @@ -76,114 +74,73 @@ using statistics_msgs::msg::StatisticDataType; using libstatistics_collector::moving_average_statistics::StatisticData; /** - * Wrapper class to test and expose parts of the SubscriptionTopicStatistics class. - * \tparam CallbackMessageT + * Wrapper class to test and expose parts of the SubscriptionTopicStatistics class. */ -template -class TestSubscriptionTopicStatistics : public SubscriptionTopicStatistics +class TestSubscriptionTopicStatistics : public SubscriptionTopicStatistics { public: TestSubscriptionTopicStatistics( const std::string & node_name, rclcpp::Publisher::SharedPtr publisher) - : SubscriptionTopicStatistics(node_name, publisher) + : SubscriptionTopicStatistics(node_name, std::move(publisher)) { } - virtual ~TestSubscriptionTopicStatistics() = default; + ~TestSubscriptionTopicStatistics() override = default; /// Exposed for testing - std::vector get_current_collector_data() const - { - return SubscriptionTopicStatistics::get_current_collector_data(); - } + using SubscriptionTopicStatistics::get_current_collector_data; }; /** - * Empty publisher node: used to publish empty messages + * PublisherNode wrapper: used to create publisher node */ -class EmptyPublisher : public rclcpp::Node +template +class PublisherNode : public rclcpp::Node { public: - EmptyPublisher( + PublisherNode( const std::string & name, const std::string & topic, const std::chrono::milliseconds & publish_period = std::chrono::milliseconds{100}) : Node(name) { - publisher_ = create_publisher(topic, 10); + publisher_ = create_publisher(topic, 10); publish_timer_ = this->create_wall_timer( publish_period, [this]() { this->publish_message(); }); } - virtual ~EmptyPublisher() = default; + ~PublisherNode() override = default; private: void publish_message() { - auto msg = Empty{}; + auto msg = MessageT{}; publisher_->publish(msg); } - rclcpp::Publisher::SharedPtr publisher_; + typename rclcpp::Publisher::SharedPtr publisher_; rclcpp::TimerBase::SharedPtr publish_timer_; }; /** - * MessageWithHeader publisher node: used to publish MessageWithHeader with `header` value set - */ -class MessageWithHeaderPublisher : public rclcpp::Node -{ -public: - MessageWithHeaderPublisher( - const std::string & name, const std::string & topic, - const std::chrono::milliseconds & publish_period = std::chrono::milliseconds{100}) - : Node(name) - { - publisher_ = create_publisher(topic, 10); - publish_timer_ = this->create_wall_timer( - publish_period, [this]() { - this->publish_message(); - }); - uniform_dist_ = std::uniform_int_distribution{1000000, 100000000}; - } - - virtual ~MessageWithHeaderPublisher() = default; - -private: - void publish_message() - { - std::random_device rd; - std::mt19937 gen{rd()}; - uint32_t d = uniform_dist_(gen); - auto msg = MessageWithHeader{}; - // Subtract ~1 second (add some noise for a non-zero standard deviation) - // so the received message age calculation is always > 0 - msg.header.stamp = this->now() - rclcpp::Duration{1, d}; - publisher_->publish(msg); - } - - rclcpp::Publisher::SharedPtr publisher_; - rclcpp::TimerBase::SharedPtr publish_timer_; - std::uniform_int_distribution uniform_dist_; -}; - -/** - * TransitionMessageStamp publisher node : used to publish MessageWithHeader with `header` value set + * TransitionMessageStamp publisher emulator node : used to emulate publishing messages by + * directly calling rclcpp::Subscription::handle_message(msg_shared_ptr, message_info). * The message age results change during the test. */ - -class TransitionMessageStampPublisher : public rclcpp::Node +template +class TransitionMessageStampPublisherEmulator : public rclcpp::Node { public: - TransitionMessageStampPublisher( - const std::string & name, const std::string & topic, + TransitionMessageStampPublisherEmulator( + const std::string & name, const std::chrono::seconds transition_duration, const std::chrono::seconds message_age_offset, + typename rclcpp::Subscription::SharedPtr subscription, const std::chrono::milliseconds & publish_period = std::chrono::milliseconds{100}) - : Node(name), transition_duration_(transition_duration), message_age_offset_(message_age_offset) + : Node(name), transition_duration_(transition_duration), message_age_offset_(message_age_offset), + subscription_(std::move(subscription)) { - publisher_ = create_publisher(topic, 10); publish_timer_ = this->create_wall_timer(publish_period, [this]() {this->publish_message();}); start_time_ = this->now(); } @@ -191,84 +148,66 @@ class TransitionMessageStampPublisher : public rclcpp::Node private: void publish_message() { - auto msg = MessageWithHeader{}; + std::shared_ptr msg_shared_ptr = std::make_shared(); + rmw_message_info_t rmw_message_info = rmw_get_zero_initialized_message_info(); + auto now = this->now(); auto elapsed_time = now - start_time_; if (elapsed_time < transition_duration_) { // Apply only to the topic statistics in the first half // Subtract offset so message_age is always >= offset. - msg.header.stamp = now - message_age_offset_; + rmw_message_info.source_timestamp = (now - message_age_offset_).nanoseconds(); } else { - msg.header.stamp = now; + rmw_message_info.source_timestamp = now.nanoseconds(); } - publisher_->publish(msg); + rclcpp::MessageInfo message_info{rmw_message_info}; + subscription_->handle_message(msg_shared_ptr, message_info); } std::chrono::seconds transition_duration_; std::chrono::seconds message_age_offset_; + typename rclcpp::Subscription::SharedPtr subscription_; rclcpp::Time start_time_; - - rclcpp::Publisher::SharedPtr publisher_; rclcpp::TimerBase::SharedPtr publish_timer_; }; /** - * Empty subscriber node: used to create subscriber topic statistics requirements + * Message subscriber node: used to create subscriber with enabled topic statistics collectors + * */ -class EmptySubscriber : public rclcpp::Node +template +class SubscriberWithTopicStatistics : public rclcpp::Node { public: - EmptySubscriber(const std::string & name, const std::string & topic) + SubscriberWithTopicStatistics( + const std::string & name, const std::string & topic, + std::chrono::milliseconds publish_period = defaultStatisticsPublishPeriod) : Node(name) { - // manually enable topic statistics via options + // Manually enable topic statistics via options auto options = rclcpp::SubscriptionOptions(); options.topic_stats_options.state = rclcpp::TopicStatisticsState::Enable; + options.topic_stats_options.publish_period = publish_period; - auto callback = [](Empty::UniquePtr msg) { + auto callback = [](typename MessageT::UniquePtr msg) { (void) msg; }; - subscription_ = create_subscription>( + subscription_ = create_subscription>( topic, rclcpp::QoS(rclcpp::KeepAll()), callback, options); } - virtual ~EmptySubscriber() = default; - -private: - rclcpp::Subscription::SharedPtr subscription_; -}; + ~SubscriberWithTopicStatistics() override = default; -/** - * MessageWithHeader subscriber node: used to create subscriber topic statistics requirements - */ -class MessageWithHeaderSubscriber : public rclcpp::Node -{ -public: - MessageWithHeaderSubscriber(const std::string & name, const std::string & topic) - : Node(name) + typename rclcpp::Subscription::SharedPtr get_subscription() { - // manually enable topic statistics via options - auto options = rclcpp::SubscriptionOptions(); - options.topic_stats_options.state = rclcpp::TopicStatisticsState::Enable; - options.topic_stats_options.publish_period = defaultStatisticsPublishPeriod; - - auto callback = [](MessageWithHeader::UniquePtr msg) { - (void) msg; - }; - subscription_ = create_subscription>( - topic, - rclcpp::QoS(rclcpp::KeepAll()), - callback, - options); + return subscription_; } - virtual ~MessageWithHeaderSubscriber() = default; private: - rclcpp::Subscription::SharedPtr subscription_; + typename rclcpp::Subscription::SharedPtr subscription_; }; /** @@ -277,43 +216,17 @@ class MessageWithHeaderSubscriber : public rclcpp::Node class TestSubscriptionTopicStatisticsFixture : public ::testing::Test { protected: - void SetUp() + void SetUp() override { rclcpp::init(0 /* argc */, nullptr /* argv */); } - void TearDown() + void TearDown() override { rclcpp::shutdown(); } }; -/** - * Check if a received statistics message is empty (no data was observed) - * \param message_to_check - */ -void check_if_statistics_message_is_empty(const MetricsMessage & message_to_check) -{ - for (const auto & stats_point : message_to_check.statistics) { - const auto type = stats_point.data_type; - switch (type) { - case StatisticDataType::STATISTICS_DATA_TYPE_SAMPLE_COUNT: - EXPECT_EQ(0, stats_point.data) << "unexpected sample count" << stats_point.data; - break; - case StatisticDataType::STATISTICS_DATA_TYPE_AVERAGE: - case StatisticDataType::STATISTICS_DATA_TYPE_MINIMUM: - case StatisticDataType::STATISTICS_DATA_TYPE_MAXIMUM: - case StatisticDataType::STATISTICS_DATA_TYPE_STDDEV: - EXPECT_TRUE(std::isnan(stats_point.data)) << "unexpected value" << stats_point.data << - " for type:" << type; - break; - default: - FAIL() << "received unknown statistics type: " << std::dec << - static_cast(type); - } - } -} - /** * Check if a received statistics message observed data and contains some calculation * \param message_to_check @@ -348,28 +261,13 @@ void check_if_statistic_message_is_populated(const MetricsMessage & message_to_c /** * Test an invalid argument is thrown for a bad input publish period. */ -TEST(TestSubscriptionTopicStatistics, test_invalid_publish_period) +TEST_F(TestSubscriptionTopicStatisticsFixture, test_invalid_publish_period) { - rclcpp::init(0 /* argc */, nullptr /* argv */); - - auto node = std::make_shared("test_period_node"); - - auto options = rclcpp::SubscriptionOptions(); - options.topic_stats_options.state = rclcpp::TopicStatisticsState::Enable; - options.topic_stats_options.publish_period = std::chrono::milliseconds(0); - - auto callback = [](Empty::UniquePtr msg) { - (void) msg; - }; - ASSERT_THROW( - (node->create_subscription>( - "should_throw_invalid_arg", - rclcpp::QoS(rclcpp::KeepAll()), - callback, - options)), std::invalid_argument); - - rclcpp::shutdown(); + SubscriberWithTopicStatistics( + "test_period_node", "should_throw_invalid_arg", std::chrono::milliseconds(0) + ), + std::invalid_argument); } /** @@ -378,7 +276,7 @@ TEST(TestSubscriptionTopicStatistics, test_invalid_publish_period) */ TEST_F(TestSubscriptionTopicStatisticsFixture, test_manual_construction) { - auto empty_subscriber = std::make_shared( + auto empty_subscriber = std::make_shared>( kTestSubNodeName, kTestSubStatsEmptyTopic); @@ -389,7 +287,7 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_manual_construction) 10); // Construct a separate instance - auto sub_topic_stats = std::make_unique>( + auto sub_topic_stats = std::make_unique( empty_subscriber->get_name(), topic_stats_publisher); @@ -410,7 +308,7 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_manual_construction) TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_for_message_no_header) { // Create an empty publisher - auto empty_publisher = std::make_shared( + auto empty_publisher = std::make_shared>( kTestPubNodeName, kTestSubStatsEmptyTopic); // empty_subscriber has a topic statistics instance as part of its subscription @@ -422,7 +320,7 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_for_message_no "/statistics", kNumExpectedMessages); - auto empty_subscriber = std::make_shared( + auto empty_subscriber = std::make_shared>( kTestSubNodeName, kTestSubStatsEmptyTopic); @@ -432,74 +330,7 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_for_message_no ex.add_node(empty_subscriber); // Spin and get future - ex.spin_until_future_complete( - statistics_listener->GetFuture(), - kTestTimeout); - - // Compare message counts, sample count should be the same as published and received count - EXPECT_EQ(kNumExpectedMessages, statistics_listener->GetNumberOfMessagesReceived()); - - // Check the received message total count - const auto received_messages = statistics_listener->GetReceivedMessages(); - EXPECT_EQ(kNumExpectedMessages, received_messages.size()); - - // check the type of statistics that were received and their counts - uint64_t message_age_count{0}; - uint64_t message_period_count{0}; - - std::set received_metrics; - for (const auto & msg : received_messages) { - if (msg.metrics_source == "message_age") { - message_age_count++; - } - if (msg.metrics_source == "message_period") { - message_period_count++; - } - } - EXPECT_EQ(kNumExpectedMessageAgeMessages, message_age_count); - EXPECT_EQ(kNumExpectedMessagePeriodMessages, message_period_count); - - // Check the collected statistics for message period. - // Message age statistics will not be calculated because Empty messages - // don't have a `header` with timestamp. This means that we expect to receive a `message_age` - // and `message_period` message for each empty message published. - for (const auto & msg : received_messages) { - if (msg.metrics_source == kMessageAgeSourceLabel) { - check_if_statistics_message_is_empty(msg); - } else if (msg.metrics_source == kMessagePeriodSourceLabel) { - check_if_statistic_message_is_populated(msg); - } - } -} - -TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_for_message_with_header) -{ - // Create a MessageWithHeader publisher - auto msg_with_header_publisher = std::make_shared( - kTestPubNodeName, - kTestSubStatsTopic); - // empty_subscriber has a topic statistics instance as part of its subscription - // this will listen to and generate statistics for the empty message - - // Create a listener for topic statistics messages - auto statistics_listener = std::make_shared( - "test_receive_stats_for_message_with_header", - "/statistics", - kNumExpectedMessages); - - auto msg_with_header_subscriber = std::make_shared( - kTestSubNodeName, - kTestSubStatsTopic); - - rclcpp::executors::SingleThreadedExecutor ex; - ex.add_node(msg_with_header_publisher); - ex.add_node(statistics_listener); - ex.add_node(msg_with_header_subscriber); - - // Spin and get future - ex.spin_until_future_complete( - statistics_listener->GetFuture(), - kTestTimeout); + ex.spin_until_future_complete(statistics_listener->GetFuture(), kTestTimeout); // Compare message counts, sample count should be the same as published and received count EXPECT_EQ(kNumExpectedMessages, statistics_listener->GetNumberOfMessagesReceived()); @@ -524,6 +355,7 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_for_message_wi EXPECT_EQ(kNumExpectedMessageAgeMessages, message_age_count); EXPECT_EQ(kNumExpectedMessagePeriodMessages, message_period_count); + // Check the collected statistics for message period. for (const auto & msg : received_messages) { check_if_statistic_message_is_populated(msg); } @@ -531,23 +363,27 @@ TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_for_message_wi TEST_F(TestSubscriptionTopicStatisticsFixture, test_receive_stats_include_window_reset) { - // Create a MessageWithHeader publisher - auto msg_with_header_publisher = std::make_shared( - kTestPubNodeName, kTestSubStatsTopic, kUnstableMessageAgeWindowDuration, - kUnstableMessageAgeOffset); - - // msg_with_header_subscriber has a topic statistics instance as part of its + // msg_subscriber_with_topic_statistics has a topic statistics instance as part of its // subscription this will listen to and generate statistics - auto msg_with_header_subscriber = - std::make_shared(kTestSubNodeName, kTestSubStatsTopic); + auto msg_subscriber_with_topic_statistics = + std::make_shared>( + kTestSubNodeName, + kTestSubStatsTopic); + + // Create a message publisher + auto msg_publisher = + std::make_shared>( + kTestPubNodeName, kUnstableMessageAgeWindowDuration, + kUnstableMessageAgeOffset, msg_subscriber_with_topic_statistics->get_subscription()); + // Create a listener for topic statistics messages auto statistics_listener = std::make_shared( "test_receive_stats_include_window_reset", "/statistics", kNumExpectedMessages); rclcpp::executors::SingleThreadedExecutor ex; - ex.add_node(msg_with_header_publisher); + ex.add_node(msg_publisher); ex.add_node(statistics_listener); - ex.add_node(msg_with_header_subscriber); + ex.add_node(msg_subscriber_with_topic_statistics); // Spin and get future ex.spin_until_future_complete(statistics_listener->GetFuture(), kTestTimeout);