Skip to content

Commit

Permalink
Allow timers to keep up the intended rate in MultiThreadedExecutor (#…
Browse files Browse the repository at this point in the history
…1516)

Signed-off-by: Ivan Santiago Paunovic <ivanpauno@ekumenlabs.com>
  • Loading branch information
ivanpauno authored and Karsten1987 committed Mar 2, 2021
1 parent e69a5b6 commit 9b44106
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 4 deletions.
1 change: 1 addition & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/clock.cpp
src/rclcpp/context.cpp
src/rclcpp/contexts/default_context.cpp
src/rclcpp/detail/mutex_two_priorities.cpp
src/rclcpp/detail/rmw_implementation_specific_payload.cpp
src/rclcpp/detail/rmw_implementation_specific_publisher_payload.cpp
src/rclcpp/detail/rmw_implementation_specific_subscription_payload.cpp
Expand Down
75 changes: 75 additions & 0 deletions rclcpp/include/rclcpp/detail/mutex_two_priorities.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2021 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__DETAIL__MUTEX_TWO_PRIORITIES_HPP_
#define RCLCPP__DETAIL__MUTEX_TWO_PRIORITIES_HPP_

#include <mutex>

namespace rclcpp
{
namespace detail
{
/// \internal A mutex that has two locking mechanism, one with higher priority than the other.
/**
* After the current mutex owner release the lock, a thread that used the high
* priority mechanism will have priority over threads that used the low priority mechanism.
*/
class MutexTwoPriorities
{
public:
class HighPriorityLockable
{
public:
explicit HighPriorityLockable(MutexTwoPriorities & parent);

void lock();

void unlock();

private:
MutexTwoPriorities & parent_;
};

class LowPriorityLockable
{
public:
explicit LowPriorityLockable(MutexTwoPriorities & parent);

void lock();

void unlock();

private:
MutexTwoPriorities & parent_;
};

HighPriorityLockable
get_high_priority_lockable();

LowPriorityLockable
get_low_priority_lockable();

private:
// Implementation detail: the idea here is that only one low priority thread can be
// trying to take the data_ mutex while the others are excluded by the barrier_ mutex.
// All high priority threads are already waiting for the data_ mutex.
std::mutex barrier_;
std::mutex data_;
};

} // namespace detail
} // namespace rclcpp

#endif // RCLCPP__DETAIL__MUTEX_TWO_PRIORITIES_HPP_
3 changes: 2 additions & 1 deletion rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <thread>
#include <unordered_map>

#include "rclcpp/detail/mutex_two_priorities.hpp"
#include "rclcpp/executor.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/memory_strategies.hpp"
Expand Down Expand Up @@ -81,7 +82,7 @@ class MultiThreadedExecutor : public rclcpp::Executor
private:
RCLCPP_DISABLE_COPY(MultiThreadedExecutor)

std::mutex wait_mutex_;
detail::MutexTwoPriorities wait_mutex_;
size_t number_of_threads_;
bool yield_before_execute_;
std::chrono::nanoseconds next_exec_timeout_;
Expand Down
75 changes: 75 additions & 0 deletions rclcpp/src/rclcpp/detail/mutex_two_priorities.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2021 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "rclcpp/detail/mutex_two_priorities.hpp"

#include <mutex>

namespace rclcpp
{
namespace detail
{

using LowPriorityLockable = MutexTwoPriorities::LowPriorityLockable;
using HighPriorityLockable = MutexTwoPriorities::HighPriorityLockable;

HighPriorityLockable::HighPriorityLockable(MutexTwoPriorities & parent)
: parent_(parent)
{}

void
HighPriorityLockable::lock()
{
parent_.data_.lock();
}

void
HighPriorityLockable::unlock()
{
parent_.data_.unlock();
}

LowPriorityLockable::LowPriorityLockable(MutexTwoPriorities & parent)
: parent_(parent)
{}

void
LowPriorityLockable::lock()
{
std::unique_lock<std::mutex> barrier_guard{parent_.barrier_};
parent_.data_.lock();
barrier_guard.release();
}

void
LowPriorityLockable::unlock()
{
std::lock_guard<std::mutex> barrier_guard{parent_.barrier_, std::adopt_lock};
parent_.data_.unlock();
}

HighPriorityLockable
MutexTwoPriorities::get_high_priority_lockable()
{
return HighPriorityLockable{*this};
}

LowPriorityLockable
MutexTwoPriorities::get_low_priority_lockable()
{
return LowPriorityLockable{*this};
}

} // namespace detail
} // namespace rclcpp
10 changes: 7 additions & 3 deletions rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "rclcpp/utilities.hpp"
#include "rclcpp/scope_exit.hpp"

using rclcpp::detail::MutexTwoPriorities;
using rclcpp::executors::MultiThreadedExecutor;

MultiThreadedExecutor::MultiThreadedExecutor(
Expand Down Expand Up @@ -51,7 +52,8 @@ MultiThreadedExecutor::spin()
std::vector<std::thread> threads;
size_t thread_id = 0;
{
std::lock_guard<std::mutex> wait_lock(wait_mutex_);
auto low_priority_wait_mutex = wait_mutex_.get_low_priority_lockable();
std::lock_guard<MutexTwoPriorities::LowPriorityLockable> wait_lock(low_priority_wait_mutex);
for (; thread_id < number_of_threads_ - 1; ++thread_id) {
auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id);
threads.emplace_back(func);
Expand All @@ -76,7 +78,8 @@ MultiThreadedExecutor::run(size_t)
while (rclcpp::ok(this->context_) && spinning.load()) {
rclcpp::AnyExecutable any_exec;
{
std::lock_guard<std::mutex> wait_lock(wait_mutex_);
auto low_priority_wait_mutex = wait_mutex_.get_low_priority_lockable();
std::lock_guard<MutexTwoPriorities::LowPriorityLockable> wait_lock(low_priority_wait_mutex);
if (!rclcpp::ok(this->context_) || !spinning.load()) {
return;
}
Expand All @@ -103,7 +106,8 @@ MultiThreadedExecutor::run(size_t)
execute_any_executable(any_exec);

if (any_exec.timer) {
std::lock_guard<std::mutex> wait_lock(wait_mutex_);
auto high_priority_wait_mutex = wait_mutex_.get_high_priority_lockable();
std::lock_guard<MutexTwoPriorities::HighPriorityLockable> wait_lock(high_priority_wait_mutex);
auto it = scheduled_timers_.find(any_exec.timer);
if (it != scheduled_timers_.end()) {
scheduled_timers_.erase(it);
Expand Down

0 comments on commit 9b44106

Please sign in to comment.