diff --git a/rclcpp/include/rclcpp/experimental/buffers/buffer_implementation_base.hpp b/rclcpp/include/rclcpp/experimental/buffers/buffer_implementation_base.hpp index 2d3c0a02f5..1d50b1659e 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/buffer_implementation_base.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/buffer_implementation_base.hpp @@ -37,6 +37,7 @@ class BufferImplementationBase virtual void clear() = 0; virtual bool has_data() const = 0; + virtual size_t available_capacity() const = 0; }; } // namespace buffers diff --git a/rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp b/rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp index 3d2f5efec8..a6ece455c9 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp @@ -45,6 +45,7 @@ class IntraProcessBufferBase virtual bool has_data() const = 0; virtual bool use_take_shared_method() const = 0; + virtual size_t available_capacity() const = 0; }; template< @@ -157,6 +158,11 @@ class TypedIntraProcessBuffer : public IntraProcessBuffer::value; } + size_t available_capacity() const override + { + return buffer_->available_capacity(); + } + private: std::unique_ptr> buffer_; @@ -348,6 +354,11 @@ class ServiceIntraProcessBuffer : public IntraProcessBufferBase buffer_->clear(); } + size_t available_capacity() const override + { + return buffer_->available_capacity(); + } + void add(BufferT && msg) { buffer_->enqueue(std::move(msg)); diff --git a/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp b/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp index fbca3ec89e..6743f81082 100644 --- a/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp +++ b/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp @@ -160,6 +160,18 @@ class RingBufferImplementation : public BufferImplementationBase return is_full_(); } + /// Get the remaining capacity to store messages + /** + * This member function is thread-safe. + * + * \return the number of free capacity for new messages + */ + size_t available_capacity() const + { + std::lock_guard lock(mutex_); + return available_capacity_(); + } + void clear() override { TRACEPOINT(rclcpp_ring_buffer_clear, static_cast(this)); @@ -266,6 +278,17 @@ class RingBufferImplementation : public BufferImplementationBase return {}; } + /// Get the remaining capacity to store messages + /** + * This member function is not thread-safe. + * + * \return the number of free capacity for new messages + */ + inline size_t available_capacity_() const + { + return capacity_ - size_; + } + size_t capacity_; std::vector ring_buffer_; diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index d6c4281f34..26d3b8ebd5 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -819,6 +819,11 @@ class IntraProcessManager bool action_server_is_available(uint64_t ipc_action_client_id); + /// Return the lowest available capacity for all subscription buffers for a publisher id. + RCLCPP_PUBLIC + size_t + lowest_available_capacity(const uint64_t intra_process_publisher_id) const; + private: struct SplittedSubscriptions { diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp index e60797e78e..1d5ea5a530 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp @@ -66,6 +66,10 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable bool is_durability_transient_local() const; + virtual + size_t + available_capacity() const = 0; + bool is_ready(rcl_wait_set_t * wait_set) override = 0; diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp index c588efb5b6..ccb687adbf 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp @@ -179,6 +179,11 @@ class SubscriptionIntraProcessBuffer : public SubscriptionROSMsgIntraProcessBuff return buffer_->use_take_shared_method(); } + size_t available_capacity() const override + { + return buffer_->available_capacity(); + } + protected: void trigger_guard_condition() override diff --git a/rclcpp/include/rclcpp/publisher_base.hpp b/rclcpp/include/rclcpp/publisher_base.hpp index 7c78c4dbb9..6c4b7c0084 100644 --- a/rclcpp/include/rclcpp/publisher_base.hpp +++ b/rclcpp/include/rclcpp/publisher_base.hpp @@ -227,6 +227,17 @@ class PublisherBase : public std::enable_shared_from_this std::vector get_network_flow_endpoints() const; + /// Return the lowest available capacity for all subscription buffers. + /** + * For intraprocess communication return the lowest buffer capacity for all subscriptions. + * If intraprocess is disabled or no intraprocess subscriptions present, return maximum of size_t. + * On failure return 0. + * \return lowest buffer capacity for all subscriptions + */ + RCLCPP_PUBLIC + size_t + lowest_available_ipm_capacity() const; + /// Wait until all published messages are acknowledged or until the specified timeout elapses. /** * This method waits until all published messages are acknowledged by all matching diff --git a/rclcpp/src/rclcpp/intra_process_manager.cpp b/rclcpp/src/rclcpp/intra_process_manager.cpp index ea5052dc99..7b4e848357 100644 --- a/rclcpp/src/rclcpp/intra_process_manager.cpp +++ b/rclcpp/src/rclcpp/intra_process_manager.cpp @@ -571,5 +571,53 @@ IntraProcessManager::can_communicate( return true; } +size_t +IntraProcessManager::lowest_available_capacity(const uint64_t intra_process_publisher_id) const +{ + size_t capacity = std::numeric_limits::max(); + + auto publisher_it = pub_to_subs_.find(intra_process_publisher_id); + if (publisher_it == pub_to_subs_.end()) { + // Publisher is either invalid or no longer exists. + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Calling lowest_available_capacity for invalid or no longer existing publisher id"); + return 0u; + } + + if (publisher_it->second.take_shared_subscriptions.empty() && + publisher_it->second.take_ownership_subscriptions.empty()) + { + // no subscriptions available + return 0u; + } + + auto available_capacity = [this, &capacity](const uint64_t intra_process_subscription_id) + { + auto subscription_it = subscriptions_.find(intra_process_subscription_id); + if (subscription_it != subscriptions_.end()) { + auto subscription = subscription_it->second.lock(); + if (subscription) { + capacity = std::min(capacity, subscription->available_capacity()); + } + } else { + // Subscription is either invalid or no longer exists. + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Calling available_capacity for invalid or no longer existing subscription id"); + } + }; + + for (const auto sub_id : publisher_it->second.take_shared_subscriptions) { + available_capacity(sub_id); + } + + for (const auto sub_id : publisher_it->second.take_ownership_subscriptions) { + available_capacity(sub_id); + } + + return capacity; +} + } // namespace experimental } // namespace rclcpp diff --git a/rclcpp/src/rclcpp/publisher_base.cpp b/rclcpp/src/rclcpp/publisher_base.cpp index 7d1b917d78..3347740d80 100644 --- a/rclcpp/src/rclcpp/publisher_base.cpp +++ b/rclcpp/src/rclcpp/publisher_base.cpp @@ -416,3 +416,22 @@ std::vector PublisherBase::get_network_flow_endpoin return network_flow_endpoint_vector; } + +size_t PublisherBase::lowest_available_ipm_capacity() const +{ + if (!intra_process_is_enabled_) { + return 0u; + } + + auto ipm = weak_ipm_.lock(); + + if (!ipm) { + // TODO(ivanpauno): should this raise an error? + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Intra process manager died for a publisher."); + return 0u; + } + + return ipm->lowest_available_capacity(intra_process_publisher_id_); +} diff --git a/rclcpp/test/rclcpp/test_intra_process_buffer.cpp b/rclcpp/test/rclcpp/test_intra_process_buffer.cpp index be6d1fc35e..fa427de74b 100644 --- a/rclcpp/test/rclcpp/test_intra_process_buffer.cpp +++ b/rclcpp/test/rclcpp/test_intra_process_buffer.cpp @@ -292,3 +292,75 @@ TEST(TestIntraProcessBuffer, unique_buffer_consume) { EXPECT_EQ(original_value_2, *unique_data_vec[1]); EXPECT_NE(original_message_pointer_2, reinterpret_cast(unique_data_vec[1].get())); } + +/* + Check the available buffer capacity while storing and consuming data from an intra-process + buffer. + The initial available buffer capacity should equal the buffer size. + Inserting a message should decrease the available buffer capacity by 1. + Consuming a message should increase the available buffer capacity by 1. + */ +TEST(TestIntraProcessBuffer, available_capacity) { + using MessageT = char; + using Alloc = std::allocator; + using Deleter = std::default_delete; + using SharedMessageT = std::shared_ptr; + using UniqueMessageT = std::unique_ptr; + using UniqueIntraProcessBufferT = rclcpp::experimental::buffers::TypedIntraProcessBuffer< + MessageT, Alloc, Deleter, UniqueMessageT>; + + constexpr auto history_depth = 5u; + + auto buffer_impl = + std::make_unique>( + history_depth); + + UniqueIntraProcessBufferT intra_process_buffer(std::move(buffer_impl)); + + EXPECT_EQ(history_depth, intra_process_buffer.available_capacity()); + + auto original_unique_msg = std::make_unique('a'); + auto original_message_pointer = reinterpret_cast(original_unique_msg.get()); + auto original_value = *original_unique_msg; + + intra_process_buffer.add_unique(std::move(original_unique_msg)); + + EXPECT_EQ(history_depth - 1u, intra_process_buffer.available_capacity()); + + SharedMessageT popped_shared_msg; + popped_shared_msg = intra_process_buffer.consume_shared(); + auto popped_message_pointer = reinterpret_cast(popped_shared_msg.get()); + + EXPECT_EQ(history_depth, intra_process_buffer.available_capacity()); + EXPECT_EQ(original_value, *popped_shared_msg); + EXPECT_EQ(original_message_pointer, popped_message_pointer); + + original_unique_msg = std::make_unique('b'); + original_message_pointer = reinterpret_cast(original_unique_msg.get()); + original_value = *original_unique_msg; + + intra_process_buffer.add_unique(std::move(original_unique_msg)); + + auto second_unique_msg = std::make_unique('c'); + auto second_message_pointer = reinterpret_cast(second_unique_msg.get()); + auto second_value = *second_unique_msg; + + intra_process_buffer.add_unique(std::move(second_unique_msg)); + + EXPECT_EQ(history_depth - 2u, intra_process_buffer.available_capacity()); + + UniqueMessageT popped_unique_msg; + popped_unique_msg = intra_process_buffer.consume_unique(); + popped_message_pointer = reinterpret_cast(popped_unique_msg.get()); + + EXPECT_EQ(history_depth - 1u, intra_process_buffer.available_capacity()); + EXPECT_EQ(original_value, *popped_unique_msg); + EXPECT_EQ(original_message_pointer, popped_message_pointer); + + popped_unique_msg = intra_process_buffer.consume_unique(); + popped_message_pointer = reinterpret_cast(popped_unique_msg.get()); + + EXPECT_EQ(history_depth, intra_process_buffer.available_capacity()); + EXPECT_EQ(second_value, *popped_unique_msg); + EXPECT_EQ(second_message_pointer, popped_message_pointer); +} diff --git a/rclcpp/test/rclcpp/test_intra_process_manager.cpp b/rclcpp/test/rclcpp/test_intra_process_manager.cpp index 01169717e6..02b27cd00c 100644 --- a/rclcpp/test/rclcpp/test_intra_process_manager.cpp +++ b/rclcpp/test/rclcpp/test_intra_process_manager.cpp @@ -291,6 +291,10 @@ class SubscriptionIntraProcessBase return qos_profile.durability() == rclcpp::DurabilityPolicy::TransientLocal; } + virtual + size_t + available_capacity() const = 0; + rclcpp::QoS qos_profile; std::string topic_name; }; @@ -350,6 +354,12 @@ class SubscriptionIntraProcessBuffer : public SubscriptionIntraProcessBase return take_shared_method; } + size_t + available_capacity() const override + { + return qos_profile.depth() - buffer->size(); + } + bool take_shared_method; typename rclcpp::experimental::buffers::mock::IntraProcessBuffer::UniquePtr buffer; @@ -1033,3 +1043,75 @@ TEST(TestIntraProcessManager, transient_local) { reinterpret_cast(received_message_pointer_3)->msg); ASSERT_EQ("Test", reinterpret_cast(received_message_pointer_1)->msg); } + +/* + * This tests the method "lowest_available_capacity": + * - Creates 1 publisher. + * - The available buffer capacity should be at least history size. + * - Add 2 subscribers. + * - Add everything to the intra-process manager. + * - All the entities are expected to have different ids. + * - Check the subscriptions count for the publisher. + * - The available buffer capacity should be the history size. + * - Publish one message (without receiving it). + * - The available buffer capacity should decrease by 1. + * - Publish another message (without receiving it). + * - The available buffer capacity should decrease by 1. + * - One subscriber receives one message. + * - The available buffer capacity should stay the same, + * as the other subscriber still has not freed its buffer. + * - The other subscriber receives one message. + * - The available buffer capacity should increase by 1. + * - One subscription goes out of scope. + * - The available buffer capacity should not change. + */ +TEST(TestIntraProcessManager, lowest_available_capacity) { + using IntraProcessManagerT = rclcpp::experimental::IntraProcessManager; + using MessageT = rcl_interfaces::msg::Log; + using PublisherT = rclcpp::mock::Publisher; + using SubscriptionIntraProcessT = rclcpp::experimental::mock::SubscriptionIntraProcess; + + constexpr auto history_depth = 10u; + + auto ipm = std::make_shared(); + + auto p1 = std::make_shared(rclcpp::QoS(history_depth).transient_local()); + + auto s1 = + std::make_shared(rclcpp::QoS(history_depth).transient_local()); + auto s2 = + std::make_shared(rclcpp::QoS(history_depth).transient_local()); + auto s3 = + std::make_shared(rclcpp::QoS(history_depth).transient_local()); + + s1->take_shared_method = false; + s2->take_shared_method = true; + s3->take_shared_method = true; + + auto p1_id = ipm->add_publisher(p1, p1->buffer); + + p1->set_intra_process_manager(p1_id, ipm); + + auto unique_msg = std::make_unique(); + unique_msg->msg = "Test"; + p1->publish(std::move(unique_msg)); + + ipm->template add_subscription(s1); + ipm->template add_subscription(s2); + ipm->template add_subscription(s3); + + auto received_message_pointer_1 = s1->pop(); + auto received_message_pointer_2 = s2->pop(); + auto received_message_pointer_3 = s3->pop(); + ASSERT_NE(0u, received_message_pointer_1); + ASSERT_NE(0u, received_message_pointer_2); + ASSERT_NE(0u, received_message_pointer_3); + ASSERT_EQ(received_message_pointer_3, received_message_pointer_2); + ASSERT_EQ( + reinterpret_cast(received_message_pointer_1)->msg, + reinterpret_cast(received_message_pointer_2)->msg); + ASSERT_EQ( + reinterpret_cast(received_message_pointer_1)->msg, + reinterpret_cast(received_message_pointer_3)->msg); + ASSERT_EQ("Test", reinterpret_cast(received_message_pointer_1)->msg); +} diff --git a/rclcpp/test/rclcpp/test_publisher.cpp b/rclcpp/test/rclcpp/test_publisher.cpp index 070ff68d9b..ab49f92b14 100644 --- a/rclcpp/test/rclcpp/test_publisher.cpp +++ b/rclcpp/test/rclcpp/test_publisher.cpp @@ -625,6 +625,41 @@ TEST_P(TestPublisherWaitForAllAcked, check_wait_for_all_acked_with_QosPolicy) { EXPECT_TRUE(pub->wait_for_all_acked(std::chrono::milliseconds(6000))); } +TEST_F(TestPublisher, lowest_available_ipm_capacity) { + constexpr auto history_depth = 10u; + + initialize(rclcpp::NodeOptions().use_intra_process_comms(true)); + + rclcpp::PublisherOptionsWithAllocator> options_ipm_disabled; + options_ipm_disabled.use_intra_process_comm = rclcpp::IntraProcessSetting::Disable; + + rclcpp::PublisherOptionsWithAllocator> options_ipm_enabled; + options_ipm_enabled.use_intra_process_comm = rclcpp::IntraProcessSetting::Enable; + + auto do_nothing = [](std::shared_ptr) {}; + auto pub_ipm_disabled = node->create_publisher( + "topic", history_depth, + options_ipm_disabled); + auto pub_ipm_enabled = node->create_publisher( + "topic", history_depth, + options_ipm_enabled); + auto sub = node->create_subscription( + "topic", + history_depth, + do_nothing); + + ASSERT_EQ(1, pub_ipm_enabled->get_intra_process_subscription_count()); + ASSERT_EQ(0, pub_ipm_disabled->lowest_available_ipm_capacity()); + ASSERT_EQ(history_depth, pub_ipm_enabled->lowest_available_ipm_capacity()); + + auto msg = std::make_shared(); + ASSERT_NO_THROW(pub_ipm_disabled->publish(*msg)); + ASSERT_NO_THROW(pub_ipm_enabled->publish(*msg)); + + ASSERT_EQ(0, pub_ipm_disabled->lowest_available_ipm_capacity()); + ASSERT_EQ(history_depth - 1u, pub_ipm_enabled->lowest_available_ipm_capacity()); +} + INSTANTIATE_TEST_SUITE_P( TestWaitForAllAckedWithParm, TestPublisherWaitForAllAcked,