From 7901bb9da4e1addf71a307230afa60bd8d72f4f6 Mon Sep 17 00:00:00 2001 From: Jeffery Hsu Date: Thu, 18 Jan 2024 10:39:59 -0600 Subject: [PATCH] Add transient local durability support to publisher and subscriptions when using intra-process communication (#2303) * Add intra process transient local durability support to publisher and subscription Signed-off-by: Jeffery Hsu * Remove durability_is_transient_local_ from publisher_base Signed-off-by: Jeffery Hsu * Design changes that move most transient local publish functionalities out of intra process manager into intra process manager Signed-off-by: Jeffery Hsu * Move transient local publish to a separate function Signed-off-by: Jeffery Hsu * Remove publisher buffer weak ptr from intra process manager when it associated publisher is removed. Signed-off-by: Jeffery Hsu * Remove incorrectly placed RCLCPP_PUBLIC Signed-off-by: Jeffery Hsu * Add missing RCLCPP_PUBLIC Signed-off-by: Jeffery Hsu * Expand RingBufferImplementation beyond shared_ptr and unique_ptr Signed-off-by: Jeffery Hsu * Comment and format fix Signed-off-by: Jeffery Hsu --------- Signed-off-by: Jeffery Hsu --- rclcpp/CMakeLists.txt | 1 + .../resolve_intra_process_buffer_type.hpp | 5 + .../buffers/buffer_implementation_base.hpp | 4 + .../buffers/intra_process_buffer.hpp | 79 +++ .../buffers/ring_buffer_implementation.hpp | 77 +++ .../experimental/intra_process_manager.hpp | 125 ++++- .../subscription_intra_process_base.hpp | 4 + rclcpp/include/rclcpp/publisher.hpp | 47 +- rclcpp/include/rclcpp/publisher_base.hpp | 6 + rclcpp/include/rclcpp/publisher_options.hpp | 4 + rclcpp/include/rclcpp/subscription.hpp | 7 +- .../resolve_intra_process_buffer_type.cpp | 37 ++ rclcpp/src/rclcpp/intra_process_manager.cpp | 38 +- rclcpp/src/rclcpp/publisher_base.cpp | 7 + .../subscription_intra_process_base.cpp | 6 + .../test/rclcpp/test_intra_process_buffer.cpp | 54 +++ .../rclcpp/test_intra_process_manager.cpp | 456 ++++++++++++------ rclcpp/test/rclcpp/test_publisher.cpp | 99 +++- .../test_ring_buffer_implementation.cpp | 70 ++- rclcpp/test/rclcpp/test_subscription.cpp | 6 +- 20 files changed, 921 insertions(+), 211 deletions(-) create mode 100644 rclcpp/src/rclcpp/detail/resolve_intra_process_buffer_type.cpp diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index ffae9cc7ee..bf3651df81 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -46,6 +46,7 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/context.cpp src/rclcpp/contexts/default_context.cpp src/rclcpp/detail/add_guard_condition_to_rcl_wait_set.cpp + src/rclcpp/detail/resolve_intra_process_buffer_type.cpp src/rclcpp/detail/resolve_parameter_overrides.cpp src/rclcpp/detail/rmw_implementation_specific_payload.cpp src/rclcpp/detail/rmw_implementation_specific_publisher_payload.cpp diff --git a/rclcpp/include/rclcpp/detail/resolve_intra_process_buffer_type.hpp b/rclcpp/include/rclcpp/detail/resolve_intra_process_buffer_type.hpp index e7196a1e11..234316a8f0 100644 --- a/rclcpp/include/rclcpp/detail/resolve_intra_process_buffer_type.hpp +++ b/rclcpp/include/rclcpp/detail/resolve_intra_process_buffer_type.hpp @@ -47,6 +47,11 @@ resolve_intra_process_buffer_type( return resolved_buffer_type; } +RCLCPP_PUBLIC +rclcpp::IntraProcessBufferType +resolve_intra_process_buffer_type( + const rclcpp::IntraProcessBufferType buffer_type); + } // namespace detail } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/experimental/buffers/buffer_implementation_base.hpp b/rclcpp/include/rclcpp/experimental/buffers/buffer_implementation_base.hpp index 83a6c7462c..1d50b1659e 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/buffer_implementation_base.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/buffer_implementation_base.hpp @@ -15,6 +15,8 @@ #ifndef RCLCPP__EXPERIMENTAL__BUFFERS__BUFFER_IMPLEMENTATION_BASE_HPP_ #define RCLCPP__EXPERIMENTAL__BUFFERS__BUFFER_IMPLEMENTATION_BASE_HPP_ +#include + namespace rclcpp { namespace experimental @@ -31,6 +33,8 @@ class BufferImplementationBase virtual BufferT dequeue() = 0; virtual void enqueue(BufferT request) = 0; + virtual std::vector get_all_data() = 0; + virtual void clear() = 0; virtual bool has_data() const = 0; virtual size_t available_capacity() const = 0; diff --git a/rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp b/rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp index 114e9196d2..268c3f6649 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp @@ -19,6 +19,7 @@ #include #include #include +#include #include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/allocator/allocator_deleter.hpp" @@ -66,6 +67,9 @@ class IntraProcessBuffer : public IntraProcessBufferBase virtual MessageSharedPtr consume_shared() = 0; virtual MessageUniquePtr consume_unique() = 0; + + virtual std::vector get_all_data_shared() = 0; + virtual std::vector get_all_data_unique() = 0; }; template< @@ -129,6 +133,16 @@ class TypedIntraProcessBuffer : public IntraProcessBuffer(); } + std::vector get_all_data_shared() override + { + return get_all_data_shared_impl(); + } + + std::vector get_all_data_unique() override + { + return get_all_data_unique_impl(); + } + bool has_data() const override { return buffer_->has_data(); @@ -243,6 +257,71 @@ class TypedIntraProcessBuffer : public IntraProcessBufferdequeue(); } + + // MessageSharedPtr to MessageSharedPtr + template + typename std::enable_if< + std::is_same::value, + std::vector + >::type + get_all_data_shared_impl() + { + return buffer_->get_all_data(); + } + + // MessageUniquePtr to MessageSharedPtr + template + typename std::enable_if< + std::is_same::value, + std::vector + >::type + get_all_data_shared_impl() + { + std::vector result; + auto uni_ptr_vec = buffer_->get_all_data(); + result.reserve(uni_ptr_vec.size()); + for (MessageUniquePtr & uni_ptr : uni_ptr_vec) { + result.emplace_back(std::move(uni_ptr)); + } + return result; + } + + // MessageSharedPtr to MessageUniquePtr + template + typename std::enable_if< + std::is_same::value, + std::vector + >::type + get_all_data_unique_impl() + { + std::vector result; + auto shared_ptr_vec = buffer_->get_all_data(); + result.reserve(shared_ptr_vec.size()); + for (MessageSharedPtr shared_msg : shared_ptr_vec) { + MessageUniquePtr unique_msg; + MessageDeleter * deleter = std::get_deleter(shared_msg); + auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1); + MessageAllocTraits::construct(*message_allocator_.get(), ptr, *shared_msg); + if (deleter) { + unique_msg = MessageUniquePtr(ptr, *deleter); + } else { + unique_msg = MessageUniquePtr(ptr); + } + result.push_back(std::move(unique_msg)); + } + return result; + } + + // MessageUniquePtr to MessageUniquePtr + template + typename std::enable_if< + std::is_same::value, + std::vector + >::type + get_all_data_unique_impl() + { + return buffer_->get_all_data(); + } }; } // namespace buffers diff --git a/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp b/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp index e67aae973a..f476fa556a 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp @@ -15,6 +15,7 @@ #ifndef RCLCPP__EXPERIMENTAL__BUFFERS__RING_BUFFER_IMPLEMENTATION_HPP_ #define RCLCPP__EXPERIMENTAL__BUFFERS__RING_BUFFER_IMPLEMENTATION_HPP_ +#include #include #include #include @@ -113,6 +114,17 @@ class RingBufferImplementation : public BufferImplementationBase return request; } + /// Get all the elements from the ring buffer + /** + * This member function is thread-safe. + * + * \return a vector containing all the elements from the ring buffer + */ + std::vector get_all_data() override + { + return get_all_data_impl(); + } + /// Get the next index value for the ring buffer /** * This member function is thread-safe. @@ -215,6 +227,71 @@ class RingBufferImplementation : public BufferImplementationBase return capacity_ - size_; } + /// Traits for checking if a type is std::unique_ptr + template + struct is_std_unique_ptr final : std::false_type {}; + template + struct is_std_unique_ptr> final : std::true_type + { + typedef T Ptr_type; + }; + + /// Get all the elements from the ring buffer + /** + * This member function is thread-safe. + * Two versions for the implementation of the function. + * One for buffer containing unique_ptr and the other for other types + * + * \return a vector containing all the elements from the ring buffer + */ + template::value && + std::is_copy_constructible< + typename is_std_unique_ptr::Ptr_type + >::value, + void> * = nullptr> + std::vector get_all_data_impl() + { + std::lock_guard lock(mutex_); + std::vector result_vtr; + result_vtr.reserve(size_); + for (size_t id = 0; id < size_; ++id) { + result_vtr.emplace_back( + new typename is_std_unique_ptr::Ptr_type( + *(ring_buffer_[(read_index_ + id) % capacity_]))); + } + return result_vtr; + } + + template::value, void> * = nullptr> + std::vector get_all_data_impl() + { + std::lock_guard lock(mutex_); + std::vector result_vtr; + result_vtr.reserve(size_); + for (size_t id = 0; id < size_; ++id) { + result_vtr.emplace_back(ring_buffer_[(read_index_ + id) % capacity_]); + } + return result_vtr; + } + + template::value && + !std::is_copy_constructible::value, void> * = nullptr> + std::vector get_all_data_impl() + { + throw std::logic_error("Underlined type results in invalid get_all_data_impl()"); + return {}; + } + + template::value && + !std::is_copy_constructible::Ptr_type>::value, + void> * = nullptr> + std::vector get_all_data_impl() + { + throw std::logic_error("Underlined type in unique_ptr results in invalid get_all_data_impl()"); + return {}; + } + size_t capacity_; std::vector ring_buffer_; diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index 54ff0c0dee..ffd77cb99c 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -28,6 +28,7 @@ #include #include "rclcpp/allocator/allocator_deleter.hpp" +#include "rclcpp/experimental/buffers/intra_process_buffer.hpp" #include "rclcpp/experimental/ros_message_intra_process_buffer.hpp" #include "rclcpp/experimental/subscription_intra_process.hpp" #include "rclcpp/experimental/subscription_intra_process_base.hpp" @@ -112,9 +113,40 @@ class IntraProcessManager * \param subscription the SubscriptionIntraProcess to register. * \return an unsigned 64-bit integer which is the subscription's unique id. */ - RCLCPP_PUBLIC + template< + typename ROSMessageType, + typename Alloc = std::allocator + > uint64_t - add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription); + add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription) + { + std::unique_lock lock(mutex_); + + uint64_t sub_id = IntraProcessManager::get_next_unique_id(); + + subscriptions_[sub_id] = subscription; + + // adds the subscription id to all the matchable publishers + for (auto & pair : publishers_) { + auto publisher = pair.second.lock(); + if (!publisher) { + continue; + } + if (can_communicate(publisher, subscription)) { + uint64_t pub_id = pair.first; + insert_sub_id_for_pub(sub_id, pub_id, subscription->use_take_shared_method()); + if (publisher->is_durability_transient_local() && + subscription->is_durability_transient_local()) + { + do_transient_local_publish( + pub_id, sub_id, + subscription->use_take_shared_method()); + } + } + } + + return sub_id; + } /// Unregister a subscription using the subscription's unique id. /** @@ -131,14 +163,21 @@ class IntraProcessManager * This method stores the publisher intra process object, together with * the information of its wrapped publisher (i.e. topic name and QoS). * + * If the publisher's durability is transient local, its buffer pointer should + * be passed and the method will store it as well. + * * In addition this generates a unique intra process id for the publisher. * * \param publisher publisher to be registered with the manager. + * \param buffer publisher's buffer to be stored if its duability is transient local. * \return an unsigned 64-bit integer which is the publisher's unique id. */ RCLCPP_PUBLIC uint64_t - add_publisher(rclcpp::PublisherBase::SharedPtr publisher); + add_publisher( + rclcpp::PublisherBase::SharedPtr publisher, + rclcpp::experimental::buffers::IntraProcessBufferBase::SharedPtr buffer = + rclcpp::experimental::buffers::IntraProcessBufferBase::SharedPtr()); /// Unregister a publisher using the publisher's unique id. /** @@ -292,6 +331,34 @@ class IntraProcessManager } } + template< + typename MessageT, + typename Alloc, + typename Deleter, + typename ROSMessageType> + void + add_shared_msg_to_buffer( + std::shared_ptr message, + uint64_t subscription_id) + { + add_shared_msg_to_buffers(message, {subscription_id}); + } + + template< + typename MessageT, + typename Alloc, + typename Deleter, + typename ROSMessageType> + void + add_owned_msg_to_buffer( + std::unique_ptr message, + uint64_t subscription_id, + typename allocator::AllocRebind::allocator_type & allocator) + { + add_owned_msg_to_buffers( + std::move(message), {subscription_id}, allocator); + } + /// Return true if the given rmw_gid_t matches any stored Publishers. RCLCPP_PUBLIC bool @@ -324,6 +391,9 @@ class IntraProcessManager using PublisherMap = std::unordered_map; + using PublisherBufferMap = + std::unordered_map; + using PublisherToSubscriptionIdsMap = std::unordered_map; @@ -342,6 +412,54 @@ class IntraProcessManager rclcpp::PublisherBase::SharedPtr pub, rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr sub) const; + template< + typename ROSMessageType, + typename Alloc = std::allocator + > + void do_transient_local_publish( + const uint64_t pub_id, const uint64_t sub_id, + const bool use_take_shared_method) + { + using ROSMessageTypeAllocatorTraits = allocator::AllocRebind; + using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type; + using ROSMessageTypeDeleter = allocator::Deleter; + + auto publisher_buffer = publisher_buffers_[pub_id].lock(); + if (!publisher_buffer) { + throw std::runtime_error("publisher buffer has unexpectedly gone out of scope"); + } + auto buffer = std::dynamic_pointer_cast< + rclcpp::experimental::buffers::IntraProcessBuffer< + ROSMessageType, + ROSMessageTypeAllocator, + ROSMessageTypeDeleter + > + >(publisher_buffer); + if (!buffer) { + throw std::runtime_error( + "failed to dynamic cast publisher's IntraProcessBufferBase to " + "IntraProcessBuffer which can happen when the publisher and " + "subscription use different allocator types, which is not supported"); + } + if (use_take_shared_method) { + auto data_vec = buffer->get_all_data_shared(); + for (auto shared_data : data_vec) { + this->template add_shared_msg_to_buffer< + ROSMessageType, ROSMessageTypeAllocator, ROSMessageTypeDeleter, ROSMessageType>( + shared_data, sub_id); + } + } else { + auto data_vec = buffer->get_all_data_unique(); + for (auto & owned_data : data_vec) { + auto allocator = ROSMessageTypeAllocator(); + this->template add_owned_msg_to_buffer< + ROSMessageType, ROSMessageTypeAllocator, ROSMessageTypeDeleter, ROSMessageType>( + std::move(owned_data), sub_id, allocator); + } + } + } + template< typename MessageT, typename Alloc, @@ -520,6 +638,7 @@ class IntraProcessManager PublisherToSubscriptionIdsMap pub_to_subs_; SubscriptionMap subscriptions_; PublisherMap publishers_; + PublisherBufferMap publisher_buffers_; mutable std::shared_timed_mutex mutex_; }; diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp index c590207b90..6a14aac0c7 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp @@ -67,6 +67,10 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable size_t available_capacity() const = 0; + RCLCPP_PUBLIC + bool + is_durability_transient_local() const; + bool is_ready(rcl_wait_set_t * wait_set) override = 0; diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index c03220f38b..08a98bdf1a 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -32,6 +32,9 @@ #include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/allocator/allocator_deleter.hpp" #include "rclcpp/detail/resolve_use_intra_process.hpp" +#include "rclcpp/detail/resolve_intra_process_buffer_type.hpp" +#include "rclcpp/experimental/buffers/intra_process_buffer.hpp" +#include "rclcpp/experimental/create_intra_process_buffer.hpp" #include "rclcpp/experimental/intra_process_manager.hpp" #include "rclcpp/get_message_type_support_handle.hpp" #include "rclcpp/is_ros_compatible_type.hpp" @@ -109,6 +112,12 @@ class Publisher : public PublisherBase [[deprecated("use std::shared_ptr")]] = std::shared_ptr; + using BufferSharedPtr = typename rclcpp::experimental::buffers::IntraProcessBuffer< + ROSMessageType, + ROSMessageTypeAllocator, + ROSMessageTypeDeleter + >::SharedPtr; + RCLCPP_SMART_PTR_DEFINITIONS(Publisher) /// Default constructor. @@ -171,11 +180,14 @@ class Publisher : public PublisherBase throw std::invalid_argument( "intraprocess communication is not allowed with a zero qos history depth value"); } - if (qos.durability() != rclcpp::DurabilityPolicy::Volatile) { - throw std::invalid_argument( - "intraprocess communication allowed only with volatile durability"); + if (qos.durability() == rclcpp::DurabilityPolicy::TransientLocal) { + buffer_ = rclcpp::experimental::create_intra_process_buffer< + ROSMessageType, ROSMessageTypeAllocator, ROSMessageTypeDeleter>( + rclcpp::detail::resolve_intra_process_buffer_type(options_.intra_process_buffer_type), + qos, + std::make_shared(ros_message_type_allocator_)); } - uint64_t intra_process_publisher_id = ipm->add_publisher(this->shared_from_this()); + uint64_t intra_process_publisher_id = ipm->add_publisher(this->shared_from_this(), buffer_); this->setup_intra_process( intra_process_publisher_id, ipm); @@ -242,9 +254,18 @@ class Publisher : public PublisherBase if (inter_process_publish_needed) { auto shared_msg = this->do_intra_process_ros_message_publish_and_return_shared(std::move(msg)); + if (buffer_) { + buffer_->add_shared(shared_msg); + } this->do_inter_process_publish(*shared_msg); } else { - this->do_intra_process_ros_message_publish(std::move(msg)); + if (buffer_) { + auto shared_msg = + this->do_intra_process_ros_message_publish_and_return_shared(std::move(msg)); + buffer_->add_shared(shared_msg); + } else { + this->do_intra_process_ros_message_publish(std::move(msg)); + } } } @@ -309,14 +330,22 @@ class Publisher : public PublisherBase get_subscription_count() > get_intra_process_subscription_count(); if (inter_process_publish_needed) { - ROSMessageType ros_msg; + auto ros_msg_ptr = std::make_shared(); // TODO(clalancette): This is unnecessarily doing an additional conversion // that may have already been done in do_intra_process_publish_and_return_shared(). // We should just reuse that effort. - rclcpp::TypeAdapter::convert_to_ros_message(*msg, ros_msg); + rclcpp::TypeAdapter::convert_to_ros_message(*msg, *ros_msg_ptr); this->do_intra_process_publish(std::move(msg)); - this->do_inter_process_publish(ros_msg); + this->do_inter_process_publish(*ros_msg_ptr); + if (buffer_) { + buffer_->add_shared(ros_msg_ptr); + } } else { + if (buffer_) { + auto ros_msg_ptr = std::make_shared(); + rclcpp::TypeAdapter::convert_to_ros_message(*msg, *ros_msg_ptr); + buffer_->add_shared(ros_msg_ptr); + } this->do_intra_process_publish(std::move(msg)); } } @@ -581,6 +610,8 @@ class Publisher : public PublisherBase PublishedTypeDeleter published_type_deleter_; ROSMessageTypeAllocator ros_message_type_allocator_; ROSMessageTypeDeleter ros_message_type_deleter_; + + BufferSharedPtr buffer_{nullptr}; }; } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/publisher_base.hpp b/rclcpp/include/rclcpp/publisher_base.hpp index 3253595361..9a6c398eeb 100644 --- a/rclcpp/include/rclcpp/publisher_base.hpp +++ b/rclcpp/include/rclcpp/publisher_base.hpp @@ -139,6 +139,12 @@ class PublisherBase : public std::enable_shared_from_this size_t get_intra_process_subscription_count() const; + /// Get if durability is transient local + /** \return If durability is transient local*/ + RCLCPP_PUBLIC + bool + is_durability_transient_local() const; + /// Manually assert that this Publisher is alive (for RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC). /** * If the rmw Liveliness policy is set to RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC, the creator diff --git a/rclcpp/include/rclcpp/publisher_options.hpp b/rclcpp/include/rclcpp/publisher_options.hpp index 3501834dd1..01fd314f49 100644 --- a/rclcpp/include/rclcpp/publisher_options.hpp +++ b/rclcpp/include/rclcpp/publisher_options.hpp @@ -24,6 +24,7 @@ #include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/detail/rmw_implementation_specific_publisher_payload.hpp" +#include "rclcpp/intra_process_buffer_type.hpp" #include "rclcpp/intra_process_setting.hpp" #include "rclcpp/qos.hpp" #include "rclcpp/event_handler.hpp" @@ -40,6 +41,9 @@ struct PublisherOptionsBase /// Setting to explicitly set intraprocess communications. IntraProcessSetting use_intra_process_comm = IntraProcessSetting::NodeDefault; + /// Setting the data-type stored in the intraprocess buffer + IntraProcessBufferType intra_process_buffer_type = IntraProcessBufferType::SharedPtr; + /// Callbacks for various events related to publishers. PublisherEventCallbacks event_callbacks; diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index d6e0f536ed..58f9a8ce6d 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -163,10 +163,6 @@ class Subscription : public SubscriptionBase throw std::invalid_argument( "intraprocess communication is not allowed with 0 depth qos policy"); } - if (qos_profile.durability() != rclcpp::DurabilityPolicy::Volatile) { - throw std::invalid_argument( - "intraprocess communication allowed only with volatile durability"); - } using SubscriptionIntraProcessT = rclcpp::experimental::SubscriptionIntraProcess< MessageT, @@ -193,7 +189,8 @@ class Subscription : public SubscriptionBase // Add it to the intra process manager. using rclcpp::experimental::IntraProcessManager; auto ipm = context->get_sub_context(); - uint64_t intra_process_subscription_id = ipm->add_subscription(subscription_intra_process_); + uint64_t intra_process_subscription_id = ipm->template add_subscription< + ROSMessageType, ROSMessageTypeAllocator>(subscription_intra_process_); this->setup_intra_process(intra_process_subscription_id, ipm); } diff --git a/rclcpp/src/rclcpp/detail/resolve_intra_process_buffer_type.cpp b/rclcpp/src/rclcpp/detail/resolve_intra_process_buffer_type.cpp new file mode 100644 index 0000000000..1ca9892ac4 --- /dev/null +++ b/rclcpp/src/rclcpp/detail/resolve_intra_process_buffer_type.cpp @@ -0,0 +1,37 @@ +// Copyright 2023 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +namespace rclcpp +{ + +namespace detail +{ +rclcpp::IntraProcessBufferType +resolve_intra_process_buffer_type( + const rclcpp::IntraProcessBufferType buffer_type) +{ + if (buffer_type == IntraProcessBufferType::CallbackDefault) { + throw std::invalid_argument( + "IntraProcessBufferType::CallbackDefault is not allowed " + "when there is no callback function"); + } + + return buffer_type; +} + +} // namespace detail + +} // namespace rclcpp diff --git a/rclcpp/src/rclcpp/intra_process_manager.cpp b/rclcpp/src/rclcpp/intra_process_manager.cpp index ea068f03ae..831ffdf2ca 100644 --- a/rclcpp/src/rclcpp/intra_process_manager.cpp +++ b/rclcpp/src/rclcpp/intra_process_manager.cpp @@ -32,13 +32,24 @@ IntraProcessManager::~IntraProcessManager() {} uint64_t -IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher) +IntraProcessManager::add_publisher( + rclcpp::PublisherBase::SharedPtr publisher, + rclcpp::experimental::buffers::IntraProcessBufferBase::SharedPtr buffer) { std::unique_lock lock(mutex_); uint64_t pub_id = IntraProcessManager::get_next_unique_id(); publishers_[pub_id] = publisher; + if (publisher->is_durability_transient_local()) { + if (buffer) { + publisher_buffers_[pub_id] = buffer; + } else { + throw std::runtime_error( + "transient_local publisher needs to pass" + "a valid publisher buffer ptr when calling add_publisher()"); + } + } // Initialize the subscriptions storage for this publisher. pub_to_subs_[pub_id] = SplittedSubscriptions(); @@ -58,30 +69,6 @@ IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher) return pub_id; } -uint64_t -IntraProcessManager::add_subscription(SubscriptionIntraProcessBase::SharedPtr subscription) -{ - std::unique_lock lock(mutex_); - - uint64_t sub_id = IntraProcessManager::get_next_unique_id(); - - subscriptions_[sub_id] = subscription; - - // adds the subscription id to all the matchable publishers - for (auto & pair : publishers_) { - auto publisher = pair.second.lock(); - if (!publisher) { - continue; - } - if (can_communicate(publisher, subscription)) { - uint64_t pub_id = pair.first; - insert_sub_id_for_pub(sub_id, pub_id, subscription->use_take_shared_method()); - } - } - - return sub_id; -} - void IntraProcessManager::remove_subscription(uint64_t intra_process_subscription_id) { @@ -112,6 +99,7 @@ IntraProcessManager::remove_publisher(uint64_t intra_process_publisher_id) std::unique_lock lock(mutex_); publishers_.erase(intra_process_publisher_id); + publisher_buffers_.erase(intra_process_publisher_id); pub_to_subs_.erase(intra_process_publisher_id); } diff --git a/rclcpp/src/rclcpp/publisher_base.cpp b/rclcpp/src/rclcpp/publisher_base.cpp index 0e37dcf3fa..111246ed96 100644 --- a/rclcpp/src/rclcpp/publisher_base.cpp +++ b/rclcpp/src/rclcpp/publisher_base.cpp @@ -270,6 +270,13 @@ PublisherBase::get_intra_process_subscription_count() const return ipm->get_subscription_count(intra_process_publisher_id_); } +bool +PublisherBase::is_durability_transient_local() const +{ + return rcl_publisher_get_actual_qos(publisher_handle_.get())->durability == + RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL; +} + rclcpp::QoS PublisherBase::get_actual_qos() const { diff --git a/rclcpp/src/rclcpp/subscription_intra_process_base.cpp b/rclcpp/src/rclcpp/subscription_intra_process_base.cpp index b13123b65b..7ecafda36c 100644 --- a/rclcpp/src/rclcpp/subscription_intra_process_base.cpp +++ b/rclcpp/src/rclcpp/subscription_intra_process_base.cpp @@ -34,3 +34,9 @@ SubscriptionIntraProcessBase::get_actual_qos() const { return qos_profile_; } + +bool +SubscriptionIntraProcessBase::is_durability_transient_local() const +{ + return qos_profile_.durability() == rclcpp::DurabilityPolicy::TransientLocal; +} diff --git a/rclcpp/test/rclcpp/test_intra_process_buffer.cpp b/rclcpp/test/rclcpp/test_intra_process_buffer.cpp index eace6445a9..fa427de74b 100644 --- a/rclcpp/test/rclcpp/test_intra_process_buffer.cpp +++ b/rclcpp/test/rclcpp/test_intra_process_buffer.cpp @@ -190,6 +190,33 @@ TEST(TestIntraProcessBuffer, shared_buffer_consume) { EXPECT_EQ(1L, original_shared_msg.use_count()); EXPECT_EQ(*original_shared_msg, *popped_unique_msg); EXPECT_NE(original_message_pointer, popped_message_pointer); + + original_shared_msg = std::make_shared('c'); + original_message_pointer = reinterpret_cast(original_shared_msg.get()); + auto original_shared_msg_2 = std::make_shared('d'); + auto original_message_pointer_2 = reinterpret_cast(original_shared_msg_2.get()); + intra_process_buffer.add_shared(original_shared_msg); + intra_process_buffer.add_shared(original_shared_msg_2); + + auto shared_data_vec = intra_process_buffer.get_all_data_shared(); + EXPECT_EQ(2L, shared_data_vec.size()); + EXPECT_EQ(3L, original_shared_msg.use_count()); + EXPECT_EQ(original_shared_msg.use_count(), shared_data_vec[0].use_count()); + EXPECT_EQ(*original_shared_msg, *shared_data_vec[0]); + EXPECT_EQ(original_message_pointer, reinterpret_cast(shared_data_vec[0].get())); + EXPECT_EQ(3L, original_shared_msg_2.use_count()); + EXPECT_EQ(original_shared_msg_2.use_count(), shared_data_vec[1].use_count()); + EXPECT_EQ(*original_shared_msg_2, *shared_data_vec[1]); + EXPECT_EQ(original_message_pointer_2, reinterpret_cast(shared_data_vec[1].get())); + + auto unique_data_vec = intra_process_buffer.get_all_data_unique(); + EXPECT_EQ(2L, unique_data_vec.size()); + EXPECT_EQ(3L, original_shared_msg.use_count()); + EXPECT_EQ(*original_shared_msg, *unique_data_vec[0]); + EXPECT_NE(original_message_pointer, reinterpret_cast(unique_data_vec[0].get())); + EXPECT_EQ(3L, original_shared_msg_2.use_count()); + EXPECT_EQ(*original_shared_msg_2, *unique_data_vec[1]); + EXPECT_NE(original_message_pointer_2, reinterpret_cast(unique_data_vec[1].get())); } /* @@ -237,6 +264,33 @@ TEST(TestIntraProcessBuffer, unique_buffer_consume) { EXPECT_EQ(original_value, *popped_unique_msg); EXPECT_EQ(original_message_pointer, popped_message_pointer); + + original_unique_msg = std::make_unique('c'); + original_message_pointer = reinterpret_cast(original_unique_msg.get()); + original_value = *original_unique_msg; + auto original_unique_msg_2 = std::make_unique('d'); + auto original_message_pointer_2 = reinterpret_cast(original_unique_msg.get()); + auto original_value_2 = *original_unique_msg_2; + intra_process_buffer.add_unique(std::move(original_unique_msg)); + intra_process_buffer.add_unique(std::move(original_unique_msg_2)); + + auto shared_data_vec = intra_process_buffer.get_all_data_shared(); + EXPECT_EQ(2L, shared_data_vec.size()); + EXPECT_EQ(1L, shared_data_vec[0].use_count()); + EXPECT_EQ(original_value, *shared_data_vec[0]); + EXPECT_NE(original_message_pointer, reinterpret_cast(shared_data_vec[0].get())); + EXPECT_EQ(1L, shared_data_vec[1].use_count()); + EXPECT_EQ(original_value_2, *shared_data_vec[1]); + EXPECT_NE(original_message_pointer_2, reinterpret_cast(shared_data_vec[1].get())); + + auto unique_data_vec = intra_process_buffer.get_all_data_unique(); + EXPECT_EQ(2L, unique_data_vec.size()); + EXPECT_EQ(1L, shared_data_vec[0].use_count()); + EXPECT_EQ(original_value, *unique_data_vec[0]); + EXPECT_NE(original_message_pointer, reinterpret_cast(unique_data_vec[0].get())); + EXPECT_EQ(1L, shared_data_vec[1].use_count()); + EXPECT_EQ(original_value_2, *unique_data_vec[1]); + EXPECT_NE(original_message_pointer_2, reinterpret_cast(unique_data_vec[1].get())); } /* diff --git a/rclcpp/test/rclcpp/test_intra_process_manager.cpp b/rclcpp/test/rclcpp/test_intra_process_manager.cpp index da863f3e3c..046a5228da 100644 --- a/rclcpp/test/rclcpp/test_intra_process_manager.cpp +++ b/rclcpp/test/rclcpp/test_intra_process_manager.cpp @@ -31,6 +31,111 @@ // NOLINTNEXTLINE(build/include_order) #include +namespace rclcpp +{ +namespace experimental +{ +namespace buffers +{ +namespace mock +{ + +class IntraProcessBufferBase +{ +public: + RCLCPP_SMART_PTR_ALIASES_ONLY(IntraProcessBufferBase) + + virtual ~IntraProcessBufferBase() {} +}; + +} // namespace mock +} // namespace buffers +} // namespace experimental +} // namespace rclcpp + +namespace rclcpp +{ +namespace experimental +{ +namespace buffers +{ +namespace mock +{ +template< + typename MessageT, + typename Alloc = std::allocator, + typename MessageDeleter = std::default_delete> +class IntraProcessBuffer : public IntraProcessBufferBase +{ +public: + using ConstMessageSharedPtr = std::shared_ptr; + using MessageUniquePtr = std::unique_ptr; + + RCLCPP_SMART_PTR_DEFINITIONS(IntraProcessBuffer) + + IntraProcessBuffer() + {} + + void add(ConstMessageSharedPtr msg) + { + message_ptr = reinterpret_cast(msg.get()); + shared_msg = msg; + ++num_msgs; + } + + void add(MessageUniquePtr msg) + { + message_ptr = reinterpret_cast(msg.get()); + unique_msg = std::move(msg); + ++num_msgs; + } + + void pop(std::uintptr_t & msg_ptr) + { + msg_ptr = message_ptr; + message_ptr = 0; + --num_msgs; + } + + size_t size() const + { + return num_msgs; + } + + std::vector get_all_data_shared() + { + if (shared_msg) { + return {shared_msg}; + } else if (unique_msg) { + return {std::make_shared(*unique_msg)}; + } + return {}; + } + + std::vector get_all_data_unique() + { + std::vector result; + if (shared_msg) { + result.push_back(std::make_unique(*shared_msg)); + } else if (unique_msg) { + result.push_back(std::make_unique(*unique_msg)); + } + return result; + } + + // need to store the messages somewhere otherwise the memory address will be reused + ConstMessageSharedPtr shared_msg; + MessageUniquePtr unique_msg; + + std::uintptr_t message_ptr; + // count add and pop + size_t num_msgs = 0u; +}; + +} // namespace mock +} // namespace buffers +} // namespace experimental +} // namespace rclcpp namespace rclcpp { // forward declaration @@ -80,6 +185,12 @@ class PublisherBase return qos_profile; } + bool + is_durability_transient_local() const + { + return qos_profile.durability() == rclcpp::DurabilityPolicy::TransientLocal; + } + bool operator==(const rmw_gid_t & gid) const { @@ -117,6 +228,9 @@ class Publisher : public PublisherBase { auto allocator = std::make_shared(); message_allocator_ = std::make_shared(*allocator.get()); + if (qos.durability() == rclcpp::DurabilityPolicy::TransientLocal) { + buffer = std::make_shared>(); + } } // The following functions use the IntraProcessManager @@ -124,75 +238,12 @@ class Publisher : public PublisherBase void publish(MessageUniquePtr msg); std::shared_ptr message_allocator_; + typename rclcpp::experimental::buffers::mock::IntraProcessBuffer::SharedPtr buffer{nullptr}; }; } // namespace mock } // namespace rclcpp -namespace rclcpp -{ -namespace experimental -{ -namespace buffers -{ -namespace mock -{ -template< - typename MessageT, - typename Alloc = std::allocator, - typename MessageDeleter = std::default_delete> -class IntraProcessBuffer -{ -public: - using ConstMessageSharedPtr = std::shared_ptr; - using MessageUniquePtr = std::unique_ptr; - - RCLCPP_SMART_PTR_DEFINITIONS(IntraProcessBuffer) - - IntraProcessBuffer() - {} - - void add(ConstMessageSharedPtr msg) - { - message_ptr = reinterpret_cast(msg.get()); - shared_msg = msg; - ++num_msgs; - } - - void add(MessageUniquePtr msg) - { - message_ptr = reinterpret_cast(msg.get()); - unique_msg = std::move(msg); - ++num_msgs; - } - - void pop(std::uintptr_t & msg_ptr) - { - msg_ptr = message_ptr; - message_ptr = 0; - --num_msgs; - } - - size_t size() const - { - return num_msgs; - } - - // need to store the messages somewhere otherwise the memory address will be reused - ConstMessageSharedPtr shared_msg; - MessageUniquePtr unique_msg; - - std::uintptr_t message_ptr; - // count add and pop - size_t num_msgs = 0u; -}; - -} // namespace mock -} // namespace buffers -} // namespace experimental -} // namespace rclcpp - - namespace rclcpp { namespace experimental @@ -231,6 +282,12 @@ class SubscriptionIntraProcessBase return topic_name.c_str(); } + bool + is_durability_transient_local() const + { + return qos_profile.durability() == rclcpp::DurabilityPolicy::TransientLocal; + } + virtual size_t available_capacity() const = 0; @@ -331,12 +388,14 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBuffer< // Prevent the header files of the mocked classes to be included #define RCLCPP__PUBLISHER_HPP_ #define RCLCPP__PUBLISHER_BASE_HPP_ +#define RCLCPP__EXPERIMENTAL__BUFFERS__INTRA_PROCESS_BUFFER_HPP_ #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_ #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_ #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_ // Force ipm to use our mock publisher class. #define Publisher mock::Publisher #define PublisherBase mock::PublisherBase +#define IntraProcessBufferBase mock::IntraProcessBufferBase #define IntraProcessBuffer mock::IntraProcessBuffer #define SubscriptionIntraProcessBase mock::SubscriptionIntraProcessBase #define SubscriptionIntraProcessBuffer mock::SubscriptionIntraProcessBuffer @@ -368,28 +427,36 @@ void Publisher::publish(MessageUniquePtr msg) throw std::runtime_error("cannot publish msg which is a null pointer"); } - ipm->template do_intra_process_publish( - intra_process_publisher_id_, - std::move(msg), - *message_allocator_); + if (buffer) { + auto shared_msg = ipm->template do_intra_process_publish_and_return_shared( + intra_process_publisher_id_, + std::move(msg), + *message_allocator_); + buffer->add(shared_msg); + } else { + ipm->template do_intra_process_publish( + intra_process_publisher_id_, + std::move(msg), + *message_allocator_); + } } } // namespace mock } // namespace rclcpp /* - This tests how the class connects and disconnects publishers and subscriptions: - - Creates 2 publishers on different topics and a subscription to one of them. - Add everything to the intra-process manager. - - All the entities are expected to have different ids. - - Check the subscriptions count for each publisher. - - One of the publishers is expected to have 1 subscription, while the other 0. - - Check the subscription count for a non existing publisher id, should return 0. - - Add a new publisher and a new subscription both with reliable QoS. - - The subscriptions count of the previous publisher is expected to remain unchanged, - while the new publisher is expected to have 2 subscriptions (it's compatible with both QoS). - - Remove the just added subscriptions. - - The count for the last publisher is expected to decrease to 1. + * This tests how the class connects and disconnects publishers and subscriptions: + * - Creates 2 publishers on different topics and a subscription to one of them. + * Add everything to the intra-process manager. + * - All the entities are expected to have different ids. + * - Check the subscriptions count for each publisher. + * - One of the publishers is expected to have 1 subscription, while the other 0. + * - Check the subscription count for a non existing publisher id, should return 0. + * - Add a new publisher and a new subscription both with reliable QoS. + * - The subscriptions count of the previous publisher is expected to remain unchanged, + * while the new publisher is expected to have 2 subscriptions (it's compatible with both QoS). + * - Remove the just added subscriptions. + * - The count for the last publisher is expected to decrease to 1. */ TEST(TestIntraProcessManager, add_pub_sub) { using IntraProcessManagerT = rclcpp::experimental::IntraProcessManager; @@ -408,7 +475,7 @@ TEST(TestIntraProcessManager, add_pub_sub) { auto p1_id = ipm->add_publisher(p1); auto p2_id = ipm->add_publisher(p2); - auto s1_id = ipm->add_subscription(s1); + auto s1_id = ipm->template add_subscription(s1); bool unique_ids = p1_id != p2_id && p2_id != s1_id; ASSERT_TRUE(unique_ids); @@ -424,7 +491,7 @@ TEST(TestIntraProcessManager, add_pub_sub) { auto s2 = std::make_shared(rclcpp::QoS(10).reliable()); - auto s2_id = ipm->add_subscription(s2); + auto s2_id = ipm->template add_subscription(s2); auto p3_id = ipm->add_publisher(p3); p1_subs = ipm->get_subscription_count(p1_id); @@ -444,14 +511,14 @@ TEST(TestIntraProcessManager, add_pub_sub) { } /* - This tests the minimal usage of the class where there is a single subscription per publisher: - - Publishes a unique_ptr message with a subscription requesting ownership. - - The received message is expected to be the same. - - Remove the first subscription from ipm and add a new one. - - Publishes a unique_ptr message with a subscription not requesting ownership. - - The received message is expected to be the same, the first subscription do not receive it. - - Publishes a shared_ptr message with a subscription not requesting ownership. - - The received message is expected to be the same. + * This tests the minimal usage of the class where there is a single subscription per publisher: + * - Publishes a unique_ptr message with a subscription requesting ownership. + * - The received message is expected to be the same. + * - Remove the first subscription from ipm and add a new one. + * - Publishes a unique_ptr message with a subscription not requesting ownership. + * - The received message is expected to be the same, the first subscription do not receive it. + * - Publishes a shared_ptr message with a subscription not requesting ownership. + * - The received message is expected to be the same. */ TEST(TestIntraProcessManager, single_subscription) { using IntraProcessManagerT = rclcpp::experimental::IntraProcessManager; @@ -467,7 +534,7 @@ TEST(TestIntraProcessManager, single_subscription) { auto s1 = std::make_shared(); s1->take_shared_method = false; - auto s1_id = ipm->add_subscription(s1); + auto s1_id = ipm->template add_subscription(s1); auto unique_msg = std::make_unique(); auto original_message_pointer = reinterpret_cast(unique_msg.get()); @@ -478,7 +545,7 @@ TEST(TestIntraProcessManager, single_subscription) { ipm->remove_subscription(s1_id); auto s2 = std::make_shared(); s2->take_shared_method = true; - auto s2_id = ipm->add_subscription(s2); + auto s2_id = ipm->template add_subscription(s2); (void)s2_id; unique_msg = std::make_unique(); @@ -497,15 +564,15 @@ TEST(TestIntraProcessManager, single_subscription) { } /* - This tests the usage of the class where there are multiple subscriptions of the same type: - - Publishes a unique_ptr message with 2 subscriptions requesting ownership. - - One is expected to receive the published message, while the other will receive a copy. - - Publishes a unique_ptr message with 2 subscriptions not requesting ownership. - - Both received messages are expected to be the same as the published one. - - Publishes a shared_ptr message with 2 subscriptions requesting ownership. - - Both received messages are expected to be a copy of the published one. - - Publishes a shared_ptr message with 2 subscriptions not requesting ownership. - - Both received messages are expected to be the same as the published one. + * This tests the usage of the class where there are multiple subscriptions of the same type: + * - Publishes a unique_ptr message with 2 subscriptions requesting ownership. + * - One is expected to receive the published message, while the other will receive a copy. + * - Publishes a unique_ptr message with 2 subscriptions not requesting ownership. + * - Both received messages are expected to be the same as the published one. + * - Publishes a shared_ptr message with 2 subscriptions requesting ownership. + * - Both received messages are expected to be a copy of the published one. + * - Publishes a shared_ptr message with 2 subscriptions not requesting ownership. + * - Both received messages are expected to be the same as the published one. */ TEST(TestIntraProcessManager, multiple_subscriptions_same_type) { using IntraProcessManagerT = rclcpp::experimental::IntraProcessManager; @@ -521,11 +588,11 @@ TEST(TestIntraProcessManager, multiple_subscriptions_same_type) { auto s1 = std::make_shared(); s1->take_shared_method = false; - auto s1_id = ipm->add_subscription(s1); + auto s1_id = ipm->template add_subscription(s1); auto s2 = std::make_shared(); s2->take_shared_method = false; - auto s2_id = ipm->add_subscription(s2); + auto s2_id = ipm->template add_subscription(s2); auto unique_msg = std::make_unique(); auto original_message_pointer = reinterpret_cast(unique_msg.get()); @@ -541,11 +608,11 @@ TEST(TestIntraProcessManager, multiple_subscriptions_same_type) { auto s3 = std::make_shared(); s3->take_shared_method = true; - auto s3_id = ipm->add_subscription(s3); + auto s3_id = ipm->template add_subscription(s3); auto s4 = std::make_shared(); s4->take_shared_method = true; - auto s4_id = ipm->add_subscription(s4); + auto s4_id = ipm->template add_subscription(s4); unique_msg = std::make_unique(); original_message_pointer = reinterpret_cast(unique_msg.get()); @@ -560,11 +627,11 @@ TEST(TestIntraProcessManager, multiple_subscriptions_same_type) { auto s5 = std::make_shared(); s5->take_shared_method = false; - auto s5_id = ipm->add_subscription(s5); + auto s5_id = ipm->template add_subscription(s5); auto s6 = std::make_shared(); s6->take_shared_method = false; - auto s6_id = ipm->add_subscription(s6); + auto s6_id = ipm->template add_subscription(s6); unique_msg = std::make_unique(); original_message_pointer = reinterpret_cast(unique_msg.get()); @@ -580,12 +647,12 @@ TEST(TestIntraProcessManager, multiple_subscriptions_same_type) { auto s7 = std::make_shared(); s7->take_shared_method = true; - auto s7_id = ipm->add_subscription(s7); + auto s7_id = ipm->template add_subscription(s7); (void)s7_id; auto s8 = std::make_shared(); s8->take_shared_method = true; - auto s8_id = ipm->add_subscription(s8); + auto s8_id = ipm->template add_subscription(s8); (void)s8_id; unique_msg = std::make_unique(); @@ -598,20 +665,20 @@ TEST(TestIntraProcessManager, multiple_subscriptions_same_type) { } /* - This tests the usage of the class where there are multiple subscriptions of different types: - - Publishes a unique_ptr message with 1 subscription requesting ownership and 1 not. - - The one requesting ownership is expected to receive the published message, - while the other is expected to receive a copy. - - Publishes a unique_ptr message with 2 subscriptions requesting ownership and 1 not. - - One of the subscriptions requesting ownership is expected to receive the published message, - while both other subscriptions are expected to receive different copies. - - Publishes a unique_ptr message with 2 subscriptions requesting ownership and 2 not. - - The 2 subscriptions not requesting ownership are expected to both receive the same copy - of the message, one of the subscription requesting ownership is expected to receive a - different copy, while the last is expected to receive the published message. - - Publishes a shared_ptr message with 1 subscription requesting ownership and 1 not. - - The subscription requesting ownership is expected to receive a copy of the message, while - the other is expected to receive the published message + * This tests the usage of the class where there are multiple subscriptions of different types: + * - Publishes a unique_ptr message with 1 subscription requesting ownership and 1 not. + * - The one requesting ownership is expected to receive the published message, + * while the other is expected to receive a copy. + * - Publishes a unique_ptr message with 2 subscriptions requesting ownership and 1 not. + * - One of the subscriptions requesting ownership is expected to receive the published message, + * while both other subscriptions are expected to receive different copies. + * - Publishes a unique_ptr message with 2 subscriptions requesting ownership and 2 not. + * - The 2 subscriptions not requesting ownership are expected to both receive the same copy + * of the message, one of the subscription requesting ownership is expected to receive a + * different copy, while the last is expected to receive the published message. + * - Publishes a shared_ptr message with 1 subscription requesting ownership and 1 not. + * - The subscription requesting ownership is expected to receive a copy of the message, while + * the other is expected to receive the published message */ TEST(TestIntraProcessManager, multiple_subscriptions_different_type) { using IntraProcessManagerT = rclcpp::experimental::IntraProcessManager; @@ -627,11 +694,11 @@ TEST(TestIntraProcessManager, multiple_subscriptions_different_type) { auto s1 = std::make_shared(); s1->take_shared_method = true; - auto s1_id = ipm->add_subscription(s1); + auto s1_id = ipm->template add_subscription(s1); auto s2 = std::make_shared(); s2->take_shared_method = false; - auto s2_id = ipm->add_subscription(s2); + auto s2_id = ipm->template add_subscription(s2); auto unique_msg = std::make_unique(); auto original_message_pointer = reinterpret_cast(unique_msg.get()); @@ -646,15 +713,15 @@ TEST(TestIntraProcessManager, multiple_subscriptions_different_type) { auto s3 = std::make_shared(); s3->take_shared_method = false; - auto s3_id = ipm->add_subscription(s3); + auto s3_id = ipm->template add_subscription(s3); auto s4 = std::make_shared(); s4->take_shared_method = false; - auto s4_id = ipm->add_subscription(s4); + auto s4_id = ipm->template add_subscription(s4); auto s5 = std::make_shared(); s5->take_shared_method = true; - auto s5_id = ipm->add_subscription(s5); + auto s5_id = ipm->template add_subscription(s5); unique_msg = std::make_unique(); original_message_pointer = reinterpret_cast(unique_msg.get()); @@ -678,19 +745,19 @@ TEST(TestIntraProcessManager, multiple_subscriptions_different_type) { auto s6 = std::make_shared(); s6->take_shared_method = true; - auto s6_id = ipm->add_subscription(s6); + auto s6_id = ipm->template add_subscription(s6); auto s7 = std::make_shared(); s7->take_shared_method = true; - auto s7_id = ipm->add_subscription(s7); + auto s7_id = ipm->template add_subscription(s7); auto s8 = std::make_shared(); s8->take_shared_method = false; - auto s8_id = ipm->add_subscription(s8); + auto s8_id = ipm->template add_subscription(s8); auto s9 = std::make_shared(); s9->take_shared_method = false; - auto s9_id = ipm->add_subscription(s9); + auto s9_id = ipm->template add_subscription(s9); unique_msg = std::make_unique(); original_message_pointer = reinterpret_cast(unique_msg.get()); @@ -716,12 +783,12 @@ TEST(TestIntraProcessManager, multiple_subscriptions_different_type) { auto s10 = std::make_shared(); s10->take_shared_method = false; - auto s10_id = ipm->add_subscription(s10); + auto s10_id = ipm->template add_subscription(s10); (void)s10_id; auto s11 = std::make_shared(); s11->take_shared_method = true; - auto s11_id = ipm->add_subscription(s11); + auto s11_id = ipm->template add_subscription(s11); (void)s11_id; unique_msg = std::make_unique(); @@ -734,25 +801,25 @@ TEST(TestIntraProcessManager, multiple_subscriptions_different_type) { } /* - This tests the method "lowest_available_capacity": - - Creates 1 publisher. - - The available buffer capacity should be at least history size. - - Add 2 subscribers. - - Add everything to the intra-process manager. - - All the entities are expected to have different ids. - - Check the subscriptions count for the publisher. - - The available buffer capacity should be the history size. - - Publish one message (without receiving it). - - The available buffer capacity should decrease by 1. - - Publish another message (without receiving it). - - The available buffer capacity should decrease by 1. - - One subscriber receives one message. - - The available buffer capacity should stay the same, - as the other subscriber still has not freed its buffer. - - The other subscriber receives one message. - - The available buffer capacity should increase by 1. - - One subscription goes out of scope. - - The available buffer capacity should not change. + * This tests the method "lowest_available_capacity": + * - Creates 1 publisher. + * - The available buffer capacity should be at least history size. + * - Add 2 subscribers. + * - Add everything to the intra-process manager. + * - All the entities are expected to have different ids. + * - Check the subscriptions count for the publisher. + * - The available buffer capacity should be the history size. + * - Publish one message (without receiving it). + * - The available buffer capacity should decrease by 1. + * - Publish another message (without receiving it). + * - The available buffer capacity should decrease by 1. + * - One subscriber receives one message. + * - The available buffer capacity should stay the same, + * as the other subscriber still has not freed its buffer. + * - The other subscriber receives one message. + * - The available buffer capacity should increase by 1. + * - One subscription goes out of scope. + * - The available buffer capacity should not change. */ TEST(TestIntraProcessManager, lowest_available_capacity) { using IntraProcessManagerT = rclcpp::experimental::IntraProcessManager; @@ -776,8 +843,8 @@ TEST(TestIntraProcessManager, lowest_available_capacity) { ASSERT_LE(0u, c1); - auto s1_id = ipm->add_subscription(s1); - auto s2_id = ipm->add_subscription(s2); + auto s1_id = ipm->template add_subscription(s1); + auto s2_id = ipm->template add_subscription(s2); bool unique_ids = s1_id != s2_id && p1_id != s1_id; ASSERT_TRUE(unique_ids); @@ -820,3 +887,82 @@ TEST(TestIntraProcessManager, lowest_available_capacity) { c1 = ipm->lowest_available_capacity(p1_id); ASSERT_EQ(history_depth - 1u, c1); } + +/* + * This tests the check inside add_publisher for transient_local + * durability publishers + * - add_publisher should throw runtime_error when no valid buffer ptr + * is passed with a transient_local publisher + */ +TEST(TestIntraProcessManager, transient_local_invalid_buffer) { + using IntraProcessManagerT = rclcpp::experimental::IntraProcessManager; + using MessageT = rcl_interfaces::msg::Log; + using PublisherT = rclcpp::mock::Publisher; + constexpr auto history_depth = 10u; + + auto ipm = std::make_shared(); + + auto p1 = std::make_shared(rclcpp::QoS(history_depth).transient_local()); + + ASSERT_THROW( + { + ipm->add_publisher(p1, nullptr); + }, std::runtime_error); +} + +/* + * This tests publishing function for transient_local durability publihers + * - A message is published before three transient_local subscriptions are added to + * ipm. Two of the subscriptions use take_shared method. Delivery of the message is verified + * along with the contents and pointer addresses from the subscriptions. + */ +TEST(TestIntraProcessManager, transient_local) { + using IntraProcessManagerT = rclcpp::experimental::IntraProcessManager; + using MessageT = rcl_interfaces::msg::Log; + using PublisherT = rclcpp::mock::Publisher; + using SubscriptionIntraProcessT = rclcpp::experimental::mock::SubscriptionIntraProcess; + + constexpr auto history_depth = 10u; + + auto ipm = std::make_shared(); + + auto p1 = std::make_shared(rclcpp::QoS(history_depth).transient_local()); + + auto s1 = + std::make_shared(rclcpp::QoS(history_depth).transient_local()); + auto s2 = + std::make_shared(rclcpp::QoS(history_depth).transient_local()); + auto s3 = + std::make_shared(rclcpp::QoS(history_depth).transient_local()); + + s1->take_shared_method = false; + s2->take_shared_method = true; + s3->take_shared_method = true; + + auto p1_id = ipm->add_publisher(p1, p1->buffer); + + p1->set_intra_process_manager(p1_id, ipm); + + auto unique_msg = std::make_unique(); + unique_msg->msg = "Test"; + p1->publish(std::move(unique_msg)); + + ipm->template add_subscription(s1); + ipm->template add_subscription(s2); + ipm->template add_subscription(s3); + + auto received_message_pointer_1 = s1->pop(); + auto received_message_pointer_2 = s2->pop(); + auto received_message_pointer_3 = s3->pop(); + ASSERT_NE(0u, received_message_pointer_1); + ASSERT_NE(0u, received_message_pointer_2); + ASSERT_NE(0u, received_message_pointer_3); + ASSERT_EQ(received_message_pointer_3, received_message_pointer_2); + ASSERT_EQ( + reinterpret_cast(received_message_pointer_1)->msg, + reinterpret_cast(received_message_pointer_2)->msg); + ASSERT_EQ( + reinterpret_cast(received_message_pointer_1)->msg, + reinterpret_cast(received_message_pointer_3)->msg); + ASSERT_EQ("Test", reinterpret_cast(received_message_pointer_1)->msg); +} diff --git a/rclcpp/test/rclcpp/test_publisher.cpp b/rclcpp/test/rclcpp/test_publisher.cpp index 3edb24c0dd..949e006997 100644 --- a/rclcpp/test/rclcpp/test_publisher.cpp +++ b/rclcpp/test/rclcpp/test_publisher.cpp @@ -196,11 +196,7 @@ static std::vector invalid_qos_profiles() { std::vector parameters; - parameters.reserve(2); - parameters.push_back( - TestParameters( - rclcpp::QoS(rclcpp::KeepLast(10)).transient_local(), - "transient_local_qos")); + parameters.reserve(1); parameters.push_back( TestParameters( rclcpp::QoS(rclcpp::KeepAll()), @@ -674,3 +670,96 @@ INSTANTIATE_TEST_SUITE_P( rclcpp::QoS(1).best_effort(), rclcpp::QoS(1).best_effort()), std::pair( rclcpp::QoS(1).reliable(), rclcpp::QoS(1).best_effort()))); + +TEST_F(TestPublisher, intra_process_transient_local) { + constexpr auto history_depth = 10u; + initialize(rclcpp::NodeOptions().use_intra_process_comms(true)); + rclcpp::PublisherOptionsWithAllocator> pub_options_ipm_disabled; + pub_options_ipm_disabled.use_intra_process_comm = rclcpp::IntraProcessSetting::Disable; + + rclcpp::PublisherOptionsWithAllocator> pub_options_ipm_enabled; + pub_options_ipm_enabled.use_intra_process_comm = rclcpp::IntraProcessSetting::Enable; + + auto pub_ipm_enabled_transient_local_enabled = node->create_publisher( + "topic1", + rclcpp::QoS(rclcpp::KeepLast(history_depth)).transient_local(), pub_options_ipm_enabled); + auto pub_ipm_disabled_transient_local_enabled = node->create_publisher( + "topic2", + rclcpp::QoS(rclcpp::KeepLast(history_depth)).transient_local(), pub_options_ipm_disabled); + auto pub_ipm_enabled_transient_local_disabled = node->create_publisher( + "topic3", + rclcpp::QoS(rclcpp::KeepLast(history_depth)), pub_options_ipm_enabled); + auto pub_ipm_disabled_transient_local_disabled = node->create_publisher( + "topic4", + rclcpp::QoS(rclcpp::KeepLast(history_depth)), pub_options_ipm_disabled); + + test_msgs::msg::Empty msg; + pub_ipm_enabled_transient_local_enabled->publish(msg); + pub_ipm_disabled_transient_local_enabled->publish(msg); + pub_ipm_enabled_transient_local_disabled->publish(msg); + pub_ipm_disabled_transient_local_disabled->publish(msg); + + auto do_nothing = [](std::shared_ptr) {}; + struct IntraProcessCallback + { + void callback_fun(size_t s) + { + (void) s; + called = true; + } + bool called = false; + }; + rclcpp::SubscriptionOptions sub_options_ipm_disabled; + sub_options_ipm_disabled.use_intra_process_comm = rclcpp::IntraProcessSetting::Disable; + rclcpp::SubscriptionOptions sub_options_ipm_enabled; + sub_options_ipm_enabled.use_intra_process_comm = rclcpp::IntraProcessSetting::Enable; + IntraProcessCallback callback1, callback2, callback3, callback4; + auto sub_ipm_enabled_transient_local_enabled = node->create_subscription( + "topic1", + rclcpp::QoS(rclcpp::KeepLast(history_depth)).transient_local(), + do_nothing, sub_options_ipm_enabled); + sub_ipm_enabled_transient_local_enabled->set_on_new_intra_process_message_callback( + std::bind(&IntraProcessCallback::callback_fun, &callback1, std::placeholders::_1)); + auto sub_ipm_disabled_transient_local_enabled = node->create_subscription( + "topic2", + rclcpp::QoS(rclcpp::KeepLast(history_depth)).transient_local(), + do_nothing, sub_options_ipm_disabled); + sub_ipm_disabled_transient_local_enabled->set_on_new_intra_process_message_callback( + std::bind(&IntraProcessCallback::callback_fun, &callback2, std::placeholders::_1)); + auto sub_ipm_enabled_transient_local_disabled = node->create_subscription( + "topic3", + rclcpp::QoS(rclcpp::KeepLast(history_depth)), + do_nothing, sub_options_ipm_enabled); + sub_ipm_enabled_transient_local_disabled->set_on_new_intra_process_message_callback( + std::bind(&IntraProcessCallback::callback_fun, &callback3, std::placeholders::_1)); + auto sub_ipm_disabled_transient_local_disabled = node->create_subscription( + "topic4", + rclcpp::QoS(rclcpp::KeepLast(history_depth)), + do_nothing, sub_options_ipm_disabled); + sub_ipm_disabled_transient_local_disabled->set_on_new_intra_process_message_callback( + std::bind(&IntraProcessCallback::callback_fun, &callback4, std::placeholders::_1)); + + EXPECT_TRUE(pub_ipm_enabled_transient_local_enabled->is_durability_transient_local()); + EXPECT_TRUE(pub_ipm_disabled_transient_local_enabled->is_durability_transient_local()); + EXPECT_FALSE(pub_ipm_enabled_transient_local_disabled->is_durability_transient_local()); + EXPECT_FALSE(pub_ipm_disabled_transient_local_disabled->is_durability_transient_local()); + + EXPECT_EQ(1, pub_ipm_enabled_transient_local_enabled->get_intra_process_subscription_count()); + EXPECT_EQ(0, pub_ipm_disabled_transient_local_enabled->get_intra_process_subscription_count()); + EXPECT_EQ(1, pub_ipm_enabled_transient_local_disabled->get_intra_process_subscription_count()); + EXPECT_EQ(0, pub_ipm_disabled_transient_local_disabled->get_intra_process_subscription_count()); + + EXPECT_EQ( + history_depth - 1u, + pub_ipm_enabled_transient_local_enabled->lowest_available_ipm_capacity()); + EXPECT_EQ(0, pub_ipm_disabled_transient_local_enabled->lowest_available_ipm_capacity()); + EXPECT_EQ( + history_depth, + pub_ipm_enabled_transient_local_disabled->lowest_available_ipm_capacity()); + EXPECT_EQ(0, pub_ipm_disabled_transient_local_disabled->lowest_available_ipm_capacity()); + + EXPECT_TRUE(callback1.called); + EXPECT_FALSE(callback2.called); + EXPECT_FALSE(callback3.called); + EXPECT_FALSE(callback4.called); +} diff --git a/rclcpp/test/rclcpp/test_ring_buffer_implementation.cpp b/rclcpp/test/rclcpp/test_ring_buffer_implementation.cpp index 07dfd8d584..0abd9b1a89 100644 --- a/rclcpp/test/rclcpp/test_ring_buffer_implementation.cpp +++ b/rclcpp/test/rclcpp/test_ring_buffer_implementation.cpp @@ -22,7 +22,7 @@ #include "rclcpp/experimental/buffers/ring_buffer_implementation.hpp" /* - Construtctor + * Construtctor */ TEST(TestRingBufferImplementation, constructor) { // Cannot create a buffer of size zero. @@ -37,10 +37,11 @@ TEST(TestRingBufferImplementation, constructor) { } /* - Basic usage - - insert data and check that it has data - - extract data - - overwrite old data writing over the buffer capacity + * Basic usage + * - insert data and check that it has data + * - get all data + * - extract data + * - overwrite old data writing over the buffer capacity */ TEST(TestRingBufferImplementation, basic_usage) { rclcpp::experimental::buffers::RingBufferImplementation rb(2); @@ -64,6 +65,12 @@ TEST(TestRingBufferImplementation, basic_usage) { rb.enqueue('d'); + const auto all_data_vec = rb.get_all_data(); + + EXPECT_EQ(2u, all_data_vec.size()); + EXPECT_EQ('c', all_data_vec[0]); + EXPECT_EQ('d', all_data_vec[1]); + EXPECT_EQ(true, rb.has_data()); EXPECT_EQ(true, rb.is_full()); @@ -79,3 +86,56 @@ TEST(TestRingBufferImplementation, basic_usage) { EXPECT_EQ(false, rb.has_data()); EXPECT_EQ(false, rb.is_full()); } + +/* + * Basic usage with unique_ptr + * - insert unique_ptr data and check that it has data + * - get all data + * - extract data + * - overwrite old data writing over the buffer capacity + */ +TEST(TestRingBufferImplementation, basic_usage_unique_ptr) { + rclcpp::experimental::buffers::RingBufferImplementation> rb(2); + + auto a = std::make_unique('a'); + auto b = std::make_unique('b'); + auto original_b_pointer = reinterpret_cast(b.get()); + auto c = std::make_unique('c'); + auto original_c_pointer = reinterpret_cast(c.get()); + + rb.enqueue(std::move(a)); + + EXPECT_EQ(true, rb.has_data()); + EXPECT_EQ(false, rb.is_full()); + + rb.enqueue(std::move(b)); + rb.enqueue(std::move(c)); + + EXPECT_EQ(true, rb.has_data()); + EXPECT_EQ(true, rb.is_full()); + + const auto all_data_vec = rb.get_all_data(); + + EXPECT_EQ(2u, all_data_vec.size()); + EXPECT_EQ('b', *all_data_vec[0]); + EXPECT_EQ('c', *all_data_vec[1]); + EXPECT_NE(original_b_pointer, reinterpret_cast(all_data_vec[0].get())); + EXPECT_NE(original_c_pointer, reinterpret_cast(all_data_vec[1].get())); + + EXPECT_EQ(true, rb.has_data()); + EXPECT_EQ(true, rb.is_full()); + + auto uni_ptr = rb.dequeue(); + + EXPECT_EQ('b', *uni_ptr); + EXPECT_EQ(original_b_pointer, reinterpret_cast(uni_ptr.get())); + EXPECT_EQ(true, rb.has_data()); + EXPECT_EQ(false, rb.is_full()); + + uni_ptr = rb.dequeue(); + + EXPECT_EQ('c', *uni_ptr); + EXPECT_EQ(original_c_pointer, reinterpret_cast(uni_ptr.get())); + EXPECT_EQ(false, rb.has_data()); + EXPECT_EQ(false, rb.is_full()); +} diff --git a/rclcpp/test/rclcpp/test_subscription.cpp b/rclcpp/test/rclcpp/test_subscription.cpp index 867171848c..06f6f9785b 100644 --- a/rclcpp/test/rclcpp/test_subscription.cpp +++ b/rclcpp/test/rclcpp/test_subscription.cpp @@ -611,11 +611,7 @@ static std::vector invalid_qos_profiles() { std::vector parameters; - parameters.reserve(3); - parameters.push_back( - TestParameters( - rclcpp::QoS(rclcpp::KeepLast(10)).transient_local(), - "transient_local_qos")); + parameters.reserve(1); parameters.push_back( TestParameters( rclcpp::QoS(rclcpp::KeepAll()),