diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index d671de701e..dcbb3ca20f 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -37,8 +37,12 @@ #include "rclcpp/logging.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/publisher_base.hpp" +#include "rclcpp/serialization.hpp" +#include "rclcpp/serialized_message.hpp" #include "rclcpp/visibility_control.hpp" +#include "rcpputils/asserts.hpp" + namespace rclcpp { @@ -183,8 +187,9 @@ class IntraProcessManager std::unique_ptr message, std::shared_ptr::allocator_type> allocator) { - using MessageAllocTraits = allocator::AllocRebind; - using MessageAllocatorT = typename MessageAllocTraits::allocator_type; + constexpr bool is_serialized_publisher = + serialization_traits::is_serialized_message_class::value; + using Indicies = SplittedSubscriptionsIndicies; std::shared_lock lock(mutex_); @@ -198,40 +203,22 @@ class IntraProcessManager } const auto & sub_ids = publisher_it->second; - if (sub_ids.take_ownership_subscriptions.empty()) { - // None of the buffers require ownership, so we promote the pointer - std::shared_ptr msg = std::move(message); - - this->template add_shared_msg_to_buffers(msg, sub_ids.take_shared_subscriptions); - } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT - sub_ids.take_shared_subscriptions.size() <= 1) - { - // There is at maximum 1 buffer that does not require ownership. - // So we this case is equivalent to all the buffers requiring ownership - - // Merge the two vector of ids into a unique one - std::vector concatenated_vector(sub_ids.take_shared_subscriptions); - concatenated_vector.insert( - concatenated_vector.end(), - sub_ids.take_ownership_subscriptions.begin(), - sub_ids.take_ownership_subscriptions.end()); - - this->template add_owned_msg_to_buffers( - std::move(message), - concatenated_vector, - allocator); - } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT - sub_ids.take_shared_subscriptions.size() > 1) + // check if (de)serialization is needed + if (sub_ids.take_subscriptions[Indicies::ownership_other].size() + + sub_ids.take_subscriptions[Indicies::shared_other].size() > 0) { - // Construct a new shared pointer from the message - // for the buffers that do not require ownership - auto shared_msg = std::allocate_shared(*allocator, *message); - - this->template add_shared_msg_to_buffers( - shared_msg, sub_ids.take_shared_subscriptions); - this->template add_owned_msg_to_buffers( - std::move(message), sub_ids.take_ownership_subscriptions, allocator); + do_intra_process_publish_other_type( + message.get(), + sub_ids.take_subscriptions[Indicies::ownership_other], + sub_ids.take_subscriptions[Indicies::shared_other] + ); } + + do_intra_process_publish_same_type( + std::move(message), allocator, + sub_ids.take_subscriptions[Indicies::ownership_same], + sub_ids.take_subscriptions[Indicies::shared_same] + ); } template< @@ -244,8 +231,9 @@ class IntraProcessManager std::unique_ptr message, std::shared_ptr::allocator_type> allocator) { - using MessageAllocTraits = allocator::AllocRebind; - using MessageAllocatorT = typename MessageAllocTraits::allocator_type; + constexpr bool is_serialized_publisher = + serialization_traits::is_serialized_message_class::value; + using Indicies = SplittedSubscriptionsIndicies; std::shared_lock lock(mutex_); @@ -259,33 +247,22 @@ class IntraProcessManager } const auto & sub_ids = publisher_it->second; - if (sub_ids.take_ownership_subscriptions.empty()) { - // If there are no owning, just convert to shared. - std::shared_ptr shared_msg = std::move(message); - if (!sub_ids.take_shared_subscriptions.empty()) { - this->template add_shared_msg_to_buffers( - shared_msg, sub_ids.take_shared_subscriptions); - } - return shared_msg; - } else { - // Construct a new shared pointer from the message for the buffers that - // do not require ownership and to return. - auto shared_msg = std::allocate_shared(*allocator, *message); - - if (!sub_ids.take_shared_subscriptions.empty()) { - this->template add_shared_msg_to_buffers( - shared_msg, - sub_ids.take_shared_subscriptions); - } - if (!sub_ids.take_ownership_subscriptions.empty()) { - this->template add_owned_msg_to_buffers( - std::move(message), - sub_ids.take_ownership_subscriptions, - allocator); - } - - return shared_msg; + // check if (de)serialization is needed + if (sub_ids.take_subscriptions[Indicies::ownership_other].size() + + sub_ids.take_subscriptions[Indicies::shared_other].size() > 0) + { + do_intra_process_publish_other_type( + message.get(), + sub_ids.take_subscriptions[Indicies::ownership_other], + sub_ids.take_subscriptions[Indicies::shared_other] + ); } + + return do_intra_process_publish_and_return_shared_same_type( + std::move(message), allocator, + sub_ids.take_subscriptions[Indicies::ownership_same], + sub_ids.take_subscriptions[Indicies::shared_same] + ); } /// Return true if the given rmw_gid_t matches any stored Publishers. @@ -324,8 +301,39 @@ class IntraProcessManager struct SplittedSubscriptions { - std::vector take_shared_subscriptions; - std::vector take_ownership_subscriptions; + enum + { + IndexSharedTyped = 0, IndexSharedSerialized = 1, + IndexOwnershipTyped = 2, IndexOwnershipSerialized = 3, + IndexNum = 4 + }; + + /// get the index for "take_subscriptions" depending on shared/serialized + constexpr static size_t + get_index(const bool is_shared, const bool is_serialized) + { + return (is_serialized ? IndexSharedTyped : IndexSharedSerialized) + + (is_shared ? IndexSharedTyped : IndexOwnershipTyped); + } + + std::vector take_subscriptions[IndexNum]; + }; + + template + struct SplittedSubscriptionsIndicies + { + constexpr static auto ownership_same = SplittedSubscriptions::get_index( + false, + is_serialized); + constexpr static auto shared_same = SplittedSubscriptions::get_index( + true, + is_serialized); + constexpr static auto ownership_other = SplittedSubscriptions::get_index( + false, + !is_serialized); + constexpr static auto shared_other = SplittedSubscriptions::get_index( + true, + !is_serialized); }; using SubscriptionMap = @@ -344,12 +352,162 @@ class IntraProcessManager RCLCPP_PUBLIC void - insert_sub_id_for_pub(uint64_t sub_id, uint64_t pub_id, bool use_take_shared_method); + insert_sub_id_for_pub( + uint64_t sub_id, uint64_t pub_id, bool use_take_shared_method, + bool is_serialized_subscriber); RCLCPP_PUBLIC bool can_communicate(PublisherInfo pub_info, SubscriptionInfo sub_info) const; + template< + typename MessageT, + typename Alloc = std::allocator, + typename Deleter = std::default_delete> + void + do_intra_process_publish_same_type( + std::unique_ptr message, + std::shared_ptr::allocator_type> allocator, + const std::vector & take_ownership_subscriptions, + const std::vector & take_shared_subscriptions) + { + // subsriber and publisher have same message types + using MessageAllocTraits = allocator::AllocRebind; + using MessageAllocatorT = typename MessageAllocTraits::allocator_type; + + if (take_ownership_subscriptions.empty()) { + // None of the buffers require ownership, so we promote the pointer + std::shared_ptr msg = std::move(message); + + this->template add_shared_msg_to_buffers(msg, take_shared_subscriptions); + } else if (!take_ownership_subscriptions.empty() && // NOLINT + take_shared_subscriptions.size() <= 1) + { + // There is at maximum 1 buffer that does not require ownership. + // So this case is equivalent to all the buffers requiring ownership + + // Merge the two vectors of ids into a unique one + std::vector concatenated_vector(take_shared_subscriptions); + concatenated_vector.insert( + concatenated_vector.end(), + take_ownership_subscriptions.begin(), + take_ownership_subscriptions.end()); + + this->template add_owned_msg_to_buffers( + std::move(message), + concatenated_vector, + allocator); + } else if (!take_ownership_subscriptions.empty() && // NOLINT + take_shared_subscriptions.size() > 1) + { + // Construct a new shared pointer from the message + // for the buffers that do not require ownership + auto shared_msg = std::allocate_shared(*allocator, *message); + + this->template add_shared_msg_to_buffers( + shared_msg, take_shared_subscriptions); + this->template add_owned_msg_to_buffers( + std::move(message), take_ownership_subscriptions, allocator); + } + } + + template + void + do_intra_process_publish_other_type( + const MessageT * unsupported_message, + const std::vector & take_ownership_subscriptions, + const std::vector & take_shared_subscriptions) + { + // subsriber and publisher have different message types + // get first subscription + const auto subscription_id = + take_ownership_subscriptions.empty() ? + take_shared_subscriptions.front() : + take_ownership_subscriptions.front(); + + auto subscription_it = subscriptions_.find(subscription_id); + if (subscription_it == subscriptions_.end()) { + throw std::runtime_error("subscription has unexpectedly gone out of scope"); + } + auto subscription_base = subscription_it->second.subscription; + + // convert published message to the supported type + auto message = convert_message(unsupported_message, subscription_base); + + if (take_ownership_subscriptions.empty()) { + // None of the buffers require ownership, so we promote the pointer + this->template add_shared_msg_to_buffers(std::move(message), take_shared_subscriptions); + } else if (!take_ownership_subscriptions.empty() && // NOLINT + take_shared_subscriptions.size() <= 1) + { + // There is at maximum 1 buffer that does not require ownership. + // So we this case is equivalent to all the buffers requiring ownership + + // Merge the two vector of ids into a unique one + std::vector concatenated_vector(take_shared_subscriptions); + concatenated_vector.insert( + concatenated_vector.end(), + take_ownership_subscriptions.begin(), + take_ownership_subscriptions.end()); + + add_owned_msg_to_buffers(std::move(message), concatenated_vector); + } else if (!take_ownership_subscriptions.empty() && // NOLINT + take_shared_subscriptions.size() > 1) + { + // Construct a new shared pointer from the message + // for the buffers that do not require ownership + auto shared_msg = subscription_base->create_shared_message(message.get()); + + this->template add_shared_msg_to_buffers( + shared_msg, take_shared_subscriptions); + add_owned_msg_to_buffers(std::move(message), take_ownership_subscriptions); + } + } + + template< + typename MessageT, + typename Alloc = std::allocator, + typename Deleter = std::default_delete> + std::shared_ptr + do_intra_process_publish_and_return_shared_same_type( + std::unique_ptr message, + std::shared_ptr::allocator_type> allocator, + const std::vector & take_ownership_subscriptions, + const std::vector & take_shared_subscriptions) + { + // subsriber and publisher have same message types + using MessageAllocTraits = allocator::AllocRebind; + using MessageAllocatorT = typename MessageAllocTraits::allocator_type; + + if (take_ownership_subscriptions.empty()) { + // If there are no owning, just convert to shared. + std::shared_ptr shared_msg = std::move(message); + if (!take_shared_subscriptions.empty()) { + this->template add_shared_msg_to_buffers( + shared_msg, take_shared_subscriptions); + } + return shared_msg; + } else { + // Construct a new shared pointer from the message for the buffers that + // do not require ownership and to return. + auto shared_msg = std::allocate_shared(*allocator, *message); + + if (!take_shared_subscriptions.empty()) { + this->template add_shared_msg_to_buffers( + shared_msg, + take_shared_subscriptions); + } + if (!take_ownership_subscriptions.empty()) { + this->template add_owned_msg_to_buffers( + std::move(message), + take_ownership_subscriptions, + allocator); + } + + return shared_msg; + } + } + template void add_shared_msg_to_buffers( @@ -363,11 +521,7 @@ class IntraProcessManager } auto subscription_base = subscription_it->second.subscription; - auto subscription = std::static_pointer_cast< - rclcpp::experimental::SubscriptionIntraProcess - >(subscription_base); - - subscription->provide_intra_process_message(message); + subscription_base->provide_intra_process_message(message); } } @@ -411,6 +565,84 @@ class IntraProcessManager } } + /// Method for unknown allocator using subscription for allocation + void + add_owned_msg_to_buffers( + std::shared_ptr message, + std::vector subscription_ids) + { + for (auto it = subscription_ids.begin(); it != subscription_ids.end(); it++) { + auto subscription_it = subscriptions_.find(*it); + if (subscription_it == subscriptions_.end()) { + throw std::runtime_error("subscription has unexpectedly gone out of scope"); + } + auto subscription_base = subscription_it->second.subscription; + + if (std::next(it) == subscription_ids.end()) { + // If this is the last subscription, give up ownership + subscription_base->provide_intra_process_message(std::move(message)); + } else { + // Copy the message since we have additional subscriptions to serve + std::shared_ptr copy_message = + subscription_base->create_shared_message(message.get()); + + subscription_base->provide_intra_process_message(std::move(copy_message)); + } + } + } + + /// Convert received message to message type of subscriber + /** + * Method to serialize a ros message to rclcpp::SerializedMessage. + * The publisher has a template type while the subscribers is serialized. + * + * \param message the ros message from publisher. + * \param subscription a subscriber used for creating the serialized message. + * \return a shared pointer (containing rclcpp::SerializedMessage) with deleter. + */ + template + std::shared_ptr convert_message( + const MessageT * message, + rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription) + { + // serialize + auto serialized_message = subscription->create_shared_message(); + rclcpp::Serialization serialization; + + rcpputils::check_true(nullptr != serialized_message, "Serialized message is nullpointer."); + + serialization.serialize_message( + message, + reinterpret_cast(serialized_message.get())); + + return serialized_message; + } + + /// Convert received message to message type of subscriber + /** + * Overloaded method for rclcpp::SerializedMessage for deserialization. + * The publisher is serialized while the subscribers have a templated message type. + * + * \param serialized_message the serialized message from publisher. + * \param subscription a subscriber used for creating ros message and serialization. + * \return a shared pointer (containing a ros message) with deleter. + */ + std::shared_ptr convert_message( + const SerializedMessage * serialized_message, + rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription) + { + // deserialize + auto converted_message = subscription->create_shared_message(); + auto serialization = subscription->get_serialization(); + + rcpputils::check_true(nullptr != converted_message, "Converted message is nullpointer."); + rcpputils::check_true(nullptr != serialization, "Serialization is nullpointer."); + + serialization->deserialize_message(serialized_message, converted_message.get()); + + return converted_message; + } + PublisherToSubscriptionIdsMap pub_to_subs_; SubscriptionMap subscriptions_; PublisherMap publishers_; diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index 618db3cac1..4ce8e48da5 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -28,6 +28,8 @@ #include "rclcpp/experimental/buffers/intra_process_buffer.hpp" #include "rclcpp/experimental/create_intra_process_buffer.hpp" #include "rclcpp/experimental/subscription_intra_process_base.hpp" +#include "rclcpp/serialization.hpp" +#include "rclcpp/serialized_message.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/waitable.hpp" #include "tracetools/tracetools.h" @@ -72,6 +74,12 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase throw std::runtime_error("SubscriptionIntraProcess wrong callback type"); } + if (!allocator) { + message_allocator_ = std::make_shared(); + } else { + message_allocator_ = std::make_shared(*allocator.get()); + } + // Create the intra-process buffer. buffer_ = rclcpp::experimental::create_intra_process_buffer( buffer_type, @@ -102,6 +110,12 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase #endif } + bool + is_serialized() const + { + return serialization_traits::is_serialized_message_class::value; + } + bool is_ready(rcl_wait_set_t * wait_set) { @@ -115,10 +129,9 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase } void - provide_intra_process_message(ConstMessageSharedPtr message) + provide_intra_process_message(std::shared_ptr message) { - buffer_->add_shared(std::move(message)); - trigger_guard_condition(); + provide_intra_process_message_impl(std::static_pointer_cast(message)); } void @@ -134,6 +147,24 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase return buffer_->use_take_shared_method(); } + std::shared_ptr + create_shared_message(const void * message_to_copy) + { + if (nullptr != message_to_copy) { + return std::allocate_shared( + *message_allocator_, + *reinterpret_cast(message_to_copy)); + } + + return std::allocate_shared(*message_allocator_); + } + + std::unique_ptr + get_serialization() + { + return std::make_unique>(); + } + private: void trigger_guard_condition() @@ -142,6 +173,13 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase (void)ret; } + void + provide_intra_process_message_impl(ConstMessageSharedPtr message) + { + buffer_->add_shared(std::move(message)); + trigger_guard_condition(); + } + template typename std::enable_if::value, void>::type execute_impl() @@ -168,6 +206,7 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase AnySubscriptionCallback any_callback_; BufferUniquePtr buffer_; + std::shared_ptr message_allocator_; }; } // namespace experimental diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp index 7afd68abef..a7fae5bac2 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp @@ -25,6 +25,8 @@ #include "rcl/error_handling.h" +#include "rclcpp/serialization.hpp" +#include "rclcpp/serialized_message.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/waitable.hpp" @@ -70,6 +72,22 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable rmw_qos_profile_t get_actual_qos() const; + RCLCPP_PUBLIC + virtual bool + is_serialized() const = 0; + + RCLCPP_PUBLIC + virtual void + provide_intra_process_message(std::shared_ptr message) = 0; + + RCLCPP_PUBLIC + virtual std::shared_ptr + create_shared_message(const void * message_to_copy = nullptr) = 0; + + RCLCPP_PUBLIC + virtual std::unique_ptr + get_serialization() = 0; + protected: std::recursive_mutex reentrant_mutex_; rcl_guard_condition_t gc_; diff --git a/rclcpp/include/rclcpp/serialization.hpp b/rclcpp/include/rclcpp/serialization.hpp index 862c625217..09e92fa70a 100644 --- a/rclcpp/include/rclcpp/serialization.hpp +++ b/rclcpp/include/rclcpp/serialization.hpp @@ -44,6 +44,32 @@ struct is_serialized_message_class: std::true_type {}; } // namespace serialization_traits +namespace serialization +{ +template +inline const rosidl_message_type_support_t * +get_type_support_handle_impl() +{ + return rosidl_typesupport_cpp::get_message_type_support_handle(); +} + +// no message type support for rclcpp::SerializedMessage +template<> +inline const rosidl_message_type_support_t * +get_type_support_handle_impl() +{ + return nullptr; +} + +// no message type support for rcl_serialized_message_t +template<> +inline const rosidl_message_type_support_t * +get_type_support_handle_impl() +{ + return nullptr; +} +} // namespace serialization + /// Interface to (de)serialize a message class RCLCPP_PUBLIC_TYPE SerializationBase { @@ -74,6 +100,13 @@ class RCLCPP_PUBLIC_TYPE SerializationBase void deserialize_message( const SerializedMessage * serialized_message, void * ros_message) const; + /// Get the message type support for the serialized message + /** + * \return The message type support. + */ + virtual const rosidl_message_type_support_t * + get_type_support_handle() const = 0; + private: const rosidl_message_type_support_t * type_support_; }; @@ -85,11 +118,13 @@ class Serialization : public SerializationBase public: /// Constructor of Serialization Serialization() - : SerializationBase(rosidl_typesupport_cpp::get_message_type_support_handle()) + : SerializationBase(get_type_support_handle()) + {} + + const rosidl_message_type_support_t * + get_type_support_handle() const override { - static_assert( - !serialization_traits::is_serialized_message_class::value, - "Serialization of serialized message to serialized message is not possible."); + return serialization::get_type_support_handle_impl(); } }; diff --git a/rclcpp/src/rclcpp/intra_process_manager.cpp b/rclcpp/src/rclcpp/intra_process_manager.cpp index 0b9c9d6670..876badc069 100644 --- a/rclcpp/src/rclcpp/intra_process_manager.cpp +++ b/rclcpp/src/rclcpp/intra_process_manager.cpp @@ -48,7 +48,9 @@ IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher) // create an entry for the publisher id and populate with already existing subscriptions for (auto & pair : subscriptions_) { if (can_communicate(publishers_[id], pair.second)) { - insert_sub_id_for_pub(pair.first, id, pair.second.use_take_shared_method); + insert_sub_id_for_pub( + pair.first, id, pair.second.use_take_shared_method, + pair.second.subscription->is_serialized()); } } @@ -70,7 +72,9 @@ IntraProcessManager::add_subscription(SubscriptionIntraProcessBase::SharedPtr su // adds the subscription id to all the matchable publishers for (auto & pair : publishers_) { if (can_communicate(pair.second, subscriptions_[id])) { - insert_sub_id_for_pub(id, pair.first, subscriptions_[id].use_take_shared_method); + insert_sub_id_for_pub( + id, pair.first, subscriptions_[id].use_take_shared_method, + subscription->is_serialized()); } } @@ -85,19 +89,14 @@ IntraProcessManager::remove_subscription(uint64_t intra_process_subscription_id) subscriptions_.erase(intra_process_subscription_id); for (auto & pair : pub_to_subs_) { - pair.second.take_shared_subscriptions.erase( - std::remove( - pair.second.take_shared_subscriptions.begin(), - pair.second.take_shared_subscriptions.end(), - intra_process_subscription_id), - pair.second.take_shared_subscriptions.end()); - - pair.second.take_ownership_subscriptions.erase( - std::remove( - pair.second.take_ownership_subscriptions.begin(), - pair.second.take_ownership_subscriptions.end(), - intra_process_subscription_id), - pair.second.take_ownership_subscriptions.end()); + for (auto i = 0u; i < SplittedSubscriptions::IndexNum; ++i) { + pair.second.take_subscriptions[i].erase( + std::remove( + pair.second.take_subscriptions[i].begin(), + pair.second.take_subscriptions[i].end(), + intra_process_subscription_id), + pair.second.take_subscriptions[i].end()); + } } } @@ -141,9 +140,10 @@ IntraProcessManager::get_subscription_count(uint64_t intra_process_publisher_id) return 0; } - auto count = - publisher_it->second.take_shared_subscriptions.size() + - publisher_it->second.take_ownership_subscriptions.size(); + auto count = 0u; + for (auto i = 0u; i < SplittedSubscriptions::IndexNum; ++i) { + count += publisher_it->second.take_subscriptions[i].size(); + } return count; } @@ -187,13 +187,12 @@ void IntraProcessManager::insert_sub_id_for_pub( uint64_t sub_id, uint64_t pub_id, - bool use_take_shared_method) + bool use_take_shared_method, + bool is_serialized_subscriber) { - if (use_take_shared_method) { - pub_to_subs_[pub_id].take_shared_subscriptions.push_back(sub_id); - } else { - pub_to_subs_[pub_id].take_ownership_subscriptions.push_back(sub_id); - } + pub_to_subs_[pub_id].take_subscriptions[SplittedSubscriptions::get_index( + use_take_shared_method, + is_serialized_subscriber)].push_back(sub_id); } bool diff --git a/rclcpp/test/rclcpp/test_intra_process_manager.cpp b/rclcpp/test/rclcpp/test_intra_process_manager.cpp index b71fb1d061..369df01b7f 100644 --- a/rclcpp/test/rclcpp/test_intra_process_manager.cpp +++ b/rclcpp/test/rclcpp/test_intra_process_manager.cpp @@ -23,6 +23,8 @@ #define RCLCPP_BUILDING_LIBRARY 1 #include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/macros.hpp" +#include "rclcpp/serialization.hpp" +#include "rclcpp/serialized_message.hpp" #include "rclcpp/qos.hpp" #include "rmw/types.h" #include "rmw/qos_profiles.h" @@ -212,6 +214,21 @@ class SubscriptionIntraProcessBase return topic_name; } + bool + is_serialized() const + { + return false; + } + + virtual void + provide_intra_process_message(std::shared_ptr message) = 0; + + virtual std::shared_ptr + create_shared_message(const void * message_to_copy = nullptr) = 0; + + virtual std::unique_ptr + get_serialization() = 0; + rmw_qos_profile_t qos_profile; const char * topic_name; }; @@ -229,9 +246,9 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase } void - provide_intra_process_message(std::shared_ptr msg) + provide_intra_process_message(std::shared_ptr message) { - buffer->add(msg); + buffer->add(std::static_pointer_cast(message)); } void @@ -240,6 +257,21 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase buffer->add(std::move(msg)); } + virtual std::shared_ptr + create_shared_message(const void * message_to_copy = nullptr) + { + if (nullptr != message_to_copy) { + return std::make_shared(*reinterpret_cast(message_to_copy)); + } + return std::make_shared(); + } + + virtual std::unique_ptr + get_serialization() + { + return std::make_unique>(); + } + std::uintptr_t pop() {