Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dnae adas/intraprocess manager support for serialized message #1076

Open
wants to merge 122 commits into
base: rolling
Choose a base branch
from

Conversation

DensoADAS
Copy link
Contributor

added support for serialized messages in intraprocess manager, allowing on demand serialization/deserialization

@DensoADAS
Copy link
Contributor Author

depends on #1075

Copy link
Member

@ivanpauno ivanpauno left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is going in the right direction, but I think it can be optimized further.

Currently, the published message is promoted or not to a shared_ptr depending on the number of subscription that requires ownership, and the number of subscriptions that do not.
See logic here:

std::vector<uint64_t> concatenated_vector(sub_ids.take_shared_subscriptions);
concatenated_vector.insert(
concatenated_vector.end(),
sub_ids.take_ownership_subscriptions.begin(),
sub_ids.take_ownership_subscriptions.end());
this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
std::move(message),
concatenated_vector,
allocator);
} else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT
sub_ids.take_shared_subscriptions.size() > 1)
{
// Construct a new shared pointer from the message
// for the buffers that do not require ownership
auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(*allocator, *message);
this->template add_shared_msg_to_buffers<MessageT>(
shared_msg, sub_ids.take_shared_subscriptions);
this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
std::move(message), sub_ids.take_ownership_subscriptions, allocator);
.

This PR is partially taking an advantage of that logic, but not completly.
Imagine the following example:

  • A Publisher publishs a non-serialized message.
  • There is N subscriptions, taking a non-owned serialized message (i.e.: a shared_ptr to a serialized message).

In that case, N serialized messages unique ptrs will be created and passed to the intraprocess subscription, but only one could have been created and shared between all of them.

Sorry if my assumption is wrong, but this isn't super easy to follow 😄.

subscription->provide_intra_process_message(message);
} else if (is_serialized_publisher) {
// publisher provides a serialized message, while subscriber expects a ROS message
provide_serialized_intra_process_message(subscription_base, *message);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of the function is confusing to me.
IMO, it sounds like it's passing a serialized message along, but it's deserializing it and the providing the deserialized message to the intraprocess subscription.

{
(void)subscription;
(void)serialized_message;
// prevent call if actual message type does not match
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could a static_assert be added here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also don't think that these two static methods have to be introduced here. I think the static_assert could be part of the function above.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @Karsten1987 on this.

@@ -134,6 +148,20 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
return buffer_->use_take_shared_method();
}

void
provide_serialized_intra_process_message(const SerializedMessage & serialized_message)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe, provide_ros_massage_from_serialized_message would be clearer, though a bit verbose.
The opposite version: provide_serialized_message_from_ros_message could be added to avoid some duplication in intraprocess manager code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method should only be available if the message type is a serialized message.
Probably, you can add a SFINAE check above to avoid it to be called in non-serialized message specializations.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though that would not satisfy the base class interface.
Better, if the message type is not serialized, the specialization should throw an error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Why is that function called _serialized_? Could that not be part of the overload with provide_intra_process_message(const SerializedMessage & serialized_message)?

@ivanpauno
Copy link
Member

I talked offline with @Karsten1987 and I will explain a bit more about how I think this optimization should be done.

Instead of having only two vectors here, we could have four:

  • One for subscriptions that take ownership (std::unique_ptr).
  • One for subscriptions that don't take ownership. (std::shared_ptr).
  • One for serialized subscriptions that take ownership. (std::unique_ptr to serialized message).
  • One for serialized subscriptions that don't take ownership. (std::shared_ptr to serialized message).

Then the logic here can be replaced with something like this:

  • If the publisher is serialized and we have at least one subscription taking a non-serialized message, a serialized message unique_ptr is created.
  • Then, we use the same logic we're using right now:
    • If there are not subscriptions taking a unique_ptr, the original non-serialized unique_ptr is promoted to a shared_ptr and added to all the buffers (
      std::shared_ptr<MessageT> shared_msg = std::move(message);
      if (!sub_ids.take_shared_subscriptions.empty()) {
      this->template add_shared_msg_to_buffers<MessageT>(
      shared_msg, sub_ids.take_shared_subscriptions);
      ).
    • If there is at least one subscription requiring an unique_ptr, the message of the original unique_ptr is copied in a shared_ptr. The unique_ptr is used for the buffers requiring ownership of the message, the shared_ptr for the others.

The same logic is then used for serialized subscriptions:

  • If the publisher is non-serialized and we have at least one subscription taking a serialized message, a serialized message unique_ptr is created.
  • Then, the same story about promoting the unique_ptr to a shared_ptr or not depending if a subscription taking a unique_ptr to a serialized message is required or not.

@DensoADAS I'm sorry for my super late review.
I think the optimization can be done in a follow-up, as it doesn't break public API.
It will break the intraprocess manager ABI, but that class isn't publicly available.
The only place where we use it is being opaqued in a pointer, so I think it'll be safe to backport the optimization.

Copy link
Contributor

@Karsten1987 Karsten1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that looks okay to me - respecting @ivanpauno's comments

{
(void)subscription;
(void)serialized_message;
// prevent call if actual message type does not match
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also don't think that these two static methods have to be introduced here. I think the static_assert could be part of the function above.

@@ -134,6 +148,20 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
return buffer_->use_take_shared_method();
}

void
provide_serialized_intra_process_message(const SerializedMessage & serialized_message)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Why is that function called _serialized_? Could that not be part of the overload with provide_intra_process_message(const SerializedMessage & serialized_message)?

@DensoADAS DensoADAS force-pushed the dnae_adas/intraprocess_manager_support_for_serialized_message branch from 1a34341 to 87b5df9 Compare April 22, 2020 03:15
@DensoADAS
Copy link
Contributor Author

t doesn't break public API.
It will break the intraprocess manager ABI, but that class i

I have already thought about this. But in lack of time and this is more complicated I skipped this optimization for now. I will provide this later.

I also would add support for reusing deserialized messages (from subscriber side).

@DensoADAS
Copy link
Contributor Author

  • updated to changes in serialized_messages
  • rebased

@Karsten1987
Copy link
Contributor

thanks @DensoADAS for iterating with us over this. It looks like there is still a conflict in this PR.

@DensoADAS
Copy link
Contributor Author

@Karsten1987 I merged it and updated it (also the other PR)

@Karsten1987
Copy link
Contributor

You might want to rebase once again as it looks like you've got some duplicated commits on your branch.

I've just checked and your PR compiles locally on my OSX machine :)
I'd be happy if you could address the review comments on the remaining two PRs in order to advance with these as soon as possible. thanks.

@@ -97,6 +97,34 @@ class Serialization : public SerializationBase
}
};

/// Specialized serialization for rcl_serialized_message_t (because type support is not defined)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't quite understand why we need this specialization. Does the static_assert really introduce so much trouble?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alternative: you can add enable_if around the functions where it's used

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also not sure why this is needed.

Joshua Hampp and others added 9 commits April 22, 2020 17:03
* implemented interface to deserialize a given serialized message to ros message

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
 * serialized publisher uses correct method to forward content to subscriber
 * non-serialized publisher can serialize message for serialized subscriber

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
…cess_message

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
…y needed for the next PR)

Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
Signed-off-by: Joshua Hampp <j.hampp@denso-adas.de>
Signed-off-by: Dirk Thomas <dirk-thomas@users.noreply.github.com>
* Reflect changes in rclcpp API

Signed-off-by: Prajakta Gokhale <prajaktg@amazon.com>

* Revert earlier fix made in rclcpp

Signed-off-by: Prajakta Gokhale <prajaktg@amazon.com>
@DensoADAS DensoADAS force-pushed the dnae_adas/intraprocess_manager_support_for_serialized_message branch from a79acb8 to 0577f39 Compare April 22, 2020 15:17
@ivanpauno
Copy link
Member

I have already thought about this. But in lack of time and this is more complicated I skipped this optimization for now. I will provide this later.

Sounds good, thanks!

I also would add support for reusing deserialized messages (from subscriber side).

Can you explain further? I don't get what you mean.

Copy link
Member

@ivanpauno ivanpauno left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this PR required #1081 for this change to make sense?

I also left some minimal comments about the implementation.

{
(void)subscription;
(void)serialized_message;
// prevent call if actual message type does not match
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @Karsten1987 on this.

@@ -134,6 +148,20 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
return buffer_->use_take_shared_method();
}

void
provide_intra_process_message(const SerializedMessage & serialized_message)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, there's a minimal asymmetry in the implementation, that makes this extremely confusing:

  • If both subscription and publisher type matches, we use this other overload, and that's also true if both publisher/subscription are using a serialized message (the only difference is that the other signature uses an unique_ptr).
  • If the publisher is serialized and the subscription isn't, this method is used, which provides a non-serialized message from a serialized one.
  • If the publisher isn't serialized and the subscription is, the original overload is used, and serialization is handled in add_owned_msg_to_buffers/add_shared_msg_to_buffers

We should either have methods clearly named here (provide_serialized_intra_process_message_from_ros_message and provide_ros_intra_process_message_from_serialized_message), or always handle serialization/deserialization in add_owned_msg_to_buffers/add_shared_msg_to_buffers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, it makes sense to handle the serialization in the add_*msg_to_buffers or some central place.

From what I understand, the provide_intra_process_message functions should simply be overloaded with SerializedMessages and shouldn't do any further logic than message passing - pretty much exactly how the other functions work. The serialization/deserialization should be handled outside of this, i.e. in the add functions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just played around a bit more with this and honestly, I don't know what the nicest solution for this is.

I am generally in favor of handling the serialization in one place, ideally in the intra-process manager. In order to do that, we need a couple of functions in the subscription base:

This handle essentially returns the typesupport pointer which is required in order to run rmw_serialize.

const rosidl_type_support_t *
get_type_support_handle() const;

The implemenation of the intraprocess subscription can then do the SFINAE thing and either return the rosidl handle for MessageT if it's not a serialized message or return a nullptr otherwise.

The second message you'd need on the subscription base is to create/instantiate a default MessageT and return this as a shared_ptr<void> or unique_ptr<void> depending on the number of subscription.

std::shared_ptr<void> generate_message() const
{
  return std::make_shared<MessageT>();  // same for unique ptr
}

That returned pointer can then be filled by the serialization and passed back via provide_intra_process_message.

The provide_intra_process_message itself has to be overloaded with shared_ptr<void> and unique_ptr<void> whose implementation then casts these pointers back to the MessageT type and proceed as usual.

void provide_intra_process_message(const std::shared_ptr<void> message) {
  provide_intra_process_message(std::static_pointer_cast<const MessageT>(message));
}

That's quite a bit of flip-flop but doable. The alternatively, we could just invert the logic and do all the serialization inside the intraprocess subscription.


as for the SFINAE part, what I came up with which works nicely without too many empty implemented specializations is the following:

  const rosidl_message_type_support_t *
  get_type_support_handle() const override
  {
    return get_type_support_handle_impl<serialization_traits::is_serialized_message_class<MessageT>::value>();
  }

  template<bool is_serialized>
  const rosidl_message_type_support_t *
  get_type_support_handle_impl() const
  {
    return rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>();
  }

  template<>
  const rosidl_message_type_support_t *
  get_type_support_handle_impl<true>() const
  {
    return nullptr;
  }

The serialization can then be conveniently be instantiated with:

rclcpp::SerializationBase serialization(get_type_support_handle());

@@ -97,6 +97,34 @@ class Serialization : public SerializationBase
}
};

/// Specialized serialization for rcl_serialized_message_t (because type support is not defined)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also not sure why this is needed.

@DensoADAS
Copy link
Contributor Author

I have already thought about this. But in lack of time and this is more complicated I skipped this optimization for now. I will provide this later.

Sounds good, thanks!

I also would add support for reusing deserialized messages (from subscriber side).

Can you explain further? I don't get what you mean.

On publisher side the message get's serialized. On susbscriber side deserialized. Both operations only need to be done once for a message.

@ivanpauno
Copy link
Member

On publisher side the message get's serialized. On susbscriber side deserialized. Both operations only need to be done once for a message.

Oh yeah, the comment I made above handles that correctly (serialization/deserialization is only done once if needed).

@Karsten1987
Copy link
Contributor

Isn't this PR required #1081 for this change to make sense?

I would try holding off from #1081 until these changes are settled. The changes introduced in the other PR are designed to be a follow up - and given time, I would assume these changes to be landed past Foxy.

@ivanpauno
Copy link
Member

AFAIS here, the intraprocess communication will happen if the subscription is taking a SerializedMessage object, and I don't see how that will happen without #1081 merged.

@Karsten1987
Copy link
Contributor

The typetrait will return correctly as well with rcl_serialized_message_t.
But I agree, before calling the callback, I believe that get_rcl_serialized_message() has to be called to get the rcl type.

ivanpauno and others added 21 commits July 7, 2020 13:23
Signed-off-by: Ivan Santiago Paunovic <ivanpauno@ekumenlabs.com>
Signed-off-by: Stephen Brawner <brawner@gmail.com>
Signed-off-by: Stephen Brawner <brawner@gmail.com>
Signed-off-by: Tomoya.Fujita <Tomoya.Fujita@sony.com>
* Unit tests for header-only functions/classes

Adds coverage for:
  * any_service_callback.hpp
  * any_subscription_callback.hpp
  * create_subscription.hpp
  * create_timer.hpp

Signed-off-by: Stephen Brawner <brawner@gmail.com>

* Address PR feedback

Signed-off-by: Stephen Brawner <brawner@gmail.com>
* Throw exception if rcl_timer_init fails

Signed-off-by: Stephen Brawner <brawner@gmail.com>

* Add bad-argument tests for GenericTimer

Signed-off-by: Stephen Brawner <brawner@gmail.com>

* Add comments

Signed-off-by: Stephen Brawner <brawner@gmail.com>

* Address feedback

Signed-off-by: Stephen Brawner <brawner@gmail.com>

* Address feedback

Signed-off-by: Stephen Brawner <brawner@gmail.com>
Signed-off-by: Tomoya.Fujita <Tomoya.Fujita@sony.com>
Signed-off-by: Christophe Bedard <bedard.christophe@gmail.com>
* Include original exception in ComponentManagerException

Signed-off-by: Martijn Buijs <martijn.buijs@gmail.com>

* Update rclcpp_components/src/component_manager.cpp

Co-authored-by: tomoya <Tomoya.Fujita@sony.com>
Signed-off-by: Martijn Buijs <martijn.buijs@gmail.com>

Co-authored-by: tomoya <Tomoya.Fujita@sony.com>
Signed-off-by: Michel Hidalgo <michel@ekumenlabs.com>
Signed-off-by: Dirk Thomas <dirk-thomas@users.noreply.github.com>
* Update quality level and links to doc

Signed-off-by: ahcorde <ahcorde@gmail.com>

* Added feedback

Signed-off-by: ahcorde <ahcorde@gmail.com>

* Fixed wording and links

Signed-off-by: ahcorde <ahcorde@gmail.com>

* Bump QD to level 3 and fixed links

Signed-off-by: ahcorde <ahcorde@gmail.com>

* Added missing dependency rcpputils to rclcpp_components

Signed-off-by: ahcorde <ahcorde@gmail.com>

* Added missing dependency rmw to rclcpp_lifecycle

Signed-off-by: ahcorde <ahcorde@gmail.com>

* Added feedback

Signed-off-by: ahcorde <ahcorde@gmail.com>

* changed ci_linux_coverage to nightly_linux_coverage

Signed-off-by: ahcorde <ahcorde@gmail.com>
`this->node_options_` might still be `nullptr` for a default initialized NodeOptions instance.
`use_global_arguments()` must return `this->use_global_arguments_`, in analogy to `NodeOptions::enable_rosout()`.

Signed-off-by: Johannes Meyer <johannes@intermodalics.eu>
* Fix conversion from negative Duration or Time to the respective message type and throw in Duration::to_rmw_time() if the duration is negative.
rmw_time_t cannot represent negative durations.

Constructors and assignment operators can be just defaulted.

Other changes are mainly cosmetical, to make conversions between signed
and unsigned types and between 32-bit and 64-bit types more explicit.

Signed-off-by: Johannes Meyer <johannes@intermodalics.eu>

* Add -Wconversion compiler option and fix implicit conversions that might alter the value

Signed-off-by: Johannes Meyer <johannes@intermodalics.eu>

* Fix usage of fixture class in some unit tests by using gtest macro TEST_F() instead of TEST().

Signed-off-by: Johannes Meyer <johannes@intermodalics.eu>

* Add compiler option -Wno-sign-conversion to fix build with Clang on macOS

Signed-off-by: Johannes Meyer <johannes@intermodalics.eu>
Signed-off-by: claireyywang <22240514+claireyywang@users.noreply.github.com>
Signed-off-by: Ivan Santiago Paunovic <ivanpauno@ekumenlabs.com>
* Unit tests for node interfaces

Signed-off-by: Stephen Brawner <brawner@gmail.com>

* Address PR Feedback

Signed-off-by: Stephen Brawner <brawner@gmail.com>

* Address PR feedback

Signed-off-by: Stephen Brawner <brawner@gmail.com>

* Adjusting comment

Signed-off-by: Stephen Brawner <brawner@gmail.com>
Signed-off-by: Dirk Thomas <dirk-thomas@users.noreply.github.com>
Signed-off-by: Michel Hidalgo <michel@ekumenlabs.com>
@DensoADAS
Copy link
Contributor Author

@Karsten1987 I rebased the branch. Is anything else open? Perhaps I lost the overview a little by my spam filter.

@@ -324,8 +301,39 @@ class IntraProcessManager

struct SplittedSubscriptions
{
std::vector<uint64_t> take_shared_subscriptions;
std::vector<uint64_t> take_ownership_subscriptions;
enum
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider using class enums. With this you wouldn't need to prefix each item with Index as well as the numbering should be done by default.

Suggested change
enum
enum class Index : std::uint8_t

Comment on lines +306 to +308
IndexSharedTyped = 0, IndexSharedSerialized = 1,
IndexOwnershipTyped = 2, IndexOwnershipSerialized = 3,
IndexNum = 4
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some doxygen to these items? I feel like it's going to be rough to figure out what's going on here later on in the project.

Comment on lines +638 to +639
rcpputils::check_true(nullptr != converted_message, "Converted message is nullpointer.");
rcpputils::check_true(nullptr != serialization, "Serialization is nullpointer.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick (style only): Consider moving these two lines at the beginning of the function.

if (sub_ids.take_subscriptions[Indicies::ownership_other].size() +
sub_ids.take_subscriptions[Indicies::shared_other].size() > 0)
{
do_intra_process_publish_other_type<MessageT>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this return here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so

const std::vector<uint64_t> & take_ownership_subscriptions,
const std::vector<uint64_t> & take_shared_subscriptions)
{
// subsriber and publisher have same message types
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// subsriber and publisher have same message types
// subscriber and publisher have same message types

// So this case is equivalent to all the buffers requiring ownership

// Merge the two vectors of ids into a unique one
std::vector<uint64_t> concatenated_vector(take_shared_subscriptions);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider calling reserve here as the total amount of elements in the merged vector is known.would it also make sense to invert this here? Copying the take_ownership_subscriptions assuming that it's the bigger vector of the two.

shared_msg, sub_ids.take_shared_subscriptions);
this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
std::move(message), sub_ids.take_ownership_subscriptions, allocator);
do_intra_process_publish_other_type<MessageT>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this return here? if not, shouldn't we check below as well that we have subscriptions which require "same type" to prevent doing always that work?

const std::vector<uint64_t> & take_ownership_subscriptions,
const std::vector<uint64_t> & take_shared_subscriptions)
{
// subsriber and publisher have different message types
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// subsriber and publisher have different message types
// subscriber and publisher have different message types

Comment on lines +437 to +464
if (take_ownership_subscriptions.empty()) {
// None of the buffers require ownership, so we promote the pointer
this->template add_shared_msg_to_buffers<void>(std::move(message), take_shared_subscriptions);
} else if (!take_ownership_subscriptions.empty() && // NOLINT
take_shared_subscriptions.size() <= 1)
{
// There is at maximum 1 buffer that does not require ownership.
// So we this case is equivalent to all the buffers requiring ownership

// Merge the two vector of ids into a unique one
std::vector<uint64_t> concatenated_vector(take_shared_subscriptions);
concatenated_vector.insert(
concatenated_vector.end(),
take_ownership_subscriptions.begin(),
take_ownership_subscriptions.end());

add_owned_msg_to_buffers(std::move(message), concatenated_vector);
} else if (!take_ownership_subscriptions.empty() && // NOLINT
take_shared_subscriptions.size() > 1)
{
// Construct a new shared pointer from the message
// for the buffers that do not require ownership
auto shared_msg = subscription_base->create_shared_message(message.get());

this->template add_shared_msg_to_buffers<void>(
shared_msg, take_shared_subscriptions);
add_owned_msg_to_buffers(std::move(message), take_ownership_subscriptions);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to deduplicate this logic?

* \return a shared pointer (containing rclcpp::SerializedMessage) with deleter.
*/
template<typename MessageT>
std::shared_ptr<void> convert_message(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider renaming this function to serialize_message.

@ivanpauno
Copy link
Member

@DensoADAS There seem to be conflicts again 😕

Copy link
Member

@ivanpauno ivanpauno left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to review more in detail the changes in rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp.

My main concern is the lack of testing.
@DensoADAS are you planning to add test cases?

};

template<bool is_serialized>
struct SplittedSubscriptionsIndicies
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
struct SplittedSubscriptionsIndicies
struct SplittedSubscriptionsIndices

I'm not a native English speaker, but I think the correct plural is Indices.
If that's the case, please fix it everywhere else.

* \return The message type support.
*/
virtual const rosidl_message_type_support_t *
get_type_support_handle() const = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't this return type_support_?

@@ -115,10 +129,9 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
}

void
provide_intra_process_message(ConstMessageSharedPtr message)
provide_intra_process_message(std::shared_ptr<const void> message)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need void here?

@@ -134,6 +147,24 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
return buffer_->use_take_shared_method();
}

std::shared_ptr<void>
create_shared_message(const void * message_to_copy)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would really avoid void usage if possible

template<bool is_serialized>
struct SplittedSubscriptionsIndicies
{
constexpr static auto ownership_same = SplittedSubscriptions::get_index(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

owned_same or unique_same sound better to me

Comment on lines +315 to +316
return (is_serialized ? IndexSharedTyped : IndexSharedSerialized) +
(is_shared ? IndexSharedTyped : IndexOwnershipTyped);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please, avoid the sum.
It might look like a clever way to calculate the index, but it's extremely confusing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also looks like the first line was wrong:

Suggested change
return (is_serialized ? IndexSharedTyped : IndexSharedSerialized) +
(is_shared ? IndexSharedTyped : IndexOwnershipTyped);
return (is_serialized ? IndexSharedSerialized : IndexSharedTyped) +
(is_shared ? IndexSharedTyped : IndexOwnershipTyped);

if (sub_ids.take_subscriptions[Indicies::ownership_other].size() +
sub_ids.take_subscriptions[Indicies::shared_other].size() > 0)
{
do_intra_process_publish_other_type<MessageT>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so

publisher_it->second.take_shared_subscriptions.size() +
publisher_it->second.take_ownership_subscriptions.size();
auto count = 0u;
for (auto i = 0u; i < SplittedSubscriptions::IndexNum; ++i) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use std::accumulate if you want, though there's no big benefit on using it.

@@ -23,6 +23,8 @@
#define RCLCPP_BUILDING_LIBRARY 1
#include "rclcpp/allocator/allocator_common.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/serialization.hpp"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to add tests verifying that the intraprocess manager is working well with intraprocess messages.

Co-authored-by: Karsten Knese <Karsten1987@users.noreply.github.com>
@clalancette clalancette changed the base branch from master to rolling June 28, 2022 14:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.