Skip to content

Commit

Permalink
added support for SerializedMessage in Intraprocessmanger, so:
Browse files Browse the repository at this point in the history
 * serialized publisher uses correct method to forward content to subscriber
 * non-serialized publisher can serialize message for serialized subscriber

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
  • Loading branch information
Joshua Hampp committed Apr 22, 2020
1 parent fcaea76 commit fada8a0
Showing 1 changed file with 78 additions and 19 deletions.
97 changes: 78 additions & 19 deletions rclcpp/include/rclcpp/experimental/intra_process_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
#include "rclcpp/logging.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/publisher_base.hpp"
#include "rclcpp/serialization.hpp"
#include "rclcpp/serialized_message.hpp"
#include "rclcpp/visibility_control.hpp"

namespace rclcpp
Expand Down Expand Up @@ -356,18 +358,37 @@ class IntraProcessManager
std::shared_ptr<const MessageT> message,
std::vector<uint64_t> subscription_ids)
{
constexpr bool is_serialized_publisher = rclcpp::is_serialized_message_class<MessageT>::value;

for (auto id : subscription_ids) {
auto subscription_it = subscriptions_.find(id);
if (subscription_it == subscriptions_.end()) {
throw std::runtime_error("subscription has unexpectedly gone out of scope");
}
auto subscription_base = subscription_it->second.subscription;

auto subscription = std::static_pointer_cast<
rclcpp::experimental::SubscriptionIntraProcess<MessageT>
>(subscription_base);
// same message type, so it can just be passed through
if (subscription_base->is_serialized() == is_serialized_publisher) {
auto subscription = std::static_pointer_cast<
rclcpp::experimental::SubscriptionIntraProcess<MessageT>
>(subscription_base);

subscription->provide_intra_process_message(message);
} else if (is_serialized_publisher) {
// publisher provides a serialized message, while subscriber expects a ROS message
provide_serialized_intra_process_message(subscription_base, *message);
} else {
// publisher provides a ROS message, while subscriber expects a serialized message
rclcpp::Serialization<MessageT> serialization;
auto subscription = std::static_pointer_cast<
rclcpp::experimental::SubscriptionIntraProcess<SerializedMessage>
>(subscription_base);

auto serialized_message = std::make_unique<SerializedMessage>();
serialization.serialize_message(*message, *serialized_message);

subscription->provide_intra_process_message(message);
subscription->provide_intra_process_message(std::move(serialized_message));
}
}
}

Expand All @@ -384,33 +405,71 @@ class IntraProcessManager
using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
using MessageUniquePtr = std::unique_ptr<MessageT, Deleter>;

constexpr bool is_serialized_publisher = rclcpp::is_serialized_message_class<MessageT>::value;

for (auto it = subscription_ids.begin(); it != subscription_ids.end(); it++) {
auto subscription_it = subscriptions_.find(*it);
if (subscription_it == subscriptions_.end()) {
throw std::runtime_error("subscription has unexpectedly gone out of scope");
}
auto subscription_base = subscription_it->second.subscription;

auto subscription = std::static_pointer_cast<
rclcpp::experimental::SubscriptionIntraProcess<MessageT>
>(subscription_base);

if (std::next(it) == subscription_ids.end()) {
// If this is the last subscription, give up ownership
subscription->provide_intra_process_message(std::move(message));
// same message type, so it can just be passed through
if (subscription_base->is_serialized() == is_serialized_publisher) {
auto subscription = std::static_pointer_cast<
rclcpp::experimental::SubscriptionIntraProcess<MessageT>
>(subscription_base);

if (std::next(it) == subscription_ids.end()) {
// If this is the last subscription, give up ownership
subscription->provide_intra_process_message(std::move(message));
} else {
// Copy the message since we have additional subscriptions to serve
MessageUniquePtr copy_message;
Deleter deleter = message.get_deleter();
auto ptr = MessageAllocTraits::allocate(*allocator.get(), 1);
MessageAllocTraits::construct(*allocator.get(), ptr, *message);
copy_message = MessageUniquePtr(ptr, deleter);

subscription->provide_intra_process_message(std::move(copy_message));
}
} else if (is_serialized_publisher) {
// publisher provides a serialized message, while subscriber expects a ROS message
provide_serialized_intra_process_message(subscription_base, *message);
} else {
// Copy the message since we have additional subscriptions to serve
MessageUniquePtr copy_message;
Deleter deleter = message.get_deleter();
auto ptr = MessageAllocTraits::allocate(*allocator.get(), 1);
MessageAllocTraits::construct(*allocator.get(), ptr, *message);
copy_message = MessageUniquePtr(ptr, deleter);

subscription->provide_intra_process_message(std::move(copy_message));
// publisher provides a ROS message, while subscriber expects a serialized message
rclcpp::Serialization<MessageT> serialization;
auto subscription = std::static_pointer_cast<
rclcpp::experimental::SubscriptionIntraProcess<SerializedMessage>
>(subscription_base);

auto serialized_message = std::make_unique<SerializedMessage>();
serialization.serialize_message(*message, *serialized_message);

subscription->provide_intra_process_message(std::move(serialized_message));
}
}
}

template<typename T>
static std::enable_if_t<!rclcpp::is_serialized_message_class<T>::value>
provide_serialized_intra_process_message(
rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr & subscription,
const T & serialized_message)
{
(void)subscription;
(void)serialized_message;
// prevent call if actual message type does not match
}

static void
provide_serialized_intra_process_message(
rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr & subscription,
const SerializedMessage & serialized_message)
{
subscription->provide_serialized_intra_process_message(serialized_message);
}

PublisherToSubscriptionIdsMap pub_to_subs_;
SubscriptionMap subscriptions_;
PublisherMap publishers_;
Expand Down

0 comments on commit fada8a0

Please sign in to comment.