-
Notifications
You must be signed in to change notification settings - Fork 430
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement intra process communications for Pub/Sub #73
Conversation
I made intra process optional, with an optional argument to the node constructor. We can consider other ways to control this (maybe through the context construction) later, but this is a temporary way to control it. It currently defaults to intra process off. I think it needs to stay that way until we figure out how to avoid subscriptions receiving messages from intra process as well as from inter process. I've also written up a "theory of operation" as doxygen comments on the intra process manager class: I can move them later, but this was convenient since it was inline with the code I was describing. I also have a few questions that are unresolved as todo's which would be good if you guys look at them and comment:
Next I'm going to split a few of the changes here off into separate pr's to make them easier to review and merge. Then I'll rebase since it's not currently merge-able. I also have some new tests, but I'm still working out a set of integration tests which actually prove something about the intra process working properly. |
I will just expand on one of the questions I have about the current approach. In the case where you have something like latching (some amount of messages which subscribers should be caught up on when they late join), I think our ring buffer will handle that situation. But I currently have a check where a message is not yielded when the subscription which asks for it was not available at publish time. The reason for this is that I keep track of which subscriptions have taken the intra process message and then on the last one I return the ownership, avoiding one copy. In the case of one publisher and one intra process subscriber then this is a nice optimization because it means no copies are made. Now for latching this is obviously a problem since the subscription would not have been at the present during publishing. Therefore when latching is on we'll need to keep the message around until it is replaced by incoming data to the ring buffer, i.e. we can never allow a call to The question is should we change from:
to:
The count I'm talking about is the number of subscriptions which were present when the message was published. Also when this count reaches zero and latching is not enabled, we can return the ownership of the message rather than a copy (effectively removing it from the intra process manager's storage). |
After talking with @dirk-thomas about the previous post, I've decided to keep the existing behavior of not giving messages out to subscriptions which did not exist when the message was published. I've also decided to leave TODO's where ever things will need to be changed in order to support something like latching so that we can move forward without that for now. I've also added tests for the intra process manager and tests into the |
@jacquelinekay to review some of the open points |
5e4f3ed
to
baba234
Compare
changes how the default context is gotten and adds an option for enabling/disabling intra process comms
this involves changes in the executor, node, publisher, and subscription classes I'd like a more decoupled way to integrate this into the executor and node but I was unable to find a good way to do so.
Ok I've rebased and rewritten the commit history to something that's slightly easier to digest. |
sub_context = std::shared_ptr<SubContext>( | ||
new SubContext(std::forward<Args>(args) ...), | ||
[] (SubContext * sub_context_ptr) { | ||
delete sub_context_ptr; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the custom deleter here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because it stores it as a void *
and I believe it is necessary to ensure that the destructor is called even if the last reference is a shared_ptr
of type void *
, but I don't know that for sure.
// *INDENT-ON* | ||
} | ||
} | ||
if (store_intra_process_message_) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not else here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because I intend to remove the first conditional later.
|
||
ament_add_gtest(test_mapped_ring_buffer test/test_mapped_ring_buffer.cpp) | ||
ament_add_gtest(test_intra_process_manager test/test_intra_process_manager.cpp) | ||
target_include_directories(test_intra_process_manager PUBLIC "${rcl_interfaces_INCLUDE_DIRS}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This must be wrapped in a condition:
if(TARGET test_intra_process_manager)
+1, though I have to admit I haven't done a deep review. I can read the code with more detail after it's been merged. |
Implement intra process communications for Pub/Sub
* ros2GH-61 Read topic directly from message when playing and allow to play multiple topics * ros2GH-61 Add test for SqliteStorage and update old ones * ros2GH-62 Extend function to poll for any number of specified topics * ros2GH-62 Allow subscription to several topics * ros2GH-61 Obtain the topic name directly from the database - Uses a JOIN instead of mapping the topic_id to the name in code * ros2GH-61 Cache read row in result iterator This allows repeated dereferencing on same row without quering the database again. * ros2GH-62 Change demo-record to allow specifying multiple topics * ros2GH-62 Add test to write non-string topic + refactoring * ros2GH-62 Add test for subscription to multiple topics * ros2GH-62 Cleanup * ros2GH-62 Simplify test setup * ros2GH-61 Cleanup * ros2GH-61 consolidate storage integration test * ros2GH-62 Consolidate write integration tests * ros2GH-61 enhance read integration test to check multiple topics * ros2GH-62 Improve rosbag integration test * ros2GH-62: Polish rosbag2_rosbag_node_test * ros2GH-62 Fix cpplint * ros2GH-62 Fix memory leak in rosbag helper * ros2GH-62 Cleanup of subscriptions * ros2GH-62 do not use flaky timers in rosbag2_write_integration_test * ros2GH-62 Use rmw_serialize_message_t consistently in test helper classes * ros2GH-73 Use test_msgs in read_integration_test * ros2GH-26 Cleanup: fix alphabetic orderung
* add ros1_bridge dependency * update maintainer email * add git as build dependency
This is a work in progress and isn't ready for review, but feedback about the approach is ok.
Connects to ros2/ros2#59
Things left to do (in no particular order):
Open issues:
Things that I think are out of scope: