From fcaea76c2dad4a7746016a03e7abfc00385e6335 Mon Sep 17 00:00:00 2001 From: Joshua Hampp Date: Tue, 21 Apr 2020 14:54:53 +0200 Subject: [PATCH] * updated interface to SubscriptionIntraProcessBase * implemented interface to deserialize a given serialized message to ros message Signed-off-by: Joshua Hampp --- .../subscription_intra_process.hpp | 29 +++++++++++++++++++ .../subscription_intra_process_base.hpp | 9 ++++++ 2 files changed, 38 insertions(+) diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index 618db3cac1..9c7807d12f 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -28,6 +28,8 @@ #include "rclcpp/experimental/buffers/intra_process_buffer.hpp" #include "rclcpp/experimental/create_intra_process_buffer.hpp" #include "rclcpp/experimental/subscription_intra_process_base.hpp" +#include "rclcpp/serialization.hpp" +#include "rclcpp/serialized_message.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/waitable.hpp" #include "tracetools/tracetools.h" @@ -72,6 +74,12 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase throw std::runtime_error("SubscriptionIntraProcess wrong callback type"); } + if (!allocator) { + message_allocator_ = std::make_shared(); + } else { + message_allocator_ = std::make_shared(*allocator.get()); + } + // Create the intra-process buffer. buffer_ = rclcpp::experimental::create_intra_process_buffer( buffer_type, @@ -102,6 +110,12 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase #endif } + bool + is_serialized() const + { + return rclcpp::is_serialized_message_class::value; + } + bool is_ready(rcl_wait_set_t * wait_set) { @@ -134,6 +148,20 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase return buffer_->use_take_shared_method(); } + void + provide_serialized_intra_process_message(const SerializedMessage & serialized_message) + { + rclcpp::Serialization serialization; + + auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1); + MessageAllocTraits::construct(*message_allocator_.get(), ptr); + auto message = MessageUniquePtr(ptr); + + serialization.deserialize_message(serialized_message, *message); + + provide_intra_process_message(std::move(message)); + } + private: void trigger_guard_condition() @@ -168,6 +196,7 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase AnySubscriptionCallback any_callback_; BufferUniquePtr buffer_; + std::shared_ptr message_allocator_; }; } // namespace experimental diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp index 7afd68abef..97ac402c83 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp @@ -25,6 +25,8 @@ #include "rcl/error_handling.h" +#include "rclcpp/serialization.hpp" +#include "rclcpp/serialized_message.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/waitable.hpp" @@ -70,6 +72,13 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable rmw_qos_profile_t get_actual_qos() const; + virtual bool + is_serialized() const = 0; + + virtual void + provide_serialized_intra_process_message( + const SerializedMessage & serialized_message) = 0; + protected: std::recursive_mutex reentrant_mutex_; rcl_guard_condition_t gc_;