diff --git a/plugins/DataStreamROS2/datastream_ros2.cpp b/plugins/DataStreamROS2/datastream_ros2.cpp index 8c545f7c..331d94b2 100644 --- a/plugins/DataStreamROS2/datastream_ros2.cpp +++ b/plugins/DataStreamROS2/datastream_ros2.cpp @@ -6,7 +6,7 @@ #include #include #include -#include "ros2_parsers/generic_subscription.hpp" +#include "rclcpp/generic_subscription.hpp" DataStreamROS2::DataStreamROS2() : DataStreamer(), @@ -198,9 +198,11 @@ void DataStreamROS2::subscribeToTopic(const std::string& topic_name, const std:: // double subscription, latching or not for (bool transient : { true, false }) { - auto subscription = std::make_shared(_node->get_node_base_interface().get(), - *_parser->typeSupport(topic_name), - topic_name, transient, bound_callback); + auto subscription = _node->create_generic_subscription(topic_name, + topic_type, + transient, + bound_callback); + _subscriptions[topic_name + (transient ? "/transient_" : "")] = subscription; _node->get_node_topics_interface()->add_subscription(subscription, nullptr); } diff --git a/plugins/DataStreamROS2/datastream_ros2.h b/plugins/DataStreamROS2/datastream_ros2.h index a4c6ec26..6017f906 100644 --- a/plugins/DataStreamROS2/datastream_ros2.h +++ b/plugins/DataStreamROS2/datastream_ros2.h @@ -8,7 +8,7 @@ #include "dialog_select_ros_topics.h" #include "ros2_parsers/ros2_parser.h" -#include "ros2_parsers/generic_subscription.hpp" +#include "rclcpp/generic_subscription.hpp" class DataStreamROS2 : public PJ::DataStreamer { @@ -64,7 +64,7 @@ class DataStreamROS2 : public PJ::DataStreamer void loadDefaultSettings(); - std::unordered_map _subscriptions; + std::unordered_map _subscriptions; void subscribeToTopic(const std::string& topic_name, const std::string& topic_type); void waitOneSecond(); diff --git a/plugins/ros2_parsers/generic_subscription.hpp b/plugins/ros2_parsers/generic_subscription.hpp deleted file mode 100644 index b3470f9f..00000000 --- a/plugins/ros2_parsers/generic_subscription.hpp +++ /dev/null @@ -1,160 +0,0 @@ -// Copyright 2018, Bosch Software Innovations GmbH. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef ROSBAG2_TRANSPORT__GENERIC_SUBSCRIPTION_HPP_ -#define ROSBAG2_TRANSPORT__GENERIC_SUBSCRIPTION_HPP_ - -#include -#include - -#include "rclcpp/any_subscription_callback.hpp" -#include "rclcpp/macros.hpp" -#include "rclcpp/subscription.hpp" -#include "rclcpp/any_subscription_callback.hpp" -#include "rclcpp/subscription.hpp" -#include "rosbag2_cpp/logging.hpp" - -namespace rosbag2_transport -{ - -/** - * This class is an implementation of an rclcpp::Subscription for serialized messages whose topic - * is not known at compile time (hence templating does not work). - * - * It does not support intra-process handling - */ -class GenericSubscription : public rclcpp::SubscriptionBase -{ - public: - RCLCPP_SMART_PTR_DEFINITIONS(GenericSubscription) - - using Ptr = std::shared_ptr; - - /** - * Constructor. In order to properly subscribe to a topic, this subscription needs to be added to - * the node_topic_interface of the node passed into this constructor. - * - * \param node_base NodeBaseInterface pointer used in parts of the setup. - * \param ts Type support handle - * \param topic_name Topic name - * \param transient if true, subscribe with transient_local - * \param callback Callback for new messages of serialized form - */ - GenericSubscription( - rclcpp::node_interfaces::NodeBaseInterface * node_base, - const rosidl_message_type_support_t & ts, - const std::string & topic_name, - bool transient, - std::function)> callback); - - // Same as create_serialized_message() as the subscription is to serialized_messages only - std::shared_ptr create_message() override; - - std::shared_ptr create_serialized_message() override; - - void handle_message( std::shared_ptr & message, const rclcpp::MessageInfo & message_info) override; - - void handle_loaned_message( - void * loaned_message, const rclcpp::MessageInfo & message_info) override; - - // Same as return_serialized_message() as the subscription is to serialized_messages only - void return_message(std::shared_ptr & message) override; - - void return_serialized_message(std::shared_ptr & message) override; - - private: - RCLCPP_DISABLE_COPY(GenericSubscription) - - std::shared_ptr borrow_serialized_message(size_t capacity); - rcutils_allocator_t default_allocator_; - std::function)> callback_; -}; - -//---------- implementation ----------- - -inline rcl_subscription_options_t PermissiveOptions() -{ - rcl_subscription_options_t options = rcl_subscription_get_default_options(); - options.qos.reliability = RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT; - options.qos.durability = RMW_QOS_POLICY_DURABILITY_VOLATILE; - options.qos.liveliness = RMW_QOS_POLICY_LIVELINESS_AUTOMATIC; - return options; -} - -inline rcl_subscription_options_t LatchingOptions() -{ - rcl_subscription_options_t options = rcl_subscription_get_default_options(); - options.qos.reliability = RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT; - options.qos.durability = RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL; - options.qos.liveliness = RMW_QOS_POLICY_LIVELINESS_AUTOMATIC; - return options; -} - - -inline -GenericSubscription::GenericSubscription( - rclcpp::node_interfaces::NodeBaseInterface * node_base, - const rosidl_message_type_support_t & ts, - const std::string & topic_name, - bool transient, - std::function)> callback) -: SubscriptionBase( - node_base, - ts, - topic_name, - transient ? LatchingOptions() : PermissiveOptions(), - true), - default_allocator_(rcutils_get_default_allocator()), - callback_(callback) -{} - -inline std::shared_ptr GenericSubscription::create_message() -{ - return create_serialized_message(); -} - -inline std::shared_ptr GenericSubscription::create_serialized_message() -{ - return borrow_serialized_message(0); -} - -inline void GenericSubscription::handle_message( - std::shared_ptr & message, const rclcpp::MessageInfo & message_info) -{ - (void) message_info; - auto typed_message = std::static_pointer_cast(message); - callback_(typed_message); -} - -inline void GenericSubscription::return_message(std::shared_ptr & message) -{ - auto typed_message = std::static_pointer_cast(message); - return_serialized_message(typed_message); -} - -inline void GenericSubscription::return_serialized_message( - std::shared_ptr & message) -{ - message.reset(); -} - -inline std::shared_ptr -GenericSubscription::borrow_serialized_message(size_t capacity) -{ - return std::make_shared(capacity); -} - -} // namespace rosbag2_transport - -#endif // ROSBAG2_TRANSPORT__GENERIC_SUBSCRIPTION_HPP_