From 6cec0b692ef1b6a6ef0aaf525bd69239c2cc3378 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82a=C5=BCej=20Sowa?= Date: Tue, 1 Oct 2024 23:04:40 +0200 Subject: [PATCH] Add lazy subscription to republisher (#325) --- .../include/image_transport/republish.hpp | 2 + image_transport/src/republish.cpp | 55 +++++++++++++------ 2 files changed, 41 insertions(+), 16 deletions(-) diff --git a/image_transport/include/image_transport/republish.hpp b/image_transport/include/image_transport/republish.hpp index 11fb7f43..9d74c0ce 100644 --- a/image_transport/include/image_transport/republish.hpp +++ b/image_transport/include/image_transport/republish.hpp @@ -30,6 +30,7 @@ #define IMAGE_TRANSPORT__REPUBLISH_HPP_ #include +#include #include "image_transport/image_transport.hpp" #include "image_transport/visibility_control.hpp" @@ -54,6 +55,7 @@ class Republisher : public rclcpp::Node bool initialized_{false}; image_transport::Subscriber sub; image_transport::Publisher pub; + std::mutex pub_matched_mutex; pluginlib::UniquePtr instance; std::shared_ptr> loader; }; diff --git a/image_transport/src/republish.cpp b/image_transport/src/republish.cpp index f7cfba61..849c5d0b 100644 --- a/image_transport/src/republish.cpp +++ b/image_transport/src/republish.cpp @@ -111,18 +111,30 @@ void Republisher::initialize() if (out_transport.empty()) { // Use all available transports for output - this->pub = image_transport::create_publisher( - this, out_topic, - rmw_qos_profile_default, pub_options); - // Use Publisher::publish as the subscriber callback typedef void (image_transport::Publisher::* PublishMemFn)( const sensor_msgs::msg::Image::ConstSharedPtr &) const; PublishMemFn pub_mem_fn = &image_transport::Publisher::publish; - this->sub = image_transport::create_subscription( - this, in_topic, std::bind(pub_mem_fn, &pub, std::placeholders::_1), - in_transport, rmw_qos_profile_default, sub_options); + pub_options.event_callbacks.matched_callback = + [this, in_topic, in_transport, pub_mem_fn, sub_options](rclcpp::MatchedInfo &) + { + std::scoped_lock lock(this->pub_matched_mutex); + if (this->pub.getNumSubscribers() == 0) { + this->sub.shutdown(); + } else if (!this->sub) { + this->sub = image_transport::create_subscription( + this, in_topic, + std::bind(pub_mem_fn, &this->pub, std::placeholders::_1), + in_transport, + rmw_qos_profile_default, + sub_options); + } + }; + + this->pub = image_transport::create_publisher( + this, out_topic, + rmw_qos_profile_default, pub_options); } else { // Use one specific transport for output // Load transport plugin @@ -132,17 +144,28 @@ void Republisher::initialize() "image_transport::PublisherPlugin"); std::string lookup_name = Plugin::getLookupName(out_transport); - instance = loader->createUniqueInstance(lookup_name); - instance->advertise(this, out_topic, rmw_qos_profile_default, pub_options); - - // Use PublisherPlugin::publish as the subscriber callback + // Use PublisherPlugin::publishPtr as the subscriber callback typedef void (Plugin::* PublishMemFn)(const sensor_msgs::msg::Image::ConstSharedPtr &) const; PublishMemFn pub_mem_fn = &Plugin::publishPtr; - this->sub = image_transport::create_subscription( - this, in_topic, - std::bind( - pub_mem_fn, - instance.get(), std::placeholders::_1), in_transport, rmw_qos_profile_default, sub_options); + + this->instance = loader->createUniqueInstance(lookup_name); + + pub_options.event_callbacks.matched_callback = + [this, in_topic, in_transport, pub_mem_fn, sub_options](rclcpp::MatchedInfo & matched_info) + { + if (matched_info.current_count == 0) { + this->sub.shutdown(); + } else if (!this->sub) { + this->sub = image_transport::create_subscription( + this, in_topic, + std::bind( + pub_mem_fn, + this->instance.get(), std::placeholders::_1), in_transport, rmw_qos_profile_default, + sub_options); + } + }; + + this->instance->advertise(this, out_topic, rmw_qos_profile_default, pub_options); } }