diff --git a/rclcpp/include/rclcpp/message_memory_strategy.hpp b/rclcpp/include/rclcpp/message_memory_strategy.hpp index 80654d79e7..6e4c0be065 100644 --- a/rclcpp/include/rclcpp/message_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/message_memory_strategy.hpp @@ -46,10 +46,10 @@ class MessageMemoryStrategy using MessageAlloc = typename MessageAllocTraits::allocator_type; using MessageDeleter = allocator::Deleter; - using SerializedMessageAllocTraits = allocator::AllocRebind; + using SerializedMessageAllocTraits = allocator::AllocRebind; using SerializedMessageAlloc = typename SerializedMessageAllocTraits::allocator_type; using SerializedMessageDeleter = - allocator::Deleter; + allocator::Deleter; using BufferAllocTraits = allocator::AllocRebind; using BufferAlloc = typename BufferAllocTraits::allocator_type; @@ -86,31 +86,12 @@ class MessageMemoryStrategy return std::allocate_shared(*message_allocator_.get()); } - virtual std::shared_ptr borrow_serialized_message(size_t capacity) + virtual std::shared_ptr borrow_serialized_message(size_t capacity) { - auto msg = new rcl_serialized_message_t; - *msg = rmw_get_zero_initialized_serialized_message(); - auto ret = rmw_serialized_message_init(msg, capacity, &rcutils_allocator_); - if (ret != RCL_RET_OK) { - rclcpp::exceptions::throw_from_rcl_error(ret); - } - - auto serialized_msg = std::shared_ptr( - msg, - [](rmw_serialized_message_t * msg) { - auto fini_ret = rmw_serialized_message_fini(msg); - delete msg; - if (fini_ret != RCL_RET_OK) { - RCUTILS_LOG_ERROR_NAMED( - "rclcpp", - "failed to destroy serialized message: %s", rcl_get_error_string().str); - } - }); - - return serialized_msg; + return std::make_shared(capacity); } - virtual std::shared_ptr borrow_serialized_message() + virtual std::shared_ptr borrow_serialized_message() { return borrow_serialized_message(default_buffer_capacity_); } @@ -127,7 +108,8 @@ class MessageMemoryStrategy msg.reset(); } - virtual void return_serialized_message(std::shared_ptr & serialized_msg) + virtual void return_serialized_message( + std::shared_ptr & serialized_msg) { serialized_msg.reset(); } diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index aa614e3a0c..abe259b31d 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -217,6 +217,12 @@ class Publisher : public PublisherBase return this->do_serialized_publish(&serialized_msg); } + void + publish(const SerializedMessage & serialized_msg) + { + return this->do_serialized_publish(&serialized_msg.get_rcl_serialized_message()); + } + /// Publish an instance of a LoanedMessage. /** * When publishing a loaned message, the memory for this ROS message will be deallocated diff --git a/rclcpp/include/rclcpp/serialized_message.hpp b/rclcpp/include/rclcpp/serialized_message.hpp index 13c2d1910b..4b90bf3561 100644 --- a/rclcpp/include/rclcpp/serialized_message.hpp +++ b/rclcpp/include/rclcpp/serialized_message.hpp @@ -98,6 +98,20 @@ class RCLCPP_PUBLIC_TYPE SerializedMessage */ size_t capacity() const; + /// Allocate memory in the data buffer + /** + * The data buffer of the underlying rcl_serialized_message_t will be resized. + * This might change the data layout and invalidates all pointers to the data. + */ + void reserve(size_t capacity); + + /// Release the underlying rcl_serialized_message_t + /** + * The memory (i.e. the data buffer) of the serialized message will no longer + * be managed by this instance and the memory won't be deallocated on destruction. + */ + rcl_serialized_message_t release_rcl_serialized_message(); + private: rcl_serialized_message_t serialized_message_; }; diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index a1e544c773..91343aaee0 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -250,7 +250,7 @@ class Subscription : public SubscriptionBase return message_memory_strategy_->borrow_message(); } - std::shared_ptr + std::shared_ptr create_serialized_message() override { return message_memory_strategy_->borrow_serialized_message(); @@ -299,7 +299,7 @@ class Subscription : public SubscriptionBase } void - return_serialized_message(std::shared_ptr & message) override + return_serialized_message(std::shared_ptr & message) override { message_memory_strategy_->return_serialized_message(message); } diff --git a/rclcpp/include/rclcpp/subscription_base.hpp b/rclcpp/include/rclcpp/subscription_base.hpp index 7ee8c624d1..8156622299 100644 --- a/rclcpp/include/rclcpp/subscription_base.hpp +++ b/rclcpp/include/rclcpp/subscription_base.hpp @@ -33,6 +33,7 @@ #include "rclcpp/message_info.hpp" #include "rclcpp/qos.hpp" #include "rclcpp/qos_event.hpp" +#include "rclcpp/serialized_message.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/visibility_control.hpp" @@ -151,7 +152,7 @@ class SubscriptionBase : public std::enable_shared_from_this */ RCLCPP_PUBLIC bool - take_serialized(rcl_serialized_message_t & message_out, rclcpp::MessageInfo & message_info_out); + take_serialized(rclcpp::SerializedMessage & message_out, rclcpp::MessageInfo & message_info_out); /// Borrow a new message. /** \return Shared pointer to the fresh message. */ @@ -164,7 +165,7 @@ class SubscriptionBase : public std::enable_shared_from_this /** \return Shared pointer to a rcl_message_serialized_t. */ RCLCPP_PUBLIC virtual - std::shared_ptr + std::shared_ptr create_serialized_message() = 0; /// Check if we need to handle the message, and execute the callback if we do. @@ -194,7 +195,7 @@ class SubscriptionBase : public std::enable_shared_from_this RCLCPP_PUBLIC virtual void - return_serialized_message(std::shared_ptr & message) = 0; + return_serialized_message(std::shared_ptr & message) = 0; RCLCPP_PUBLIC const rosidl_message_type_support_t & diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 556c42ddde..c25b47db49 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -350,8 +350,7 @@ Executor::execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription) if (subscription->is_serialized()) { // This is the case where a copy of the serialized message is taken from // the middleware via inter-process communication. - std::shared_ptr serialized_msg = - subscription->create_serialized_message(); + std::shared_ptr serialized_msg = subscription->create_serialized_message(); take_and_do_error_handling( "taking a serialized message from topic", subscription->get_topic_name(), diff --git a/rclcpp/src/rclcpp/serialized_message.cpp b/rclcpp/src/rclcpp/serialized_message.cpp index a667977cea..64e54f58f9 100644 --- a/rclcpp/src/rclcpp/serialized_message.cpp +++ b/rclcpp/src/rclcpp/serialized_message.cpp @@ -144,4 +144,20 @@ size_t SerializedMessage::capacity() const { return serialized_message_.buffer_capacity; } + +void SerializedMessage::reserve(size_t capacity) +{ + auto ret = rmw_serialized_message_resize(&serialized_message_, capacity); + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error(ret); + } +} + +rcl_serialized_message_t SerializedMessage::release_rcl_serialized_message() +{ + auto ret = serialized_message_; + serialized_message_ = rmw_get_zero_initialized_serialized_message(); + + return ret; +} } // namespace rclcpp diff --git a/rclcpp/src/rclcpp/subscription_base.cpp b/rclcpp/src/rclcpp/subscription_base.cpp index 2af068e32e..de1acfadb2 100644 --- a/rclcpp/src/rclcpp/subscription_base.cpp +++ b/rclcpp/src/rclcpp/subscription_base.cpp @@ -159,12 +159,12 @@ SubscriptionBase::take_type_erased(void * message_out, rclcpp::MessageInfo & mes bool SubscriptionBase::take_serialized( - rcl_serialized_message_t & message_out, + rclcpp::SerializedMessage & message_out, rclcpp::MessageInfo & message_info_out) { rcl_ret_t ret = rcl_take_serialized_message( this->get_subscription_handle().get(), - &message_out, + &message_out.get_rcl_serialized_message(), &message_info_out.get_rmw_message_info(), nullptr); if (RCL_RET_SUBSCRIPTION_TAKE_FAILED == ret) { diff --git a/rclcpp/test/test_serialized_message.cpp b/rclcpp/test/test_serialized_message.cpp index 0c8a037490..9047e22b22 100644 --- a/rclcpp/test/test_serialized_message.cpp +++ b/rclcpp/test/test_serialized_message.cpp @@ -114,6 +114,31 @@ TEST(TestSerializedMessage, various_constructors_from_rcl) { EXPECT_TRUE(nullptr != rcl_handle.buffer); } +TEST(TestSerializedMessage, release) { + const std::string content = "Hello World"; + const auto content_size = content.size() + 1; // accounting for null terminator + + rcl_serialized_message_t released_handle = rmw_get_zero_initialized_serialized_message(); + { + rclcpp::SerializedMessage serialized_msg(13); + // manually copy some content + auto & rcl_serialized_msg = serialized_msg.get_rcl_serialized_message(); + std::memcpy(rcl_serialized_msg.buffer, content.c_str(), content.size()); + rcl_serialized_msg.buffer[content.size()] = '\0'; + rcl_serialized_msg.buffer_length = content_size; + EXPECT_EQ(13u, serialized_msg.capacity()); + + released_handle = serialized_msg.release_rcl_serialized_message(); + // scope exit of serialized_msg + } + + EXPECT_TRUE(nullptr != released_handle.buffer); + EXPECT_EQ(13u, released_handle.buffer_capacity); + EXPECT_EQ(content_size, released_handle.buffer_length); + // cleanup memory manually + EXPECT_EQ(RCL_RET_OK, rmw_serialized_message_fini(&released_handle)); +} + TEST(TestSerializedMessage, serialization) { using MessageT = test_msgs::msg::BasicTypes; diff --git a/rclcpp/test/test_serialized_message_allocator.cpp b/rclcpp/test/test_serialized_message_allocator.cpp index bc177722e4..968ad0ac2d 100644 --- a/rclcpp/test/test_serialized_message_allocator.cpp +++ b/rclcpp/test/test_serialized_message_allocator.cpp @@ -29,25 +29,25 @@ TEST(TestSerializedMessageAllocator, default_allocator) { rclcpp::message_memory_strategy::MessageMemoryStrategy::create_default(); auto msg0 = mem_strategy->borrow_serialized_message(); - ASSERT_EQ(msg0->buffer_capacity, 0u); + ASSERT_EQ(msg0->capacity(), 0u); mem_strategy->return_serialized_message(msg0); auto msg100 = mem_strategy->borrow_serialized_message(100); - ASSERT_EQ(msg100->buffer_capacity, 100u); + ASSERT_EQ(msg100->capacity(), 100u); mem_strategy->return_serialized_message(msg100); auto msg200 = mem_strategy->borrow_serialized_message(); - auto ret = rmw_serialized_message_resize(msg200.get(), 200); + auto ret = rmw_serialized_message_resize(&msg200->get_rcl_serialized_message(), 200); ASSERT_EQ(RCL_RET_OK, ret); - EXPECT_EQ(0u, msg200->buffer_length); - EXPECT_EQ(200u, msg200->buffer_capacity); + EXPECT_EQ(0u, msg200->size()); + EXPECT_EQ(200u, msg200->capacity()); mem_strategy->return_serialized_message(msg200); auto msg1000 = mem_strategy->borrow_serialized_message(1000); - ASSERT_EQ(msg1000->buffer_capacity, 1000u); - ret = rmw_serialized_message_resize(msg1000.get(), 2000); + ASSERT_EQ(msg1000->capacity(), 1000u); + ret = rmw_serialized_message_resize(&msg1000->get_rcl_serialized_message(), 2000); ASSERT_EQ(RCL_RET_OK, ret); - EXPECT_EQ(2000u, msg1000->buffer_capacity); + EXPECT_EQ(2000u, msg1000->capacity()); mem_strategy->return_serialized_message(msg1000); } @@ -61,7 +61,7 @@ TEST(TestSerializedMessageAllocator, borrow_from_subscription) { [](std::shared_ptr test_msg) {(void) test_msg;}); auto msg0 = sub->create_serialized_message(); - EXPECT_EQ(0u, msg0->buffer_capacity); + EXPECT_EQ(0u, msg0->capacity()); sub->return_serialized_message(msg0); rclcpp::shutdown(); diff --git a/rclcpp/test/test_subscription.cpp b/rclcpp/test/test_subscription.cpp index 95f9b96448..a265f05e17 100644 --- a/rclcpp/test/test_subscription.cpp +++ b/rclcpp/test/test_subscription.cpp @@ -299,10 +299,10 @@ TEST_F(TestSubscription, take) { TEST_F(TestSubscription, take_serialized) { initialize(); using test_msgs::msg::Empty; - auto do_nothing = [](std::shared_ptr) {FAIL();}; + auto do_nothing = [](std::shared_ptr) {FAIL();}; { auto sub = node->create_subscription("~/test_take", 1, do_nothing); - std::shared_ptr msg = sub->create_serialized_message(); + std::shared_ptr msg = sub->create_serialized_message(); rclcpp::MessageInfo msg_info; EXPECT_FALSE(sub->take_serialized(*msg, msg_info)); } @@ -317,7 +317,7 @@ TEST_F(TestSubscription, take_serialized) { test_msgs::msg::Empty msg; pub->publish(msg); } - std::shared_ptr msg = sub->create_serialized_message(); + std::shared_ptr msg = sub->create_serialized_message(); rclcpp::MessageInfo msg_info; bool message_recieved = false; auto start = std::chrono::steady_clock::now();