Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate topic statistics #1072

Merged
merged 25 commits into from
Apr 22, 2020
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
538fbcb
Add SubscriberTopicStatistics class
dabonnie Apr 6, 2020
a61ad41
Add SubscriberTopicStatistics Test
dabonnie Apr 7, 2020
fe4fad9
Address review comments
dabonnie Apr 8, 2020
fdb6c64
Modify constructor to allow a node to create necessary components
dabonnie Apr 13, 2020
043b9a9
Fix docstring style
dabonnie Apr 13, 2020
019c86d
Remove SetPublisherTimer method
dabonnie Apr 13, 2020
8b8a31c
Change naming style to match rclcpp
dabonnie Apr 13, 2020
ade51ca
Address style issues
dabonnie Apr 14, 2020
34a89ce
Use rclcpp:Time
dabonnie Apr 14, 2020
c9a6eb7
Address review comments
dabonnie Apr 15, 2020
6f7060a
Remove unnecessary check for null publisher timer
dabonnie Apr 15, 2020
9c19234
Update message dependency
dabonnie Apr 17, 2020
9554784
Initial integration of Subscriber Topic Statistics
dabonnie Apr 18, 2020
c4519eb
Fix nanoseconds used for Topic Stats
dabonnie Apr 18, 2020
598d045
Add simple publishing test
dabonnie Apr 18, 2020
ebc753e
Add test utils header
dabonnie Apr 18, 2020
7c6bff4
Integrate with Topic Statistics options
dabonnie Apr 21, 2020
d88ec30
Update after rebasing
dabonnie Apr 21, 2020
6ae2e93
Address minor review comments
dabonnie Apr 21, 2020
80086d0
Move Topic Statistics instantiation to create_subscription
dabonnie Apr 22, 2020
6b9eaa5
Fix rebase issue
dabonnie Apr 22, 2020
5e69742
Move new timer creation method to relevant header
dabonnie Apr 22, 2020
942d7e0
Add timers interface to topic interface
dabonnie Apr 22, 2020
4c73fcf
Use new create timer method
dabonnie Apr 22, 2020
ec9bb73
Address review comments
dabonnie Apr 22, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 69 additions & 2 deletions rclcpp/include/rclcpp/create_subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,21 @@
#ifndef RCLCPP__CREATE_SUBSCRIPTION_HPP_
#define RCLCPP__CREATE_SUBSCRIPTION_HPP_

#include <chrono>
#include <functional>
#include <memory>
#include <string>
#include <utility>

#include "rclcpp/node_interfaces/get_node_topics_interface.hpp"
#include "rclcpp/node_interfaces/node_topics_interface.hpp"
#include "rclcpp/node_interfaces/node_timers_interface.hpp"

#include "rclcpp/subscription_factory.hpp"
#include "rclcpp/subscription_options.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/topic_statistics/subscription_topic_statistics.hpp"
#include "rclcpp/timer.hpp"
#include "rmw/qos_profiles.h"

namespace rclcpp
Expand Down Expand Up @@ -58,16 +64,49 @@ create_subscription(
),
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat = (
MessageMemoryStrategyT::create_default()
)
),
rclcpp::node_interfaces::NodeTimersInterface::SharedPtr node_timer_interface = nullptr
wjwwood marked this conversation as resolved.
Show resolved Hide resolved
)
{
using rclcpp::node_interfaces::get_node_topics_interface;
auto node_topics = get_node_topics_interface(std::forward<NodeT>(node));

std::shared_ptr<rclcpp::topic_statistics::SubscriptionTopicStatistics<CallbackMessageT>>
subscription_topic_stats = nullptr;

if (options.topic_stats_options.state == rclcpp::TopicStatisticsState::Enable) {
dabonnie marked this conversation as resolved.
Show resolved Hide resolved
std::shared_ptr<Publisher<statistics_msgs::msg::MetricsMessage>> publisher =
create_publisher<statistics_msgs::msg::MetricsMessage>(
node,
options.topic_stats_options.publish_topic,
qos);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq: why does the topics statistics publisher use the same qos as the subscription being created?

I haven't checked the topics statistics demo, but it seems that if different entities are publishing statistics using different QoS, a variety of problems can happen (e.g.: pub/sub not matching).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question. It's possible we could provide publishing QoS options as part of the topic statistics options. What do you think is the best path forward?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible we could provide publishing QoS options as part of the topic statistics options. What do you think is the best path forward?

I think that adding an extra option for that (like options.topic_stats_options.publish_topic) is a good idea.


subscription_topic_stats = std::make_shared<
rclcpp::topic_statistics::SubscriptionTopicStatistics<CallbackMessageT>
>(node_topics->get_node_base_interface()->get_name(), publisher);

auto sub_call_back = [subscription_topic_stats]() {
subscription_topic_stats->publish_message();
};

auto timer = create_timer(
std::chrono::duration_cast<std::chrono::nanoseconds>(
options.topic_stats_options.
publish_period),
dabonnie marked this conversation as resolved.
Show resolved Hide resolved
sub_call_back,
options.callback_group,
node_topics->get_node_base_interface(),
node_timer_interface
);

subscription_topic_stats->set_publisher_timer(timer);
}

auto factory = rclcpp::create_subscription_factory<MessageT>(
std::forward<CallbackT>(callback),
options,
msg_mem_strat
msg_mem_strat,
subscription_topic_stats
);

auto sub = node_topics->create_subscription(topic_name, factory, qos);
Expand All @@ -76,6 +115,34 @@ create_subscription(
return std::dynamic_pointer_cast<SubscriptionT>(sub);
}

/// Convenience method to create a timer
/**
*
* \tparam CallbackT callback template
* \param nanos period to exectute callback
* \param callback
* \param group
* \param node_base_
* \param node_timers_
* \return
*/
template<typename CallbackT>
typename rclcpp::WallTimer<CallbackT>::SharedPtr
create_timer(
std::chrono::nanoseconds nanos,
CallbackT callback,
rclcpp::callback_group::CallbackGroup::SharedPtr group,
rclcpp::node_interfaces::NodeBaseInterface * node_base_,
const rclcpp::node_interfaces::NodeTimersInterface::SharedPtr & node_timers_)
{
auto timer = rclcpp::WallTimer<CallbackT>::make_shared(
nanos,
std::move(callback),
node_base_->get_context());
node_timers_->add_timer(timer, group);
return timer;
}
dabonnie marked this conversation as resolved.
Show resolved Hide resolved

} // namespace rclcpp

#endif // RCLCPP__CREATE_SUBSCRIPTION_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ resolve_enable_topic_statistics(const OptionsT & options, const NodeBaseT & node
break;
default:
throw std::runtime_error("Unrecognized EnableTopicStatistics value");
break;
}

return topic_stats_enabled;
Expand Down
4 changes: 3 additions & 1 deletion rclcpp/include/rclcpp/node_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "rclcpp/qos.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rclcpp/timer.hpp"
dabonnie marked this conversation as resolved.
Show resolved Hide resolved

#ifndef RCLCPP__NODE_HPP_
#include "node.hpp"
Expand Down Expand Up @@ -98,7 +99,8 @@ Node::create_subscription(
qos,
std::forward<CallbackT>(callback),
options,
msg_mem_strat);
msg_mem_strat,
this->get_node_timers_interface());
}

template<typename DurationRepT, typename DurationT, typename CallbackT>
Expand Down
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/node_interfaces/node_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class NodeBase : public NodeBaseInterface
rclcpp::Context::SharedPtr context,
const rcl_node_options_t & rcl_node_options,
bool use_intra_process_default,
bool enable_topic_statistics_default);
bool enable_topic_statistics_default = false);
dabonnie marked this conversation as resolved.
Show resolved Hide resolved

RCLCPP_PUBLIC
virtual
Expand Down
21 changes: 19 additions & 2 deletions rclcpp/include/rclcpp/subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
#include <rmw/error_handling.h>
#include <rmw/rmw.h>

#include <chrono>
#include <functional>
#include <iostream>
#include <memory>
#include <sstream>
#include <string>
#include <utility>


#include "rcl/error_handling.h"
#include "rcl/subscription.h"

Expand All @@ -47,6 +47,7 @@
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rclcpp/waitable.hpp"
#include "rclcpp/topic_statistics/subscription_topic_statistics.hpp"
#include "tracetools/tracetools.h"

namespace rclcpp
Expand Down Expand Up @@ -75,6 +76,8 @@ class Subscription : public SubscriptionBase
using MessageDeleter = allocator::Deleter<MessageAllocator, CallbackMessageT>;
using ConstMessageSharedPtr = std::shared_ptr<const CallbackMessageT>;
using MessageUniquePtr = std::unique_ptr<CallbackMessageT, MessageDeleter>;
using SubscriptionTopicStatisticsSharedPtr =
std::shared_ptr<rclcpp::topic_statistics::SubscriptionTopicStatistics<CallbackMessageT>>;

RCLCPP_SMART_PTR_DEFINITIONS(Subscription)

Expand All @@ -98,7 +101,8 @@ class Subscription : public SubscriptionBase
const rclcpp::QoS & qos,
AnySubscriptionCallback<CallbackMessageT, AllocatorT> callback,
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options,
typename MessageMemoryStrategyT::SharedPtr message_memory_strategy)
typename MessageMemoryStrategyT::SharedPtr message_memory_strategy,
SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics = nullptr)
: SubscriptionBase(
node_base,
type_support_handle,
Expand Down Expand Up @@ -180,6 +184,10 @@ class Subscription : public SubscriptionBase
this->setup_intra_process(intra_process_subscription_id, ipm);
}

if (subscription_topic_statistics != nullptr) {
this->subscription_topic_statistics_ = std::move(subscription_topic_statistics);
}

TRACEPOINT(
rclcpp_subscription_init,
(const void *)get_subscription_handle().get(),
Expand Down Expand Up @@ -260,6 +268,13 @@ class Subscription : public SubscriptionBase
}
auto typed_message = std::static_pointer_cast<CallbackMessageT>(message);
any_callback_.dispatch(typed_message, message_info);

if (subscription_topic_statistics_) {
const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(
std::chrono::steady_clock::now());
const auto time = rclcpp::Time(nanos.time_since_epoch().count(), RCL_STEADY_TIME);
subscription_topic_statistics_->handle_message(*typed_message, time);
}
}

void
Expand Down Expand Up @@ -307,6 +322,8 @@ class Subscription : public SubscriptionBase
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> options_;
typename message_memory_strategy::MessageMemoryStrategy<CallbackMessageT, AllocatorT>::SharedPtr
message_memory_strategy_;
/// Component which computes and publishes topic statistics for this subscriber
SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics_{nullptr};
};

} // namespace rclcpp
Expand Down
11 changes: 8 additions & 3 deletions rclcpp/include/rclcpp/subscription_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "rclcpp/subscription_options.hpp"
#include "rclcpp/subscription_traits.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rclcpp/topic_statistics/subscription_topic_statistics.hpp"

namespace rclcpp
{
Expand Down Expand Up @@ -78,7 +79,10 @@ SubscriptionFactory
create_subscription_factory(
CallbackT && callback,
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options,
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat)
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat,
std::shared_ptr<rclcpp::topic_statistics::SubscriptionTopicStatistics<CallbackMessageT>>
subscription_topic_stats = nullptr
)
{
auto allocator = options.get_allocator();

Expand All @@ -88,7 +92,7 @@ create_subscription_factory(

SubscriptionFactory factory {
// factory function that creates a MessageT specific SubscriptionT
[options, msg_mem_strat, any_subscription_callback](
[options, msg_mem_strat, any_subscription_callback, subscription_topic_stats](
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic_name,
const rclcpp::QoS & qos
Expand All @@ -104,7 +108,8 @@ create_subscription_factory(
qos,
any_subscription_callback,
options,
msg_mem_strat);
msg_mem_strat,
subscription_topic_stats);
// This is used for setting up things like intra process comms which
// require this->shared_from_this() which cannot be called from
// the constructor.
Expand Down
Loading