Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure all Clock accesses are thread-safe #999

Merged
merged 2 commits into from
Feb 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions rclcpp/include/rclcpp/clock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <functional>
#include <memory>
#include <mutex>

#include "rclcpp/macros.hpp"
#include "rclcpp/time.hpp"
Expand Down Expand Up @@ -96,6 +97,10 @@ class Clock
rcl_clock_type_t
get_clock_type() const noexcept;

RCLCPP_PUBLIC
std::mutex &
get_clock_mutex() noexcept;

// Add a callback to invoke if the jump threshold is exceeded.
/**
* These callback functions must remain valid as long as the
Expand Down Expand Up @@ -133,9 +138,10 @@ class Clock
bool before_jump,
void * user_data);

/// Internal storage backed by rcl
std::shared_ptr<rcl_clock_t> rcl_clock_;
rcl_allocator_t allocator_;
/// Private internal storage
class Impl;

std::shared_ptr<Impl> impl_;
};

} // namespace rclcpp
Expand Down
92 changes: 55 additions & 37 deletions rclcpp/src/rclcpp/clock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "rclcpp/clock.hpp"

#include <memory>
#include <thread>

#include "rclcpp/exceptions.hpp"

Expand All @@ -23,6 +24,31 @@
namespace rclcpp
{

class Clock::Impl
{
public:
explicit Impl(rcl_clock_type_t clock_type)
: allocator_{rcl_get_default_allocator()}
{
rcl_ret_t ret = rcl_clock_init(clock_type, &rcl_clock_, &allocator_);
if (ret != RCL_RET_OK) {
exceptions::throw_from_rcl_error(ret, "could not get current time stamp");
}
}

~Impl()
{
rcl_ret_t ret = rcl_clock_fini(&rcl_clock_);
if (ret != RCL_RET_OK) {
RCUTILS_LOG_ERROR("Failed to fini rcl clock.");
}
}

rcl_clock_t rcl_clock_;
rcl_allocator_t allocator_;
std::mutex clock_mutex_;
};

JumpHandler::JumpHandler(
pre_callback_t pre_callback,
post_callback_t post_callback,
Expand All @@ -32,35 +58,17 @@ JumpHandler::JumpHandler(
notice_threshold(threshold)
{}

static
void
rcl_clock_deleter(rcl_clock_t * rcl_clock)
{
auto ret = rcl_clock_fini(rcl_clock);
if (ret != RCL_RET_OK) {
RCUTILS_LOG_ERROR("Failed to fini rcl clock.");
}
delete rcl_clock;
}

Clock::Clock(rcl_clock_type_t clock_type)
{
allocator_ = rcl_get_default_allocator();
rcl_clock_ = std::shared_ptr<rcl_clock_t>(new rcl_clock_t(), rcl_clock_deleter);
auto ret = rcl_clock_init(clock_type, rcl_clock_.get(), &allocator_);
if (ret != RCL_RET_OK) {
exceptions::throw_from_rcl_error(ret, "could not get current time stamp");
}
}
: impl_(new Clock::Impl(clock_type)) {}

Clock::~Clock() {}

Time
Clock::now()
{
Time now(0, 0, rcl_clock_->type);
Time now(0, 0, impl_->rcl_clock_.type);

auto ret = rcl_clock_get_now(rcl_clock_.get(), &now.rcl_time_.nanoseconds);
auto ret = rcl_clock_get_now(&impl_->rcl_clock_, &now.rcl_time_.nanoseconds);
if (ret != RCL_RET_OK) {
exceptions::throw_from_rcl_error(ret, "could not get current time stamp");
}
Expand All @@ -71,13 +79,13 @@ Clock::now()
bool
Clock::ros_time_is_active()
{
if (!rcl_clock_valid(rcl_clock_.get())) {
if (!rcl_clock_valid(&impl_->rcl_clock_)) {
RCUTILS_LOG_ERROR("ROS time not valid!");
return false;
}

bool is_enabled = false;
auto ret = rcl_is_enabled_ros_time_override(rcl_clock_.get(), &is_enabled);
auto ret = rcl_is_enabled_ros_time_override(&impl_->rcl_clock_, &is_enabled);
if (ret != RCL_RET_OK) {
exceptions::throw_from_rcl_error(
ret, "Failed to check ros_time_override_status");
Expand All @@ -88,13 +96,19 @@ Clock::ros_time_is_active()
rcl_clock_t *
Clock::get_clock_handle() noexcept
{
return rcl_clock_.get();
return &impl_->rcl_clock_;
}

rcl_clock_type_t
Clock::get_clock_type() const noexcept
{
return rcl_clock_->type;
return impl_->rcl_clock_.type;
}

std::mutex &
Clock::get_clock_mutex() noexcept
{
return impl_->clock_mutex_;
}

void
Expand Down Expand Up @@ -126,22 +140,26 @@ Clock::create_jump_callback(
throw std::bad_alloc{};
}

// Try to add the jump callback to the clock
rcl_ret_t ret = rcl_clock_add_jump_callback(
rcl_clock_.get(), threshold, Clock::on_time_jump,
handler.get());
if (RCL_RET_OK != ret) {
exceptions::throw_from_rcl_error(ret, "Failed to add time jump callback");
{
std::lock_guard<std::mutex> clock_guard(impl_->clock_mutex_);
// Try to add the jump callback to the clock
rcl_ret_t ret = rcl_clock_add_jump_callback(
&impl_->rcl_clock_, threshold, Clock::on_time_jump,
handler.get());
if (RCL_RET_OK != ret) {
exceptions::throw_from_rcl_error(ret, "Failed to add time jump callback");
}
}

std::weak_ptr<rcl_clock_t> weak_clock = rcl_clock_;
std::weak_ptr<Clock::Impl> weak_impl = impl_;
// *INDENT-OFF*
// create shared_ptr that removes the callback automatically when all copies are destructed
return JumpHandler::SharedPtr(handler.release(), [weak_clock](JumpHandler * handler) noexcept {
auto shared_clock = weak_clock.lock();
if (shared_clock) {
rcl_ret_t ret = rcl_clock_remove_jump_callback(shared_clock.get(), Clock::on_time_jump,
handler);
return JumpHandler::SharedPtr(handler.release(), [weak_impl](JumpHandler * handler) noexcept {
auto shared_impl = weak_impl.lock();
if (shared_impl) {
std::lock_guard<std::mutex> clock_guard(shared_impl->clock_mutex_);
rcl_ret_t ret = rcl_clock_remove_jump_callback(&shared_impl->rcl_clock_,
Clock::on_time_jump, handler);
if (RCL_RET_OK != ret) {
RCUTILS_LOG_ERROR("Failed to remove time jump callback");
}
Expand Down
33 changes: 20 additions & 13 deletions rclcpp/src/rclcpp/timer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <chrono>
#include <string>
#include <memory>
#include <thread>

#include "rclcpp/contexts/default_context.hpp"
#include "rclcpp/exceptions.hpp"
Expand All @@ -40,11 +41,14 @@ TimerBase::TimerBase(
timer_handle_ = std::shared_ptr<rcl_timer_t>(
new rcl_timer_t, [ = ](rcl_timer_t * timer) mutable
{
if (rcl_timer_fini(timer) != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"Failed to clean up rcl timer handle: %s", rcl_get_error_string().str);
rcl_reset_error();
{
std::lock_guard<std::mutex> clock_guard(clock->get_clock_mutex());
if (rcl_timer_fini(timer) != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"Failed to clean up rcl timer handle: %s", rcl_get_error_string().str);
rcl_reset_error();
}
}
delete timer;
// Captured shared pointers by copy, reset to make sure timer is finalized before clock
Expand All @@ -55,15 +59,18 @@ TimerBase::TimerBase(
*timer_handle_.get() = rcl_get_zero_initialized_timer();

rcl_clock_t * clock_handle = clock_->get_clock_handle();
if (
rcl_timer_init(
timer_handle_.get(), clock_handle, rcl_context.get(), period.count(), nullptr,
rcl_get_default_allocator()) != RCL_RET_OK)
{
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"Couldn't initialize rcl timer handle: %s\n", rcl_get_error_string().str);
rcl_reset_error();
std::lock_guard<std::mutex> clock_guard(clock_->get_clock_mutex());
if (
rcl_timer_init(
timer_handle_.get(), clock_handle, rcl_context.get(), period.count(), nullptr,
rcl_get_default_allocator()) != RCL_RET_OK)
{
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"Couldn't initialize rcl timer handle: %s\n", rcl_get_error_string().str);
rcl_reset_error();
}
}
}

Expand Down