Skip to content

Commit

Permalink
Merge pull request #83 from mauropasse/mauro/add-rclcpp-guard-cond-ca…
Browse files Browse the repository at this point in the history
…llback

Mauro/add rclcpp guard cond callback
  • Loading branch information
alsora authored Feb 9, 2022
2 parents 82a1b46 + 9d0ac6c commit 8b4b7f2
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 3 deletions.
17 changes: 17 additions & 0 deletions rclcpp/include/rclcpp/guard_condition.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,27 @@ class GuardCondition
bool
exchange_in_use_by_wait_set_state(bool in_use_state);

/// Adds the guard condition to a waitset
/**
* This function is thread-safe.
* \param[in] wait_set pointer to a wait set where to add the guard condition
*/
RCLCPP_PUBLIC
void
add_to_wait_set(rcl_wait_set_t * wait_set);

RCLCPP_PUBLIC
void
set_on_trigger_callback(std::function<void(size_t)> callback);

protected:
rclcpp::Context::SharedPtr context_;
rcl_guard_condition_t rcl_guard_condition_;
std::atomic<bool> in_use_by_wait_set_{false};
std::recursive_mutex reentrant_mutex_;
std::function<void(size_t)> on_trigger_callback_{nullptr};
size_t unread_count_{0};
rcl_wait_set_t * wait_set_{nullptr};
};

} // namespace rclcpp
Expand Down
53 changes: 50 additions & 3 deletions rclcpp/src/rclcpp/guard_condition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <functional>

#include "rclcpp/guard_condition.hpp"

#include "rclcpp/exceptions.hpp"
Expand Down Expand Up @@ -72,9 +74,16 @@ GuardCondition::get_rcl_guard_condition() const
void
GuardCondition::trigger()
{
rcl_ret_t ret = rcl_trigger_guard_condition(&rcl_guard_condition_);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
std::lock_guard<std::recursive_mutex> lock(reentrant_mutex_);

if (on_trigger_callback_) {
on_trigger_callback_(1);
} else {
rcl_ret_t ret = rcl_trigger_guard_condition(&rcl_guard_condition_);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
unread_count_++;
}
}

Expand All @@ -84,4 +93,42 @@ GuardCondition::exchange_in_use_by_wait_set_state(bool in_use_state)
return in_use_by_wait_set_.exchange(in_use_state);
}

void
GuardCondition::add_to_wait_set(rcl_wait_set_t * wait_set)
{
std::lock_guard<std::recursive_mutex> lock(reentrant_mutex_);

if (exchange_in_use_by_wait_set_state(true)) {
if (wait_set != wait_set_) {
throw std::runtime_error("guard condition has already been added to a wait set.");
}
} else {
wait_set_ = wait_set;
}

rcl_ret_t ret = rcl_wait_set_add_guard_condition(wait_set, &this->rcl_guard_condition_, NULL);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(
ret, "failed to add guard condition to wait set");
}
}

void
GuardCondition::set_on_trigger_callback(std::function<void(size_t)> callback)
{
std::lock_guard<std::recursive_mutex> lock(reentrant_mutex_);

if (callback) {
on_trigger_callback_ = callback;

if (unread_count_) {
callback(unread_count_);
unread_count_ = 0;
}
return;
}

on_trigger_callback_ = nullptr;
}

} // namespace rclcpp
62 changes: 62 additions & 0 deletions rclcpp/test/rclcpp/test_guard_condition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,65 @@ TEST_F(TestGuardCondition, trigger) {
}
}
}

/*
* Testing addition to a wait set
*/
TEST_F(TestGuardCondition, add_to_wait_set) {
{
{
auto gc = std::make_shared<rclcpp::GuardCondition>();

auto mock = mocking_utils::patch_and_return(
"lib:rclcpp", rcl_wait_set_add_guard_condition, RCL_RET_OK);

rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set();
EXPECT_NO_THROW(gc->add_to_wait_set(&wait_set));
EXPECT_NO_THROW(gc->add_to_wait_set(&wait_set));

rcl_wait_set_t wait_set_2 = rcl_get_zero_initialized_wait_set();
EXPECT_THROW(gc->add_to_wait_set(&wait_set_2), std::runtime_error);
}

{
auto mock = mocking_utils::patch_and_return(
"lib:rclcpp", rcl_wait_set_add_guard_condition, RCL_RET_ERROR);

auto gd = std::make_shared<rclcpp::GuardCondition>();
EXPECT_THROW(gd->add_to_wait_set(nullptr), rclcpp::exceptions::RCLError);
}
}
}

/*
* Testing set on trigger callback
*/
TEST_F(TestGuardCondition, set_on_trigger_callback) {
{
auto gc = std::make_shared<rclcpp::GuardCondition>();

std::atomic<size_t> c1 {0};
auto increase_c1_cb = [&c1](size_t count_msgs) {c1 += count_msgs;};
gc->set_on_trigger_callback(increase_c1_cb);

EXPECT_EQ(c1.load(), 0u);
EXPECT_NO_THROW(gc->trigger());
EXPECT_EQ(c1.load(), 1u);

std::atomic<size_t> c2 {0};
auto increase_c2_cb = [&c2](size_t count_msgs) {c2 += count_msgs;};
gc->set_on_trigger_callback(increase_c2_cb);

EXPECT_NO_THROW(gc->trigger());
EXPECT_EQ(c1.load(), 1u);
EXPECT_EQ(c2.load(), 1u);

gc->set_on_trigger_callback(nullptr);
EXPECT_NO_THROW(gc->trigger());
EXPECT_EQ(c1.load(), 1u);
EXPECT_EQ(c2.load(), 1u);

gc->set_on_trigger_callback(increase_c1_cb);
EXPECT_EQ(c1.load(), 2u);
}
}

0 comments on commit 8b4b7f2

Please sign in to comment.