From 50619d1914871423b85ede53af5ee8f71d159ee5 Mon Sep 17 00:00:00 2001 From: Jeffery Hsu Date: Thu, 18 Jan 2024 10:39:59 -0600 Subject: [PATCH] Add transient local durability support to publisher and subscriptions when using intra-process communication (#2303) * Add intra process transient local durability support to publisher and subscription Signed-off-by: Jeffery Hsu * Remove durability_is_transient_local_ from publisher_base Signed-off-by: Jeffery Hsu * Design changes that move most transient local publish functionalities out of intra process manager into intra process manager Signed-off-by: Jeffery Hsu * Move transient local publish to a separate function Signed-off-by: Jeffery Hsu * Remove publisher buffer weak ptr from intra process manager when it associated publisher is removed. Signed-off-by: Jeffery Hsu * Remove incorrectly placed RCLCPP_PUBLIC Signed-off-by: Jeffery Hsu * Add missing RCLCPP_PUBLIC Signed-off-by: Jeffery Hsu * Expand RingBufferImplementation beyond shared_ptr and unique_ptr Signed-off-by: Jeffery Hsu * Comment and format fix Signed-off-by: Jeffery Hsu --------- Signed-off-by: Jeffery Hsu --- rclcpp/CMakeLists.txt | 1 + .../resolve_intra_process_buffer_type.hpp | 5 + .../buffers/buffer_implementation_base.hpp | 4 + .../buffers/intra_process_buffer.hpp | 79 ++++ .../buffers/ring_buffer_implementation.hpp | 77 ++++ .../experimental/intra_process_manager.hpp | 126 +++++- .../subscription_intra_process_base.hpp | 4 + rclcpp/include/rclcpp/publisher.hpp | 47 +- rclcpp/include/rclcpp/publisher_base.hpp | 6 + rclcpp/include/rclcpp/publisher_options.hpp | 4 + rclcpp/include/rclcpp/subscription.hpp | 7 +- .../resolve_intra_process_buffer_type.cpp | 37 ++ rclcpp/src/rclcpp/intra_process_manager.cpp | 38 +- rclcpp/src/rclcpp/publisher_base.cpp | 7 + .../subscription_intra_process_base.cpp | 6 + .../test/rclcpp/test_intra_process_buffer.cpp | 54 +++ .../rclcpp/test_intra_process_manager.cpp | 404 ++++++++++++------ rclcpp/test/rclcpp/test_publisher.cpp | 99 ++++- .../test_ring_buffer_implementation.cpp | 70 ++- rclcpp/test/rclcpp/test_service.cpp | 10 +- rclcpp/test/rclcpp/test_subscription.cpp | 333 +++++++-------- 21 files changed, 1062 insertions(+), 356 deletions(-) create mode 100644 rclcpp/src/rclcpp/detail/resolve_intra_process_buffer_type.cpp diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 5c4d8691ce..b1fd257d5d 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -46,6 +46,7 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/context.cpp src/rclcpp/contexts/default_context.cpp src/rclcpp/detail/add_guard_condition_to_rcl_wait_set.cpp + src/rclcpp/detail/resolve_intra_process_buffer_type.cpp src/rclcpp/detail/resolve_parameter_overrides.cpp src/rclcpp/detail/rmw_implementation_specific_payload.cpp src/rclcpp/detail/rmw_implementation_specific_publisher_payload.cpp diff --git a/rclcpp/include/rclcpp/detail/resolve_intra_process_buffer_type.hpp b/rclcpp/include/rclcpp/detail/resolve_intra_process_buffer_type.hpp index e7196a1e11..234316a8f0 100644 --- a/rclcpp/include/rclcpp/detail/resolve_intra_process_buffer_type.hpp +++ b/rclcpp/include/rclcpp/detail/resolve_intra_process_buffer_type.hpp @@ -47,6 +47,11 @@ resolve_intra_process_buffer_type( return resolved_buffer_type; } +RCLCPP_PUBLIC +rclcpp::IntraProcessBufferType +resolve_intra_process_buffer_type( + const rclcpp::IntraProcessBufferType buffer_type); + } // namespace detail } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/experimental/buffers/buffer_implementation_base.hpp b/rclcpp/include/rclcpp/experimental/buffers/buffer_implementation_base.hpp index 1e5346116a..2d3c0a02f5 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/buffer_implementation_base.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/buffer_implementation_base.hpp @@ -15,6 +15,8 @@ #ifndef RCLCPP__EXPERIMENTAL__BUFFERS__BUFFER_IMPLEMENTATION_BASE_HPP_ #define RCLCPP__EXPERIMENTAL__BUFFERS__BUFFER_IMPLEMENTATION_BASE_HPP_ +#include + namespace rclcpp { namespace experimental @@ -31,6 +33,8 @@ class BufferImplementationBase virtual BufferT dequeue() = 0; virtual void enqueue(BufferT request) = 0; + virtual std::vector get_all_data() = 0; + virtual void clear() = 0; virtual bool has_data() const = 0; }; diff --git a/rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp b/rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp index b6205743c6..3d2f5efec8 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp @@ -19,6 +19,7 @@ #include #include #include +#include #include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/allocator/allocator_deleter.hpp" @@ -65,6 +66,9 @@ class IntraProcessBuffer : public IntraProcessBufferBase virtual MessageSharedPtr consume_shared() = 0; virtual MessageUniquePtr consume_unique() = 0; + + virtual std::vector get_all_data_shared() = 0; + virtual std::vector get_all_data_unique() = 0; }; template< @@ -128,6 +132,16 @@ class TypedIntraProcessBuffer : public IntraProcessBuffer(); } + std::vector get_all_data_shared() override + { + return get_all_data_shared_impl(); + } + + std::vector get_all_data_unique() override + { + return get_all_data_unique_impl(); + } + bool has_data() const override { return buffer_->has_data(); @@ -237,6 +251,71 @@ class TypedIntraProcessBuffer : public IntraProcessBufferdequeue(); } + + // MessageSharedPtr to MessageSharedPtr + template + typename std::enable_if< + std::is_same::value, + std::vector + >::type + get_all_data_shared_impl() + { + return buffer_->get_all_data(); + } + + // MessageUniquePtr to MessageSharedPtr + template + typename std::enable_if< + std::is_same::value, + std::vector + >::type + get_all_data_shared_impl() + { + std::vector result; + auto uni_ptr_vec = buffer_->get_all_data(); + result.reserve(uni_ptr_vec.size()); + for (MessageUniquePtr & uni_ptr : uni_ptr_vec) { + result.emplace_back(std::move(uni_ptr)); + } + return result; + } + + // MessageSharedPtr to MessageUniquePtr + template + typename std::enable_if< + std::is_same::value, + std::vector + >::type + get_all_data_unique_impl() + { + std::vector result; + auto shared_ptr_vec = buffer_->get_all_data(); + result.reserve(shared_ptr_vec.size()); + for (MessageSharedPtr shared_msg : shared_ptr_vec) { + MessageUniquePtr unique_msg; + MessageDeleter * deleter = std::get_deleter(shared_msg); + auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1); + MessageAllocTraits::construct(*message_allocator_.get(), ptr, *shared_msg); + if (deleter) { + unique_msg = MessageUniquePtr(ptr, *deleter); + } else { + unique_msg = MessageUniquePtr(ptr); + } + result.push_back(std::move(unique_msg)); + } + return result; + } + + // MessageUniquePtr to MessageUniquePtr + template + typename std::enable_if< + std::is_same::value, + std::vector + >::type + get_all_data_unique_impl() + { + return buffer_->get_all_data(); + } }; template diff --git a/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp b/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp index 54d25d596e..fbca3ec89e 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp @@ -15,6 +15,7 @@ #ifndef RCLCPP__EXPERIMENTAL__BUFFERS__RING_BUFFER_IMPLEMENTATION_HPP_ #define RCLCPP__EXPERIMENTAL__BUFFERS__RING_BUFFER_IMPLEMENTATION_HPP_ +#include #include #include #include @@ -110,6 +111,17 @@ class RingBufferImplementation : public BufferImplementationBase return request; } + /// Get all the elements from the ring buffer + /** + * This member function is thread-safe. + * + * \return a vector containing all the elements from the ring buffer + */ + std::vector get_all_data() override + { + return get_all_data_impl(); + } + /// Get the next index value for the ring buffer /** * This member function is thread-safe. @@ -189,6 +201,71 @@ class RingBufferImplementation : public BufferImplementationBase return size_ == capacity_; } + /// Traits for checking if a type is std::unique_ptr + template + struct is_std_unique_ptr final : std::false_type {}; + template + struct is_std_unique_ptr> final : std::true_type + { + typedef T Ptr_type; + }; + + /// Get all the elements from the ring buffer + /** + * This member function is thread-safe. + * Two versions for the implementation of the function. + * One for buffer containing unique_ptr and the other for other types + * + * \return a vector containing all the elements from the ring buffer + */ + template::value && + std::is_copy_constructible< + typename is_std_unique_ptr::Ptr_type + >::value, + void> * = nullptr> + std::vector get_all_data_impl() + { + std::lock_guard lock(mutex_); + std::vector result_vtr; + result_vtr.reserve(size_); + for (size_t id = 0; id < size_; ++id) { + result_vtr.emplace_back( + new typename is_std_unique_ptr::Ptr_type( + *(ring_buffer_[(read_index_ + id) % capacity_]))); + } + return result_vtr; + } + + template::value, void> * = nullptr> + std::vector get_all_data_impl() + { + std::lock_guard lock(mutex_); + std::vector result_vtr; + result_vtr.reserve(size_); + for (size_t id = 0; id < size_; ++id) { + result_vtr.emplace_back(ring_buffer_[(read_index_ + id) % capacity_]); + } + return result_vtr; + } + + template::value && + !std::is_copy_constructible::value, void> * = nullptr> + std::vector get_all_data_impl() + { + throw std::logic_error("Underlined type results in invalid get_all_data_impl()"); + return {}; + } + + template::value && + !std::is_copy_constructible::Ptr_type>::value, + void> * = nullptr> + std::vector get_all_data_impl() + { + throw std::logic_error("Underlined type in unique_ptr results in invalid get_all_data_impl()"); + return {}; + } + size_t capacity_; std::vector ring_buffer_; diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index 3feada97d6..d6c4281f34 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -32,6 +32,8 @@ #include "rclcpp/experimental/action_client_intra_process_base.hpp" #include "rclcpp/experimental/action_server_intra_process.hpp" #include "rclcpp/experimental/action_server_intra_process_base.hpp" +#include "rclcpp/experimental/buffers/intra_process_buffer.hpp" +#include "rclcpp/experimental/buffers/intra_process_buffer.hpp" #include "rclcpp/experimental/client_intra_process.hpp" #include "rclcpp/experimental/client_intra_process_base.hpp" #include "rclcpp/experimental/ros_message_intra_process_buffer.hpp" @@ -120,9 +122,40 @@ class IntraProcessManager * \param subscription the SubscriptionIntraProcess to register. * \return an unsigned 64-bit integer which is the subscription's unique id. */ - RCLCPP_PUBLIC + template< + typename ROSMessageType, + typename Alloc = std::allocator + > uint64_t - add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription); + add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription) + { + std::unique_lock lock(mutex_); + + uint64_t sub_id = IntraProcessManager::get_next_unique_id(); + + subscriptions_[sub_id] = subscription; + + // adds the subscription id to all the matchable publishers + for (auto & pair : publishers_) { + auto publisher = pair.second.lock(); + if (!publisher) { + continue; + } + if (can_communicate(publisher, subscription)) { + uint64_t pub_id = pair.first; + insert_sub_id_for_pub(sub_id, pub_id, subscription->use_take_shared_method()); + if (publisher->is_durability_transient_local() && + subscription->is_durability_transient_local()) + { + do_transient_local_publish( + pub_id, sub_id, + subscription->use_take_shared_method()); + } + } + } + + return sub_id; + } /// Register an intra-process client with the manager, returns the client unique id. /** @@ -211,14 +244,21 @@ class IntraProcessManager * This method stores the publisher intra process object, together with * the information of its wrapped publisher (i.e. topic name and QoS). * + * If the publisher's durability is transient local, its buffer pointer should + * be passed and the method will store it as well. + * * In addition this generates a unique intra process id for the publisher. * * \param publisher publisher to be registered with the manager. + * \param buffer publisher's buffer to be stored if its duability is transient local. * \return an unsigned 64-bit integer which is the publisher's unique id. */ RCLCPP_PUBLIC uint64_t - add_publisher(rclcpp::PublisherBase::SharedPtr publisher); + add_publisher( + rclcpp::PublisherBase::SharedPtr publisher, + rclcpp::experimental::buffers::IntraProcessBufferBase::SharedPtr buffer = + rclcpp::experimental::buffers::IntraProcessBufferBase::SharedPtr()); /// Unregister a publisher using the publisher's unique id. /** @@ -713,6 +753,34 @@ class IntraProcessManager } } + template< + typename MessageT, + typename Alloc, + typename Deleter, + typename ROSMessageType> + void + add_shared_msg_to_buffer( + std::shared_ptr message, + uint64_t subscription_id) + { + add_shared_msg_to_buffers(message, {subscription_id}); + } + + template< + typename MessageT, + typename Alloc, + typename Deleter, + typename ROSMessageType> + void + add_owned_msg_to_buffer( + std::unique_ptr message, + uint64_t subscription_id, + typename allocator::AllocRebind::allocator_type & allocator) + { + add_owned_msg_to_buffers( + std::move(message), {subscription_id}, allocator); + } + /// Return true if the given rmw_gid_t matches any stored Publishers. RCLCPP_PUBLIC bool @@ -764,6 +832,9 @@ class IntraProcessManager using PublisherMap = std::unordered_map; + using PublisherBufferMap = + std::unordered_map; + using PublisherToSubscriptionIdsMap = std::unordered_map; @@ -812,6 +883,54 @@ class IntraProcessManager rclcpp::experimental::ActionClientIntraProcessBase::SharedPtr client, rclcpp::experimental::ActionServerIntraProcessBase::SharedPtr service) const; + template< + typename ROSMessageType, + typename Alloc = std::allocator + > + void do_transient_local_publish( + const uint64_t pub_id, const uint64_t sub_id, + const bool use_take_shared_method) + { + using ROSMessageTypeAllocatorTraits = allocator::AllocRebind; + using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type; + using ROSMessageTypeDeleter = allocator::Deleter; + + auto publisher_buffer = publisher_buffers_[pub_id].lock(); + if (!publisher_buffer) { + throw std::runtime_error("publisher buffer has unexpectedly gone out of scope"); + } + auto buffer = std::dynamic_pointer_cast< + rclcpp::experimental::buffers::IntraProcessBuffer< + ROSMessageType, + ROSMessageTypeAllocator, + ROSMessageTypeDeleter + > + >(publisher_buffer); + if (!buffer) { + throw std::runtime_error( + "failed to dynamic cast publisher's IntraProcessBufferBase to " + "IntraProcessBuffer which can happen when the publisher and " + "subscription use different allocator types, which is not supported"); + } + if (use_take_shared_method) { + auto data_vec = buffer->get_all_data_shared(); + for (auto shared_data : data_vec) { + this->template add_shared_msg_to_buffer< + ROSMessageType, ROSMessageTypeAllocator, ROSMessageTypeDeleter, ROSMessageType>( + shared_data, sub_id); + } + } else { + auto data_vec = buffer->get_all_data_unique(); + for (auto & owned_data : data_vec) { + auto allocator = ROSMessageTypeAllocator(); + this->template add_owned_msg_to_buffer< + ROSMessageType, ROSMessageTypeAllocator, ROSMessageTypeDeleter, ROSMessageType>( + std::move(owned_data), sub_id, allocator); + } + } + } + template< typename MessageT, typename Alloc, @@ -998,6 +1117,7 @@ class IntraProcessManager ActionClientToServerIdsMap action_clients_to_servers_; std::unordered_map clients_uuid_to_id_; + PublisherBufferMap publisher_buffers_; mutable std::shared_timed_mutex mutex_; }; diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp index 6583e74ae7..e60797e78e 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp @@ -62,6 +62,10 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable void add_to_wait_set(rcl_wait_set_t * wait_set) override; + RCLCPP_PUBLIC + bool + is_durability_transient_local() const; + bool is_ready(rcl_wait_set_t * wait_set) override = 0; diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index 40f6e79c17..72d93c3658 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -32,6 +32,9 @@ #include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/allocator/allocator_deleter.hpp" #include "rclcpp/detail/resolve_use_intra_process.hpp" +#include "rclcpp/detail/resolve_intra_process_buffer_type.hpp" +#include "rclcpp/experimental/buffers/intra_process_buffer.hpp" +#include "rclcpp/experimental/create_intra_process_buffer.hpp" #include "rclcpp/experimental/intra_process_manager.hpp" #include "rclcpp/get_message_type_support_handle.hpp" #include "rclcpp/is_ros_compatible_type.hpp" @@ -109,6 +112,12 @@ class Publisher : public PublisherBase [[deprecated("use std::shared_ptr")]] = std::shared_ptr; + using BufferSharedPtr = typename rclcpp::experimental::buffers::IntraProcessBuffer< + ROSMessageType, + ROSMessageTypeAllocator, + ROSMessageTypeDeleter + >::SharedPtr; + RCLCPP_SMART_PTR_DEFINITIONS(Publisher) /// Default constructor. @@ -171,11 +180,14 @@ class Publisher : public PublisherBase throw std::invalid_argument( "intraprocess communication is not allowed with a zero qos history depth value"); } - if (qos.durability() != rclcpp::DurabilityPolicy::Volatile) { - throw std::invalid_argument( - "intraprocess communication allowed only with volatile durability"); + if (qos.durability() == rclcpp::DurabilityPolicy::TransientLocal) { + buffer_ = rclcpp::experimental::create_intra_process_buffer< + ROSMessageType, ROSMessageTypeAllocator, ROSMessageTypeDeleter>( + rclcpp::detail::resolve_intra_process_buffer_type(options_.intra_process_buffer_type), + qos, + std::make_shared(ros_message_type_allocator_)); } - uint64_t intra_process_publisher_id = ipm->add_publisher(this->shared_from_this()); + uint64_t intra_process_publisher_id = ipm->add_publisher(this->shared_from_this(), buffer_); this->setup_intra_process( intra_process_publisher_id, ipm); @@ -242,9 +254,18 @@ class Publisher : public PublisherBase if (inter_process_publish_needed) { auto shared_msg = this->do_intra_process_ros_message_publish_and_return_shared(std::move(msg)); + if (buffer_) { + buffer_->add_shared(shared_msg); + } this->do_inter_process_publish(*shared_msg); } else { - this->do_intra_process_ros_message_publish(std::move(msg)); + if (buffer_) { + auto shared_msg = + this->do_intra_process_ros_message_publish_and_return_shared(std::move(msg)); + buffer_->add_shared(shared_msg); + } else { + this->do_intra_process_ros_message_publish(std::move(msg)); + } } } @@ -309,14 +330,22 @@ class Publisher : public PublisherBase get_subscription_count() > get_intra_process_subscription_count(); if (inter_process_publish_needed) { - ROSMessageType ros_msg; + auto ros_msg_ptr = std::make_shared(); // TODO(clalancette): This is unnecessarily doing an additional conversion // that may have already been done in do_intra_process_publish_and_return_shared(). // We should just reuse that effort. - rclcpp::TypeAdapter::convert_to_ros_message(*msg, ros_msg); + rclcpp::TypeAdapter::convert_to_ros_message(*msg, *ros_msg_ptr); this->do_intra_process_publish(std::move(msg)); - this->do_inter_process_publish(ros_msg); + this->do_inter_process_publish(*ros_msg_ptr); + if (buffer_) { + buffer_->add_shared(ros_msg_ptr); + } } else { + if (buffer_) { + auto ros_msg_ptr = std::make_shared(); + rclcpp::TypeAdapter::convert_to_ros_message(*msg, *ros_msg_ptr); + buffer_->add_shared(ros_msg_ptr); + } this->do_intra_process_publish(std::move(msg)); } } @@ -580,6 +609,8 @@ class Publisher : public PublisherBase PublishedTypeDeleter published_type_deleter_; ROSMessageTypeAllocator ros_message_type_allocator_; ROSMessageTypeDeleter ros_message_type_deleter_; + + BufferSharedPtr buffer_{nullptr}; }; } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/publisher_base.hpp b/rclcpp/include/rclcpp/publisher_base.hpp index 702cbbf7c6..7c78c4dbb9 100644 --- a/rclcpp/include/rclcpp/publisher_base.hpp +++ b/rclcpp/include/rclcpp/publisher_base.hpp @@ -145,6 +145,12 @@ class PublisherBase : public std::enable_shared_from_this size_t get_intra_process_subscription_count() const; + /// Get if durability is transient local + /** \return If durability is transient local*/ + RCLCPP_PUBLIC + bool + is_durability_transient_local() const; + /// Manually assert that this Publisher is alive (for RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC). /** * If the rmw Liveliness policy is set to RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC, the creator diff --git a/rclcpp/include/rclcpp/publisher_options.hpp b/rclcpp/include/rclcpp/publisher_options.hpp index 3501834dd1..01fd314f49 100644 --- a/rclcpp/include/rclcpp/publisher_options.hpp +++ b/rclcpp/include/rclcpp/publisher_options.hpp @@ -24,6 +24,7 @@ #include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/detail/rmw_implementation_specific_publisher_payload.hpp" +#include "rclcpp/intra_process_buffer_type.hpp" #include "rclcpp/intra_process_setting.hpp" #include "rclcpp/qos.hpp" #include "rclcpp/event_handler.hpp" @@ -40,6 +41,9 @@ struct PublisherOptionsBase /// Setting to explicitly set intraprocess communications. IntraProcessSetting use_intra_process_comm = IntraProcessSetting::NodeDefault; + /// Setting the data-type stored in the intraprocess buffer + IntraProcessBufferType intra_process_buffer_type = IntraProcessBufferType::SharedPtr; + /// Callbacks for various events related to publishers. PublisherEventCallbacks event_callbacks; diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 400941a9aa..97ac3e6953 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -163,10 +163,6 @@ class Subscription : public SubscriptionBase throw std::invalid_argument( "intraprocess communication is not allowed with 0 depth qos policy"); } - if (qos_profile.durability() != rclcpp::DurabilityPolicy::Volatile) { - throw std::invalid_argument( - "intraprocess communication allowed only with volatile durability"); - } using SubscriptionIntraProcessT = rclcpp::experimental::SubscriptionIntraProcess< MessageT, @@ -193,7 +189,8 @@ class Subscription : public SubscriptionBase // Add it to the intra process manager. using rclcpp::experimental::IntraProcessManager; auto ipm = context->get_sub_context(); - uint64_t intra_process_subscription_id = ipm->add_subscription(subscription_intra_process_); + uint64_t intra_process_subscription_id = ipm->template add_subscription< + ROSMessageType, ROSMessageTypeAllocator>(subscription_intra_process_); this->setup_intra_process(intra_process_subscription_id, ipm); } diff --git a/rclcpp/src/rclcpp/detail/resolve_intra_process_buffer_type.cpp b/rclcpp/src/rclcpp/detail/resolve_intra_process_buffer_type.cpp new file mode 100644 index 0000000000..1ca9892ac4 --- /dev/null +++ b/rclcpp/src/rclcpp/detail/resolve_intra_process_buffer_type.cpp @@ -0,0 +1,37 @@ +// Copyright 2023 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +namespace rclcpp +{ + +namespace detail +{ +rclcpp::IntraProcessBufferType +resolve_intra_process_buffer_type( + const rclcpp::IntraProcessBufferType buffer_type) +{ + if (buffer_type == IntraProcessBufferType::CallbackDefault) { + throw std::invalid_argument( + "IntraProcessBufferType::CallbackDefault is not allowed " + "when there is no callback function"); + } + + return buffer_type; +} + +} // namespace detail + +} // namespace rclcpp diff --git a/rclcpp/src/rclcpp/intra_process_manager.cpp b/rclcpp/src/rclcpp/intra_process_manager.cpp index 0e75eaf3e4..ea5052dc99 100644 --- a/rclcpp/src/rclcpp/intra_process_manager.cpp +++ b/rclcpp/src/rclcpp/intra_process_manager.cpp @@ -26,13 +26,24 @@ namespace experimental static std::atomic _next_unique_id {1}; uint64_t -IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher) +IntraProcessManager::add_publisher( + rclcpp::PublisherBase::SharedPtr publisher, + rclcpp::experimental::buffers::IntraProcessBufferBase::SharedPtr buffer) { std::unique_lock lock(mutex_); uint64_t pub_id = IntraProcessManager::get_next_unique_id(); publishers_[pub_id] = publisher; + if (publisher->is_durability_transient_local()) { + if (buffer) { + publisher_buffers_[pub_id] = buffer; + } else { + throw std::runtime_error( + "transient_local publisher needs to pass" + "a valid publisher buffer ptr when calling add_publisher()"); + } + } // Initialize the subscriptions storage for this publisher. pub_to_subs_[pub_id] = SplittedSubscriptions(); @@ -52,30 +63,6 @@ IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher) return pub_id; } -uint64_t -IntraProcessManager::add_subscription(SubscriptionIntraProcessBase::SharedPtr subscription) -{ - std::unique_lock lock(mutex_); - - uint64_t sub_id = IntraProcessManager::get_next_unique_id(); - - subscriptions_[sub_id] = subscription; - - // adds the subscription id to all the matchable publishers - for (auto & pair : publishers_) { - auto publisher = pair.second.lock(); - if (!publisher) { - continue; - } - if (can_communicate(publisher, subscription)) { - uint64_t pub_id = pair.first; - insert_sub_id_for_pub(sub_id, pub_id, subscription->use_take_shared_method()); - } - } - - return sub_id; -} - uint64_t IntraProcessManager::add_intra_process_client(ClientIntraProcessBase::SharedPtr client) { @@ -267,6 +254,7 @@ IntraProcessManager::remove_publisher(uint64_t intra_process_publisher_id) std::unique_lock lock(mutex_); publishers_.erase(intra_process_publisher_id); + publisher_buffers_.erase(intra_process_publisher_id); pub_to_subs_.erase(intra_process_publisher_id); } diff --git a/rclcpp/src/rclcpp/publisher_base.cpp b/rclcpp/src/rclcpp/publisher_base.cpp index 44a125b05d..7d1b917d78 100644 --- a/rclcpp/src/rclcpp/publisher_base.cpp +++ b/rclcpp/src/rclcpp/publisher_base.cpp @@ -295,6 +295,13 @@ PublisherBase::get_intra_process_subscription_count() const return ipm->get_subscription_count(intra_process_publisher_id_); } +bool +PublisherBase::is_durability_transient_local() const +{ + return rcl_publisher_get_actual_qos(publisher_handle_.get())->durability == + RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL; +} + rclcpp::QoS PublisherBase::get_actual_qos() const { diff --git a/rclcpp/src/rclcpp/subscription_intra_process_base.cpp b/rclcpp/src/rclcpp/subscription_intra_process_base.cpp index b13123b65b..7ecafda36c 100644 --- a/rclcpp/src/rclcpp/subscription_intra_process_base.cpp +++ b/rclcpp/src/rclcpp/subscription_intra_process_base.cpp @@ -34,3 +34,9 @@ SubscriptionIntraProcessBase::get_actual_qos() const { return qos_profile_; } + +bool +SubscriptionIntraProcessBase::is_durability_transient_local() const +{ + return qos_profile_.durability() == rclcpp::DurabilityPolicy::TransientLocal; +} diff --git a/rclcpp/test/rclcpp/test_intra_process_buffer.cpp b/rclcpp/test/rclcpp/test_intra_process_buffer.cpp index 16c457c96f..be6d1fc35e 100644 --- a/rclcpp/test/rclcpp/test_intra_process_buffer.cpp +++ b/rclcpp/test/rclcpp/test_intra_process_buffer.cpp @@ -190,6 +190,33 @@ TEST(TestIntraProcessBuffer, shared_buffer_consume) { EXPECT_EQ(1L, original_shared_msg.use_count()); EXPECT_EQ(*original_shared_msg, *popped_unique_msg); EXPECT_NE(original_message_pointer, popped_message_pointer); + + original_shared_msg = std::make_shared('c'); + original_message_pointer = reinterpret_cast(original_shared_msg.get()); + auto original_shared_msg_2 = std::make_shared('d'); + auto original_message_pointer_2 = reinterpret_cast(original_shared_msg_2.get()); + intra_process_buffer.add_shared(original_shared_msg); + intra_process_buffer.add_shared(original_shared_msg_2); + + auto shared_data_vec = intra_process_buffer.get_all_data_shared(); + EXPECT_EQ(2L, shared_data_vec.size()); + EXPECT_EQ(3L, original_shared_msg.use_count()); + EXPECT_EQ(original_shared_msg.use_count(), shared_data_vec[0].use_count()); + EXPECT_EQ(*original_shared_msg, *shared_data_vec[0]); + EXPECT_EQ(original_message_pointer, reinterpret_cast(shared_data_vec[0].get())); + EXPECT_EQ(3L, original_shared_msg_2.use_count()); + EXPECT_EQ(original_shared_msg_2.use_count(), shared_data_vec[1].use_count()); + EXPECT_EQ(*original_shared_msg_2, *shared_data_vec[1]); + EXPECT_EQ(original_message_pointer_2, reinterpret_cast(shared_data_vec[1].get())); + + auto unique_data_vec = intra_process_buffer.get_all_data_unique(); + EXPECT_EQ(2L, unique_data_vec.size()); + EXPECT_EQ(3L, original_shared_msg.use_count()); + EXPECT_EQ(*original_shared_msg, *unique_data_vec[0]); + EXPECT_NE(original_message_pointer, reinterpret_cast(unique_data_vec[0].get())); + EXPECT_EQ(3L, original_shared_msg_2.use_count()); + EXPECT_EQ(*original_shared_msg_2, *unique_data_vec[1]); + EXPECT_NE(original_message_pointer_2, reinterpret_cast(unique_data_vec[1].get())); } /* @@ -237,4 +264,31 @@ TEST(TestIntraProcessBuffer, unique_buffer_consume) { EXPECT_EQ(original_value, *popped_unique_msg); EXPECT_EQ(original_message_pointer, popped_message_pointer); + + original_unique_msg = std::make_unique('c'); + original_message_pointer = reinterpret_cast(original_unique_msg.get()); + original_value = *original_unique_msg; + auto original_unique_msg_2 = std::make_unique('d'); + auto original_message_pointer_2 = reinterpret_cast(original_unique_msg.get()); + auto original_value_2 = *original_unique_msg_2; + intra_process_buffer.add_unique(std::move(original_unique_msg)); + intra_process_buffer.add_unique(std::move(original_unique_msg_2)); + + auto shared_data_vec = intra_process_buffer.get_all_data_shared(); + EXPECT_EQ(2L, shared_data_vec.size()); + EXPECT_EQ(1L, shared_data_vec[0].use_count()); + EXPECT_EQ(original_value, *shared_data_vec[0]); + EXPECT_NE(original_message_pointer, reinterpret_cast(shared_data_vec[0].get())); + EXPECT_EQ(1L, shared_data_vec[1].use_count()); + EXPECT_EQ(original_value_2, *shared_data_vec[1]); + EXPECT_NE(original_message_pointer_2, reinterpret_cast(shared_data_vec[1].get())); + + auto unique_data_vec = intra_process_buffer.get_all_data_unique(); + EXPECT_EQ(2L, unique_data_vec.size()); + EXPECT_EQ(1L, shared_data_vec[0].use_count()); + EXPECT_EQ(original_value, *unique_data_vec[0]); + EXPECT_NE(original_message_pointer, reinterpret_cast(unique_data_vec[0].get())); + EXPECT_EQ(1L, shared_data_vec[1].use_count()); + EXPECT_EQ(original_value_2, *unique_data_vec[1]); + EXPECT_NE(original_message_pointer_2, reinterpret_cast(unique_data_vec[1].get())); } diff --git a/rclcpp/test/rclcpp/test_intra_process_manager.cpp b/rclcpp/test/rclcpp/test_intra_process_manager.cpp index 345b0cab9d..01169717e6 100644 --- a/rclcpp/test/rclcpp/test_intra_process_manager.cpp +++ b/rclcpp/test/rclcpp/test_intra_process_manager.cpp @@ -34,6 +34,111 @@ // NOLINTNEXTLINE(build/include_order) #include +namespace rclcpp +{ +namespace experimental +{ +namespace buffers +{ +namespace mock +{ + +class IntraProcessBufferBase +{ +public: + RCLCPP_SMART_PTR_ALIASES_ONLY(IntraProcessBufferBase) + + virtual ~IntraProcessBufferBase() {} +}; + +} // namespace mock +} // namespace buffers +} // namespace experimental +} // namespace rclcpp + +namespace rclcpp +{ +namespace experimental +{ +namespace buffers +{ +namespace mock +{ +template< + typename MessageT, + typename Alloc = std::allocator, + typename MessageDeleter = std::default_delete> +class IntraProcessBuffer : public IntraProcessBufferBase +{ +public: + using ConstMessageSharedPtr = std::shared_ptr; + using MessageUniquePtr = std::unique_ptr; + + RCLCPP_SMART_PTR_DEFINITIONS(IntraProcessBuffer) + + IntraProcessBuffer() + {} + + void add(ConstMessageSharedPtr msg) + { + message_ptr = reinterpret_cast(msg.get()); + shared_msg = msg; + ++num_msgs; + } + + void add(MessageUniquePtr msg) + { + message_ptr = reinterpret_cast(msg.get()); + unique_msg = std::move(msg); + ++num_msgs; + } + + void pop(std::uintptr_t & msg_ptr) + { + msg_ptr = message_ptr; + message_ptr = 0; + --num_msgs; + } + + size_t size() const + { + return num_msgs; + } + + std::vector get_all_data_shared() + { + if (shared_msg) { + return {shared_msg}; + } else if (unique_msg) { + return {std::make_shared(*unique_msg)}; + } + return {}; + } + + std::vector get_all_data_unique() + { + std::vector result; + if (shared_msg) { + result.push_back(std::make_unique(*shared_msg)); + } else if (unique_msg) { + result.push_back(std::make_unique(*unique_msg)); + } + return result; + } + + // need to store the messages somewhere otherwise the memory address will be reused + ConstMessageSharedPtr shared_msg; + MessageUniquePtr unique_msg; + + std::uintptr_t message_ptr; + // count add and pop + size_t num_msgs = 0u; +}; + +} // namespace mock +} // namespace buffers +} // namespace experimental +} // namespace rclcpp namespace rclcpp { // forward declaration @@ -83,6 +188,12 @@ class PublisherBase return qos_profile; } + bool + is_durability_transient_local() const + { + return qos_profile.durability() == rclcpp::DurabilityPolicy::TransientLocal; + } + bool operator==(const rmw_gid_t & gid) const { @@ -120,6 +231,9 @@ class Publisher : public PublisherBase { auto allocator = std::make_shared(); message_allocator_ = std::make_shared(*allocator.get()); + if (qos.durability() == rclcpp::DurabilityPolicy::TransientLocal) { + buffer = std::make_shared>(); + } } // The following functions use the IntraProcessManager @@ -127,65 +241,12 @@ class Publisher : public PublisherBase void publish(MessageUniquePtr msg); std::shared_ptr message_allocator_; + typename rclcpp::experimental::buffers::mock::IntraProcessBuffer::SharedPtr buffer{nullptr}; }; } // namespace mock } // namespace rclcpp -namespace rclcpp -{ -namespace experimental -{ -namespace buffers -{ -namespace mock -{ -template< - typename MessageT, - typename Alloc = std::allocator, - typename MessageDeleter = std::default_delete> -class IntraProcessBuffer -{ -public: - using ConstMessageSharedPtr = std::shared_ptr; - using MessageUniquePtr = std::unique_ptr; - - RCLCPP_SMART_PTR_DEFINITIONS(IntraProcessBuffer) - - IntraProcessBuffer() - {} - - void add(ConstMessageSharedPtr msg) - { - message_ptr = reinterpret_cast(msg.get()); - shared_msg = msg; - } - - void add(MessageUniquePtr msg) - { - message_ptr = reinterpret_cast(msg.get()); - unique_msg = std::move(msg); - } - - void pop(std::uintptr_t & msg_ptr) - { - msg_ptr = message_ptr; - message_ptr = 0; - } - - // need to store the messages somewhere otherwise the memory address will be reused - ConstMessageSharedPtr shared_msg; - MessageUniquePtr unique_msg; - - std::uintptr_t message_ptr; -}; - -} // namespace mock -} // namespace buffers -} // namespace experimental -} // namespace rclcpp - - namespace rclcpp { namespace experimental @@ -224,6 +285,12 @@ class SubscriptionIntraProcessBase return topic_name.c_str(); } + bool + is_durability_transient_local() const + { + return qos_profile.durability() == rclcpp::DurabilityPolicy::TransientLocal; + } + rclcpp::QoS qos_profile; std::string topic_name; }; @@ -452,6 +519,7 @@ class ActionServerIntraProcess : public ActionServerIntraProcessBase // Prevent the header files of the mocked classes to be included #define RCLCPP__PUBLISHER_HPP_ #define RCLCPP__PUBLISHER_BASE_HPP_ +#define RCLCPP__EXPERIMENTAL__BUFFERS__INTRA_PROCESS_BUFFER_HPP_ #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_ #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_ #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_ @@ -466,6 +534,7 @@ class ActionServerIntraProcess : public ActionServerIntraProcessBase // Force ipm to use our mock publisher class. #define Publisher mock::Publisher #define PublisherBase mock::PublisherBase +#define IntraProcessBufferBase mock::IntraProcessBufferBase #define IntraProcessBuffer mock::IntraProcessBuffer #define SubscriptionIntraProcessBase mock::SubscriptionIntraProcessBase #define SubscriptionIntraProcessBuffer mock::SubscriptionIntraProcessBuffer @@ -513,28 +582,36 @@ void Publisher::publish(MessageUniquePtr msg) throw std::runtime_error("cannot publish msg which is a null pointer"); } - ipm->template do_intra_process_publish( - intra_process_publisher_id_, - std::move(msg), - *message_allocator_); + if (buffer) { + auto shared_msg = ipm->template do_intra_process_publish_and_return_shared( + intra_process_publisher_id_, + std::move(msg), + *message_allocator_); + buffer->add(shared_msg); + } else { + ipm->template do_intra_process_publish( + intra_process_publisher_id_, + std::move(msg), + *message_allocator_); + } } } // namespace mock } // namespace rclcpp /* - This tests how the class connects and disconnects publishers and subscriptions: - - Creates 2 publishers on different topics and a subscription to one of them. - Add everything to the intra-process manager. - - All the entities are expected to have different ids. - - Check the subscriptions count for each publisher. - - One of the publishers is expected to have 1 subscription, while the other 0. - - Check the subscription count for a non existing publisher id, should return 0. - - Add a new publisher and a new subscription both with reliable QoS. - - The subscriptions count of the previous publisher is expected to remain unchanged, - while the new publisher is expected to have 2 subscriptions (it's compatible with both QoS). - - Remove the just added subscriptions. - - The count for the last publisher is expected to decrease to 1. + * This tests how the class connects and disconnects publishers and subscriptions: + * - Creates 2 publishers on different topics and a subscription to one of them. + * Add everything to the intra-process manager. + * - All the entities are expected to have different ids. + * - Check the subscriptions count for each publisher. + * - One of the publishers is expected to have 1 subscription, while the other 0. + * - Check the subscription count for a non existing publisher id, should return 0. + * - Add a new publisher and a new subscription both with reliable QoS. + * - The subscriptions count of the previous publisher is expected to remain unchanged, + * while the new publisher is expected to have 2 subscriptions (it's compatible with both QoS). + * - Remove the just added subscriptions. + * - The count for the last publisher is expected to decrease to 1. */ TEST(TestIntraProcessManager, add_pub_sub) { using IntraProcessManagerT = rclcpp::experimental::IntraProcessManager; @@ -553,7 +630,7 @@ TEST(TestIntraProcessManager, add_pub_sub) { auto p1_id = ipm->add_publisher(p1); auto p2_id = ipm->add_publisher(p2); - auto s1_id = ipm->add_subscription(s1); + auto s1_id = ipm->template add_subscription(s1); bool unique_ids = p1_id != p2_id && p2_id != s1_id; ASSERT_TRUE(unique_ids); @@ -569,7 +646,7 @@ TEST(TestIntraProcessManager, add_pub_sub) { auto s2 = std::make_shared(rclcpp::QoS(10).reliable()); - auto s2_id = ipm->add_subscription(s2); + auto s2_id = ipm->template add_subscription(s2); auto p3_id = ipm->add_publisher(p3); p1_subs = ipm->get_subscription_count(p1_id); @@ -589,14 +666,14 @@ TEST(TestIntraProcessManager, add_pub_sub) { } /* - This tests the minimal usage of the class where there is a single subscription per publisher: - - Publishes a unique_ptr message with a subscription requesting ownership. - - The received message is expected to be the same. - - Remove the first subscription from ipm and add a new one. - - Publishes a unique_ptr message with a subscription not requesting ownership. - - The received message is expected to be the same, the first subscription do not receive it. - - Publishes a shared_ptr message with a subscription not requesting ownership. - - The received message is expected to be the same. + * This tests the minimal usage of the class where there is a single subscription per publisher: + * - Publishes a unique_ptr message with a subscription requesting ownership. + * - The received message is expected to be the same. + * - Remove the first subscription from ipm and add a new one. + * - Publishes a unique_ptr message with a subscription not requesting ownership. + * - The received message is expected to be the same, the first subscription do not receive it. + * - Publishes a shared_ptr message with a subscription not requesting ownership. + * - The received message is expected to be the same. */ TEST(TestIntraProcessManager, single_subscription) { using IntraProcessManagerT = rclcpp::experimental::IntraProcessManager; @@ -612,7 +689,7 @@ TEST(TestIntraProcessManager, single_subscription) { auto s1 = std::make_shared(); s1->take_shared_method = false; - auto s1_id = ipm->add_subscription(s1); + auto s1_id = ipm->template add_subscription(s1); auto unique_msg = std::make_unique(); auto original_message_pointer = reinterpret_cast(unique_msg.get()); @@ -623,7 +700,7 @@ TEST(TestIntraProcessManager, single_subscription) { ipm->remove_subscription(s1_id); auto s2 = std::make_shared(); s2->take_shared_method = true; - auto s2_id = ipm->add_subscription(s2); + auto s2_id = ipm->template add_subscription(s2); (void)s2_id; unique_msg = std::make_unique(); @@ -642,15 +719,15 @@ TEST(TestIntraProcessManager, single_subscription) { } /* - This tests the usage of the class where there are multiple subscriptions of the same type: - - Publishes a unique_ptr message with 2 subscriptions requesting ownership. - - One is expected to receive the published message, while the other will receive a copy. - - Publishes a unique_ptr message with 2 subscriptions not requesting ownership. - - Both received messages are expected to be the same as the published one. - - Publishes a shared_ptr message with 2 subscriptions requesting ownership. - - Both received messages are expected to be a copy of the published one. - - Publishes a shared_ptr message with 2 subscriptions not requesting ownership. - - Both received messages are expected to be the same as the published one. + * This tests the usage of the class where there are multiple subscriptions of the same type: + * - Publishes a unique_ptr message with 2 subscriptions requesting ownership. + * - One is expected to receive the published message, while the other will receive a copy. + * - Publishes a unique_ptr message with 2 subscriptions not requesting ownership. + * - Both received messages are expected to be the same as the published one. + * - Publishes a shared_ptr message with 2 subscriptions requesting ownership. + * - Both received messages are expected to be a copy of the published one. + * - Publishes a shared_ptr message with 2 subscriptions not requesting ownership. + * - Both received messages are expected to be the same as the published one. */ TEST(TestIntraProcessManager, multiple_subscriptions_same_type) { using IntraProcessManagerT = rclcpp::experimental::IntraProcessManager; @@ -666,11 +743,11 @@ TEST(TestIntraProcessManager, multiple_subscriptions_same_type) { auto s1 = std::make_shared(); s1->take_shared_method = false; - auto s1_id = ipm->add_subscription(s1); + auto s1_id = ipm->template add_subscription(s1); auto s2 = std::make_shared(); s2->take_shared_method = false; - auto s2_id = ipm->add_subscription(s2); + auto s2_id = ipm->template add_subscription(s2); auto unique_msg = std::make_unique(); auto original_message_pointer = reinterpret_cast(unique_msg.get()); @@ -686,11 +763,11 @@ TEST(TestIntraProcessManager, multiple_subscriptions_same_type) { auto s3 = std::make_shared(); s3->take_shared_method = true; - auto s3_id = ipm->add_subscription(s3); + auto s3_id = ipm->template add_subscription(s3); auto s4 = std::make_shared(); s4->take_shared_method = true; - auto s4_id = ipm->add_subscription(s4); + auto s4_id = ipm->template add_subscription(s4); unique_msg = std::make_unique(); original_message_pointer = reinterpret_cast(unique_msg.get()); @@ -705,11 +782,11 @@ TEST(TestIntraProcessManager, multiple_subscriptions_same_type) { auto s5 = std::make_shared(); s5->take_shared_method = false; - auto s5_id = ipm->add_subscription(s5); + auto s5_id = ipm->template add_subscription(s5); auto s6 = std::make_shared(); s6->take_shared_method = false; - auto s6_id = ipm->add_subscription(s6); + auto s6_id = ipm->template add_subscription(s6); unique_msg = std::make_unique(); original_message_pointer = reinterpret_cast(unique_msg.get()); @@ -725,12 +802,12 @@ TEST(TestIntraProcessManager, multiple_subscriptions_same_type) { auto s7 = std::make_shared(); s7->take_shared_method = true; - auto s7_id = ipm->add_subscription(s7); + auto s7_id = ipm->template add_subscription(s7); (void)s7_id; auto s8 = std::make_shared(); s8->take_shared_method = true; - auto s8_id = ipm->add_subscription(s8); + auto s8_id = ipm->template add_subscription(s8); (void)s8_id; unique_msg = std::make_unique(); @@ -743,20 +820,20 @@ TEST(TestIntraProcessManager, multiple_subscriptions_same_type) { } /* - This tests the usage of the class where there are multiple subscriptions of different types: - - Publishes a unique_ptr message with 1 subscription requesting ownership and 1 not. - - The one requesting ownership is expected to receive the published message, - while the other is expected to receive a copy. - - Publishes a unique_ptr message with 2 subscriptions requesting ownership and 1 not. - - One of the subscriptions requesting ownership is expected to receive the published message, - while both other subscriptions are expected to receive different copies. - - Publishes a unique_ptr message with 2 subscriptions requesting ownership and 2 not. - - The 2 subscriptions not requesting ownership are expected to both receive the same copy - of the message, one of the subscription requesting ownership is expected to receive a - different copy, while the last is expected to receive the published message. - - Publishes a shared_ptr message with 1 subscription requesting ownership and 1 not. - - The subscription requesting ownership is expected to receive a copy of the message, while - the other is expected to receive the published message + * This tests the usage of the class where there are multiple subscriptions of different types: + * - Publishes a unique_ptr message with 1 subscription requesting ownership and 1 not. + * - The one requesting ownership is expected to receive the published message, + * while the other is expected to receive a copy. + * - Publishes a unique_ptr message with 2 subscriptions requesting ownership and 1 not. + * - One of the subscriptions requesting ownership is expected to receive the published message, + * while both other subscriptions are expected to receive different copies. + * - Publishes a unique_ptr message with 2 subscriptions requesting ownership and 2 not. + * - The 2 subscriptions not requesting ownership are expected to both receive the same copy + * of the message, one of the subscription requesting ownership is expected to receive a + * different copy, while the last is expected to receive the published message. + * - Publishes a shared_ptr message with 1 subscription requesting ownership and 1 not. + * - The subscription requesting ownership is expected to receive a copy of the message, while + * the other is expected to receive the published message */ TEST(TestIntraProcessManager, multiple_subscriptions_different_type) { using IntraProcessManagerT = rclcpp::experimental::IntraProcessManager; @@ -772,11 +849,11 @@ TEST(TestIntraProcessManager, multiple_subscriptions_different_type) { auto s1 = std::make_shared(); s1->take_shared_method = true; - auto s1_id = ipm->add_subscription(s1); + auto s1_id = ipm->template add_subscription(s1); auto s2 = std::make_shared(); s2->take_shared_method = false; - auto s2_id = ipm->add_subscription(s2); + auto s2_id = ipm->template add_subscription(s2); auto unique_msg = std::make_unique(); auto original_message_pointer = reinterpret_cast(unique_msg.get()); @@ -791,15 +868,15 @@ TEST(TestIntraProcessManager, multiple_subscriptions_different_type) { auto s3 = std::make_shared(); s3->take_shared_method = false; - auto s3_id = ipm->add_subscription(s3); + auto s3_id = ipm->template add_subscription(s3); auto s4 = std::make_shared(); s4->take_shared_method = false; - auto s4_id = ipm->add_subscription(s4); + auto s4_id = ipm->template add_subscription(s4); auto s5 = std::make_shared(); s5->take_shared_method = true; - auto s5_id = ipm->add_subscription(s5); + auto s5_id = ipm->template add_subscription(s5); unique_msg = std::make_unique(); original_message_pointer = reinterpret_cast(unique_msg.get()); @@ -823,19 +900,19 @@ TEST(TestIntraProcessManager, multiple_subscriptions_different_type) { auto s6 = std::make_shared(); s6->take_shared_method = true; - auto s6_id = ipm->add_subscription(s6); + auto s6_id = ipm->template add_subscription(s6); auto s7 = std::make_shared(); s7->take_shared_method = true; - auto s7_id = ipm->add_subscription(s7); + auto s7_id = ipm->template add_subscription(s7); auto s8 = std::make_shared(); s8->take_shared_method = false; - auto s8_id = ipm->add_subscription(s8); + auto s8_id = ipm->template add_subscription(s8); auto s9 = std::make_shared(); s9->take_shared_method = false; - auto s9_id = ipm->add_subscription(s9); + auto s9_id = ipm->template add_subscription(s9); unique_msg = std::make_unique(); original_message_pointer = reinterpret_cast(unique_msg.get()); @@ -861,12 +938,12 @@ TEST(TestIntraProcessManager, multiple_subscriptions_different_type) { auto s10 = std::make_shared(); s10->take_shared_method = false; - auto s10_id = ipm->add_subscription(s10); + auto s10_id = ipm->template add_subscription(s10); (void)s10_id; auto s11 = std::make_shared(); s11->take_shared_method = true; - auto s11_id = ipm->add_subscription(s11); + auto s11_id = ipm->template add_subscription(s11); (void)s11_id; unique_msg = std::make_unique(); @@ -877,3 +954,82 @@ TEST(TestIntraProcessManager, multiple_subscriptions_different_type) { EXPECT_EQ(original_message_pointer, received_message_pointer_10); EXPECT_NE(original_message_pointer, received_message_pointer_11); } + +/* + * This tests the check inside add_publisher for transient_local + * durability publishers + * - add_publisher should throw runtime_error when no valid buffer ptr + * is passed with a transient_local publisher + */ +TEST(TestIntraProcessManager, transient_local_invalid_buffer) { + using IntraProcessManagerT = rclcpp::experimental::IntraProcessManager; + using MessageT = rcl_interfaces::msg::Log; + using PublisherT = rclcpp::mock::Publisher; + constexpr auto history_depth = 10u; + + auto ipm = std::make_shared(); + + auto p1 = std::make_shared(rclcpp::QoS(history_depth).transient_local()); + + ASSERT_THROW( + { + ipm->add_publisher(p1, nullptr); + }, std::runtime_error); +} + +/* + * This tests publishing function for transient_local durability publihers + * - A message is published before three transient_local subscriptions are added to + * ipm. Two of the subscriptions use take_shared method. Delivery of the message is verified + * along with the contents and pointer addresses from the subscriptions. + */ +TEST(TestIntraProcessManager, transient_local) { + using IntraProcessManagerT = rclcpp::experimental::IntraProcessManager; + using MessageT = rcl_interfaces::msg::Log; + using PublisherT = rclcpp::mock::Publisher; + using SubscriptionIntraProcessT = rclcpp::experimental::mock::SubscriptionIntraProcess; + + constexpr auto history_depth = 10u; + + auto ipm = std::make_shared(); + + auto p1 = std::make_shared(rclcpp::QoS(history_depth).transient_local()); + + auto s1 = + std::make_shared(rclcpp::QoS(history_depth).transient_local()); + auto s2 = + std::make_shared(rclcpp::QoS(history_depth).transient_local()); + auto s3 = + std::make_shared(rclcpp::QoS(history_depth).transient_local()); + + s1->take_shared_method = false; + s2->take_shared_method = true; + s3->take_shared_method = true; + + auto p1_id = ipm->add_publisher(p1, p1->buffer); + + p1->set_intra_process_manager(p1_id, ipm); + + auto unique_msg = std::make_unique(); + unique_msg->msg = "Test"; + p1->publish(std::move(unique_msg)); + + ipm->template add_subscription(s1); + ipm->template add_subscription(s2); + ipm->template add_subscription(s3); + + auto received_message_pointer_1 = s1->pop(); + auto received_message_pointer_2 = s2->pop(); + auto received_message_pointer_3 = s3->pop(); + ASSERT_NE(0u, received_message_pointer_1); + ASSERT_NE(0u, received_message_pointer_2); + ASSERT_NE(0u, received_message_pointer_3); + ASSERT_EQ(received_message_pointer_3, received_message_pointer_2); + ASSERT_EQ( + reinterpret_cast(received_message_pointer_1)->msg, + reinterpret_cast(received_message_pointer_2)->msg); + ASSERT_EQ( + reinterpret_cast(received_message_pointer_1)->msg, + reinterpret_cast(received_message_pointer_3)->msg); + ASSERT_EQ("Test", reinterpret_cast(received_message_pointer_1)->msg); +} diff --git a/rclcpp/test/rclcpp/test_publisher.cpp b/rclcpp/test/rclcpp/test_publisher.cpp index 20a46194fc..070ff68d9b 100644 --- a/rclcpp/test/rclcpp/test_publisher.cpp +++ b/rclcpp/test/rclcpp/test_publisher.cpp @@ -196,11 +196,7 @@ static std::vector invalid_qos_profiles() { std::vector parameters; - parameters.reserve(2); - parameters.push_back( - TestParameters( - rclcpp::QoS(rclcpp::KeepLast(10)).transient_local(), - "transient_local_qos")); + parameters.reserve(1); parameters.push_back( TestParameters( rclcpp::QoS(rclcpp::KeepAll()), @@ -639,3 +635,96 @@ INSTANTIATE_TEST_SUITE_P( rclcpp::QoS(1).best_effort(), rclcpp::QoS(1).best_effort()), std::pair( rclcpp::QoS(1).reliable(), rclcpp::QoS(1).best_effort()))); + +// TEST_F(TestPublisher, intra_process_transient_local) { +// constexpr auto history_depth = 10u; +// initialize(rclcpp::NodeOptions().use_intra_process_comms(true)); +// rclcpp::PublisherOptionsWithAllocator> pub_options_ipm_disabled; +// pub_options_ipm_disabled.use_intra_process_comm = rclcpp::IntraProcessSetting::Disable; + +// rclcpp::PublisherOptionsWithAllocator> pub_options_ipm_enabled; +// pub_options_ipm_enabled.use_intra_process_comm = rclcpp::IntraProcessSetting::Enable; + +// auto pub_ipm_enabled_transient_local_enabled = node->create_publisher( +// "topic1", +// rclcpp::QoS(rclcpp::KeepLast(history_depth)).transient_local(), pub_options_ipm_enabled); +// auto pub_ipm_disabled_transient_local_enabled = node->create_publisher( +// "topic2", +// rclcpp::QoS(rclcpp::KeepLast(history_depth)).transient_local(), pub_options_ipm_disabled); +// auto pub_ipm_enabled_transient_local_disabled = node->create_publisher( +// "topic3", +// rclcpp::QoS(rclcpp::KeepLast(history_depth)), pub_options_ipm_enabled); +// auto pub_ipm_disabled_transient_local_disabled = node->create_publisher( +// "topic4", +// rclcpp::QoS(rclcpp::KeepLast(history_depth)), pub_options_ipm_disabled); + +// test_msgs::msg::Empty msg; +// pub_ipm_enabled_transient_local_enabled->publish(msg); +// pub_ipm_disabled_transient_local_enabled->publish(msg); +// pub_ipm_enabled_transient_local_disabled->publish(msg); +// pub_ipm_disabled_transient_local_disabled->publish(msg); + +// auto do_nothing = [](std::shared_ptr) {}; +// struct IntraProcessCallback +// { +// void callback_fun(size_t s) +// { +// (void) s; +// called = true; +// } +// bool called = false; +// }; +// rclcpp::SubscriptionOptions sub_options_ipm_disabled; +// sub_options_ipm_disabled.use_intra_process_comm = rclcpp::IntraProcessSetting::Disable; +// rclcpp::SubscriptionOptions sub_options_ipm_enabled; +// sub_options_ipm_enabled.use_intra_process_comm = rclcpp::IntraProcessSetting::Enable; +// IntraProcessCallback callback1, callback2, callback3, callback4; +// auto sub_ipm_enabled_transient_local_enabled = node->create_subscription( +// "topic1", +// rclcpp::QoS(rclcpp::KeepLast(history_depth)).transient_local(), +// do_nothing, sub_options_ipm_enabled); +// sub_ipm_enabled_transient_local_enabled->set_on_new_intra_process_message_callback( +// std::bind(&IntraProcessCallback::callback_fun, &callback1, std::placeholders::_1)); +// auto sub_ipm_disabled_transient_local_enabled = node->create_subscription( +// "topic2", +// rclcpp::QoS(rclcpp::KeepLast(history_depth)).transient_local(), +// do_nothing, sub_options_ipm_disabled); +// sub_ipm_disabled_transient_local_enabled->set_on_new_intra_process_message_callback( +// std::bind(&IntraProcessCallback::callback_fun, &callback2, std::placeholders::_1)); +// auto sub_ipm_enabled_transient_local_disabled = node->create_subscription( +// "topic3", +// rclcpp::QoS(rclcpp::KeepLast(history_depth)), +// do_nothing, sub_options_ipm_enabled); +// sub_ipm_enabled_transient_local_disabled->set_on_new_intra_process_message_callback( +// std::bind(&IntraProcessCallback::callback_fun, &callback3, std::placeholders::_1)); +// auto sub_ipm_disabled_transient_local_disabled = node->create_subscription( +// "topic4", +// rclcpp::QoS(rclcpp::KeepLast(history_depth)), +// do_nothing, sub_options_ipm_disabled); +// sub_ipm_disabled_transient_local_disabled->set_on_new_intra_process_message_callback( +// std::bind(&IntraProcessCallback::callback_fun, &callback4, std::placeholders::_1)); + +// EXPECT_TRUE(pub_ipm_enabled_transient_local_enabled->is_durability_transient_local()); +// EXPECT_TRUE(pub_ipm_disabled_transient_local_enabled->is_durability_transient_local()); +// EXPECT_FALSE(pub_ipm_enabled_transient_local_disabled->is_durability_transient_local()); +// EXPECT_FALSE(pub_ipm_disabled_transient_local_disabled->is_durability_transient_local()); + +// EXPECT_EQ(1, pub_ipm_enabled_transient_local_enabled->get_intra_process_subscription_count()); +// EXPECT_EQ(0, pub_ipm_disabled_transient_local_enabled->get_intra_process_subscription_count()); +// EXPECT_EQ(1, pub_ipm_enabled_transient_local_disabled->get_intra_process_subscription_count()); +// EXPECT_EQ(0, pub_ipm_disabled_transient_local_disabled->get_intra_process_subscription_count()); + +// EXPECT_EQ( +// history_depth - 1u, +// pub_ipm_enabled_transient_local_enabled->lowest_available_ipm_capacity()); +// EXPECT_EQ(0, pub_ipm_disabled_transient_local_enabled->lowest_available_ipm_capacity()); +// EXPECT_EQ( +// history_depth, +// pub_ipm_enabled_transient_local_disabled->lowest_available_ipm_capacity()); +// EXPECT_EQ(0, pub_ipm_disabled_transient_local_disabled->lowest_available_ipm_capacity()); + +// EXPECT_TRUE(callback1.called); +// EXPECT_FALSE(callback2.called); +// EXPECT_FALSE(callback3.called); +// EXPECT_FALSE(callback4.called); +// } diff --git a/rclcpp/test/rclcpp/test_ring_buffer_implementation.cpp b/rclcpp/test/rclcpp/test_ring_buffer_implementation.cpp index 07dfd8d584..0abd9b1a89 100644 --- a/rclcpp/test/rclcpp/test_ring_buffer_implementation.cpp +++ b/rclcpp/test/rclcpp/test_ring_buffer_implementation.cpp @@ -22,7 +22,7 @@ #include "rclcpp/experimental/buffers/ring_buffer_implementation.hpp" /* - Construtctor + * Construtctor */ TEST(TestRingBufferImplementation, constructor) { // Cannot create a buffer of size zero. @@ -37,10 +37,11 @@ TEST(TestRingBufferImplementation, constructor) { } /* - Basic usage - - insert data and check that it has data - - extract data - - overwrite old data writing over the buffer capacity + * Basic usage + * - insert data and check that it has data + * - get all data + * - extract data + * - overwrite old data writing over the buffer capacity */ TEST(TestRingBufferImplementation, basic_usage) { rclcpp::experimental::buffers::RingBufferImplementation rb(2); @@ -64,6 +65,12 @@ TEST(TestRingBufferImplementation, basic_usage) { rb.enqueue('d'); + const auto all_data_vec = rb.get_all_data(); + + EXPECT_EQ(2u, all_data_vec.size()); + EXPECT_EQ('c', all_data_vec[0]); + EXPECT_EQ('d', all_data_vec[1]); + EXPECT_EQ(true, rb.has_data()); EXPECT_EQ(true, rb.is_full()); @@ -79,3 +86,56 @@ TEST(TestRingBufferImplementation, basic_usage) { EXPECT_EQ(false, rb.has_data()); EXPECT_EQ(false, rb.is_full()); } + +/* + * Basic usage with unique_ptr + * - insert unique_ptr data and check that it has data + * - get all data + * - extract data + * - overwrite old data writing over the buffer capacity + */ +TEST(TestRingBufferImplementation, basic_usage_unique_ptr) { + rclcpp::experimental::buffers::RingBufferImplementation> rb(2); + + auto a = std::make_unique('a'); + auto b = std::make_unique('b'); + auto original_b_pointer = reinterpret_cast(b.get()); + auto c = std::make_unique('c'); + auto original_c_pointer = reinterpret_cast(c.get()); + + rb.enqueue(std::move(a)); + + EXPECT_EQ(true, rb.has_data()); + EXPECT_EQ(false, rb.is_full()); + + rb.enqueue(std::move(b)); + rb.enqueue(std::move(c)); + + EXPECT_EQ(true, rb.has_data()); + EXPECT_EQ(true, rb.is_full()); + + const auto all_data_vec = rb.get_all_data(); + + EXPECT_EQ(2u, all_data_vec.size()); + EXPECT_EQ('b', *all_data_vec[0]); + EXPECT_EQ('c', *all_data_vec[1]); + EXPECT_NE(original_b_pointer, reinterpret_cast(all_data_vec[0].get())); + EXPECT_NE(original_c_pointer, reinterpret_cast(all_data_vec[1].get())); + + EXPECT_EQ(true, rb.has_data()); + EXPECT_EQ(true, rb.is_full()); + + auto uni_ptr = rb.dequeue(); + + EXPECT_EQ('b', *uni_ptr); + EXPECT_EQ(original_b_pointer, reinterpret_cast(uni_ptr.get())); + EXPECT_EQ(true, rb.has_data()); + EXPECT_EQ(false, rb.is_full()); + + uni_ptr = rb.dequeue(); + + EXPECT_EQ('c', *uni_ptr); + EXPECT_EQ(original_c_pointer, reinterpret_cast(uni_ptr.get())); + EXPECT_EQ(false, rb.has_data()); + EXPECT_EQ(false, rb.is_full()); +} diff --git a/rclcpp/test/rclcpp/test_service.cpp b/rclcpp/test/rclcpp/test_service.cpp index b9ab010e0f..10bb84d840 100644 --- a/rclcpp/test/rclcpp/test_service.cpp +++ b/rclcpp/test/rclcpp/test_service.cpp @@ -177,17 +177,9 @@ TEST_F(TestService, basic_public_getters) { } rclcpp::AnyServiceCallback cb; - - rclcpp::IntraProcessSetting ipc_setting; - if (node_base_interface->get_use_intra_process_default()) { - ipc_setting = rclcpp::IntraProcessSetting::Enable; - } else { - ipc_setting = rclcpp::IntraProcessSetting::Disable; - } - const rclcpp::Service base( node_handle_int->get_node_base_interface(), - &service_handle, cb, ipc_setting); + &service_handle, cb); // Use get_service_handle specific to const service std::shared_ptr const_service_handle = base.get_service_handle(); EXPECT_NE(nullptr, const_service_handle); diff --git a/rclcpp/test/rclcpp/test_subscription.cpp b/rclcpp/test/rclcpp/test_subscription.cpp index 6facada4dd..a077839dfe 100644 --- a/rclcpp/test/rclcpp/test_subscription.cpp +++ b/rclcpp/test/rclcpp/test_subscription.cpp @@ -47,62 +47,15 @@ class TestSubscription : public ::testing::Test protected: void initialize(const rclcpp::NodeOptions & node_options = rclcpp::NodeOptions()) { - node = std::make_shared("test_subscription", "/ns", node_options); + node_ = std::make_shared("test_subscription", "/ns", node_options); } void TearDown() { - node.reset(); + node_.reset(); } - rclcpp::Node::SharedPtr node; -}; - -struct TestParameters -{ - TestParameters(rclcpp::QoS qos, std::string description) - : qos(qos), description(description) {} - rclcpp::QoS qos; - std::string description; -}; - -std::ostream & operator<<(std::ostream & out, const TestParameters & params) -{ - out << params.description; - return out; -} - -class TestSubscriptionInvalidIntraprocessQos - : public TestSubscription, - public ::testing::WithParamInterface -{}; - -class TestSubscriptionSub : public ::testing::Test -{ -public: - void OnMessage(test_msgs::msg::Empty::ConstSharedPtr msg) - { - (void)msg; - } - -protected: - static void SetUpTestCase() - { - } - - void SetUp() - { - node = std::make_shared("test_subscription", "/ns"); - subnode = node->create_sub_node("sub_ns"); - } - - void TearDown() - { - node.reset(); - } - - rclcpp::Node::SharedPtr node; - rclcpp::Node::SharedPtr subnode; + rclcpp::Node::SharedPtr node_; }; class SubscriptionClassNodeInheritance : public rclcpp::Node @@ -137,10 +90,10 @@ class SubscriptionClass void CreateSubscription() { - auto node = std::make_shared("test_subscription_member_callback", "/ns"); + auto node_ = std::make_shared("test_subscription_member_callback", "/ns"); auto callback = std::bind(&SubscriptionClass::OnMessage, this, std::placeholders::_1); using test_msgs::msg::Empty; - auto sub = node->create_subscription("topic", 10, callback); + auto sub = node_->create_subscription("topic", 10, callback); } }; @@ -155,7 +108,7 @@ TEST_F(TestSubscription, construction_and_destruction) { }; { constexpr size_t depth = 10u; - auto sub = node->create_subscription("topic", depth, callback); + auto sub = node_->create_subscription("topic", depth, callback); EXPECT_NE(nullptr, sub->get_subscription_handle()); // Converting to base class was necessary for the compiler to choose the const version of @@ -172,40 +125,7 @@ TEST_F(TestSubscription, construction_and_destruction) { { ASSERT_THROW( { - auto sub = node->create_subscription("invalid_topic?", 10, callback); - }, rclcpp::exceptions::InvalidTopicNameError); - } -} - -/* - Testing subscription construction and destruction for subnodes. - */ -TEST_F(TestSubscriptionSub, construction_and_destruction) { - using test_msgs::msg::Empty; - auto callback = [](Empty::ConstSharedPtr msg) { - (void)msg; - }; - { - auto sub = subnode->create_subscription("topic", 1, callback); - EXPECT_STREQ(sub->get_topic_name(), "/ns/sub_ns/topic"); - } - - { - auto sub = subnode->create_subscription("/topic", 1, callback); - EXPECT_STREQ(sub->get_topic_name(), "/topic"); - } - - { - auto sub = subnode->create_subscription("~/topic", 1, callback); - std::string expected_topic_name = - std::string(node->get_namespace()) + "/" + node->get_name() + "/topic"; - EXPECT_STREQ(sub->get_topic_name(), expected_topic_name.c_str()); - } - - { - ASSERT_THROW( - { - auto sub = node->create_subscription("invalid_topic?", 1, callback); + auto sub = node_->create_subscription("invalid_topic?", 10, callback); }, rclcpp::exceptions::InvalidTopicNameError); } } @@ -218,31 +138,31 @@ TEST_F(TestSubscription, various_creation_signatures) { using test_msgs::msg::Empty; auto cb = [](test_msgs::msg::Empty::ConstSharedPtr) {}; { - auto sub = node->create_subscription("topic", 1, cb); + auto sub = node_->create_subscription("topic", 1, cb); (void)sub; } { - auto sub = node->create_subscription("topic", rclcpp::QoS(1), cb); + auto sub = node_->create_subscription("topic", rclcpp::QoS(1), cb); (void)sub; } { auto sub = - node->create_subscription("topic", rclcpp::QoS(rclcpp::KeepLast(1)), cb); + node_->create_subscription("topic", rclcpp::QoS(rclcpp::KeepLast(1)), cb); (void)sub; } { auto sub = - node->create_subscription("topic", rclcpp::QoS(rclcpp::KeepAll()), cb); + node_->create_subscription("topic", rclcpp::QoS(rclcpp::KeepAll()), cb); (void)sub; } { - auto sub = node->create_subscription( + auto sub = node_->create_subscription( "topic", 42, cb, rclcpp::SubscriptionOptions()); (void)sub; } { auto sub = rclcpp::create_subscription( - node, "topic", 42, cb, rclcpp::SubscriptionOptions()); + node_, "topic", 42, cb, rclcpp::SubscriptionOptions()); (void)sub; } { @@ -250,14 +170,14 @@ TEST_F(TestSubscription, various_creation_signatures) { options.allocator = std::make_shared>(); EXPECT_NE(nullptr, options.get_allocator()); auto sub = rclcpp::create_subscription( - node, "topic", 42, cb, options); + node_, "topic", 42, cb, options); (void)sub; } { rclcpp::SubscriptionOptionsBase options_base; rclcpp::SubscriptionOptionsWithAllocator> options(options_base); auto sub = rclcpp::create_subscription( - node, "topic", 42, cb, options); + node_, "topic", 42, cb, options); (void)sub; } } @@ -283,7 +203,7 @@ TEST_F(TestSubscription, callback_bind) { // Regression test for https://github.com/ros2/rclcpp/issues/479 where the TEST_F GTest macro // was interfering with rclcpp's `function_traits`. auto callback = std::bind(&TestSubscription::OnMessage, this, std::placeholders::_1); - auto sub = node->create_subscription("topic", 1, callback); + auto sub = node_->create_subscription("topic", 1, callback); } } @@ -295,7 +215,7 @@ TEST_F(TestSubscription, take) { using test_msgs::msg::Empty; auto do_nothing = [](std::shared_ptr) {FAIL();}; { - auto sub = node->create_subscription("~/test_take", 1, do_nothing); + auto sub = node_->create_subscription("~/test_take", 1, do_nothing); test_msgs::msg::Empty msg; rclcpp::MessageInfo msg_info; EXPECT_FALSE(sub->take(msg, msg_info)); @@ -303,10 +223,10 @@ TEST_F(TestSubscription, take) { { rclcpp::SubscriptionOptions so; so.use_intra_process_comm = rclcpp::IntraProcessSetting::Disable; - auto sub = node->create_subscription("~/test_take", 1, do_nothing, so); + auto sub = node_->create_subscription("~/test_take", 1, do_nothing, so); rclcpp::PublisherOptions po; po.use_intra_process_comm = rclcpp::IntraProcessSetting::Disable; - auto pub = node->create_publisher("~/test_take", 1, po); + auto pub = node_->create_publisher("~/test_take", 1, po); { test_msgs::msg::Empty msg; pub->publish(msg); @@ -332,7 +252,7 @@ TEST_F(TestSubscription, take_serialized) { using test_msgs::msg::Empty; auto do_nothing = [](std::shared_ptr) {FAIL();}; { - auto sub = node->create_subscription("~/test_take", 1, do_nothing); + auto sub = node_->create_subscription("~/test_take", 1, do_nothing); std::shared_ptr msg = sub->create_serialized_message(); rclcpp::MessageInfo msg_info; EXPECT_FALSE(sub->take_serialized(*msg, msg_info)); @@ -340,10 +260,10 @@ TEST_F(TestSubscription, take_serialized) { { rclcpp::SubscriptionOptions so; so.use_intra_process_comm = rclcpp::IntraProcessSetting::Disable; - auto sub = node->create_subscription("~/test_take", 1, do_nothing, so); + auto sub = node_->create_subscription("~/test_take", 1, do_nothing, so); rclcpp::PublisherOptions po; po.use_intra_process_comm = rclcpp::IntraProcessSetting::Disable; - auto pub = node->create_publisher("~/test_take", 1, po); + auto pub = node_->create_publisher("~/test_take", 1, po); { test_msgs::msg::Empty msg; pub->publish(msg); @@ -368,7 +288,7 @@ TEST_F(TestSubscription, rcl_subscription_init_error) { // reset() is not needed for triggering exception, just to avoid an unused return value warning EXPECT_THROW( - node->create_subscription("topic", 10, callback).reset(), + node_->create_subscription("topic", 10, callback).reset(), rclcpp::exceptions::RCLError); } @@ -380,7 +300,7 @@ TEST_F(TestSubscription, rcl_subscription_fini_error) { // Cleanup just fails, no exception expected EXPECT_NO_THROW( - node->create_subscription("topic", 10, callback).reset()); + node_->create_subscription("topic", 10, callback).reset()); } TEST_F(TestSubscription, rcl_subscription_get_actual_qos_error) { @@ -389,7 +309,7 @@ TEST_F(TestSubscription, rcl_subscription_get_actual_qos_error) { auto mock = mocking_utils::patch_and_return( "lib:rclcpp", rcl_subscription_get_actual_qos, nullptr); - auto sub = node->create_subscription("topic", 10, callback); + auto sub = node_->create_subscription("topic", 10, callback); RCLCPP_EXPECT_THROW_EQ( sub->get_actual_qos(), std::runtime_error("failed to get qos settings: error not set")); } @@ -400,7 +320,7 @@ TEST_F(TestSubscription, rcl_take_type_erased_error) { auto mock = mocking_utils::patch_and_return( "lib:rclcpp", rcl_take, RCL_RET_ERROR); - auto sub = node->create_subscription("topic", 10, callback); + auto sub = node_->create_subscription("topic", 10, callback); test_msgs::msg::Empty msg; rclcpp::MessageInfo message_info; @@ -413,7 +333,7 @@ TEST_F(TestSubscription, rcl_take_serialized_message_error) { auto mock = mocking_utils::patch_and_return( "lib:rclcpp", rcl_take_serialized_message, RCL_RET_ERROR); - auto sub = node->create_subscription("topic", 10, callback); + auto sub = node_->create_subscription("topic", 10, callback); rclcpp::SerializedMessage msg; rclcpp::MessageInfo message_info; @@ -426,14 +346,14 @@ TEST_F(TestSubscription, rcl_subscription_get_publisher_count_error) { auto mock = mocking_utils::patch_and_return( "lib:rclcpp", rcl_subscription_get_publisher_count, RCL_RET_ERROR); - auto sub = node->create_subscription("topic", 10, callback); + auto sub = node_->create_subscription("topic", 10, callback); EXPECT_THROW(sub->get_publisher_count(), rclcpp::exceptions::RCLError); } TEST_F(TestSubscription, handle_loaned_message) { initialize(); auto callback = [](std::shared_ptr) {}; - auto sub = node->create_subscription("topic", 10, callback); + auto sub = node_->create_subscription("topic", 10, callback); test_msgs::msg::Empty msg; rclcpp::MessageInfo message_info; @@ -448,13 +368,13 @@ TEST_F(TestSubscription, on_new_message_callback) { using test_msgs::msg::Empty; auto do_nothing = [](std::shared_ptr) {FAIL();}; - auto sub = node->create_subscription("~/test_take", 10, do_nothing); + auto sub = node_->create_subscription("~/test_take", 10, do_nothing); std::atomic c1 {0}; auto increase_c1_cb = [&c1](size_t count_msgs) {c1 += count_msgs;}; sub->set_on_new_message_callback(increase_c1_cb); - auto pub = node->create_publisher("~/test_take", 3); + auto pub = node_->create_publisher("~/test_take", 3); { test_msgs::msg::Empty msg; pub->publish(msg); @@ -518,13 +438,13 @@ TEST_F(TestSubscription, on_new_intra_process_message_callback) { using test_msgs::msg::Empty; auto do_nothing = [](std::shared_ptr) {FAIL();}; - auto sub = node->create_subscription("~/test_take", 10, do_nothing); + auto sub = node_->create_subscription("~/test_take", 10, do_nothing); std::atomic c1 {0}; auto increase_c1_cb = [&c1](size_t count_msgs) {c1 += count_msgs;}; sub->set_on_new_intra_process_message_callback(increase_c1_cb); - auto pub = node->create_publisher("~/test_take", 1); + auto pub = node_->create_publisher("~/test_take", 1); { test_msgs::msg::Empty msg; pub->publish(msg); @@ -580,13 +500,138 @@ TEST_F(TestSubscription, on_new_intra_process_message_callback) { EXPECT_THROW(sub->set_on_new_intra_process_message_callback(invalid_cb), std::invalid_argument); } +TEST_F(TestSubscription, get_network_flow_endpoints_errors) { + initialize(); + const rclcpp::QoS subscription_qos(1); + auto subscription_callback = [](test_msgs::msg::Empty::ConstSharedPtr msg) { + (void)msg; + }; + auto subscription = node_->create_subscription( + "topic", subscription_qos, subscription_callback); + + { + auto mock = mocking_utils::patch_and_return( + "lib:rclcpp", rcl_subscription_get_network_flow_endpoints, RCL_RET_ERROR); + auto mock_network_flow_endpoint_array_fini = mocking_utils::patch_and_return( + "lib:rclcpp", rcl_network_flow_endpoint_array_fini, RCL_RET_ERROR); + EXPECT_THROW( + subscription->get_network_flow_endpoints(), + rclcpp::exceptions::RCLError); + } + { + auto mock_network_flow_endpoint_array_fini = mocking_utils::patch_and_return( + "lib:rclcpp", rcl_network_flow_endpoint_array_fini, RCL_RET_ERROR); + EXPECT_THROW( + subscription->get_network_flow_endpoints(), + rclcpp::exceptions::RCLError); + } + { + auto mock = mocking_utils::patch_and_return( + "lib:rclcpp", rcl_subscription_get_network_flow_endpoints, RCL_RET_OK); + auto mock_network_flow_endpoint_array_fini = mocking_utils::patch_and_return( + "lib:rclcpp", rcl_network_flow_endpoint_array_fini, RCL_RET_OK); + EXPECT_NO_THROW(subscription->get_network_flow_endpoints()); + } +} + +class TestSubscriptionSub : public ::testing::Test +{ +protected: + static void SetUpTestCase() + { + rclcpp::init(0, nullptr); + } + + static void TearDownTestCase() + { + rclcpp::shutdown(); + } + + void SetUp() + { + node_ = std::make_shared("test_subscription", "/ns"); + subnode_ = node_->create_sub_node("sub_ns"); + } + + rclcpp::Node::SharedPtr node_; + rclcpp::Node::SharedPtr subnode_; +}; + +/* + Testing subscription construction and destruction for subnodes. + */ +TEST_F(TestSubscriptionSub, construction_and_destruction) { + auto callback = [](test_msgs::msg::Empty::ConstSharedPtr msg) { + (void)msg; + }; + { + auto sub = subnode_->create_subscription("topic", 1, callback); + EXPECT_STREQ(sub->get_topic_name(), "/ns/sub_ns/topic"); + } + + { + auto sub = subnode_->create_subscription("/topic", 1, callback); + EXPECT_STREQ(sub->get_topic_name(), "/topic"); + } + + { + auto sub = subnode_->create_subscription("~/topic", 1, callback); + std::string expected_topic_name = + std::string(node_->get_namespace()) + "/" + node_->get_name() + "/topic"; + EXPECT_STREQ(sub->get_topic_name(), expected_topic_name.c_str()); + } + + { + ASSERT_THROW( + { + auto sub = node_->create_subscription("invalid_topic?", 1, callback); + }, rclcpp::exceptions::InvalidTopicNameError); + } +} + +struct TestParameters final +{ + TestParameters(rclcpp::QoS qos, std::string description) + : qos(qos), description(description) {} + rclcpp::QoS qos; + std::string description; +}; + +std::ostream & operator<<(std::ostream & out, const TestParameters & params) +{ + out << params.description; + return out; +} + +class TestSubscriptionInvalidIntraprocessQos + : public TestSubscription, + public ::testing::WithParamInterface +{}; + +static std::vector invalid_qos_profiles() +{ + std::vector parameters; + + parameters.reserve(1); + parameters.push_back( + TestParameters( + rclcpp::QoS(rclcpp::KeepAll()), + "keep_all_qos")); + + return parameters; +} + +INSTANTIATE_TEST_SUITE_P( + TestSubscriptionThrows, TestSubscriptionInvalidIntraprocessQos, + ::testing::ValuesIn(invalid_qos_profiles()), + ::testing::PrintToStringParamName()); + /* Testing subscription with intraprocess enabled and invalid QoS */ TEST_P(TestSubscriptionInvalidIntraprocessQos, test_subscription_throws) { initialize(rclcpp::NodeOptions().use_intra_process_comms(true)); rclcpp::QoS qos = GetParam().qos; - using test_msgs::msg::Empty; { auto callback = std::bind( &TestSubscriptionInvalidIntraprocessQos::OnMessage, @@ -594,7 +639,7 @@ TEST_P(TestSubscriptionInvalidIntraprocessQos, test_subscription_throws) { std::placeholders::_1); ASSERT_THROW( - {auto subscription = node->create_subscription( + {auto subscription = node_->create_subscription( "topic", qos, callback);}, @@ -617,66 +662,10 @@ TEST_P(TestSubscriptionInvalidIntraprocessQos, test_subscription_throws_intrapro std::placeholders::_1); RCLCPP_EXPECT_THROW_EQ( - {auto subscription = node->create_subscription( + {auto subscription = node_->create_subscription( "topic", qos, callback, options);}, std::runtime_error("Unrecognized IntraProcessSetting value")); } - -static std::vector invalid_qos_profiles() -{ - std::vector parameters; - - parameters.reserve(3); - parameters.push_back( - TestParameters( - rclcpp::QoS(rclcpp::KeepLast(10)).transient_local(), - "transient_local_qos")); - parameters.push_back( - TestParameters( - rclcpp::QoS(rclcpp::KeepAll()), - "keep_all_qos")); - - return parameters; -} - -INSTANTIATE_TEST_SUITE_P( - TestSubscriptionThrows, TestSubscriptionInvalidIntraprocessQos, - ::testing::ValuesIn(invalid_qos_profiles()), - ::testing::PrintToStringParamName()); - -TEST_F(TestSubscription, get_network_flow_endpoints_errors) { - initialize(); - const rclcpp::QoS subscription_qos(1); - auto subscription_callback = [](test_msgs::msg::Empty::ConstSharedPtr msg) { - (void)msg; - }; - auto subscription = node->create_subscription( - "topic", subscription_qos, subscription_callback); - - { - auto mock = mocking_utils::patch_and_return( - "lib:rclcpp", rcl_subscription_get_network_flow_endpoints, RCL_RET_ERROR); - auto mock_network_flow_endpoint_array_fini = mocking_utils::patch_and_return( - "lib:rclcpp", rcl_network_flow_endpoint_array_fini, RCL_RET_ERROR); - EXPECT_THROW( - subscription->get_network_flow_endpoints(), - rclcpp::exceptions::RCLError); - } - { - auto mock_network_flow_endpoint_array_fini = mocking_utils::patch_and_return( - "lib:rclcpp", rcl_network_flow_endpoint_array_fini, RCL_RET_ERROR); - EXPECT_THROW( - subscription->get_network_flow_endpoints(), - rclcpp::exceptions::RCLError); - } - { - auto mock = mocking_utils::patch_and_return( - "lib:rclcpp", rcl_subscription_get_network_flow_endpoints, RCL_RET_OK); - auto mock_network_flow_endpoint_array_fini = mocking_utils::patch_and_return( - "lib:rclcpp", rcl_network_flow_endpoint_array_fini, RCL_RET_OK); - EXPECT_NO_THROW(subscription->get_network_flow_endpoints()); - } -}