diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 3918120e5a..c3dc4fed3f 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -253,6 +253,18 @@ if(BUILD_TESTING) "test_msgs" ) target_link_libraries(test_loaned_message ${PROJECT_NAME}) + ament_add_gtest(test_intra_process_communication test/test_intra_process_communication.cpp + TIMEOUT 120) + if(TARGET test_intra_process_communication) + ament_target_dependencies(test_intra_process_communication + "rcl" + "rcl_interfaces" + "rmw" + "rosidl_typesupport_cpp" + "test_msgs" + ) + endif() + target_link_libraries(test_intra_process_communication ${PROJECT_NAME}) ament_add_gtest(test_node test/test_node.cpp TIMEOUT 240) if(TARGET test_node) diff --git a/rclcpp/include/rclcpp/create_publisher.hpp b/rclcpp/include/rclcpp/create_publisher.hpp index 811c18b69f..9e72e2a5cb 100644 --- a/rclcpp/include/rclcpp/create_publisher.hpp +++ b/rclcpp/include/rclcpp/create_publisher.hpp @@ -29,6 +29,46 @@ namespace rclcpp { +/// Create and return a publisher of the given MessageT type. +/** + * The NodeT type only needs to have a method called get_node_topics_interface() + * which returns a shared_ptr to a NodeTopicsInterface. + */ +template< + typename MessageT = SerializedMessage, + typename AllocatorT = std::allocator, + typename PublisherT = rclcpp::Publisher, + typename NodeT> +std::shared_ptr +create_publisher( + NodeT & node, + const std::string & topic_name, + const rosidl_message_type_support_t & type_support, + const rclcpp::QoS & qos, + const rclcpp::PublisherOptionsWithAllocator & options = ( + rclcpp::PublisherOptionsWithAllocator() + ) +) +{ + // Extract the NodeTopicsInterface from the NodeT. + using rclcpp::node_interfaces::get_node_topics_interface; + auto node_topics = get_node_topics_interface(node); + + // Create the publisher. + auto pub = node_topics->create_publisher( + topic_name, + rclcpp::create_publisher_factory( + options, + type_support), + qos + ); + + // Add the publisher to the node topics interface. + node_topics->add_publisher(pub, options.callback_group); + + return std::dynamic_pointer_cast(pub); +} + /// Create and return a publisher of the given MessageT type. /** * The NodeT type only needs to have a method called get_node_topics_interface() diff --git a/rclcpp/include/rclcpp/create_subscription.hpp b/rclcpp/include/rclcpp/create_subscription.hpp index 9248254047..98f5ecd071 100644 --- a/rclcpp/include/rclcpp/create_subscription.hpp +++ b/rclcpp/include/rclcpp/create_subscription.hpp @@ -29,6 +29,52 @@ namespace rclcpp { +/// Create and return a subscription of the given MessageT type. +/** + * The NodeT type only needs to have a method called get_node_topics_interface() + * which returns a shared_ptr to a NodeTopicsInterface, or be a + * NodeTopicsInterface pointer itself. + */ +template< + typename CallbackT, + typename AllocatorT = std::allocator, + typename CallbackMessageT = + typename rclcpp::subscription_traits::has_message_type::type, + typename SubscriptionT = rclcpp::Subscription, + typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy< + CallbackMessageT, + AllocatorT + >, + typename NodeT> +typename std::shared_ptr +create_subscription( + NodeT && node, + const std::string & topic_name, + const rosidl_message_type_support_t & type_support, + const rclcpp::QoS & qos, + CallbackT && callback, + const rclcpp::SubscriptionOptionsWithAllocator & options = + rclcpp::SubscriptionOptionsWithAllocator(), + typename MessageMemoryStrategyT::SharedPtr msg_mem_strat = + MessageMemoryStrategyT::create_default() +) +{ + using rclcpp::node_interfaces::get_node_topics_interface; + auto node_topics = get_node_topics_interface(std::forward(node)); + + auto factory = rclcpp::create_subscription_factory( + std::forward(callback), + options, + msg_mem_strat, + type_support + ); + + auto sub = node_topics->create_subscription(topic_name, factory, qos); + node_topics->add_subscription(sub, options.callback_group); + + return std::dynamic_pointer_cast(sub); +} + /// Create and return a subscription of the given MessageT type. /** * The NodeT type only needs to have a method called get_node_topics_interface() diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index d671de701e..39a5d16aa1 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -37,8 +37,12 @@ #include "rclcpp/logging.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/publisher_base.hpp" +#include "rclcpp/serialization.hpp" +#include "rclcpp/serialized_message.hpp" #include "rclcpp/visibility_control.hpp" +#include "rcpputils/asserts.hpp" + namespace rclcpp { @@ -183,8 +187,9 @@ class IntraProcessManager std::unique_ptr message, std::shared_ptr::allocator_type> allocator) { - using MessageAllocTraits = allocator::AllocRebind; - using MessageAllocatorT = typename MessageAllocTraits::allocator_type; + constexpr bool is_serialized_publisher = + serialization_traits::is_serialized_message_class::value; + using Indicies = SplittedSubscriptionsIndicies; std::shared_lock lock(mutex_); @@ -198,40 +203,22 @@ class IntraProcessManager } const auto & sub_ids = publisher_it->second; - if (sub_ids.take_ownership_subscriptions.empty()) { - // None of the buffers require ownership, so we promote the pointer - std::shared_ptr msg = std::move(message); - - this->template add_shared_msg_to_buffers(msg, sub_ids.take_shared_subscriptions); - } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT - sub_ids.take_shared_subscriptions.size() <= 1) - { - // There is at maximum 1 buffer that does not require ownership. - // So we this case is equivalent to all the buffers requiring ownership - - // Merge the two vector of ids into a unique one - std::vector concatenated_vector(sub_ids.take_shared_subscriptions); - concatenated_vector.insert( - concatenated_vector.end(), - sub_ids.take_ownership_subscriptions.begin(), - sub_ids.take_ownership_subscriptions.end()); - - this->template add_owned_msg_to_buffers( - std::move(message), - concatenated_vector, - allocator); - } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT - sub_ids.take_shared_subscriptions.size() > 1) + // check if (de)serialization is needed + if (sub_ids.take_subscriptions[Indicies::ownership_other].size() + + sub_ids.take_subscriptions[Indicies::shared_other].size() > 0) { - // Construct a new shared pointer from the message - // for the buffers that do not require ownership - auto shared_msg = std::allocate_shared(*allocator, *message); - - this->template add_shared_msg_to_buffers( - shared_msg, sub_ids.take_shared_subscriptions); - this->template add_owned_msg_to_buffers( - std::move(message), sub_ids.take_ownership_subscriptions, allocator); + do_intra_process_publish_other_type( + message.get(), + sub_ids.take_subscriptions[Indicies::ownership_other], + sub_ids.take_subscriptions[Indicies::shared_other] + ); } + + do_intra_process_publish_same_type( + std::move(message), allocator, + sub_ids.take_subscriptions[Indicies::ownership_same], + sub_ids.take_subscriptions[Indicies::shared_same] + ); } template< @@ -244,8 +231,9 @@ class IntraProcessManager std::unique_ptr message, std::shared_ptr::allocator_type> allocator) { - using MessageAllocTraits = allocator::AllocRebind; - using MessageAllocatorT = typename MessageAllocTraits::allocator_type; + constexpr bool is_serialized_publisher = + serialization_traits::is_serialized_message_class::value; + using Indicies = SplittedSubscriptionsIndicies; std::shared_lock lock(mutex_); @@ -259,33 +247,22 @@ class IntraProcessManager } const auto & sub_ids = publisher_it->second; - if (sub_ids.take_ownership_subscriptions.empty()) { - // If there are no owning, just convert to shared. - std::shared_ptr shared_msg = std::move(message); - if (!sub_ids.take_shared_subscriptions.empty()) { - this->template add_shared_msg_to_buffers( - shared_msg, sub_ids.take_shared_subscriptions); - } - return shared_msg; - } else { - // Construct a new shared pointer from the message for the buffers that - // do not require ownership and to return. - auto shared_msg = std::allocate_shared(*allocator, *message); - - if (!sub_ids.take_shared_subscriptions.empty()) { - this->template add_shared_msg_to_buffers( - shared_msg, - sub_ids.take_shared_subscriptions); - } - if (!sub_ids.take_ownership_subscriptions.empty()) { - this->template add_owned_msg_to_buffers( - std::move(message), - sub_ids.take_ownership_subscriptions, - allocator); - } - - return shared_msg; + // check if (de)serialization is needed + if (sub_ids.take_subscriptions[Indicies::ownership_other].size() + + sub_ids.take_subscriptions[Indicies::shared_other].size() > 0) + { + do_intra_process_publish_other_type( + message.get(), + sub_ids.take_subscriptions[Indicies::ownership_other], + sub_ids.take_subscriptions[Indicies::shared_other] + ); } + + return do_intra_process_publish_and_return_shared_same_type( + std::move(message), allocator, + sub_ids.take_subscriptions[Indicies::ownership_same], + sub_ids.take_subscriptions[Indicies::shared_same] + ); } /// Return true if the given rmw_gid_t matches any stored Publishers. @@ -324,8 +301,39 @@ class IntraProcessManager struct SplittedSubscriptions { - std::vector take_shared_subscriptions; - std::vector take_ownership_subscriptions; + enum + { + IndexSharedTyped = 0, IndexSharedSerialized = 1, + IndexOwnershipTyped = 2, IndexOwnershipSerialized = 3, + IndexNum = 4 + }; + + /// get the index for "take_subscriptions" depending on shared/serialized + constexpr static auto + get_index(const bool is_shared, const bool is_serialized) + { + return (is_serialized ? IndexSharedTyped : IndexSharedSerialized) + + (is_shared ? IndexSharedTyped : IndexOwnershipTyped); + } + + std::vector take_subscriptions[IndexNum]; + }; + + template + struct SplittedSubscriptionsIndicies + { + constexpr static auto ownership_same = SplittedSubscriptions::get_index( + false, + is_serialized); + constexpr static auto shared_same = SplittedSubscriptions::get_index( + true, + is_serialized); + constexpr static auto ownership_other = SplittedSubscriptions::get_index( + false, + !is_serialized); + constexpr static auto shared_other = SplittedSubscriptions::get_index( + true, + !is_serialized); }; using SubscriptionMap = @@ -344,12 +352,162 @@ class IntraProcessManager RCLCPP_PUBLIC void - insert_sub_id_for_pub(uint64_t sub_id, uint64_t pub_id, bool use_take_shared_method); + insert_sub_id_for_pub( + uint64_t sub_id, uint64_t pub_id, bool use_take_shared_method, + bool is_serialized_subscriber); RCLCPP_PUBLIC bool can_communicate(PublisherInfo pub_info, SubscriptionInfo sub_info) const; + template< + typename MessageT, + typename Alloc = std::allocator, + typename Deleter = std::default_delete> + void + do_intra_process_publish_same_type( + std::unique_ptr message, + std::shared_ptr::allocator_type> allocator, + const std::vector & take_ownership_subscriptions, + const std::vector & take_shared_subscriptions) + { + // subsriber and publisher have same message types + using MessageAllocTraits = allocator::AllocRebind; + using MessageAllocatorT = typename MessageAllocTraits::allocator_type; + + if (take_ownership_subscriptions.empty()) { + // None of the buffers require ownership, so we promote the pointer + std::shared_ptr msg = std::move(message); + + this->template add_shared_msg_to_buffers(msg, take_shared_subscriptions); + } else if (!take_ownership_subscriptions.empty() && // NOLINT + take_shared_subscriptions.size() <= 1) + { + // There is at maximum 1 buffer that does not require ownership. + // So this case is equivalent to all the buffers requiring ownership + + // Merge the two vectors of ids into a unique one + std::vector concatenated_vector(take_shared_subscriptions); + concatenated_vector.insert( + concatenated_vector.end(), + take_ownership_subscriptions.begin(), + take_ownership_subscriptions.end()); + + this->template add_owned_msg_to_buffers( + std::move(message), + concatenated_vector, + allocator); + } else if (!take_ownership_subscriptions.empty() && // NOLINT + take_shared_subscriptions.size() > 1) + { + // Construct a new shared pointer from the message + // for the buffers that do not require ownership + auto shared_msg = std::allocate_shared(*allocator, *message); + + this->template add_shared_msg_to_buffers( + shared_msg, take_shared_subscriptions); + this->template add_owned_msg_to_buffers( + std::move(message), take_ownership_subscriptions, allocator); + } + } + + template + void + do_intra_process_publish_other_type( + const MessageT * unsupported_message, + const std::vector & take_ownership_subscriptions, + const std::vector & take_shared_subscriptions) + { + // subsriber and publisher have different message types + // get first subscription + const auto subscription_id = + take_ownership_subscriptions.empty() ? + take_shared_subscriptions.front() : + take_ownership_subscriptions.front(); + + auto subscription_it = subscriptions_.find(subscription_id); + if (subscription_it == subscriptions_.end()) { + throw std::runtime_error("subscription has unexpectedly gone out of scope"); + } + auto subscription_base = subscription_it->second.subscription; + + // convert published message to the supported type + auto message = convert_message(unsupported_message, subscription_base); + + if (take_ownership_subscriptions.empty()) { + // None of the buffers require ownership, so we promote the pointer + this->template add_shared_msg_to_buffers(std::move(message), take_shared_subscriptions); + } else if (!take_ownership_subscriptions.empty() && // NOLINT + take_shared_subscriptions.size() <= 1) + { + // There is at maximum 1 buffer that does not require ownership. + // So we this case is equivalent to all the buffers requiring ownership + + // Merge the two vector of ids into a unique one + std::vector concatenated_vector(take_shared_subscriptions); + concatenated_vector.insert( + concatenated_vector.end(), + take_ownership_subscriptions.begin(), + take_ownership_subscriptions.end()); + + add_owned_msg_to_buffers(std::move(message), concatenated_vector); + } else if (!take_ownership_subscriptions.empty() && // NOLINT + take_shared_subscriptions.size() > 1) + { + // Construct a new shared pointer from the message + // for the buffers that do not require ownership + auto shared_msg = subscription_base->create_shared_message(message.get()); + + this->template add_shared_msg_to_buffers( + shared_msg, take_shared_subscriptions); + add_owned_msg_to_buffers(std::move(message), take_ownership_subscriptions); + } + } + + template< + typename MessageT, + typename Alloc = std::allocator, + typename Deleter = std::default_delete> + std::shared_ptr + do_intra_process_publish_and_return_shared_same_type( + std::unique_ptr message, + std::shared_ptr::allocator_type> allocator, + const std::vector & take_ownership_subscriptions, + const std::vector & take_shared_subscriptions) + { + // subsriber and publisher have same message types + using MessageAllocTraits = allocator::AllocRebind; + using MessageAllocatorT = typename MessageAllocTraits::allocator_type; + + if (take_ownership_subscriptions.empty()) { + // If there are no owning, just convert to shared. + std::shared_ptr shared_msg = std::move(message); + if (!take_shared_subscriptions.empty()) { + this->template add_shared_msg_to_buffers( + shared_msg, take_shared_subscriptions); + } + return shared_msg; + } else { + // Construct a new shared pointer from the message for the buffers that + // do not require ownership and to return. + auto shared_msg = std::allocate_shared(*allocator, *message); + + if (!take_shared_subscriptions.empty()) { + this->template add_shared_msg_to_buffers( + shared_msg, + take_shared_subscriptions); + } + if (!take_ownership_subscriptions.empty()) { + this->template add_owned_msg_to_buffers( + std::move(message), + take_ownership_subscriptions, + allocator); + } + + return shared_msg; + } + } + template void add_shared_msg_to_buffers( @@ -363,11 +521,7 @@ class IntraProcessManager } auto subscription_base = subscription_it->second.subscription; - auto subscription = std::static_pointer_cast< - rclcpp::experimental::SubscriptionIntraProcess - >(subscription_base); - - subscription->provide_intra_process_message(message); + subscription_base->provide_intra_process_message(message); } } @@ -411,6 +565,84 @@ class IntraProcessManager } } + /// Method for unknown allocator using subscription for allocation + void + add_owned_msg_to_buffers( + std::shared_ptr message, + std::vector subscription_ids) + { + for (auto it = subscription_ids.begin(); it != subscription_ids.end(); it++) { + auto subscription_it = subscriptions_.find(*it); + if (subscription_it == subscriptions_.end()) { + throw std::runtime_error("subscription has unexpectedly gone out of scope"); + } + auto subscription_base = subscription_it->second.subscription; + + if (std::next(it) == subscription_ids.end()) { + // If this is the last subscription, give up ownership + subscription_base->provide_intra_process_message(std::move(message)); + } else { + // Copy the message since we have additional subscriptions to serve + std::shared_ptr copy_message = + subscription_base->create_shared_message(message.get()); + + subscription_base->provide_intra_process_message(std::move(copy_message)); + } + } + } + + /// Convert received message to message type of subscriber + /** + * Method to serialize a ros message to rclcpp::SerializedMessage. + * The publisher has a template type while the subscribers is serialized. + * + * \param message the ros message from publisher. + * \param subscription a subscriber used for creating the serialized message. + * \return a shared pointer (containing rclcpp::SerializedMessage) with deleter. + */ + template + std::shared_ptr convert_message( + const MessageT * message, + rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription) + { + // serialize + auto serialized_message = subscription->create_shared_message(); + rclcpp::Serialization serialization; + + rcpputils::check_true(nullptr != serialized_message, "Serialized message is nullpointer."); + + serialization.serialize_message( + message, + reinterpret_cast(serialized_message.get())); + + return serialized_message; + } + + /// Convert received message to message type of subscriber + /** + * Overloaded method for rclcpp::SerializedMessage for deserialization. + * The publisher is serialized while the subscribers have a templated message type. + * + * \param serialized_message the serialized message from publisher. + * \param subscription a subscriber used for creating ros message and serialization. + * \return a shared pointer (containing a ros message) with deleter. + */ + std::shared_ptr convert_message( + const SerializedMessage * serialized_message, + rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription) + { + // deserialize + auto converted_message = subscription->create_shared_message(); + auto serialization = subscription->get_serialization(); + + rcpputils::check_true(nullptr != converted_message, "Converted message is nullpointer."); + rcpputils::check_true(nullptr != serialization, "Serialization is nullpointer."); + + serialization->deserialize_message(serialized_message, converted_message.get()); + + return converted_message; + } + PublisherToSubscriptionIdsMap pub_to_subs_; SubscriptionMap subscriptions_; PublisherMap publishers_; diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index 618db3cac1..4ce8e48da5 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -28,6 +28,8 @@ #include "rclcpp/experimental/buffers/intra_process_buffer.hpp" #include "rclcpp/experimental/create_intra_process_buffer.hpp" #include "rclcpp/experimental/subscription_intra_process_base.hpp" +#include "rclcpp/serialization.hpp" +#include "rclcpp/serialized_message.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/waitable.hpp" #include "tracetools/tracetools.h" @@ -72,6 +74,12 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase throw std::runtime_error("SubscriptionIntraProcess wrong callback type"); } + if (!allocator) { + message_allocator_ = std::make_shared(); + } else { + message_allocator_ = std::make_shared(*allocator.get()); + } + // Create the intra-process buffer. buffer_ = rclcpp::experimental::create_intra_process_buffer( buffer_type, @@ -102,6 +110,12 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase #endif } + bool + is_serialized() const + { + return serialization_traits::is_serialized_message_class::value; + } + bool is_ready(rcl_wait_set_t * wait_set) { @@ -115,10 +129,9 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase } void - provide_intra_process_message(ConstMessageSharedPtr message) + provide_intra_process_message(std::shared_ptr message) { - buffer_->add_shared(std::move(message)); - trigger_guard_condition(); + provide_intra_process_message_impl(std::static_pointer_cast(message)); } void @@ -134,6 +147,24 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase return buffer_->use_take_shared_method(); } + std::shared_ptr + create_shared_message(const void * message_to_copy) + { + if (nullptr != message_to_copy) { + return std::allocate_shared( + *message_allocator_, + *reinterpret_cast(message_to_copy)); + } + + return std::allocate_shared(*message_allocator_); + } + + std::unique_ptr + get_serialization() + { + return std::make_unique>(); + } + private: void trigger_guard_condition() @@ -142,6 +173,13 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase (void)ret; } + void + provide_intra_process_message_impl(ConstMessageSharedPtr message) + { + buffer_->add_shared(std::move(message)); + trigger_guard_condition(); + } + template typename std::enable_if::value, void>::type execute_impl() @@ -168,6 +206,7 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase AnySubscriptionCallback any_callback_; BufferUniquePtr buffer_; + std::shared_ptr message_allocator_; }; } // namespace experimental diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp index 7afd68abef..a7fae5bac2 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp @@ -25,6 +25,8 @@ #include "rcl/error_handling.h" +#include "rclcpp/serialization.hpp" +#include "rclcpp/serialized_message.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/waitable.hpp" @@ -70,6 +72,22 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable rmw_qos_profile_t get_actual_qos() const; + RCLCPP_PUBLIC + virtual bool + is_serialized() const = 0; + + RCLCPP_PUBLIC + virtual void + provide_intra_process_message(std::shared_ptr message) = 0; + + RCLCPP_PUBLIC + virtual std::shared_ptr + create_shared_message(const void * message_to_copy = nullptr) = 0; + + RCLCPP_PUBLIC + virtual std::unique_ptr + get_serialization() = 0; + protected: std::recursive_mutex reentrant_mutex_; rcl_guard_condition_t gc_; diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index aa614e3a0c..4b5cdd121d 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -63,11 +63,12 @@ class Publisher : public PublisherBase rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic, const rclcpp::QoS & qos, - const rclcpp::PublisherOptionsWithAllocator & options) + const rclcpp::PublisherOptionsWithAllocator & options, + const rosidl_message_type_support_t & type_support) : PublisherBase( node_base, topic, - *rosidl_typesupport_cpp::get_message_type_support_handle(), + type_support, options.template to_rcl_publisher_options(qos)), options_(options), message_allocator_(new MessageAllocator(*options.get_allocator().get())) @@ -103,6 +104,19 @@ class Publisher : public PublisherBase // Setup continues in the post construction method, post_init_setup(). } + Publisher( + rclcpp::node_interfaces::NodeBaseInterface * node_base, + const std::string & topic, + const rclcpp::QoS & qos, + const rclcpp::PublisherOptionsWithAllocator & options) + : Publisher( + node_base, + topic, + qos, + options, + *rosidl_typesupport_cpp::get_message_type_support_handle()) + {} + /// Called post construction, so that construction may continue after shared_from_this() works. virtual void @@ -259,8 +273,9 @@ class Publisher : public PublisherBase } protected: - void - do_inter_process_publish(const MessageT & msg) + template + typename std::enable_if::value, void>::type + do_inter_process_publish(const T & msg) { auto status = rcl_publish(&publisher_handle_, &msg, nullptr); @@ -279,6 +294,14 @@ class Publisher : public PublisherBase } } + template + typename std::enable_if::value, void>::type + do_inter_process_publish(const T & msg) + { + // for serialized messages the serialized method is needed + do_serialized_publish(&msg.get_rcl_serialized_message()); + } + void do_serialized_publish(const rcl_serialized_message_t * serialized_msg) { diff --git a/rclcpp/include/rclcpp/publisher_factory.hpp b/rclcpp/include/rclcpp/publisher_factory.hpp index 87def3cc17..f9ca967e4d 100644 --- a/rclcpp/include/rclcpp/publisher_factory.hpp +++ b/rclcpp/include/rclcpp/publisher_factory.hpp @@ -60,6 +60,35 @@ struct PublisherFactory const PublisherFactoryFunction create_typed_publisher; }; +/// Return a PublisherFactory with functions setup for creating a PublisherT. +template +PublisherFactory +create_publisher_factory( + const rclcpp::PublisherOptionsWithAllocator & options, + const rosidl_message_type_support_t & type_support) +{ + PublisherFactory factory { + // factory function that creates a specific PublisherT + [options, type_support]( + rclcpp::node_interfaces::NodeBaseInterface * node_base, + const std::string & topic_name, + const rclcpp::QoS & qos + ) -> std::shared_ptr + { + auto publisher = std::make_shared( + node_base, topic_name, qos, options, type_support); + // This is used for setting up things like intra process comms which + // require this->shared_from_this() which cannot be called from + // the constructor. + publisher->post_init_setup(node_base, topic_name, qos, options); + return publisher; + } + }; + + // return the factory now that it is populated + return factory; +} + /// Return a PublisherFactory with functions setup for creating a PublisherT. template PublisherFactory diff --git a/rclcpp/include/rclcpp/serialization.hpp b/rclcpp/include/rclcpp/serialization.hpp index 53fe59dc8b..bc9063a9b8 100644 --- a/rclcpp/include/rclcpp/serialization.hpp +++ b/rclcpp/include/rclcpp/serialization.hpp @@ -48,6 +48,32 @@ struct is_serialized_message_class: std::true_type {}; } // namespace serialization_traits +namespace serialization +{ +template +inline const rosidl_message_type_support_t * +get_type_support_handle_impl() +{ + return rosidl_typesupport_cpp::get_message_type_support_handle(); +} + +// no message type support for rclcpp::SerializedMessage +template<> +inline const rosidl_message_type_support_t * +get_type_support_handle_impl() +{ + return nullptr; +} + +// no message type support for rcl_serialized_message_t +template<> +inline const rosidl_message_type_support_t * +get_type_support_handle_impl() +{ + return nullptr; +} +} // namespace serialization + /// Interface to (de)serialize a message class RCLCPP_PUBLIC_TYPE SerializationBase { @@ -78,6 +104,13 @@ class RCLCPP_PUBLIC_TYPE SerializationBase void deserialize_message( const SerializedMessage * serialized_message, void * ros_message) const; + /// Get the message type support for the serialized message + /** + * \return The message type support. + */ + virtual const rosidl_message_type_support_t * + get_type_support_handle() const = 0; + private: const rosidl_message_type_support_t * type_support_; }; @@ -89,11 +122,13 @@ class Serialization : public SerializationBase public: /// Constructor of Serialization Serialization() - : SerializationBase(rosidl_typesupport_cpp::get_message_type_support_handle()) + : SerializationBase(get_type_support_handle()) + {} + + const rosidl_message_type_support_t * + get_type_support_handle() const override { - static_assert( - !serialization_traits::is_serialized_message_class::value, - "Serialization of serialized message to serialized message is not possible."); + return serialization::get_type_support_handle_impl(); } }; diff --git a/rclcpp/include/rclcpp/subscription_factory.hpp b/rclcpp/include/rclcpp/subscription_factory.hpp index a0f265c803..a8663286b1 100644 --- a/rclcpp/include/rclcpp/subscription_factory.hpp +++ b/rclcpp/include/rclcpp/subscription_factory.hpp @@ -117,6 +117,61 @@ create_subscription_factory( // return the factory now that it is populated return factory; } +/// Return a SubscriptionFactory setup to create a SubscriptionT. +template< + typename CallbackT, + typename AllocatorT, + typename CallbackMessageT = + typename rclcpp::subscription_traits::has_message_type::type, + typename SubscriptionT = rclcpp::Subscription, + typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy< + CallbackMessageT, + AllocatorT + >> +SubscriptionFactory +create_subscription_factory( + CallbackT && callback, + const rclcpp::SubscriptionOptionsWithAllocator & options, + typename MessageMemoryStrategyT::SharedPtr msg_mem_strat, + const rosidl_message_type_support_t & type_support) +{ + auto allocator = options.get_allocator(); + + using rclcpp::AnySubscriptionCallback; + AnySubscriptionCallback any_subscription_callback(allocator); + any_subscription_callback.set(std::forward(callback)); + + SubscriptionFactory factory { + // factory function that creates a MessageT specific SubscriptionT + [options, msg_mem_strat, any_subscription_callback, type_support]( + rclcpp::node_interfaces::NodeBaseInterface * node_base, + const std::string & topic_name, + const rclcpp::QoS & qos + ) -> rclcpp::SubscriptionBase::SharedPtr + { + using rclcpp::Subscription; + using rclcpp::SubscriptionBase; + + auto sub = Subscription::make_shared( + node_base, + type_support, + topic_name, + qos, + any_subscription_callback, + options, + msg_mem_strat); + // This is used for setting up things like intra process comms which + // require this->shared_from_this() which cannot be called from + // the constructor. + sub->post_init_setup(node_base, qos, options); + auto sub_base_ptr = std::dynamic_pointer_cast(sub); + return sub_base_ptr; + } + }; + + // return the factory now that it is populated + return factory; +} } // namespace rclcpp diff --git a/rclcpp/src/rclcpp/intra_process_manager.cpp b/rclcpp/src/rclcpp/intra_process_manager.cpp index 0b9c9d6670..876badc069 100644 --- a/rclcpp/src/rclcpp/intra_process_manager.cpp +++ b/rclcpp/src/rclcpp/intra_process_manager.cpp @@ -48,7 +48,9 @@ IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher) // create an entry for the publisher id and populate with already existing subscriptions for (auto & pair : subscriptions_) { if (can_communicate(publishers_[id], pair.second)) { - insert_sub_id_for_pub(pair.first, id, pair.second.use_take_shared_method); + insert_sub_id_for_pub( + pair.first, id, pair.second.use_take_shared_method, + pair.second.subscription->is_serialized()); } } @@ -70,7 +72,9 @@ IntraProcessManager::add_subscription(SubscriptionIntraProcessBase::SharedPtr su // adds the subscription id to all the matchable publishers for (auto & pair : publishers_) { if (can_communicate(pair.second, subscriptions_[id])) { - insert_sub_id_for_pub(id, pair.first, subscriptions_[id].use_take_shared_method); + insert_sub_id_for_pub( + id, pair.first, subscriptions_[id].use_take_shared_method, + subscription->is_serialized()); } } @@ -85,19 +89,14 @@ IntraProcessManager::remove_subscription(uint64_t intra_process_subscription_id) subscriptions_.erase(intra_process_subscription_id); for (auto & pair : pub_to_subs_) { - pair.second.take_shared_subscriptions.erase( - std::remove( - pair.second.take_shared_subscriptions.begin(), - pair.second.take_shared_subscriptions.end(), - intra_process_subscription_id), - pair.second.take_shared_subscriptions.end()); - - pair.second.take_ownership_subscriptions.erase( - std::remove( - pair.second.take_ownership_subscriptions.begin(), - pair.second.take_ownership_subscriptions.end(), - intra_process_subscription_id), - pair.second.take_ownership_subscriptions.end()); + for (auto i = 0u; i < SplittedSubscriptions::IndexNum; ++i) { + pair.second.take_subscriptions[i].erase( + std::remove( + pair.second.take_subscriptions[i].begin(), + pair.second.take_subscriptions[i].end(), + intra_process_subscription_id), + pair.second.take_subscriptions[i].end()); + } } } @@ -141,9 +140,10 @@ IntraProcessManager::get_subscription_count(uint64_t intra_process_publisher_id) return 0; } - auto count = - publisher_it->second.take_shared_subscriptions.size() + - publisher_it->second.take_ownership_subscriptions.size(); + auto count = 0u; + for (auto i = 0u; i < SplittedSubscriptions::IndexNum; ++i) { + count += publisher_it->second.take_subscriptions[i].size(); + } return count; } @@ -187,13 +187,12 @@ void IntraProcessManager::insert_sub_id_for_pub( uint64_t sub_id, uint64_t pub_id, - bool use_take_shared_method) + bool use_take_shared_method, + bool is_serialized_subscriber) { - if (use_take_shared_method) { - pub_to_subs_[pub_id].take_shared_subscriptions.push_back(sub_id); - } else { - pub_to_subs_[pub_id].take_ownership_subscriptions.push_back(sub_id); - } + pub_to_subs_[pub_id].take_subscriptions[SplittedSubscriptions::get_index( + use_take_shared_method, + is_serialized_subscriber)].push_back(sub_id); } bool diff --git a/rclcpp/test/test_intra_process_communication.cpp b/rclcpp/test/test_intra_process_communication.cpp new file mode 100644 index 0000000000..f90fa805a5 --- /dev/null +++ b/rclcpp/test/test_intra_process_communication.cpp @@ -0,0 +1,367 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +#include +#include +#include +#include +#include + +#include "rclcpp/exceptions.hpp" +#include "rclcpp/publisher.hpp" +#include "rclcpp/rclcpp.hpp" + +#include "rclcpp/serialized_message.hpp" + +int32_t & get_test_allocation_counter() +{ + static int32_t counter = 0; + return counter; +} + +void * custom_allocate(size_t size, void * state) +{ + static auto m_allocator = rcutils_get_default_allocator(); + + ++get_test_allocation_counter(); + auto r = m_allocator.allocate(size, state); + return r; +} + +void * custom_zero_allocate(size_t number_of_elements, size_t size_of_element, void * state) +{ + static auto m_allocator = rcutils_get_default_allocator(); + + ++get_test_allocation_counter(); + + auto r = m_allocator.zero_allocate(number_of_elements, size_of_element, state); + return r; +} + +void * custom_reallocate(void * pointer, size_t size, void * state) +{ + static auto m_allocator = rcutils_get_default_allocator(); + + if (pointer == nullptr) { + ++get_test_allocation_counter(); + } + + auto r = m_allocator.reallocate(pointer, size, state); + return r; +} + +void custom_deallocate(void * pointer, void * state) +{ + static auto m_allocator = rcutils_get_default_allocator(); + + --get_test_allocation_counter(); + m_allocator.deallocate(pointer, state); +} + +rcl_serialized_message_t make_serialized_string_msg( + const std::shared_ptr & stringMsg) +{ + auto m_allocator = rcutils_get_default_allocator(); + + // add custom (de)allocator to count the references to the object + m_allocator.allocate = &custom_allocate; + m_allocator.deallocate = &custom_deallocate; + m_allocator.reallocate = &custom_reallocate; + m_allocator.zero_allocate = &custom_zero_allocate; + + rcl_serialized_message_t msg = rmw_get_zero_initialized_serialized_message(); + auto ret = rmw_serialized_message_init(&msg, 0, &m_allocator); + if (ret != RCL_RET_OK) { + rclcpp::exceptions::throw_from_rcl_error(ret); + } + + static auto type = + rosidl_typesupport_cpp::get_message_type_support_handle + (); + auto error = rmw_serialize(stringMsg.get(), type, &msg); + if (error != RMW_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + "test_intra_process_communication", + "Something went wrong preparing the serialized message"); + } + + return msg; +} + +/** + * Parameterized test. + * The first param are the NodeOptions used to create the nodes. + * The second param are the expected intraprocess count results. + */ +struct TestParameters +{ + rclcpp::NodeOptions node_options[2]; + uint64_t intraprocess_count_results[2]; + size_t runs; + std::string description; +}; + +std::ostream & operator<<(std::ostream & out, const TestParameters & params) +{ + out << params.description; + return out; +} + +class TestPublisherSubscriptionSerialized : public ::testing::TestWithParam +{ +public: + static void SetUpTestCase() + { + if (!rclcpp::ok()) { + rclcpp::init(0, nullptr); + } + } + +protected: + static std::chrono::milliseconds offset; +}; + +std::chrono::milliseconds TestPublisherSubscriptionSerialized::offset = std::chrono::milliseconds( + 250); +std::array counts; + +void OnMessageSerialized(const std::shared_ptr msg) +{ + EXPECT_NE(msg->get_rcl_serialized_message().buffer, nullptr); + EXPECT_GT(msg->get_rcl_serialized_message().buffer_capacity, 0u); + + ++counts[0]; +} + +void OnMessageConst(std::shared_ptr msg) +{ + EXPECT_EQ(msg->string_value.back(), '9'); + + ++counts[1]; +} + +void OnMessageUniquePtr(std::unique_ptr msg) +{ + EXPECT_EQ(msg->string_value.back(), '9'); + + ++counts[1]; +} + +void OnMessage(std::shared_ptr msg) +{ + EXPECT_EQ(msg->string_value.back(), '9'); + + ++counts[1]; +} + +TEST_P(TestPublisherSubscriptionSerialized, publish_serialized) +{ + get_test_allocation_counter() = 0; + + TestParameters parameters = GetParam(); + { + rclcpp::Node::SharedPtr node = std::make_shared( + "my_node", + "/ns", + parameters.node_options[0]); + auto publisher = node->create_publisher("/topic", 10); + + auto sub_shared = node->create_subscription( + "/topic", 10, + &OnMessage); + auto sub_unique = node->create_subscription( + "/topic", 10, + &OnMessageUniquePtr); + auto sub_const_shared = node->create_subscription( + "/topic", 10, + &OnMessageConst); + auto sub_serialized = node->create_subscription( + "/topic", 10, + &OnMessageSerialized); + + rclcpp::sleep_for(offset); + + counts.fill(0); + auto stringMsg = get_messages_strings()[3]; + + for (size_t i = 0; i < parameters.runs; i++) { + std::unique_ptr stringMsgU( + new test_msgs::msg::Strings( + *stringMsg)); + + publisher->publish(*stringMsg); + publisher->publish(std::move(stringMsgU)); + } + for (uint32_t i = 0; i < 3; ++i) { + rclcpp::spin_some(node); + rclcpp::sleep_for(offset); + } + + rclcpp::spin_some(node); + } + + if (parameters.runs == 1) { + EXPECT_EQ(counts[0], 2u); + EXPECT_EQ(counts[1], 6u); + } + + EXPECT_EQ(get_test_allocation_counter(), 0); +} + +TEST_P(TestPublisherSubscriptionSerialized, publish_serialized_generic) +{ + get_test_allocation_counter() = 0; + + TestParameters parameters = GetParam(); + { + rclcpp::Node::SharedPtr node = std::make_shared( + "my_node", + "/ns", + parameters.node_options[0]); + // create a generic publisher + auto publisher = rclcpp::create_publisher( + node, + "/topic", + *rosidl_typesupport_cpp::get_message_type_support_handle(), + rclcpp::QoS(10)); + + // create a generic subscriber + auto sub_gen_serialized = rclcpp::create_subscription( + node, + "/topic", + *rosidl_typesupport_cpp::get_message_type_support_handle(), + rclcpp::QoS(10), + &OnMessageSerialized); + + auto sub_shared = node->create_subscription( + "/topic", 10, + &OnMessage); + auto sub_unique = node->create_subscription( + "/topic", 10, + &OnMessageUniquePtr); + auto sub_const_shared = node->create_subscription( + "/topic", 10, + &OnMessageConst); + auto sub_serialized = node->create_subscription( + "/topic", 10, + &OnMessageSerialized); + + rclcpp::sleep_for(offset); + + counts.fill(0); + auto stringMsg = get_messages_strings()[3]; + + for (size_t i = 0; i < parameters.runs; i++) { + auto msg0 = make_serialized_string_msg(stringMsg); + + publisher->publish(std::make_unique(std::move(msg0))); + } + rclcpp::spin_some(node); + rclcpp::sleep_for(offset); + + rclcpp::spin_some(node); + } + + if (parameters.runs == 1) { + EXPECT_EQ(counts[0], 2u); + EXPECT_EQ(counts[1], 3u); + } + + EXPECT_EQ(get_test_allocation_counter(), 0); +} + +auto get_new_context() +{ + auto context = rclcpp::Context::make_shared(); + context->init(0, nullptr); + return context; +} + +std::vector parameters = { + /* + Testing publisher subscription count api and internal process subscription count. + Two subscriptions in the same topic, both using intraprocess comm. + */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(true), + rclcpp::NodeOptions().use_intra_process_comms(true) + }, + {1u, 2u}, + 1, + "two_subscriptions_intraprocess_comm" + }, + /* + Testing publisher subscription count api and internal process subscription count. + Two subscriptions, one using intra-process comm and the other not using it. + */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(true), + rclcpp::NodeOptions().use_intra_process_comms(false) + }, + {1u, 1u}, + 1, + "two_subscriptions_one_intraprocess_one_not" + }, + /* + Testing publisher subscription count api and internal process subscription count. + Two contexts, both using intra-process. + */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(true), + rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(true) + }, + {1u, 1u}, + 1, + "two_subscriptions_in_two_contexts_with_intraprocess_comm" + }, + /* + Testing publisher subscription count api and internal process subscription count. + Two contexts, both of them not using intra-process comm. + */ + { + { + rclcpp::NodeOptions().use_intra_process_comms(false), + rclcpp::NodeOptions().context(get_new_context()).use_intra_process_comms(false) + }, + {0u, 0u}, + 1, + "two_subscriptions_in_two_contexts_without_intraprocess_comm" + } +}; + +std::vector setRuns(const std::vector & in, const size_t runs) +{ + std::vector out = in; + for (auto & p : out) { + p.runs = runs; + } + return out; +} + +INSTANTIATE_TEST_CASE_P( + TestWithDifferentNodeOptions, TestPublisherSubscriptionSerialized, + ::testing::ValuesIn(parameters), + ::testing::PrintToStringParamName()); + +INSTANTIATE_TEST_CASE_P( + TestWithDifferentNodeOptions1000Runs, TestPublisherSubscriptionSerialized, + ::testing::ValuesIn(setRuns(parameters, 1000)), + ::testing::PrintToStringParamName()); diff --git a/rclcpp/test/test_intra_process_manager.cpp b/rclcpp/test/test_intra_process_manager.cpp index b71fb1d061..369df01b7f 100644 --- a/rclcpp/test/test_intra_process_manager.cpp +++ b/rclcpp/test/test_intra_process_manager.cpp @@ -23,6 +23,8 @@ #define RCLCPP_BUILDING_LIBRARY 1 #include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/macros.hpp" +#include "rclcpp/serialization.hpp" +#include "rclcpp/serialized_message.hpp" #include "rclcpp/qos.hpp" #include "rmw/types.h" #include "rmw/qos_profiles.h" @@ -212,6 +214,21 @@ class SubscriptionIntraProcessBase return topic_name; } + bool + is_serialized() const + { + return false; + } + + virtual void + provide_intra_process_message(std::shared_ptr message) = 0; + + virtual std::shared_ptr + create_shared_message(const void * message_to_copy = nullptr) = 0; + + virtual std::unique_ptr + get_serialization() = 0; + rmw_qos_profile_t qos_profile; const char * topic_name; }; @@ -229,9 +246,9 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase } void - provide_intra_process_message(std::shared_ptr msg) + provide_intra_process_message(std::shared_ptr message) { - buffer->add(msg); + buffer->add(std::static_pointer_cast(message)); } void @@ -240,6 +257,21 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase buffer->add(std::move(msg)); } + virtual std::shared_ptr + create_shared_message(const void * message_to_copy = nullptr) + { + if (nullptr != message_to_copy) { + return std::make_shared(*reinterpret_cast(message_to_copy)); + } + return std::make_shared(); + } + + virtual std::unique_ptr + get_serialization() + { + return std::make_unique>(); + } + std::uintptr_t pop() {