Skip to content

Commit

Permalink
implement intra process comms
Browse files Browse the repository at this point in the history
this involves changes in the executor, node,
publisher, and subscription classes

I'd like a more decoupled way to integrate this
into the executor and node but I was unable to
find a good way to do so.
  • Loading branch information
wjwwood committed Aug 19, 2015
1 parent fb4e836 commit aedc494
Show file tree
Hide file tree
Showing 6 changed files with 289 additions and 34 deletions.
1 change: 1 addition & 0 deletions rclcpp/include/rclcpp/any_executable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ struct AnyExecutable
{}
// Only one of the following pointers will be set.
rclcpp::subscription::SubscriptionBase::SharedPtr subscription;
rclcpp::subscription::SubscriptionBase::SharedPtr subscription_intra_process;
rclcpp::timer::TimerBase::SharedPtr timer;
rclcpp::service::ServiceBase::SharedPtr service;
rclcpp::client::ClientBase::SharedPtr client;
Expand Down
68 changes: 53 additions & 15 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@
#ifndef RCLCPP_RCLCPP_EXECUTOR_HPP_
#define RCLCPP_RCLCPP_EXECUTOR_HPP_

#include <iostream>

#include <algorithm>
#include <cassert>
#include <cstdlib>
#include <iostream>
#include <list>
#include <memory>
#include <vector>

#include <rcl_interfaces/msg/intra_process_message.hpp>

#include <rclcpp/any_executable.hpp>
#include <rclcpp/macros.hpp>
#include <rclcpp/memory_strategy.hpp>
Expand Down Expand Up @@ -159,6 +160,9 @@ class Executor
if (any_exec->subscription) {
execute_subscription(any_exec->subscription);
}
if (any_exec->subscription_intra_process) {
execute_intra_process_subscription(any_exec->subscription_intra_process);
}
if (any_exec->service) {
execute_service(any_exec->service);
}
Expand Down Expand Up @@ -194,6 +198,24 @@ class Executor
subscription->return_message(message);
}

static void
execute_intra_process_subscription(
rclcpp::subscription::SubscriptionBase::SharedPtr & subscription)
{
rcl_interfaces::msg::IntraProcessMessage ipm;
bool taken = false;
rmw_ret_t status = rmw_take(subscription->intra_process_subscription_handle_, &ipm, &taken);
if (status == RMW_RET_OK) {
if (taken) {
subscription->handle_intra_process_message(ipm);
}
} else {
fprintf(stderr,
"[rclcpp::error] take failed for intra process subscription on topic '%s': %s\n",
subscription->get_topic_name().c_str(), rmw_get_error_string_safe());
}
}

static void
execute_timer(
rclcpp::timer::TimerBase::SharedPtr & timer)
Expand Down Expand Up @@ -293,22 +315,24 @@ class Executor
}));
}
// Use the number of subscriptions to allocate memory in the handles
size_t number_of_subscriptions = subs.size();
size_t max_number_of_subscriptions = subs.size() * 2; // Times two for intra-process.
rmw_subscriptions_t subscriber_handles;
subscriber_handles.subscriber_count = number_of_subscriptions;
subscriber_handles.subscriber_count = 0;
// TODO(wjwwood): Avoid redundant malloc's
subscriber_handles.subscribers =
memory_strategy_->borrow_handles(HandleType::subscription_handle, number_of_subscriptions);
subscriber_handles.subscribers = memory_strategy_->borrow_handles(
HandleType::subscription_handle, max_number_of_subscriptions);
if (subscriber_handles.subscribers == NULL) {
// TODO(wjwwood): Use a different error here? maybe std::bad_alloc?
throw std::runtime_error("Could not malloc for subscriber pointers.");
}
// Then fill the SubscriberHandles with ready subscriptions
size_t subscriber_handle_index = 0;
for (auto & subscription : subs) {
subscriber_handles.subscribers[subscriber_handle_index] = \
subscriber_handles.subscribers[subscriber_handles.subscriber_count++] =
subscription->subscription_handle_->data;
subscriber_handle_index += 1;
if (subscription->intra_process_subscription_handle_) {
subscriber_handles.subscribers[subscriber_handles.subscriber_count++] =
subscription->intra_process_subscription_handle_->data;
}
}

// Use the number of services to allocate memory in the handles
Expand Down Expand Up @@ -414,7 +438,7 @@ class Executor
}
// Add the new work to the class's list of things waiting to be executed
// Starting with the subscribers
for (size_t i = 0; i < number_of_subscriptions; ++i) {
for (size_t i = 0; i < subscriber_handles.subscriber_count; ++i) {
void * handle = subscriber_handles.subscribers[i];
if (handle) {
subscriber_handles_.push_back(handle);
Expand Down Expand Up @@ -463,13 +487,18 @@ class Executor
}
for (auto & weak_subscription : group->subscription_ptrs_) {
auto subscription = weak_subscription.lock();
if (subscription && subscription->subscription_handle_->data == subscriber_handle) {
return subscription;
if (subscription) {
if (subscription->subscription_handle_->data == subscriber_handle) {
return subscription;
}
if (subscription->intra_process_subscription_handle_->data == subscriber_handle) {
return subscription;
}
}
}
}
}
return rclcpp::subscription::SubscriptionBase::SharedPtr();
return nullptr;
}

rclcpp::service::ServiceBase::SharedPtr
Expand Down Expand Up @@ -653,6 +682,11 @@ class Executor
while (it != subscriber_handles_.end()) {
auto subscription = get_subscription_by_handle(*it);
if (subscription) {
// Figure out if this is for intra-process or not.
bool is_intra_process = false;
if (subscription->intra_process_subscription_handle_) {
is_intra_process = subscription->intra_process_subscription_handle_->data == *it;
}
// Find the group for this handle and see if it can be serviced
auto group = get_group_by_subscription(subscription);
if (!group) {
Expand All @@ -668,7 +702,11 @@ class Executor
continue;
}
// Otherwise it is safe to set and return the any_exec
any_exec->subscription = subscription;
if (is_intra_process) {
any_exec->subscription_intra_process = subscription;
} else {
any_exec->subscription = subscription;
}
any_exec->callback_group = group;
any_exec->node = get_node_by_group(group);
subscriber_handles_.erase(it++);
Expand Down Expand Up @@ -804,7 +842,7 @@ class Executor
}
// Check the subscriptions to see if there are any that are ready
get_next_subscription(any_exec);
if (any_exec->subscription) {
if (any_exec->subscription || any_exec->subscription_intra_process) {
return any_exec;
}
// Check the services to see if there are any that are ready
Expand Down
8 changes: 8 additions & 0 deletions rclcpp/include/rclcpp/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <rcl_interfaces/msg/parameter_descriptor.hpp>
#include <rcl_interfaces/msg/parameter_event.hpp>
#include <rcl_interfaces/msg/set_parameters_result.hpp>
#include <rosidl_generator_cpp/message_type_support.hpp>

This comment has been minimized.

Copy link
@dirk-thomas

dirk-thomas Aug 28, 2015

Member

This include does not come from any specified dependency but from a rosidl_typesupport specific package which is not available to rclcpp.


#include <rclcpp/callback_group.hpp>
#include <rclcpp/client.hpp>
Expand Down Expand Up @@ -199,6 +200,8 @@ class Node
private:
RCLCPP_DISABLE_COPY(Node);

static const rosidl_message_type_support_t * ipm_ts;

bool
group_in_node(callback_group::CallbackGroup::SharedPtr & group);

Expand Down Expand Up @@ -309,6 +312,11 @@ class Node
}
};

const rosidl_message_type_support_t * Node::ipm_ts =
rosidl_generator_cpp::get_message_type_support_handle<
rcl_interfaces::msg::IntraProcessMessage
>();

} /* namespace node */
} /* namespace rclcpp */

Expand Down
79 changes: 77 additions & 2 deletions rclcpp/include/rclcpp/node_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
#include <stdexcept>
#include <string>

#include <rcl_interfaces/msg/intra_process_message.hpp>
#include <rmw/error_handling.h>
#include <rmw/rmw.h>
#include <rosidl_generator_cpp/message_type_support.hpp>
#include <rosidl_generator_cpp/service_type_support.hpp>

#include <rclcpp/contexts/default_context.hpp>
#include <rclcpp/intra_process_manager.hpp>
#include <rclcpp/parameter.hpp>

#ifndef RCLCPP_RCLCPP_NODE_HPP_
Expand Down Expand Up @@ -128,7 +130,45 @@ Node::create_publisher(
// *INDENT-ON*
}

return publisher::Publisher::make_shared(node_handle_, publisher_handle);
auto publisher = publisher::Publisher::make_shared(
node_handle_, publisher_handle, topic_name, qos_profile.depth);

if (use_intra_process_comms_) {
rmw_publisher_t * intra_process_publisher_handle = rmw_create_publisher(
node_handle_.get(), ipm_ts, (topic_name + "__intra").c_str(), qos_profile);
if (!intra_process_publisher_handle) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("could not create intra process publisher: ") +
rmw_get_error_string_safe());
// *INDENT-ON*
}

auto intra_process_manager =
context_->get_sub_context<rclcpp::intra_process_manager::IntraProcessManager>();
uint64_t intra_process_publisher_id =
intra_process_manager->add_publisher<MessageT>(publisher);
rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager;
auto shared_publish_callback =
[weak_ipm] (uint64_t publisher_id, std::shared_ptr<void> msg) -> uint64_t
{
auto ipm = weak_ipm.lock();
if (!ipm) {
// TODO(wjwwood): should this just return silently? Or maybe return with a logged warning?
throw std::runtime_error(
"intra process publish called after destruction of intra process manager");
}
auto typed_msg = std::static_pointer_cast<const MessageT>(msg);
std::unique_ptr<MessageT> unique_msg(new MessageT(*typed_msg));
uint64_t message_seq = ipm->store_intra_process_message(publisher_id, unique_msg);
return message_seq;
};
publisher->setup_intra_process(
intra_process_publisher_id,
shared_publish_callback,
intra_process_publisher_handle);
}
return publisher;
}

bool
Expand Down Expand Up @@ -182,10 +222,45 @@ Node::create_subscription(
callback,
msg_mem_strat);
auto sub_base_ptr = std::dynamic_pointer_cast<SubscriptionBase>(sub);
// Setup intra process.
if (use_intra_process_comms_) {
rmw_subscription_t * intra_process_subscriber_handle = rmw_create_subscription(
node_handle_.get(), ipm_ts,
(topic_name + "__intra").c_str(), qos_profile, false);
if (!subscriber_handle) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("could not create intra process subscription: ") + rmw_get_error_string_safe());
// *INDENT-ON*
}
auto intra_process_manager =
context_->get_sub_context<rclcpp::intra_process_manager::IntraProcessManager>();
rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager;
uint64_t intra_process_subscription_id =
intra_process_manager->add_subscription(sub_base_ptr);
sub->setup_intra_process(
intra_process_subscription_id,
intra_process_subscriber_handle,
[weak_ipm] (
uint64_t publisher_id,
uint64_t message_sequence,
uint64_t subscription_id,
std::unique_ptr<MessageT> & message)
{
auto ipm = weak_ipm.lock();
if (!ipm) {
// TODO(wjwwood): should this just return silently? Or maybe return with a logged warning?
throw std::runtime_error(
"intra process take called after destruction of intra process manager");
}
ipm->take_intra_process_message(publisher_id, message_sequence, subscription_id, message);
});
}
// Assign to a group.
if (group) {
if (!group_in_node(group)) {
// TODO: use custom exception
throw std::runtime_error("Cannot create timer, group not in node.");
throw std::runtime_error("Cannot create subscription, group not in node.");
}
group->add_subscription(sub_base_ptr);
} else {
Expand Down
Loading

0 comments on commit aedc494

Please sign in to comment.