From fada8a06effc8f0b1cdc6e21232bddd81548d905 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Tue, 21 Apr 2020 14:56:30 +0200 Subject: [PATCH] added support for SerializedMessage in Intraprocessmanger, so: * serialized publisher uses correct method to forward content to subscriber * non-serialized publisher can serialize message for serialized subscriber Signed-off-by: Joshua Hampp --- .../experimental/intra_process_manager.hpp | 97 +++++++++++++++---- 1 file changed, 78 insertions(+), 19 deletions(-) diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index d671de701e..d420248f93 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -37,6 +37,8 @@ #include "rclcpp/logging.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/publisher_base.hpp" +#include "rclcpp/serialization.hpp" +#include "rclcpp/serialized_message.hpp" #include "rclcpp/visibility_control.hpp" namespace rclcpp @@ -356,6 +358,8 @@ class IntraProcessManager std::shared_ptr message, std::vector subscription_ids) { + constexpr bool is_serialized_publisher = rclcpp::is_serialized_message_class::value; + for (auto id : subscription_ids) { auto subscription_it = subscriptions_.find(id); if (subscription_it == subscriptions_.end()) { @@ -363,11 +367,28 @@ class IntraProcessManager } auto subscription_base = subscription_it->second.subscription; - auto subscription = std::static_pointer_cast< - rclcpp::experimental::SubscriptionIntraProcess - >(subscription_base); + // same message type, so it can just be passed through + if (subscription_base->is_serialized() == is_serialized_publisher) { + auto subscription = std::static_pointer_cast< + rclcpp::experimental::SubscriptionIntraProcess + >(subscription_base); + + subscription->provide_intra_process_message(message); + } else if (is_serialized_publisher) { + // publisher provides a serialized message, while subscriber expects a ROS message + provide_serialized_intra_process_message(subscription_base, *message); + } else { + // publisher provides a ROS message, while subscriber expects a serialized message + rclcpp::Serialization serialization; + auto subscription = std::static_pointer_cast< + rclcpp::experimental::SubscriptionIntraProcess + >(subscription_base); + + auto serialized_message = std::make_unique(); + serialization.serialize_message(*message, *serialized_message); - subscription->provide_intra_process_message(message); + subscription->provide_intra_process_message(std::move(serialized_message)); + } } } @@ -384,6 +405,8 @@ class IntraProcessManager using MessageAllocTraits = allocator::AllocRebind; using MessageUniquePtr = std::unique_ptr; + constexpr bool is_serialized_publisher = rclcpp::is_serialized_message_class::value; + for (auto it = subscription_ids.begin(); it != subscription_ids.end(); it++) { auto subscription_it = subscriptions_.find(*it); if (subscription_it == subscriptions_.end()) { @@ -391,26 +414,62 @@ class IntraProcessManager } auto subscription_base = subscription_it->second.subscription; - auto subscription = std::static_pointer_cast< - rclcpp::experimental::SubscriptionIntraProcess - >(subscription_base); - - if (std::next(it) == subscription_ids.end()) { - // If this is the last subscription, give up ownership - subscription->provide_intra_process_message(std::move(message)); + // same message type, so it can just be passed through + if (subscription_base->is_serialized() == is_serialized_publisher) { + auto subscription = std::static_pointer_cast< + rclcpp::experimental::SubscriptionIntraProcess + >(subscription_base); + + if (std::next(it) == subscription_ids.end()) { + // If this is the last subscription, give up ownership + subscription->provide_intra_process_message(std::move(message)); + } else { + // Copy the message since we have additional subscriptions to serve + MessageUniquePtr copy_message; + Deleter deleter = message.get_deleter(); + auto ptr = MessageAllocTraits::allocate(*allocator.get(), 1); + MessageAllocTraits::construct(*allocator.get(), ptr, *message); + copy_message = MessageUniquePtr(ptr, deleter); + + subscription->provide_intra_process_message(std::move(copy_message)); + } + } else if (is_serialized_publisher) { + // publisher provides a serialized message, while subscriber expects a ROS message + provide_serialized_intra_process_message(subscription_base, *message); } else { - // Copy the message since we have additional subscriptions to serve - MessageUniquePtr copy_message; - Deleter deleter = message.get_deleter(); - auto ptr = MessageAllocTraits::allocate(*allocator.get(), 1); - MessageAllocTraits::construct(*allocator.get(), ptr, *message); - copy_message = MessageUniquePtr(ptr, deleter); - - subscription->provide_intra_process_message(std::move(copy_message)); + // publisher provides a ROS message, while subscriber expects a serialized message + rclcpp::Serialization serialization; + auto subscription = std::static_pointer_cast< + rclcpp::experimental::SubscriptionIntraProcess + >(subscription_base); + + auto serialized_message = std::make_unique(); + serialization.serialize_message(*message, *serialized_message); + + subscription->provide_intra_process_message(std::move(serialized_message)); } } } + template + static std::enable_if_t::value> + provide_serialized_intra_process_message( + rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr & subscription, + const T & serialized_message) + { + (void)subscription; + (void)serialized_message; + // prevent call if actual message type does not match + } + + static void + provide_serialized_intra_process_message( + rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr & subscription, + const SerializedMessage & serialized_message) + { + subscription->provide_serialized_intra_process_message(serialized_message); + } + PublisherToSubscriptionIdsMap pub_to_subs_; SubscriptionMap subscriptions_; PublisherMap publishers_;