Skip to content

Commit

Permalink
Trigger the intraprocess guard condition with data (ros2#2164)
Browse files Browse the repository at this point in the history
If the intraprocess buffer still has data after taking, re-trigger the
guard condition to ensure that the executor will continue to service it,
even if incoming publications stop.


Signed-off-by: Michael Carroll <michael@openrobotics.org>
  • Loading branch information
mjcarroll authored and Barry-Xu-2018 committed Jan 12, 2024
1 parent 75a92e4 commit 27cd369
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ class SubscriptionIntraProcess
return nullptr;
}
}

if (this->buffer_->has_data()) {
// If there is data still to be processed, indicate to the
// executor or waitset by triggering the guard condition.
this->trigger_guard_condition();
}

return std::static_pointer_cast<void>(
std::make_shared<std::pair<ConstMessageSharedPtr, MessageUniquePtr>>(
std::pair<ConstMessageSharedPtr, MessageUniquePtr>(
Expand Down
103 changes: 103 additions & 0 deletions rclcpp/test/rclcpp/executors/test_executors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -593,3 +593,106 @@ TEST(TestExecutors, testSpinUntilFutureCompleteNodePtr) {

rclcpp::shutdown();
}

template<typename T>
class TestIntraprocessExecutors : public ::testing::Test
{
public:
static void SetUpTestCase()
{
rclcpp::init(0, nullptr);
}

static void TearDownTestCase()
{
rclcpp::shutdown();
}

void SetUp()
{
const auto test_info = ::testing::UnitTest::GetInstance()->current_test_info();
std::stringstream test_name;
test_name << test_info->test_case_name() << "_" << test_info->name();
node = std::make_shared<rclcpp::Node>("node", test_name.str());

callback_count = 0;

const std::string topic_name = std::string("topic_") + test_name.str();

rclcpp::PublisherOptions po;
po.use_intra_process_comm = rclcpp::IntraProcessSetting::Enable;
publisher = node->create_publisher<test_msgs::msg::Empty>(topic_name, rclcpp::QoS(1), po);

auto callback = [this](test_msgs::msg::Empty::ConstSharedPtr) {
this->callback_count.fetch_add(1);
};

rclcpp::SubscriptionOptions so;
so.use_intra_process_comm = rclcpp::IntraProcessSetting::Enable;
subscription =
node->create_subscription<test_msgs::msg::Empty>(
topic_name, rclcpp::QoS(kNumMessages), std::move(callback), so);
}

void TearDown()
{
publisher.reset();
subscription.reset();
node.reset();
}

const size_t kNumMessages = 100;

rclcpp::Node::SharedPtr node;
rclcpp::Publisher<test_msgs::msg::Empty>::SharedPtr publisher;
rclcpp::Subscription<test_msgs::msg::Empty>::SharedPtr subscription;
std::atomic_int callback_count;
};

TYPED_TEST_SUITE(TestIntraprocessExecutors, ExecutorTypes, ExecutorTypeNames);

TYPED_TEST(TestIntraprocessExecutors, testIntraprocessRetrigger) {
// This tests that executors will continue to service intraprocess subscriptions in the case
// that publishers aren't continuing to publish.
// This was previously broken in that intraprocess guard conditions were only triggered on
// publish and the test was added to prevent future regressions.
const size_t kNumMessages = 100;

using ExecutorType = TypeParam;
ExecutorType executor;
executor.add_node(this->node);

EXPECT_EQ(0, this->callback_count.load());
this->publisher->publish(test_msgs::msg::Empty());

// Wait for up to 5 seconds for the first message to come available.
const std::chrono::milliseconds sleep_per_loop(10);
int loops = 0;
while (1u != this->callback_count.load() && loops < 500) {
rclcpp::sleep_for(sleep_per_loop);
executor.spin_some();
loops++;
}
EXPECT_EQ(1u, this->callback_count.load());

// reset counter
this->callback_count.store(0);

for (size_t ii = 0; ii < kNumMessages; ++ii) {
this->publisher->publish(test_msgs::msg::Empty());
}

// Fire a timer every 10ms up to 5 seconds waiting for subscriptions to be read.
loops = 0;
auto timer = this->node->create_wall_timer(
std::chrono::milliseconds(10), [this, &executor, &loops, &kNumMessages]() {
loops++;
if (kNumMessages == this->callback_count.load() ||
loops == 500)
{
executor.cancel();
}
});
executor.spin();
EXPECT_EQ(kNumMessages, this->callback_count.load());
}

0 comments on commit 27cd369

Please sign in to comment.