Skip to content

Commit

Permalink
Refactor random talker example
Browse files Browse the repository at this point in the history
Signed-off-by: Carlos San Vicente <carlos.sanvicente@apex.ai>
  • Loading branch information
carlossvg committed Jun 30, 2021
1 parent 8d519be commit db0a78e
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 108 deletions.
4 changes: 4 additions & 0 deletions rclcpp/wait_set/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ ament_target_dependencies(wait_set_example rclcpp std_msgs)
add_executable(wait_set_random_order wait_set_random_order.cpp)
ament_target_dependencies(wait_set_random_order rclcpp std_msgs)

add_executable(executor_random_order executor_random_order.cpp)
ament_target_dependencies(executor_random_order rclcpp std_msgs)

add_executable(wait_set_different_rates_topics wait_set_different_rates_topics.cpp)
ament_target_dependencies(wait_set_different_rates_topics rclcpp std_msgs)

Expand All @@ -42,6 +45,7 @@ install(TARGETS
thread_safe_wait_set
wait_set_example
wait_set_random_order
executor_random_order
wait_set_different_rates_topics
wait_set_composition
DESTINATION lib/${PROJECT_NAME}
Expand Down
43 changes: 43 additions & 0 deletions rclcpp/wait_set/executor_random_order.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2021, Apex.AI Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/string.hpp>
#include "random_listener.hpp"
#include "random_talker.hpp"

/* For this example, we will be creating a talker node with three publishers which will
* publish the topics A, B, C in random order each time. In this case the messages are handled
* using an executor. The order in which the messages are handled will depend on the message
* arrival and the type of messages available at the moment.
*/

int32_t main(const int32_t argc, char ** const argv)
{
rclcpp::init(argc, argv);

// We run the talker and the listener in different threads to simulate inter-process
// communications.
// Note the order of execution will be different if both nodes are spun in the same executor.
// This is because the executor handles the messages in the order in which the
// subscriptions were created. Using difference threads the handling order depends on the
// message arrival and type of messages available.
auto thread = std::thread([]() {rclcpp::spin(std::make_shared<RandomTalker>());});
rclcpp::spin(std::make_shared<Listener>());

rclcpp::shutdown();
thread.join();

return 0;
}
44 changes: 44 additions & 0 deletions rclcpp/wait_set/random_listener.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2021, Apex.AI Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/string.hpp>
#include <random>

class Listener : public rclcpp::Node
{
using subscription_list = std::vector<rclcpp::Subscription<std_msgs::msg::String>::SharedPtr>;

public:
Listener()
: Node("random_listener")
{
auto print_msg = [this](std_msgs::msg::String::UniquePtr msg) {
RCLCPP_INFO(this->get_logger(), "I heard: %s", msg->data.c_str());
};
sub1_ = this->create_subscription<std_msgs::msg::String>("topicA", 10, print_msg);
sub2_ = this->create_subscription<std_msgs::msg::String>("topicB", 10, print_msg);
sub3_ = this->create_subscription<std_msgs::msg::String>("topicC", 10, print_msg);
}

subscription_list get_subscriptions() const
{
return {sub1_, sub2_, sub3_};
}

private:
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr sub1_;
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr sub2_;
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr sub3_;
};
67 changes: 67 additions & 0 deletions rclcpp/wait_set/random_talker.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2021, Apex.AI Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/string.hpp>
#include <random>

using namespace std::chrono_literals;

class RandomTalker : public rclcpp::Node
{
public:
RandomTalker()
: Node("random_talker"),
pub1_(this->create_publisher<std_msgs::msg::String>("topicA", 10)),
pub2_(this->create_publisher<std_msgs::msg::String>("topicB", 10)),
pub3_(this->create_publisher<std_msgs::msg::String>("topicC", 10)),
rand_engine_(std::chrono::system_clock::now().time_since_epoch().count())
{
publish_functions_.emplace_back(
([this]() {
std_msgs::msg::String msg;
msg.data = "A";
RCLCPP_INFO(this->get_logger(), "Publishing: %s", msg.data.c_str());
pub1_->publish(msg);
}));
publish_functions_.emplace_back(
([this]() {
std_msgs::msg::String msg;
msg.data = "B";
RCLCPP_INFO(this->get_logger(), "Publishing: %s", msg.data.c_str());
pub2_->publish(msg);
}));
publish_functions_.emplace_back(
([this]() {
std_msgs::msg::String msg;
msg.data = "C";
RCLCPP_INFO(this->get_logger(), "Publishing: %s", msg.data.c_str());
pub3_->publish(msg);
}));
auto timer_callback =
[this]() -> void {
std::shuffle(publish_functions_.begin(), publish_functions_.end(), rand_engine_);
for (const auto & f : publish_functions_) {f();}
};
timer_ = this->create_wall_timer(1s, timer_callback);
}

private:
rclcpp::TimerBase::SharedPtr timer_;
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr pub1_;
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr pub2_;
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr pub3_;
std::vector<std::function<void()>> publish_functions_;
std::default_random_engine rand_engine_;
};
8 changes: 4 additions & 4 deletions rclcpp/wait_set/wait_set_composition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,13 @@ int main(int argc, char * argv[])
std_msgs::msg::String msg;
rclcpp::MessageInfo msg_info;
if (listener->get_subscription()->take(msg, msg_info)) {
RCLCPP_INFO(listener->get_logger(), "I heard: '%s' (waitset)", msg.data.c_str());
RCLCPP_INFO(listener->get_logger(), "I heard: '%s' (wait-set)", msg.data.c_str());
}
}
} else if (wait_result.kind() == rclcpp::WaitResultKind::Timeout) {
RCLCPP_ERROR(listener->get_logger(), "No message received after 5s.");
} else {
RCLCPP_ERROR(listener->get_logger(), "Wait-set failed.");
if (rclcpp::ok()) {
RCLCPP_ERROR(listener->get_logger(), "Wait-set failed with timeout");
}
}
}

Expand Down
16 changes: 7 additions & 9 deletions rclcpp/wait_set/wait_set_different_rates_topics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ int32_t main(const int32_t argc, char ** const argv)
{
rclcpp::init(argc, argv);

auto node = std::make_shared<rclcpp::Node>("waitset_node");
auto node = std::make_shared<rclcpp::Node>("wait_set_listener");
auto do_nothing = [](std_msgs::msg::String::UniquePtr) {assert(false);};

auto sub1 = node->create_subscription<std_msgs::msg::String>("topicA", 10, do_nothing);
Expand All @@ -75,9 +75,7 @@ int32_t main(const int32_t argc, char ** const argv)
auto publisher_thread = std::thread([&exec]() {exec.spin();});

while (rclcpp::ok()) {
// Waiting up to 5s for a message to arrive
const auto wait_result = wait_set.wait(std::chrono::seconds(5));

const auto wait_result = wait_set.wait(3s);
if (wait_result.kind() == rclcpp::WaitResultKind::Ready) {
bool sub2_has_data = wait_result.get_wait_set().get_rcl_wait_set().subscriptions[1U];
bool sub3_has_data = wait_result.get_wait_set().get_rcl_wait_set().subscriptions[2U];
Expand All @@ -98,7 +96,7 @@ int32_t main(const int32_t argc, char ** const argv)
handled_data.append(msg1.data);
}
handled_data.append(msg2.data);
RCLCPP_INFO(node->get_logger(), "Handle topic A and B: %s", handled_data.c_str());
RCLCPP_INFO(node->get_logger(), "I heard: %s", handled_data.c_str());
} else {
RCLCPP_ERROR(node->get_logger(), "An invalid message from topic B was received.");
}
Expand All @@ -109,15 +107,15 @@ int32_t main(const int32_t argc, char ** const argv)
std_msgs::msg::String msg;
rclcpp::MessageInfo msg_info;
if (sub3->take(msg, msg_info)) {
RCLCPP_INFO(node->get_logger(), "Handle topic C: %s", msg.data.c_str());
RCLCPP_INFO(node->get_logger(), "I heard: %s", msg.data.c_str());
} else {
RCLCPP_ERROR(node->get_logger(), "An invalid message from topic C was received.");
}
}
} else if (wait_result.kind() == rclcpp::WaitResultKind::Timeout) {
RCLCPP_ERROR(node->get_logger(), "No message received after 5s.");
} else {
RCLCPP_ERROR(node->get_logger(), "Wait-set failed.");
if (rclcpp::ok()) {
RCLCPP_ERROR(node->get_logger(), "Wait-set failed with timeout");
}
}
}

Expand Down
12 changes: 5 additions & 7 deletions rclcpp/wait_set/wait_set_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ int32_t main(const int32_t argc, char ** const argv)
{
rclcpp::init(argc, argv);

auto node = std::make_shared<rclcpp::Node>("wait_set_example_node");
auto node = std::make_shared<rclcpp::Node>("wait_set_listener");
auto do_nothing = [](std_msgs::msg::String::UniquePtr) {assert(false);};

auto sub1 = node->create_subscription<std_msgs::msg::String>("topicA", 1, do_nothing);
Expand Down Expand Up @@ -74,9 +74,7 @@ int32_t main(const int32_t argc, char ** const argv)

auto num_recv = std::size_t();
while (num_recv < 3U) {
// Waiting up to 5s for a sample to arrive.
const auto wait_result = wait_set.wait(std::chrono::seconds(5));

const auto wait_result = wait_set.wait(2s);
if (wait_result.kind() == rclcpp::WaitResultKind::Ready) {
if (wait_result.get_wait_set().get_rcl_wait_set().timers[0U]) {
// we execute manually the timer callback
Expand Down Expand Up @@ -111,9 +109,9 @@ int32_t main(const int32_t argc, char ** const argv)
RCLCPP_INFO(node->get_logger(), "Number of messages already got: %zu of 3", num_recv);
}
} else if (wait_result.kind() == rclcpp::WaitResultKind::Timeout) {
RCLCPP_ERROR(node->get_logger(), "No message received after 5s.");
} else {
RCLCPP_ERROR(node->get_logger(), "Wait-set failed.");
if (rclcpp::ok()) {
RCLCPP_ERROR(node->get_logger(), "Wait-set failed with timeout");
}
}
}
RCLCPP_INFO(node->get_logger(), "Got all messages!");
Expand Down
Loading

0 comments on commit db0a78e

Please sign in to comment.