Skip to content

Commit

Permalink
Add lazy subscription to republisher (#325)
Browse files Browse the repository at this point in the history
  • Loading branch information
bjsowa authored Oct 1, 2024
1 parent 3b0a1b6 commit 6cec0b6
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 16 deletions.
2 changes: 2 additions & 0 deletions image_transport/include/image_transport/republish.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#define IMAGE_TRANSPORT__REPUBLISH_HPP_

#include <memory>
#include <mutex>

#include "image_transport/image_transport.hpp"
#include "image_transport/visibility_control.hpp"
Expand All @@ -54,6 +55,7 @@ class Republisher : public rclcpp::Node
bool initialized_{false};
image_transport::Subscriber sub;
image_transport::Publisher pub;
std::mutex pub_matched_mutex;
pluginlib::UniquePtr<image_transport::PublisherPlugin> instance;
std::shared_ptr<pluginlib::ClassLoader<image_transport::PublisherPlugin>> loader;
};
Expand Down
55 changes: 39 additions & 16 deletions image_transport/src/republish.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,30 @@ void Republisher::initialize()
if (out_transport.empty()) {
// Use all available transports for output

this->pub = image_transport::create_publisher(
this, out_topic,
rmw_qos_profile_default, pub_options);

// Use Publisher::publish as the subscriber callback
typedef void (image_transport::Publisher::* PublishMemFn)(
const sensor_msgs::msg::Image::ConstSharedPtr &) const;
PublishMemFn pub_mem_fn = &image_transport::Publisher::publish;

this->sub = image_transport::create_subscription(
this, in_topic, std::bind(pub_mem_fn, &pub, std::placeholders::_1),
in_transport, rmw_qos_profile_default, sub_options);
pub_options.event_callbacks.matched_callback =
[this, in_topic, in_transport, pub_mem_fn, sub_options](rclcpp::MatchedInfo &)
{
std::scoped_lock<std::mutex> lock(this->pub_matched_mutex);
if (this->pub.getNumSubscribers() == 0) {
this->sub.shutdown();
} else if (!this->sub) {
this->sub = image_transport::create_subscription(
this, in_topic,
std::bind(pub_mem_fn, &this->pub, std::placeholders::_1),
in_transport,
rmw_qos_profile_default,
sub_options);
}
};

this->pub = image_transport::create_publisher(
this, out_topic,
rmw_qos_profile_default, pub_options);
} else {
// Use one specific transport for output
// Load transport plugin
Expand All @@ -132,17 +144,28 @@ void Republisher::initialize()
"image_transport::PublisherPlugin");
std::string lookup_name = Plugin::getLookupName(out_transport);

instance = loader->createUniqueInstance(lookup_name);
instance->advertise(this, out_topic, rmw_qos_profile_default, pub_options);

// Use PublisherPlugin::publish as the subscriber callback
// Use PublisherPlugin::publishPtr as the subscriber callback
typedef void (Plugin::* PublishMemFn)(const sensor_msgs::msg::Image::ConstSharedPtr &) const;
PublishMemFn pub_mem_fn = &Plugin::publishPtr;
this->sub = image_transport::create_subscription(
this, in_topic,
std::bind(
pub_mem_fn,
instance.get(), std::placeholders::_1), in_transport, rmw_qos_profile_default, sub_options);

this->instance = loader->createUniqueInstance(lookup_name);

pub_options.event_callbacks.matched_callback =
[this, in_topic, in_transport, pub_mem_fn, sub_options](rclcpp::MatchedInfo & matched_info)
{
if (matched_info.current_count == 0) {
this->sub.shutdown();
} else if (!this->sub) {
this->sub = image_transport::create_subscription(
this, in_topic,
std::bind(
pub_mem_fn,
this->instance.get(), std::placeholders::_1), in_transport, rmw_qos_profile_default,
sub_options);
}
};

this->instance->advertise(this, out_topic, rmw_qos_profile_default, pub_options);
}
}

Expand Down

0 comments on commit 6cec0b6

Please sign in to comment.