Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
mjcarroll committed Mar 28, 2018
1 parent 5ac980e commit ace16b7
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 65 deletions.
4 changes: 3 additions & 1 deletion rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ MultiThreadedExecutor::get_number_of_threads()
}

void
MultiThreadedExecutor::run(size_t)
MultiThreadedExecutor::run(size_t thread_id)
{
while (rclcpp::ok() && spinning.load()) {
executor::AnyExecutable::SharedPtr any_exec;
Expand All @@ -78,7 +78,9 @@ MultiThreadedExecutor::run(size_t)
any_exec = get_next_executable();
}
if (yield_before_execute_) {
fprintf(stderr, "BY: %zi\n", thread_id);
std::this_thread::yield();
fprintf(stderr, "AY: %zi\n", thread_id);
}
execute_any_executable(any_exec);
}
Expand Down
95 changes: 31 additions & 64 deletions rclcpp/test/executors/test_multi_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,81 +43,48 @@ class TestMultiThreadedExecutor : public::testing::Test
*/
TEST_F(TestMultiThreadedExecutor, timer_over_take) {

rclcpp::executors::MultiThreadedExecutor executor(rclcpp::executor::create_default_executor_arguments(),
true);
struct sched_param param;
param.sched_priority = 0;
if (sched_setscheduler(0, SCHED_BATCH, &param) != 0) {
perror("sched_setscheduler");
}

rclcpp::executors::MultiThreadedExecutor executor(
rclcpp::executor::create_default_executor_arguments(),
true);

ASSERT_GT(executor.get_number_of_threads(), 1);
ASSERT_GT(executor.get_number_of_threads(), 1u);

std::shared_ptr<rclcpp::Node> node =
std::make_shared<rclcpp::Node>("test_multi_threaded_executor_timer_over_take");

auto cbg = node->create_callback_group(rclcpp::callback_group::CallbackGroupType::Reentrant);

rclcpp::Clock system_clock(RCL_STEADY_TIME);
auto start = system_clock.now();

auto timer = node->create_wall_timer(100ms, [&system_clock, &start]() {
std::cerr << (system_clock.now() - start).nanoseconds() << std::endl;
});

executor.add_node(node);
executor.spin();
}

TEST_F(TestMultiThreadedExecutor, timer_starvation) {
std::string node_topic_name = "publisher";

std::shared_ptr<rclcpp::Node> node =
std::make_shared<rclcpp::Node>("test_single_threaded_executor_timer_starvation");

auto cbg = node->create_callback_group(rclcpp::callback_group::CallbackGroupType::Reentrant);

rclcpp::executors::SingleThreadedExecutor executor;

//const size_t num_messages = 5 * std::min<size_t>(executor.get_number_of_threads(), 16);
const size_t num_messages = 5;
auto pub = node->create_publisher<IntraProcessMessage>(node_topic_name, num_messages);

std::atomic_uint subscription_counter(0);
auto sub_callback = [&subscription_counter](const IntraProcessMessage::SharedPtr /*msg*/)
{
++subscription_counter;
printf("Subscription callback %u\n", subscription_counter.load());
};

auto sub = node->create_subscription<IntraProcessMessage>(node_topic_name, num_messages,
sub_callback,
cbg);

auto msg = std::make_shared<IntraProcessMessage>();
pub->publish(msg);

// wait a moment for everything to initialize
//test_rclcpp::wait_for_subscriber(node, node_topic_name);

// use atomic
std::atomic_uint timer_counter(0);

auto timer_callback =
[&executor, &pub, &msg, &timer_counter, &subscription_counter, &num_messages](
rclcpp::TimerBase & /*timer*/)
{
++timer_counter;

if (timer_counter.load() >= 10) {
executor.cancel();
const auto start = system_clock.now();
std::mutex last_mutex;
rclcpp::Time last = start;

auto timer_callback = [&executor, &system_clock, &start, &last_mutex, &last]() {
rclcpp::Time now = system_clock.now();
fprintf(stderr, "elap: %f\n", (now-start).nanoseconds()/1.0e9);

{
std::lock_guard<std::mutex> lock(last_mutex);
double diff = abs((now - last).nanoseconds()) / 1.0e9;
fprintf(stderr, "diff: %f\n", diff);
last = now;

if (diff < 0.09 || diff > 0.11) {
fprintf(stderr, "executor cancel");
executor.cancel();
ASSERT_TRUE(diff > 0.09);
ASSERT_TRUE(diff < 0.11);
}
}

printf("Timer callback%u\n", timer_counter.load());
pub->publish(msg);
rclcpp::sleep_for(15ms);
};

std::vector<rclcpp::TimerBase::SharedPtr> timers;
timers.push_back(node->create_wall_timer(10ms, timer_callback));
auto timer = node->create_wall_timer(100ms, timer_callback, cbg);
executor.add_node(node);
executor.spin();

ASSERT_EQ(10, timer_counter);
ASSERT_EQ(10, subscription_counter);
}
98 changes: 98 additions & 0 deletions rclcpp/test/executors/test_single_threaded_executor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2018 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <gtest/gtest.h>

#include <chrono>
#include <string>
#include <memory>

#include "rclcpp/exceptions.hpp"
#include "rclcpp/node.hpp"
#include "rclcpp/rclcpp.hpp"
#include "rclcpp/executors.hpp"

#include "rcl_interfaces/msg/intra_process_message.hpp"

using namespace std::chrono_literals;

using rcl_interfaces::msg::IntraProcessMessage;

class TestSingleThreadedExecutor: public::testing::Test
{
protected:
static void SetUpTestCase()
{
rclcpp::init(0, nullptr);
}
};

TEST_F(TestSingleThreadedExecutor, timer_starvation) {
std::string node_topic_name = "publisher";

std::shared_ptr<rclcpp::Node> node =
std::make_shared<rclcpp::Node>("test_single_threaded_executor_timer_starvation");

auto cbg = node->create_callback_group(rclcpp::callback_group::CallbackGroupType::Reentrant);

rclcpp::executors::SingleThreadedExecutor executor;

//const size_t num_messages = 5 * std::min<size_t>(executor.get_number_of_threads(), 16);
const size_t num_messages = 5;
auto pub = node->create_publisher<IntraProcessMessage>(node_topic_name, num_messages);

std::atomic_uint subscription_counter(0);
auto sub_callback = [&subscription_counter](const IntraProcessMessage::SharedPtr /*msg*/)
{
++subscription_counter;
printf("Subscription callback %u\n", subscription_counter.load());
};

auto sub = node->create_subscription<IntraProcessMessage>(node_topic_name, num_messages,
sub_callback,
cbg);

auto msg = std::make_shared<IntraProcessMessage>();
pub->publish(msg);

// wait a moment for everything to initialize
//test_rclcpp::wait_for_subscriber(node, node_topic_name);

// use atomic
std::atomic_uint timer_counter(0);

auto timer_callback =
[&executor, &pub, &msg, &timer_counter, &subscription_counter, &num_messages](
rclcpp::TimerBase & /*timer*/)
{
++timer_counter;

if (timer_counter.load() >= 10) {
executor.cancel();
}

printf("Timer callback%u\n", timer_counter.load());
pub->publish(msg);
rclcpp::sleep_for(15ms);
};

std::vector<rclcpp::TimerBase::SharedPtr> timers;
timers.push_back(node->create_wall_timer(10ms, timer_callback));
executor.add_node(node);
executor.spin();

ASSERT_EQ(10, timer_counter);
ASSERT_EQ(10, subscription_counter);
}

0 comments on commit ace16b7

Please sign in to comment.