Skip to content

Commit

Permalink
Use serialized message (#1081)
Browse files Browse the repository at this point in the history
* use serialized message in callback

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* introduce resize method for serialized message

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* introduce release for serialized message

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* address review comments

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* correct typo

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* fix interface traits test

Signed-off-by: Karsten Knese <karsten@openrobotics.org>
  • Loading branch information
Karsten1987 authored Apr 23, 2020
1 parent bb91b6c commit 46cfe84
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 46 deletions.
32 changes: 7 additions & 25 deletions rclcpp/include/rclcpp/message_memory_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ class MessageMemoryStrategy
using MessageAlloc = typename MessageAllocTraits::allocator_type;
using MessageDeleter = allocator::Deleter<MessageAlloc, MessageT>;

using SerializedMessageAllocTraits = allocator::AllocRebind<rcl_serialized_message_t, Alloc>;
using SerializedMessageAllocTraits = allocator::AllocRebind<rclcpp::SerializedMessage, Alloc>;
using SerializedMessageAlloc = typename SerializedMessageAllocTraits::allocator_type;
using SerializedMessageDeleter =
allocator::Deleter<SerializedMessageAlloc, rcl_serialized_message_t>;
allocator::Deleter<SerializedMessageAlloc, rclcpp::SerializedMessage>;

using BufferAllocTraits = allocator::AllocRebind<char, Alloc>;
using BufferAlloc = typename BufferAllocTraits::allocator_type;
Expand Down Expand Up @@ -86,31 +86,12 @@ class MessageMemoryStrategy
return std::allocate_shared<MessageT, MessageAlloc>(*message_allocator_.get());
}

virtual std::shared_ptr<rcl_serialized_message_t> borrow_serialized_message(size_t capacity)
virtual std::shared_ptr<rclcpp::SerializedMessage> borrow_serialized_message(size_t capacity)
{
auto msg = new rcl_serialized_message_t;
*msg = rmw_get_zero_initialized_serialized_message();
auto ret = rmw_serialized_message_init(msg, capacity, &rcutils_allocator_);
if (ret != RCL_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}

auto serialized_msg = std::shared_ptr<rcl_serialized_message_t>(
msg,
[](rmw_serialized_message_t * msg) {
auto fini_ret = rmw_serialized_message_fini(msg);
delete msg;
if (fini_ret != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"failed to destroy serialized message: %s", rcl_get_error_string().str);
}
});

return serialized_msg;
return std::make_shared<rclcpp::SerializedMessage>(capacity);
}

virtual std::shared_ptr<rcl_serialized_message_t> borrow_serialized_message()
virtual std::shared_ptr<rclcpp::SerializedMessage> borrow_serialized_message()
{
return borrow_serialized_message(default_buffer_capacity_);
}
Expand All @@ -127,7 +108,8 @@ class MessageMemoryStrategy
msg.reset();
}

virtual void return_serialized_message(std::shared_ptr<rcl_serialized_message_t> & serialized_msg)
virtual void return_serialized_message(
std::shared_ptr<rclcpp::SerializedMessage> & serialized_msg)
{
serialized_msg.reset();
}
Expand Down
6 changes: 6 additions & 0 deletions rclcpp/include/rclcpp/publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,12 @@ class Publisher : public PublisherBase
return this->do_serialized_publish(&serialized_msg);
}

void
publish(const SerializedMessage & serialized_msg)
{
return this->do_serialized_publish(&serialized_msg.get_rcl_serialized_message());
}

/// Publish an instance of a LoanedMessage.
/**
* When publishing a loaned message, the memory for this ROS message will be deallocated
Expand Down
14 changes: 14 additions & 0 deletions rclcpp/include/rclcpp/serialized_message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,20 @@ class RCLCPP_PUBLIC_TYPE SerializedMessage
*/
size_t capacity() const;

/// Allocate memory in the data buffer
/**
* The data buffer of the underlying rcl_serialized_message_t will be resized.
* This might change the data layout and invalidates all pointers to the data.
*/
void reserve(size_t capacity);

/// Release the underlying rcl_serialized_message_t
/**
* The memory (i.e. the data buffer) of the serialized message will no longer
* be managed by this instance and the memory won't be deallocated on destruction.
*/
rcl_serialized_message_t release_rcl_serialized_message();

private:
rcl_serialized_message_t serialized_message_;
};
Expand Down
4 changes: 2 additions & 2 deletions rclcpp/include/rclcpp/subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ class Subscription : public SubscriptionBase
return message_memory_strategy_->borrow_message();
}

std::shared_ptr<rcl_serialized_message_t>
std::shared_ptr<rclcpp::SerializedMessage>
create_serialized_message() override
{
return message_memory_strategy_->borrow_serialized_message();
Expand Down Expand Up @@ -299,7 +299,7 @@ class Subscription : public SubscriptionBase
}

void
return_serialized_message(std::shared_ptr<rcl_serialized_message_t> & message) override
return_serialized_message(std::shared_ptr<rclcpp::SerializedMessage> & message) override
{
message_memory_strategy_->return_serialized_message(message);
}
Expand Down
7 changes: 4 additions & 3 deletions rclcpp/include/rclcpp/subscription_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "rclcpp/message_info.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/qos_event.hpp"
#include "rclcpp/serialized_message.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/visibility_control.hpp"

Expand Down Expand Up @@ -151,7 +152,7 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
*/
RCLCPP_PUBLIC
bool
take_serialized(rcl_serialized_message_t & message_out, rclcpp::MessageInfo & message_info_out);
take_serialized(rclcpp::SerializedMessage & message_out, rclcpp::MessageInfo & message_info_out);

/// Borrow a new message.
/** \return Shared pointer to the fresh message. */
Expand All @@ -164,7 +165,7 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
/** \return Shared pointer to a rcl_message_serialized_t. */
RCLCPP_PUBLIC
virtual
std::shared_ptr<rcl_serialized_message_t>
std::shared_ptr<rclcpp::SerializedMessage>
create_serialized_message() = 0;

/// Check if we need to handle the message, and execute the callback if we do.
Expand Down Expand Up @@ -194,7 +195,7 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
RCLCPP_PUBLIC
virtual
void
return_serialized_message(std::shared_ptr<rcl_serialized_message_t> & message) = 0;
return_serialized_message(std::shared_ptr<rclcpp::SerializedMessage> & message) = 0;

RCLCPP_PUBLIC
const rosidl_message_type_support_t &
Expand Down
3 changes: 1 addition & 2 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,7 @@ Executor::execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
if (subscription->is_serialized()) {
// This is the case where a copy of the serialized message is taken from
// the middleware via inter-process communication.
std::shared_ptr<rcl_serialized_message_t> serialized_msg =
subscription->create_serialized_message();
std::shared_ptr<SerializedMessage> serialized_msg = subscription->create_serialized_message();
take_and_do_error_handling(
"taking a serialized message from topic",
subscription->get_topic_name(),
Expand Down
16 changes: 16 additions & 0 deletions rclcpp/src/rclcpp/serialized_message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,20 @@ size_t SerializedMessage::capacity() const
{
return serialized_message_.buffer_capacity;
}

void SerializedMessage::reserve(size_t capacity)
{
auto ret = rmw_serialized_message_resize(&serialized_message_, capacity);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
}

rcl_serialized_message_t SerializedMessage::release_rcl_serialized_message()
{
auto ret = serialized_message_;
serialized_message_ = rmw_get_zero_initialized_serialized_message();

return ret;
}
} // namespace rclcpp
4 changes: 2 additions & 2 deletions rclcpp/src/rclcpp/subscription_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,12 @@ SubscriptionBase::take_type_erased(void * message_out, rclcpp::MessageInfo & mes

bool
SubscriptionBase::take_serialized(
rcl_serialized_message_t & message_out,
rclcpp::SerializedMessage & message_out,
rclcpp::MessageInfo & message_info_out)
{
rcl_ret_t ret = rcl_take_serialized_message(
this->get_subscription_handle().get(),
&message_out,
&message_out.get_rcl_serialized_message(),
&message_info_out.get_rmw_message_info(),
nullptr);
if (RCL_RET_SUBSCRIPTION_TAKE_FAILED == ret) {
Expand Down
25 changes: 25 additions & 0 deletions rclcpp/test/test_serialized_message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,31 @@ TEST(TestSerializedMessage, various_constructors_from_rcl) {
EXPECT_TRUE(nullptr != rcl_handle.buffer);
}

TEST(TestSerializedMessage, release) {
const std::string content = "Hello World";
const auto content_size = content.size() + 1; // accounting for null terminator

rcl_serialized_message_t released_handle = rmw_get_zero_initialized_serialized_message();
{
rclcpp::SerializedMessage serialized_msg(13);
// manually copy some content
auto & rcl_serialized_msg = serialized_msg.get_rcl_serialized_message();
std::memcpy(rcl_serialized_msg.buffer, content.c_str(), content.size());
rcl_serialized_msg.buffer[content.size()] = '\0';
rcl_serialized_msg.buffer_length = content_size;
EXPECT_EQ(13u, serialized_msg.capacity());

released_handle = serialized_msg.release_rcl_serialized_message();
// scope exit of serialized_msg
}

EXPECT_TRUE(nullptr != released_handle.buffer);
EXPECT_EQ(13u, released_handle.buffer_capacity);
EXPECT_EQ(content_size, released_handle.buffer_length);
// cleanup memory manually
EXPECT_EQ(RCL_RET_OK, rmw_serialized_message_fini(&released_handle));
}

TEST(TestSerializedMessage, serialization) {
using MessageT = test_msgs::msg::BasicTypes;

Expand Down
18 changes: 9 additions & 9 deletions rclcpp/test/test_serialized_message_allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,25 @@ TEST(TestSerializedMessageAllocator, default_allocator) {
rclcpp::message_memory_strategy::MessageMemoryStrategy<DummyMessageT>::create_default();

auto msg0 = mem_strategy->borrow_serialized_message();
ASSERT_EQ(msg0->buffer_capacity, 0u);
ASSERT_EQ(msg0->capacity(), 0u);
mem_strategy->return_serialized_message(msg0);

auto msg100 = mem_strategy->borrow_serialized_message(100);
ASSERT_EQ(msg100->buffer_capacity, 100u);
ASSERT_EQ(msg100->capacity(), 100u);
mem_strategy->return_serialized_message(msg100);

auto msg200 = mem_strategy->borrow_serialized_message();
auto ret = rmw_serialized_message_resize(msg200.get(), 200);
auto ret = rmw_serialized_message_resize(&msg200->get_rcl_serialized_message(), 200);
ASSERT_EQ(RCL_RET_OK, ret);
EXPECT_EQ(0u, msg200->buffer_length);
EXPECT_EQ(200u, msg200->buffer_capacity);
EXPECT_EQ(0u, msg200->size());
EXPECT_EQ(200u, msg200->capacity());
mem_strategy->return_serialized_message(msg200);

auto msg1000 = mem_strategy->borrow_serialized_message(1000);
ASSERT_EQ(msg1000->buffer_capacity, 1000u);
ret = rmw_serialized_message_resize(msg1000.get(), 2000);
ASSERT_EQ(msg1000->capacity(), 1000u);
ret = rmw_serialized_message_resize(&msg1000->get_rcl_serialized_message(), 2000);
ASSERT_EQ(RCL_RET_OK, ret);
EXPECT_EQ(2000u, msg1000->buffer_capacity);
EXPECT_EQ(2000u, msg1000->capacity());
mem_strategy->return_serialized_message(msg1000);
}

Expand All @@ -61,7 +61,7 @@ TEST(TestSerializedMessageAllocator, borrow_from_subscription) {
[](std::shared_ptr<test_msgs::msg::Empty> test_msg) {(void) test_msg;});

auto msg0 = sub->create_serialized_message();
EXPECT_EQ(0u, msg0->buffer_capacity);
EXPECT_EQ(0u, msg0->capacity());
sub->return_serialized_message(msg0);

rclcpp::shutdown();
Expand Down
6 changes: 3 additions & 3 deletions rclcpp/test/test_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,10 @@ TEST_F(TestSubscription, take) {
TEST_F(TestSubscription, take_serialized) {
initialize();
using test_msgs::msg::Empty;
auto do_nothing = [](std::shared_ptr<const rcl_serialized_message_t>) {FAIL();};
auto do_nothing = [](std::shared_ptr<const rclcpp::SerializedMessage>) {FAIL();};
{
auto sub = node->create_subscription<test_msgs::msg::Empty>("~/test_take", 1, do_nothing);
std::shared_ptr<rcl_serialized_message_t> msg = sub->create_serialized_message();
std::shared_ptr<rclcpp::SerializedMessage> msg = sub->create_serialized_message();
rclcpp::MessageInfo msg_info;
EXPECT_FALSE(sub->take_serialized(*msg, msg_info));
}
Expand All @@ -317,7 +317,7 @@ TEST_F(TestSubscription, take_serialized) {
test_msgs::msg::Empty msg;
pub->publish(msg);
}
std::shared_ptr<rcl_serialized_message_t> msg = sub->create_serialized_message();
std::shared_ptr<rclcpp::SerializedMessage> msg = sub->create_serialized_message();
rclcpp::MessageInfo msg_info;
bool message_recieved = false;
auto start = std::chrono::steady_clock::now();
Expand Down

0 comments on commit 46cfe84

Please sign in to comment.