Skip to content

Commit

Permalink
use a dedicated callback group for costmap plugin and filter instead …
Browse files Browse the repository at this point in the history
…of inner node

Signed-off-by: zhenpeng ge <zhenpeng.ge@qq.com>

update comment

Signed-off-by: zhenpeng ge <zhenpeng.ge@qq.com>
  • Loading branch information
gezp committed Jun 2, 2022
1 parent 7efb6ba commit 972c433
Show file tree
Hide file tree
Showing 14 changed files with 77 additions and 50 deletions.
7 changes: 5 additions & 2 deletions nav2_costmap_2d/include/nav2_costmap_2d/costmap_2d_ros.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,6 @@ class Costmap2DROS : public nav2_util::LifecycleNode
double getRobotRadius() {return robot_radius_;}

protected:
rclcpp::Node::SharedPtr client_node_;

// Publishers and subscribers
rclcpp_lifecycle::LifecyclePublisher<geometry_msgs::msg::PolygonStamped>::SharedPtr
footprint_pub_;
Expand All @@ -313,6 +311,11 @@ class Costmap2DROS : public nav2_util::LifecycleNode
rclcpp::Subscription<geometry_msgs::msg::Polygon>::SharedPtr footprint_sub_;
rclcpp::Subscription<rcl_interfaces::msg::ParameterEvent>::SharedPtr parameter_sub_;

// Dedicated callback group and executor for tf timer_interface, costmap plugins and filters
rclcpp::CallbackGroup::SharedPtr callback_group_;
rclcpp::executors::SingleThreadedExecutor::SharedPtr executor_;
std::unique_ptr<nav2_util::NodeThread> executor_thread_;

// Transform listener
std::shared_ptr<tf2_ros::Buffer> tf_buffer_;
std::shared_ptr<tf2_ros::TransformListener> tf_listener_;
Expand Down
6 changes: 2 additions & 4 deletions nav2_costmap_2d/include/nav2_costmap_2d/layer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ class Layer
std::string name,
tf2_ros::Buffer * tf,
const nav2_util::LifecycleNode::WeakPtr & node,
rclcpp::Node::SharedPtr client_node,
rclcpp::Node::SharedPtr rclcpp_node);
rclcpp::CallbackGroup::SharedPtr callback_group);
/** @brief Stop publishers. */
virtual void deactivate() {}
/** @brief Restart publishers if they've been stopped. */
Expand Down Expand Up @@ -160,8 +159,7 @@ class Layer
LayeredCostmap * layered_costmap_;
std::string name_;
tf2_ros::Buffer * tf_;
rclcpp::Node::SharedPtr client_node_;
rclcpp::Node::SharedPtr rclcpp_node_;
rclcpp::CallbackGroup::SharedPtr callback_group_;
rclcpp_lifecycle::LifecycleNode::WeakPtr node_;
rclcpp::Clock::SharedPtr clock_;
rclcpp::Logger logger_{rclcpp::get_logger("nav2_costmap_2d")};
Expand Down
3 changes: 2 additions & 1 deletion nav2_costmap_2d/include/nav2_costmap_2d/obstacle_layer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ class ObstacleLayer : public CostmapLayer
/// @brief Used to project laser scans into point clouds
laser_geometry::LaserProjection projector_;
/// @brief Used for the observation message filters
std::vector<std::shared_ptr<message_filters::SubscriberBase<>>> observation_subscribers_;
std::vector<std::shared_ptr<message_filters::SubscriberBase<rclcpp_lifecycle::LifecycleNode>>>
observation_subscribers_;
/// @brief Used to make sure that transforms are available for each sensor
std::vector<std::shared_ptr<tf2_ros::MessageFilterBase>> observation_notifiers_;
/// @brief Used to store observations from various sensors
Expand Down
11 changes: 9 additions & 2 deletions nav2_costmap_2d/plugins/costmap_filters/keepout_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,18 @@ void KeepoutFilter::initializeFilter(
}

filter_info_topic_ = filter_info_topic;

auto sub_opt = rclcpp::SubscriptionOptions();
sub_opt.callback_group = callback_group_;

// Setting new costmap filter info subscriber
RCLCPP_INFO(
logger_,
"KeepoutFilter: Subscribing to \"%s\" topic for filter info...",
filter_info_topic_.c_str());
filter_info_sub_ = node->create_subscription<nav2_msgs::msg::CostmapFilterInfo>(
filter_info_topic_, rclcpp::QoS(rclcpp::KeepLast(1)).transient_local().reliable(),
std::bind(&KeepoutFilter::filterInfoCallback, this, std::placeholders::_1));
std::bind(&KeepoutFilter::filterInfoCallback, this, std::placeholders::_1), sub_opt);

global_frame_ = layered_costmap_->getGlobalFrameID();
}
Expand Down Expand Up @@ -110,14 +114,17 @@ void KeepoutFilter::filterInfoCallback(

mask_topic_ = msg->filter_mask_topic;

auto sub_opt = rclcpp::SubscriptionOptions();
sub_opt.callback_group = callback_group_;

// Setting new filter mask subscriber
RCLCPP_INFO(
logger_,
"KeepoutFilter: Subscribing to \"%s\" topic for filter mask...",
mask_topic_.c_str());
mask_sub_ = node->create_subscription<nav_msgs::msg::OccupancyGrid>(
mask_topic_, rclcpp::QoS(rclcpp::KeepLast(1)).transient_local().reliable(),
std::bind(&KeepoutFilter::maskCallback, this, std::placeholders::_1));
std::bind(&KeepoutFilter::maskCallback, this, std::placeholders::_1), sub_opt);
}

void KeepoutFilter::maskCallback(
Expand Down
11 changes: 9 additions & 2 deletions nav2_costmap_2d/plugins/costmap_filters/speed_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,18 @@ void SpeedFilter::initializeFilter(
node->get_parameter(name_ + "." + "speed_limit_topic", speed_limit_topic);

filter_info_topic_ = filter_info_topic;

auto sub_opt = rclcpp::SubscriptionOptions();
sub_opt.callback_group = callback_group_;

// Setting new costmap filter info subscriber
RCLCPP_INFO(
logger_,
"SpeedFilter: Subscribing to \"%s\" topic for filter info...",
filter_info_topic_.c_str());
filter_info_sub_ = node->create_subscription<nav2_msgs::msg::CostmapFilterInfo>(
filter_info_topic_, rclcpp::QoS(rclcpp::KeepLast(1)).transient_local().reliable(),
std::bind(&SpeedFilter::filterInfoCallback, this, std::placeholders::_1));
std::bind(&SpeedFilter::filterInfoCallback, this, std::placeholders::_1), sub_opt);

// Get global frame required for speed limit publisher
global_frame_ = layered_costmap_->getGlobalFrameID();
Expand Down Expand Up @@ -143,14 +147,17 @@ void SpeedFilter::filterInfoCallback(

mask_topic_ = msg->filter_mask_topic;

auto sub_opt = rclcpp::SubscriptionOptions();
sub_opt.callback_group = callback_group_;

// Setting new filter mask subscriber
RCLCPP_INFO(
logger_,
"SpeedFilter: Subscribing to \"%s\" topic for filter mask...",
mask_topic_.c_str());
mask_sub_ = node->create_subscription<nav_msgs::msg::OccupancyGrid>(
mask_topic_, rclcpp::QoS(rclcpp::KeepLast(1)).transient_local().reliable(),
std::bind(&SpeedFilter::maskCallback, this, std::placeholders::_1));
std::bind(&SpeedFilter::maskCallback, this, std::placeholders::_1), sub_opt);
}

void SpeedFilter::maskCallback(
Expand Down
29 changes: 17 additions & 12 deletions nav2_costmap_2d/plugins/obstacle_layer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ void ObstacleLayer::onInitialize()

global_frame_ = layered_costmap_->getGlobalFrameID();

auto sub_opt = rclcpp::SubscriptionOptions();
sub_opt.callback_group = callback_group_;

// now we need to split the topics based on whitespace which we can use a stringstream for
std::stringstream ss(topics_string);

Expand Down Expand Up @@ -218,14 +221,15 @@ void ObstacleLayer::onInitialize()

// create a callback for the topic
if (data_type == "LaserScan") {
std::shared_ptr<message_filters::Subscriber<sensor_msgs::msg::LaserScan>> sub(
new message_filters::Subscriber<sensor_msgs::msg::LaserScan>(
rclcpp_node_, topic, custom_qos_profile));
auto sub = std::make_shared<message_filters::Subscriber<sensor_msgs::msg::LaserScan,
rclcpp_lifecycle::LifecycleNode>>(node, topic, custom_qos_profile, sub_opt);
sub->unsubscribe();

std::shared_ptr<tf2_ros::MessageFilter<sensor_msgs::msg::LaserScan>> filter(
new tf2_ros::MessageFilter<sensor_msgs::msg::LaserScan>(
*sub, *tf_, global_frame_, 50, rclcpp_node_, tf2::durationFromSec(transform_tolerance)));
auto filter = std::make_shared<tf2_ros::MessageFilter<sensor_msgs::msg::LaserScan>>(
*sub, *tf_, global_frame_, 50,
node->get_node_logging_interface(),
node->get_node_clock_interface(),
tf2::durationFromSec(transform_tolerance));

if (inf_is_valid) {
filter->registerCallback(
Expand All @@ -246,9 +250,8 @@ void ObstacleLayer::onInitialize()
observation_notifiers_.back()->setTolerance(rclcpp::Duration::from_seconds(0.05));

} else {
std::shared_ptr<message_filters::Subscriber<sensor_msgs::msg::PointCloud2>> sub(
new message_filters::Subscriber<sensor_msgs::msg::PointCloud2>(
rclcpp_node_, topic, custom_qos_profile));
auto sub = std::make_shared<message_filters::Subscriber<sensor_msgs::msg::PointCloud2,
rclcpp_lifecycle::LifecycleNode>>(node, topic, custom_qos_profile, sub_opt);
sub->unsubscribe();

if (inf_is_valid) {
Expand All @@ -257,9 +260,11 @@ void ObstacleLayer::onInitialize()
"obstacle_layer: inf_is_valid option is not applicable to PointCloud observations.");
}

std::shared_ptr<tf2_ros::MessageFilter<sensor_msgs::msg::PointCloud2>> filter(
new tf2_ros::MessageFilter<sensor_msgs::msg::PointCloud2>(
*sub, *tf_, global_frame_, 50, rclcpp_node_, tf2::durationFromSec(transform_tolerance)));
auto filter = std::make_shared<tf2_ros::MessageFilter<sensor_msgs::msg::PointCloud2>>(
*sub, *tf_, global_frame_, 50,
node->get_node_logging_interface(),
node->get_node_clock_interface(),
tf2::durationFromSec(transform_tolerance));

filter->registerCallback(
std::bind(
Expand Down
5 changes: 4 additions & 1 deletion nav2_costmap_2d/plugins/range_sensor_layer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ void RangeSensorLayer::onInitialize()
return;
}

auto sub_opt = rclcpp::SubscriptionOptions();
sub_opt.callback_group = callback_group_;

// Traverse the topic names list subscribing to all of them with the same callback method
for (auto & topic_name : topic_names) {
if (input_sensor_type == InputSensorType::VARIABLE) {
Expand All @@ -153,7 +156,7 @@ void RangeSensorLayer::onInitialize()
node->create_subscription<sensor_msgs::msg::Range>(
topic_name, rclcpp::SensorDataQoS(), std::bind(
&RangeSensorLayer::bufferIncomingRangeMsg, this,
std::placeholders::_1)));
std::placeholders::_1), sub_opt));

RCLCPP_INFO(
logger_, "RangeSensorLayer: subscribed to "
Expand Down
7 changes: 5 additions & 2 deletions nav2_costmap_2d/plugins/static_layer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,19 @@ StaticLayer::onInitialize()
throw std::runtime_error{"Failed to lock node"};
}

// use a dedicated callback group for map and map_update subscriber
auto sub_opt = rclcpp::SubscriptionOptions();
sub_opt.callback_group = callback_group_;
map_sub_ = node->create_subscription<nav_msgs::msg::OccupancyGrid>(
map_topic_, map_qos,
std::bind(&StaticLayer::incomingMap, this, std::placeholders::_1));
std::bind(&StaticLayer::incomingMap, this, std::placeholders::_1), sub_opt);

if (subscribe_to_updates_) {
RCLCPP_INFO(logger_, "Subscribing to updates");
map_update_sub_ = node->create_subscription<map_msgs::msg::OccupancyGridUpdate>(
map_topic_ + "_updates",
rclcpp::SystemDefaultsQoS(),
std::bind(&StaticLayer::incomingUpdate, this, std::placeholders::_1));
std::bind(&StaticLayer::incomingUpdate, this, std::placeholders::_1), sub_opt);
}
}

Expand Down
23 changes: 14 additions & 9 deletions nav2_costmap_2d/src/costmap_2d_ros.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ Costmap2DROS::Costmap2DROS(
const std::string & name,
const std::string & parent_namespace,
const std::string & local_namespace)
: nav2_util::LifecycleNode(name, "", true,
: nav2_util::LifecycleNode(name, "", false,
// NodeOption arguments take precedence over the ones provided on the command line
// use this to make sure the node is placed on the provided namespace
// TODO(orduno) Pass a sub-node instead of creating a new node for better handling
Expand All @@ -84,9 +84,6 @@ Costmap2DROS::Costmap2DROS(
"nav2_costmap_2d::InflationLayer"}
{
RCLCPP_INFO(get_logger(), "Creating Costmap");
auto options = rclcpp::NodeOptions().arguments(
{"--ros-args", "-r", std::string("__node:=") + get_name() + "_client", "--"});
client_node_ = std::make_shared<rclcpp::Node>("_", options);

std::vector<std::string> clearable_layers{"obstacle_layer", "voxel_layer", "range_layer"};

Expand Down Expand Up @@ -129,6 +126,9 @@ Costmap2DROS::on_configure(const rclcpp_lifecycle::State & /*state*/)
RCLCPP_INFO(get_logger(), "Configuring");
getParameters();

callback_group_ = create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive, false);

// Create the costmap itself
layered_costmap_ = std::make_unique<LayeredCostmap>(
global_frame_, rolling_window_, track_unknown_space_);
Expand All @@ -140,10 +140,11 @@ Costmap2DROS::on_configure(const rclcpp_lifecycle::State & /*state*/)
}

// Create the transform-related objects
tf_buffer_ = std::make_shared<tf2_ros::Buffer>(rclcpp_node_->get_clock());
tf_buffer_ = std::make_shared<tf2_ros::Buffer>(get_clock());
auto timer_interface = std::make_shared<tf2_ros::CreateTimerROS>(
rclcpp_node_->get_node_base_interface(),
rclcpp_node_->get_node_timers_interface());
get_node_base_interface(),
get_node_timers_interface(),
callback_group_);
tf_buffer_->setCreateTimerInterface(timer_interface);
tf_listener_ = std::make_shared<tf2_ros::TransformListener>(*tf_buffer_);

Expand All @@ -157,7 +158,7 @@ Costmap2DROS::on_configure(const rclcpp_lifecycle::State & /*state*/)
// TODO(mjeronimo): instead of get(), use a shared ptr
plugin->initialize(
layered_costmap_.get(), plugin_names_[i], tf_buffer_.get(),
shared_from_this(), client_node_, rclcpp_node_);
shared_from_this(), callback_group_);

RCLCPP_INFO(get_logger(), "Initialized plugin \"%s\"", plugin_names_[i].c_str());
}
Expand All @@ -170,7 +171,7 @@ Costmap2DROS::on_configure(const rclcpp_lifecycle::State & /*state*/)

filter->initialize(
layered_costmap_.get(), filter_names_[i], tf_buffer_.get(),
shared_from_this(), client_node_, rclcpp_node_);
shared_from_this(), callback_group_);

RCLCPP_INFO(get_logger(), "Initialized costmap filter \"%s\"", filter_names_[i].c_str());
}
Expand Down Expand Up @@ -201,6 +202,9 @@ Costmap2DROS::on_configure(const rclcpp_lifecycle::State & /*state*/)
// Add cleaning service
clear_costmap_service_ = std::make_unique<ClearCostmapService>(shared_from_this(), *this);

executor_ = std::make_shared<rclcpp::executors::SingleThreadedExecutor>();
executor_->add_callback_group(callback_group_, get_node_base_interface());
executor_thread_ = std::make_unique<nav2_util::NodeThread>(executor_);
return nav2_util::CallbackReturn::SUCCESS;
}

Expand Down Expand Up @@ -284,6 +288,7 @@ Costmap2DROS::on_cleanup(const rclcpp_lifecycle::State & /*state*/)
costmap_publisher_.reset();
clear_costmap_service_.reset();

executor_thread_.reset();
return nav2_util::CallbackReturn::SUCCESS;
}

Expand Down
7 changes: 2 additions & 5 deletions nav2_costmap_2d/src/layer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,13 @@ Layer::initialize(
std::string name,
tf2_ros::Buffer * tf,
const nav2_util::LifecycleNode::WeakPtr & node,
rclcpp::Node::SharedPtr client_node,
rclcpp::Node::SharedPtr rclcpp_node)
rclcpp::CallbackGroup::SharedPtr callback_group)
{
layered_costmap_ = parent;
name_ = name;
tf_ = tf;
client_node_ = client_node;
rclcpp_node_ = rclcpp_node;
node_ = node;

callback_group_ = callback_group;
{
auto node_shared_ptr = node_.lock();
logger_ = node_shared_ptr->get_logger();
Expand Down
8 changes: 4 additions & 4 deletions nav2_costmap_2d/test/testing_helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ void addStaticLayer(
{
slayer = std::make_shared<nav2_costmap_2d::StaticLayer>();
layers.addPlugin(std::shared_ptr<nav2_costmap_2d::Layer>(slayer));
slayer->initialize(&layers, "static", &tf, node, nullptr, nullptr /*TODO*/);
slayer->initialize(&layers, "static", &tf, node, nullptr);
}

void addObstacleLayer(
Expand All @@ -84,7 +84,7 @@ void addObstacleLayer(
std::shared_ptr<nav2_costmap_2d::ObstacleLayer> & olayer)
{
olayer = std::make_shared<nav2_costmap_2d::ObstacleLayer>();
olayer->initialize(&layers, "obstacles", &tf, node, nullptr, nullptr /*TODO*/);
olayer->initialize(&layers, "obstacles", &tf, node, nullptr);
layers.addPlugin(std::shared_ptr<nav2_costmap_2d::Layer>(olayer));
}

Expand All @@ -94,7 +94,7 @@ void addRangeLayer(
std::shared_ptr<nav2_costmap_2d::RangeSensorLayer> & rlayer)
{
rlayer = std::make_shared<nav2_costmap_2d::RangeSensorLayer>();
rlayer->initialize(&layers, "range", &tf, node, nullptr, nullptr /*TODO*/);
rlayer->initialize(&layers, "range", &tf, node, nullptr);
layers.addPlugin(std::shared_ptr<nav2_costmap_2d::Layer>(rlayer));
}

Expand Down Expand Up @@ -133,7 +133,7 @@ void addInflationLayer(
std::shared_ptr<nav2_costmap_2d::InflationLayer> & ilayer)
{
ilayer = std::make_shared<nav2_costmap_2d::InflationLayer>();
ilayer->initialize(&layers, "inflation", &tf, node, nullptr, nullptr /*TODO*/);
ilayer->initialize(&layers, "inflation", &tf, node, nullptr);
std::shared_ptr<nav2_costmap_2d::Layer> ipointer(ilayer);
layers.addPlugin(ipointer);
}
Expand Down
4 changes: 2 additions & 2 deletions nav2_costmap_2d/test/unit/declare_parameter_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ TEST(DeclareParameter, useValidParameter)
tf2_ros::Buffer tf(node->get_clock());
nav2_costmap_2d::LayeredCostmap layers("frame", false, false);

layer.initialize(&layers, "test_layer", &tf, node, nullptr, nullptr);
layer.initialize(&layers, "test_layer", &tf, node, nullptr);

layer.declareParameter("test1", rclcpp::ParameterValue("test_val1"));
try {
Expand All @@ -63,7 +63,7 @@ TEST(DeclareParameter, useInvalidParameter)
tf2_ros::Buffer tf(node->get_clock());
nav2_costmap_2d::LayeredCostmap layers("frame", false, false);

layer.initialize(&layers, "test_layer", &tf, node, nullptr, nullptr);
layer.initialize(&layers, "test_layer", &tf, node, nullptr);

layer.declareParameter("test2", rclcpp::PARAMETER_STRING);
try {
Expand Down
4 changes: 1 addition & 3 deletions nav2_costmap_2d/test/unit/keepout_filter_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,7 @@ void TestNode::createKeepoutFilter(const std::string & global_frame)
rclcpp::Parameter(std::string(FILTER_NAME) + ".filter_info_topic", INFO_TOPIC));

keepout_filter_ = std::make_shared<nav2_costmap_2d::KeepoutFilter>();
keepout_filter_->initialize(
&layers, std::string(FILTER_NAME),
tf_buffer_.get(), node_, nullptr, nullptr);
keepout_filter_->initialize(&layers, std::string(FILTER_NAME), tf_buffer_.get(), node_, nullptr);
keepout_filter_->initializeFilter(INFO_TOPIC);

// Wait until mask will be received by KeepoutFilter
Expand Down
Loading

0 comments on commit 972c433

Please sign in to comment.