-
Notifications
You must be signed in to change notification settings - Fork 430
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement intra process communications for Pub/Sub #73
Changes from all commits
747a019
94bf5ff
5568cc2
fb4e836
aedc494
12b939c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,15 +15,16 @@ | |
#ifndef RCLCPP_RCLCPP_EXECUTOR_HPP_ | ||
#define RCLCPP_RCLCPP_EXECUTOR_HPP_ | ||
|
||
#include <iostream> | ||
|
||
#include <algorithm> | ||
#include <cassert> | ||
#include <cstdlib> | ||
#include <iostream> | ||
#include <list> | ||
#include <memory> | ||
#include <vector> | ||
|
||
#include <rcl_interfaces/msg/intra_process_message.hpp> | ||
|
||
#include <rclcpp/any_executable.hpp> | ||
#include <rclcpp/macros.hpp> | ||
#include <rclcpp/memory_strategy.hpp> | ||
|
@@ -159,6 +160,9 @@ class Executor | |
if (any_exec->subscription) { | ||
execute_subscription(any_exec->subscription); | ||
} | ||
if (any_exec->subscription_intra_process) { | ||
execute_intra_process_subscription(any_exec->subscription_intra_process); | ||
} | ||
if (any_exec->service) { | ||
execute_service(any_exec->service); | ||
} | ||
|
@@ -194,6 +198,24 @@ class Executor | |
subscription->return_message(message); | ||
} | ||
|
||
static void | ||
execute_intra_process_subscription( | ||
rclcpp::subscription::SubscriptionBase::SharedPtr & subscription) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But it's ok though, and the other functions do it this way. I'd prefer to have a different pull request fix all the functions if this is really better. |
||
{ | ||
rcl_interfaces::msg::IntraProcessMessage ipm; | ||
bool taken = false; | ||
rmw_ret_t status = rmw_take(subscription->intra_process_subscription_handle_, &ipm, &taken); | ||
if (status == RMW_RET_OK) { | ||
if (taken) { | ||
subscription->handle_intra_process_message(ipm); | ||
} | ||
} else { | ||
fprintf(stderr, | ||
"[rclcpp::error] take failed for intra process subscription on topic '%s': %s\n", | ||
subscription->get_topic_name().c_str(), rmw_get_error_string_safe()); | ||
} | ||
} | ||
|
||
static void | ||
execute_timer( | ||
rclcpp::timer::TimerBase::SharedPtr & timer) | ||
|
@@ -293,22 +315,24 @@ class Executor | |
})); | ||
} | ||
// Use the number of subscriptions to allocate memory in the handles | ||
size_t number_of_subscriptions = subs.size(); | ||
size_t max_number_of_subscriptions = subs.size() * 2; // Times two for intra-process. | ||
rmw_subscriptions_t subscriber_handles; | ||
subscriber_handles.subscriber_count = number_of_subscriptions; | ||
subscriber_handles.subscriber_count = 0; | ||
// TODO(wjwwood): Avoid redundant malloc's | ||
subscriber_handles.subscribers = | ||
memory_strategy_->borrow_handles(HandleType::subscription_handle, number_of_subscriptions); | ||
subscriber_handles.subscribers = memory_strategy_->borrow_handles( | ||
HandleType::subscription_handle, max_number_of_subscriptions); | ||
if (subscriber_handles.subscribers == NULL) { | ||
// TODO(wjwwood): Use a different error here? maybe std::bad_alloc? | ||
throw std::runtime_error("Could not malloc for subscriber pointers."); | ||
} | ||
// Then fill the SubscriberHandles with ready subscriptions | ||
size_t subscriber_handle_index = 0; | ||
for (auto & subscription : subs) { | ||
subscriber_handles.subscribers[subscriber_handle_index] = \ | ||
subscriber_handles.subscribers[subscriber_handles.subscriber_count++] = | ||
subscription->subscription_handle_->data; | ||
subscriber_handle_index += 1; | ||
if (subscription->intra_process_subscription_handle_) { | ||
subscriber_handles.subscribers[subscriber_handles.subscriber_count++] = | ||
subscription->intra_process_subscription_handle_->data; | ||
} | ||
} | ||
|
||
// Use the number of services to allocate memory in the handles | ||
|
@@ -414,7 +438,7 @@ class Executor | |
} | ||
// Add the new work to the class's list of things waiting to be executed | ||
// Starting with the subscribers | ||
for (size_t i = 0; i < number_of_subscriptions; ++i) { | ||
for (size_t i = 0; i < subscriber_handles.subscriber_count; ++i) { | ||
void * handle = subscriber_handles.subscribers[i]; | ||
if (handle) { | ||
subscriber_handles_.push_back(handle); | ||
|
@@ -463,13 +487,18 @@ class Executor | |
} | ||
for (auto & weak_subscription : group->subscription_ptrs_) { | ||
auto subscription = weak_subscription.lock(); | ||
if (subscription && subscription->subscription_handle_->data == subscriber_handle) { | ||
return subscription; | ||
if (subscription) { | ||
if (subscription->subscription_handle_->data == subscriber_handle) { | ||
return subscription; | ||
} | ||
if (subscription->intra_process_subscription_handle_->data == subscriber_handle) { | ||
return subscription; | ||
} | ||
} | ||
} | ||
} | ||
} | ||
return rclcpp::subscription::SubscriptionBase::SharedPtr(); | ||
return nullptr; | ||
} | ||
|
||
rclcpp::service::ServiceBase::SharedPtr | ||
|
@@ -653,6 +682,11 @@ class Executor | |
while (it != subscriber_handles_.end()) { | ||
auto subscription = get_subscription_by_handle(*it); | ||
if (subscription) { | ||
// Figure out if this is for intra-process or not. | ||
bool is_intra_process = false; | ||
if (subscription->intra_process_subscription_handle_) { | ||
is_intra_process = subscription->intra_process_subscription_handle_->data == *it; | ||
} | ||
// Find the group for this handle and see if it can be serviced | ||
auto group = get_group_by_subscription(subscription); | ||
if (!group) { | ||
|
@@ -668,7 +702,11 @@ class Executor | |
continue; | ||
} | ||
// Otherwise it is safe to set and return the any_exec | ||
any_exec->subscription = subscription; | ||
if (is_intra_process) { | ||
any_exec->subscription_intra_process = subscription; | ||
} else { | ||
any_exec->subscription = subscription; | ||
} | ||
any_exec->callback_group = group; | ||
any_exec->node = get_node_by_group(group); | ||
subscriber_handles_.erase(it++); | ||
|
@@ -804,7 +842,7 @@ class Executor | |
} | ||
// Check the subscriptions to see if there are any that are ready | ||
get_next_subscription(any_exec); | ||
if (any_exec->subscription) { | ||
if (any_exec->subscription || any_exec->subscription_intra_process) { | ||
return any_exec; | ||
} | ||
// Check the services to see if there are any that are ready | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the custom deleter here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because it stores it as a
void *
and I believe it is necessary to ensure that the destructor is called even if the last reference is ashared_ptr
of typevoid *
, but I don't know that for sure.