diff --git a/image_transport/src/republish.cpp b/image_transport/src/republish.cpp index f7cfba61..ea8f2b30 100644 --- a/image_transport/src/republish.cpp +++ b/image_transport/src/republish.cpp @@ -43,6 +43,8 @@ using namespace std::chrono_literals; namespace image_transport { +std::mutex pub_matched_mutex; + Republisher::Republisher(const rclcpp::NodeOptions & options) : Node("image_republisher", options) { @@ -111,18 +113,30 @@ void Republisher::initialize() if (out_transport.empty()) { // Use all available transports for output - this->pub = image_transport::create_publisher( - this, out_topic, - rmw_qos_profile_default, pub_options); - // Use Publisher::publish as the subscriber callback typedef void (image_transport::Publisher::* PublishMemFn)( const sensor_msgs::msg::Image::ConstSharedPtr &) const; PublishMemFn pub_mem_fn = &image_transport::Publisher::publish; - this->sub = image_transport::create_subscription( - this, in_topic, std::bind(pub_mem_fn, &pub, std::placeholders::_1), - in_transport, rmw_qos_profile_default, sub_options); + pub_options.event_callbacks.matched_callback = + [this, in_topic, in_transport, pub_mem_fn, sub_options](rclcpp::MatchedInfo &) + { + std::scoped_lock lock(pub_matched_mutex); + if (this->pub.getNumSubscribers() == 0) { + this->sub.shutdown(); + } else if (!this->sub) { + this->sub = image_transport::create_subscription( + this, in_topic, + std::bind(pub_mem_fn, &this->pub, std::placeholders::_1), + in_transport, + rmw_qos_profile_default, + sub_options); + } + }; + + this->pub = image_transport::create_publisher( + this, out_topic, + rmw_qos_profile_default, pub_options); } else { // Use one specific transport for output // Load transport plugin @@ -132,17 +146,28 @@ void Republisher::initialize() "image_transport::PublisherPlugin"); std::string lookup_name = Plugin::getLookupName(out_transport); - instance = loader->createUniqueInstance(lookup_name); - instance->advertise(this, out_topic, rmw_qos_profile_default, pub_options); - - // Use PublisherPlugin::publish as the subscriber callback + // Use PublisherPlugin::publishPtr as the subscriber callback typedef void (Plugin::* PublishMemFn)(const sensor_msgs::msg::Image::ConstSharedPtr &) const; PublishMemFn pub_mem_fn = &Plugin::publishPtr; - this->sub = image_transport::create_subscription( - this, in_topic, - std::bind( - pub_mem_fn, - instance.get(), std::placeholders::_1), in_transport, rmw_qos_profile_default, sub_options); + + this->instance = loader->createUniqueInstance(lookup_name); + + pub_options.event_callbacks.matched_callback = + [this, in_topic, in_transport, pub_mem_fn, sub_options](rclcpp::MatchedInfo & matched_info) + { + if (matched_info.current_count == 0) { + this->sub.shutdown(); + } else if (!this->sub) { + this->sub = image_transport::create_subscription( + this, in_topic, + std::bind( + pub_mem_fn, + this->instance.get(), std::placeholders::_1), in_transport, rmw_qos_profile_default, + sub_options); + } + }; + + this->instance->advertise(this, out_topic, rmw_qos_profile_default, pub_options); } }