Skip to content

Commit

Permalink
Move ownership of shutdown_guard_condition to executors/graph_listener (
Browse files Browse the repository at this point in the history
ros2#1404)

Signed-off-by: Stephen Brawner <brawner@gmail.com>
  • Loading branch information
brawner authored and Alberto Soragna committed May 17, 2021
1 parent 156d12e commit f8eb22a
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 289 deletions.
66 changes: 0 additions & 66 deletions rclcpp/include/rclcpp/context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,67 +243,6 @@ class Context : public std::enable_shared_from_this<Context>
void
interrupt_all_sleep_for();

/// Get a handle to the guard condition which is triggered when interrupted.
/**
* This guard condition is triggered any time interrupt_all_wait_sets() is
* called, which may be called by the user, or shutdown().
* And in turn, shutdown() may be called by the user, the destructor of this
* context, or the signal handler if installed and shutdown_on_sigint is true
* for this context.
*
* The first time that this function is called for a given wait set a new guard
* condition will be created and returned; thereafter the same guard condition
* will be returned for the same wait set.
* This mechanism is designed to ensure that the same guard condition is not
* reused across wait sets (e.g., when using multiple executors in the same
* process).
* This method will throw an exception if initialization of the guard
* condition fails.
*
* The returned guard condition needs to be released with the
* release_interrupt_guard_condition() method in order to reclaim resources.
*
* \param[in] wait_set Pointer to the rcl_wait_set_t that will be using the
* resulting guard condition.
* \return Pointer to the guard condition.
* \throws anything rclcpp::exceptions::throw_from_rcl_error can throw.
*/
RCLCPP_PUBLIC
rcl_guard_condition_t *
get_interrupt_guard_condition(rcl_wait_set_t * wait_set);

/// Release the previously allocated guard condition which is triggered when interrupted.
/**
* If you previously called get_interrupt_guard_condition() for a given wait
* set to get a interrupt guard condition, then you should call
* release_interrupt_guard_condition() when you're done, to free that
* condition.
* Will throw an exception if get_interrupt_guard_condition() wasn't
* previously called for the given wait set.
*
* After calling this, the pointer returned by get_interrupt_guard_condition()
* for the given wait_set is invalid.
*
* \param[in] wait_set Pointer to the rcl_wait_set_t that was using the
* resulting guard condition.
* \throws anything rclcpp::exceptions::throw_from_rcl_error can throw.
* \throws std::runtime_error if a nonexistent wait set is trying to release sigint guard condition.
*/
RCLCPP_PUBLIC
void
release_interrupt_guard_condition(rcl_wait_set_t * wait_set);

/// Nothrow version of release_interrupt_guard_condition(), logs to RCLCPP_ERROR instead.
RCLCPP_PUBLIC
void
release_interrupt_guard_condition(rcl_wait_set_t * wait_set, const std::nothrow_t &) noexcept;

/// Interrupt any blocking executors, or wait sets associated with this context.
RCLCPP_PUBLIC
virtual
void
interrupt_all_wait_sets();

/// Return a singleton instance for the SubContext type, constructing one if necessary.
template<typename SubContext, typename ... Args>
std::shared_ptr<SubContext>
Expand Down Expand Up @@ -363,11 +302,6 @@ class Context : public std::enable_shared_from_this<Context>
/// Mutex for protecting the global condition variable.
std::mutex interrupt_mutex_;

/// Mutex to protect sigint_guard_cond_handles_.
std::mutex interrupt_guard_cond_handles_mutex_;
/// Guard conditions for interrupting of associated wait sets on interrupt_all_wait_sets().
std::unordered_map<rcl_wait_set_t *, rcl_guard_condition_t> interrupt_guard_cond_handles_;

/// Keep shared ownership of global vector of weak contexts
std::shared_ptr<WeakContextsWrapper> weak_contexts_;
};
Expand Down
3 changes: 3 additions & 0 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "rcl/wait.h"

#include "rclcpp/contexts/default_context.hpp"
#include "rclcpp/guard_condition.hpp"
#include "rclcpp/executor_options.hpp"
#include "rclcpp/future_return_code.hpp"
#include "rclcpp/memory_strategies.hpp"
Expand Down Expand Up @@ -328,6 +329,8 @@ class Executor
/// Guard condition for signaling the rmw layer to wake up for special events.
rcl_guard_condition_t interrupt_guard_condition_ = rcl_get_zero_initialized_guard_condition();

std::shared_ptr<rclcpp::GuardCondition> shutdown_guard_condition_;

/// Wait set for managing entities that the rmw layer waits on.
rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set();

Expand Down
14 changes: 8 additions & 6 deletions rclcpp/include/rclcpp/graph_listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class GraphListener : public std::enable_shared_from_this<GraphListener>
{
public:
RCLCPP_PUBLIC
explicit GraphListener(std::shared_ptr<rclcpp::Context> parent_context);
explicit GraphListener(const rclcpp::Context::SharedPtr & parent_context);

RCLCPP_PUBLIC
virtual ~GraphListener();
Expand Down Expand Up @@ -161,19 +161,22 @@ class GraphListener : public std::enable_shared_from_this<GraphListener>
run_loop();

RCLCPP_PUBLIC
void init_wait_set();
void
init_wait_set();

RCLCPP_PUBLIC
void cleanup_wait_set();
void
cleanup_wait_set();

private:
RCLCPP_DISABLE_COPY(GraphListener)

/** \internal */
void
__shutdown(bool);
__shutdown();

rclcpp::Context::WeakPtr parent_context_;
std::weak_ptr<rclcpp::Context> weak_parent_context_;
std::shared_ptr<rcl_context_t> rcl_parent_context_;

std::thread listener_thread_;
bool is_started_;
Expand All @@ -185,7 +188,6 @@ class GraphListener : public std::enable_shared_from_this<GraphListener>
std::vector<rclcpp::node_interfaces::NodeGraphInterface *> node_graph_interfaces_;

rcl_guard_condition_t interrupt_guard_condition_ = rcl_get_zero_initialized_guard_condition();
rcl_guard_condition_t * shutdown_guard_condition_;
rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set();
};

Expand Down
70 changes: 0 additions & 70 deletions rclcpp/src/rclcpp/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,6 @@ Context::shutdown(const std::string & reason)
}
// interrupt all blocking sleep_for() and all blocking executors or wait sets
this->interrupt_all_sleep_for();
this->interrupt_all_wait_sets();
// remove self from the global contexts
weak_contexts_->remove_context(this);
// shutdown logger
Expand Down Expand Up @@ -376,75 +375,6 @@ Context::interrupt_all_sleep_for()
interrupt_condition_variable_.notify_all();
}

rcl_guard_condition_t *
Context::get_interrupt_guard_condition(rcl_wait_set_t * wait_set)
{
std::lock_guard<std::mutex> lock(interrupt_guard_cond_handles_mutex_);
auto kv = interrupt_guard_cond_handles_.find(wait_set);
if (kv != interrupt_guard_cond_handles_.end()) {
return &kv->second;
} else {
rcl_guard_condition_t handle = rcl_get_zero_initialized_guard_condition();
rcl_guard_condition_options_t options = rcl_guard_condition_get_default_options();
auto ret = rcl_guard_condition_init(&handle, this->get_rcl_context().get(), options);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't initialize guard condition");
}
interrupt_guard_cond_handles_.emplace(wait_set, handle);
return &interrupt_guard_cond_handles_[wait_set];
}
}

void
Context::release_interrupt_guard_condition(rcl_wait_set_t * wait_set)
{
std::lock_guard<std::mutex> lock(interrupt_guard_cond_handles_mutex_);
auto kv = interrupt_guard_cond_handles_.find(wait_set);
if (kv != interrupt_guard_cond_handles_.end()) {
rcl_ret_t ret = rcl_guard_condition_fini(&kv->second);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to destroy sigint guard condition");
}
interrupt_guard_cond_handles_.erase(kv);
} else {
throw std::runtime_error("Tried to release sigint guard condition for nonexistent wait set");
}
}

void
Context::release_interrupt_guard_condition(
rcl_wait_set_t * wait_set,
const std::nothrow_t &) noexcept
{
try {
this->release_interrupt_guard_condition(wait_set);
} catch (const std::exception & exc) {
RCLCPP_ERROR(
rclcpp::get_logger("rclcpp"),
"caught %s exception when releasing interrupt guard condition: %s",
rmw::impl::cpp::demangle(exc).c_str(), exc.what());
} catch (...) {
RCLCPP_ERROR(
rclcpp::get_logger("rclcpp"),
"caught unknown exception when releasing interrupt guard condition");
}
}

void
Context::interrupt_all_wait_sets()
{
std::lock_guard<std::mutex> lock(interrupt_guard_cond_handles_mutex_);
for (auto & kv : interrupt_guard_cond_handles_) {
rcl_ret_t status = rcl_trigger_guard_condition(&(kv.second));
if (status != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"failed to trigger guard condition in Context::interrupt_all_wait_sets(): %s",
rcl_get_error_string().str);
}
}
}

void
Context::clean_up()
{
Expand Down
26 changes: 17 additions & 9 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include "rclcpp/exceptions.hpp"
#include "rclcpp/executor.hpp"
#include "rclcpp/executors/static_single_threaded_executor.hpp"
#include "rclcpp/guard_condition.hpp"
#include "rclcpp/memory_strategy.hpp"
#include "rclcpp/node.hpp"
#include "rclcpp/scope_exit.hpp"
#include "rclcpp/utilities.hpp"
Expand All @@ -37,28 +39,35 @@ using rclcpp::FutureReturnCode;

Executor::Executor(const rclcpp::ExecutorOptions & options)
: spinning(false),
shutdown_guard_condition_(std::make_shared<rclcpp::GuardCondition>(options.context)),
memory_strategy_(options.memory_strategy)
{
// Store the context for later use.
context_ = options.context;

rcl_guard_condition_options_t guard_condition_options = rcl_guard_condition_get_default_options();
rcl_ret_t ret = rcl_guard_condition_init(
&interrupt_guard_condition_, options.context->get_rcl_context().get(), guard_condition_options);
&interrupt_guard_condition_, context_->get_rcl_context().get(), guard_condition_options);
if (RCL_RET_OK != ret) {
throw_from_rcl_error(ret, "Failed to create interrupt guard condition in Executor constructor");
}

context_->on_shutdown(
[weak_gc = std::weak_ptr<rclcpp::GuardCondition>{shutdown_guard_condition_}]() {
auto strong_gc = weak_gc.lock();
if (strong_gc) {
strong_gc->trigger();
}
});

// The number of guard conditions is always at least 2: 1 for the ctrl-c guard cond,
// and one for the executor's guard cond (interrupt_guard_condition_)

// Put the global ctrl-c guard condition in
memory_strategy_->add_guard_condition(options.context->get_interrupt_guard_condition(&wait_set_));
memory_strategy_->add_guard_condition(&shutdown_guard_condition_->get_rcl_guard_condition());

// Put the executor's guard condition in
memory_strategy_->add_guard_condition(&interrupt_guard_condition_);
rcl_allocator_t allocator = memory_strategy_->get_allocator();

// Store the context for later use.
context_ = options.context;

ret = rcl_wait_set_init(
&wait_set_,
0, 2, 0, 0, 0, 0,
Expand Down Expand Up @@ -110,8 +119,7 @@ Executor::~Executor()
rcl_reset_error();
}
// Remove and release the sigint guard condition
memory_strategy_->remove_guard_condition(context_->get_interrupt_guard_condition(&wait_set_));
context_->release_interrupt_guard_condition(&wait_set_, std::nothrow);
memory_strategy_->remove_guard_condition(&shutdown_guard_condition_->get_rcl_guard_condition());
}

void
Expand Down
Loading

0 comments on commit f8eb22a

Please sign in to comment.