diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 1bd484fcaf..2ad3674450 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -387,6 +387,14 @@ if(BUILD_TESTING) "rcl") target_link_libraries(test_utilities ${PROJECT_NAME}) endif() + + ament_add_gtest(test_multi_threaded_executor test/executors/test_multi_threaded_executor.cpp + APPEND_LIBRARY_DIRS "${append_library_dirs}") + if(TARGET test_multi_threaded_executor) + ament_target_dependencies(test_multi_threaded_executor + "rcl") + target_link_libraries(test_multi_threaded_executor ${PROJECT_NAME}) + endif() endif() ament_package( diff --git a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp index aa350cc822..b91059447b 100644 --- a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp @@ -15,7 +15,9 @@ #ifndef RCLCPP__EXECUTORS__MULTI_THREADED_EXECUTOR_HPP_ #define RCLCPP__EXECUTORS__MULTI_THREADED_EXECUTOR_HPP_ +#include #include +#include #include #include @@ -34,10 +36,22 @@ class MultiThreadedExecutor : public executor::Executor public: RCLCPP_SMART_PTR_DEFINITIONS(MultiThreadedExecutor) + /// Constructor for MultiThreadedExecutor. + /** + * For the yield_before_execute option, when true std::this_thread::yield() + * will be called after acquiring work (as an AnyExecutable) and + * releasing the spinning lock, but before executing the work. + * This is useful for reproducing some bugs related to taking work more than + * once. + * + * \param args common arguments for all executors + * \param yield_before_execute if true std::this_thread::yield() is called + */ RCLCPP_PUBLIC MultiThreadedExecutor( const executor::ExecutorArgs & args = rclcpp::executor::create_default_executor_arguments(), - size_t number_of_threads = 0); + size_t number_of_threads = 0, + bool yield_before_execute = false); RCLCPP_PUBLIC virtual ~MultiThreadedExecutor(); @@ -60,6 +74,10 @@ class MultiThreadedExecutor : public executor::Executor std::mutex wait_mutex_; size_t number_of_threads_; + bool yield_before_execute_; + + std::mutex scheduled_timers_mutex_; + std::set scheduled_timers_; }; } // namespace executors diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index 4752bb8bb8..f7a6b81c99 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -16,6 +16,7 @@ #include #include +#include #include #include "rclcpp/utilities.hpp" @@ -25,8 +26,9 @@ using rclcpp::executors::MultiThreadedExecutor; MultiThreadedExecutor::MultiThreadedExecutor( const rclcpp::executor::ExecutorArgs & args, - size_t number_of_threads) -: executor::Executor(args) + size_t number_of_threads, + bool yield_before_execute) +: executor::Executor(args), yield_before_execute_(yield_before_execute) { number_of_threads_ = number_of_threads ? number_of_threads : std::thread::hardware_concurrency(); if (number_of_threads_ == 0) { @@ -78,7 +80,27 @@ MultiThreadedExecutor::run(size_t) if (!get_next_executable(any_exec)) { continue; } + if (any_exec.timer) { + // Guard against multiple threads getting the same timer. + std::lock_guard lock(scheduled_timers_mutex_); + if (scheduled_timers_.count(any_exec.timer) != 0) { + continue; + } + scheduled_timers_.insert(any_exec.timer); + } + } + if (yield_before_execute_) { + std::this_thread::yield(); } + execute_any_executable(any_exec); + + if (any_exec.timer) { + std::lock_guard lock(scheduled_timers_mutex_); + auto it = scheduled_timers_.find(any_exec.timer); + if (it != scheduled_timers_.end()) { + scheduled_timers_.erase(it); + } + } } } diff --git a/rclcpp/test/executors/test_multi_threaded_executor.cpp b/rclcpp/test/executors/test_multi_threaded_executor.cpp new file mode 100644 index 0000000000..6465b28c8a --- /dev/null +++ b/rclcpp/test/executors/test_multi_threaded_executor.cpp @@ -0,0 +1,100 @@ +// Copyright 2018 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include + +#include "rclcpp/exceptions.hpp" +#include "rclcpp/node.hpp" +#include "rclcpp/rclcpp.hpp" +#include "rclcpp/executors.hpp" + +#include "rcl_interfaces/msg/intra_process_message.hpp" + +using namespace std::chrono_literals; + +using rcl_interfaces::msg::IntraProcessMessage; + +class TestMultiThreadedExecutor : public ::testing::Test +{ +protected: + static void SetUpTestCase() + { + rclcpp::init(0, nullptr); + } +}; + +/* + Test that timers are not taken multiple times when using reentrant callback groups. + */ +TEST_F(TestMultiThreadedExecutor, timer_over_take) { +#ifdef __linux__ + // This seems to be the most effective way to force the bug to happen on Linux. + // This is unnecessary on MacOS, since the default scheduler causes it. + struct sched_param param; + param.sched_priority = 0; + if (sched_setscheduler(0, SCHED_BATCH, ¶m) != 0) { + perror("sched_setscheduler"); + } +#endif + + bool yield_before_execute = true; + + rclcpp::executors::MultiThreadedExecutor executor( + rclcpp::executor::create_default_executor_arguments(), 2u, yield_before_execute); + + ASSERT_GT(executor.get_number_of_threads(), 1u); + + std::shared_ptr node = + std::make_shared("test_multi_threaded_executor_timer_over_take"); + + auto cbg = node->create_callback_group(rclcpp::callback_group::CallbackGroupType::Reentrant); + + rclcpp::Clock system_clock(RCL_STEADY_TIME); + std::mutex last_mutex; + auto last = system_clock.now(); + + std::atomic_int timer_count {0}; + + auto timer_callback = [&timer_count, &executor, &system_clock, &last_mutex, &last]() { + const double PERIOD = 0.1f; + const double TOLERANCE = 0.01f; + + rclcpp::Time now = system_clock.now(); + timer_count++; + + if (timer_count > 5) { + executor.cancel(); + } + + { + std::lock_guard lock(last_mutex); + double diff = std::abs((now - last).nanoseconds()) / 1.0e9; + last = now; + + if (diff < PERIOD - TOLERANCE || diff > PERIOD + TOLERANCE) { + executor.cancel(); + ASSERT_TRUE(diff > PERIOD - TOLERANCE); + ASSERT_TRUE(diff < PERIOD + TOLERANCE); + } + } + }; + + auto timer = node->create_wall_timer(100ms, timer_callback, cbg); + executor.add_node(node); + executor.spin(); +}