Skip to content

Commit

Permalink
Execute intra-process subscription callbacks in a separate thread rat…
Browse files Browse the repository at this point in the history
…her than in the executor
  • Loading branch information
Mauro authored and Soragna, Alberto committed Apr 2, 2020
1 parent 9928697 commit ea3f97c
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 13 deletions.
30 changes: 30 additions & 0 deletions rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
throw std::runtime_error("SubscriptionIntraProcess init error initializing guard condition");
}

wait_set_ = rcl_get_zero_initialized_wait_set();
ret = rcl_wait_set_init(&wait_set_, 0, 1, 0, 0, 0, 0, context->get_rcl_context().get(), rcl_get_default_allocator());

if (RCL_RET_OK != ret) {
throw std::runtime_error("SubscriptionIntraProcess init error initializing wait set");
}

TRACEPOINT(
rclcpp_subscription_callback_added,
(const void *)this,
Expand Down Expand Up @@ -134,10 +141,33 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
return buffer_->use_take_shared_method();
}

void consume_messages_task() override
{
rcl_ret_t ret;
do {
ret = rcl_wait_set_clear(&wait_set_);
ret = rcl_wait_set_add_guard_condition(&wait_set_, &gc_, NULL);

// Wait until guard condition is triggered
ret = rcl_wait(&wait_set_, -1);

// If guard condition is triggered, check buffer for data
while (is_ready(&wait_set_)) {
// Process the message
execute();
}
} while (rclcpp::ok());

ret = rcl_wait_set_fini(&wait_set_);
(void)ret;
}

private:
void
trigger_guard_condition()
{
// Publisher pushed message into the buffer.
// Notify subscription context
rcl_ret_t ret = rcl_trigger_guard_condition(&gc_);
(void)ret;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable

RCLCPP_PUBLIC
size_t
get_number_of_ready_guard_conditions() {return 1;}
get_number_of_ready_guard_conditions() {return 0;} // QUICK HACK to avoid allocating space in the executor

RCLCPP_PUBLIC
bool
Expand All @@ -70,9 +70,13 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
rmw_qos_profile_t
get_actual_qos() const;

virtual void
consume_messages_task() = 0;

protected:
std::recursive_mutex reentrant_mutex_;
rcl_guard_condition_t gc_;
rcl_wait_set_t wait_set_;

private:
virtual void
Expand Down
4 changes: 2 additions & 2 deletions rclcpp/include/rclcpp/subscription_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>

/// Return the waitable for intra-process, or nullptr if intra-process is not setup.
RCLCPP_PUBLIC
rclcpp::Waitable::SharedPtr
get_intra_process_waitable() const;
rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr
get_intra_process_subscription() const;

protected:
template<typename EventCallbackT>
Expand Down
18 changes: 14 additions & 4 deletions rclcpp/src/rclcpp/node_interfaces/node_topics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,20 @@ NodeTopics::add_subscription(
callback_group->add_waitable(subscription_event);
}

auto intra_process_waitable = subscription->get_intra_process_waitable();
if (nullptr != intra_process_waitable) {
// Add to the callback group to be notified about intra-process msgs.
callback_group->add_waitable(intra_process_waitable);
auto intra_process_subscription = subscription->get_intra_process_subscription();

if (nullptr != intra_process_subscription) {
using rclcpp::experimental::SubscriptionIntraProcessBase;

// TODO:
// - the thread should be explicitly created by the user
// - this should be part of the intra-process executor
//Start thread which executes subscription callback whenever an intra-process message is received
std::thread subscription_thread = std::thread(
&SubscriptionIntraProcessBase::consume_messages_task,
intra_process_subscription);

subscription_thread.detach();
}

// Notify the executor that a new subscription was created using the parent Node.
Expand Down
6 changes: 3 additions & 3 deletions rclcpp/src/rclcpp/subscription_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ SubscriptionBase::can_loan_messages() const
return rcl_subscription_can_loan_messages(subscription_handle_.get());
}

rclcpp::Waitable::SharedPtr
SubscriptionBase::get_intra_process_waitable() const
rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr
SubscriptionBase::get_intra_process_subscription() const
{
// If not using intra process, shortcut to nullptr.
if (!use_intra_process_) {
Expand All @@ -187,7 +187,7 @@ SubscriptionBase::get_intra_process_waitable() const
auto ipm = weak_ipm_.lock();
if (!ipm) {
throw std::runtime_error(
"SubscriptionBase::get_intra_process_waitable() called "
"SubscriptionBase::get_intra_process_subscription() called "
"after destruction of intra process manager");
}

Expand Down
9 changes: 6 additions & 3 deletions rclcpp/src/rclcpp/subscription_intra_process_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ using rclcpp::experimental::SubscriptionIntraProcessBase;
bool
SubscriptionIntraProcessBase::add_to_wait_set(rcl_wait_set_t * wait_set)
{
std::lock_guard<std::recursive_mutex> lock(reentrant_mutex_);
// QUICK HACK: commented out to avoid adding this to the executor wait set
(void)wait_set;
//std::lock_guard<std::recursive_mutex> lock(reentrant_mutex_);

rcl_ret_t ret = rcl_wait_set_add_guard_condition(wait_set, &gc_, NULL);
return RCL_RET_OK == ret;
//rcl_ret_t ret = rcl_wait_set_add_guard_condition(wait_set, &gc_, NULL);
//return RCL_RET_OK == ret;
return true;
}

const char *
Expand Down

0 comments on commit ea3f97c

Please sign in to comment.