Skip to content

Commit

Permalink
Add lazy subscription to republisher (backport #325) (#327)
Browse files Browse the repository at this point in the history
Signed-off-by: Alejandro Hernández Cordero <ahcorde@gmail.com>
Co-authored-by: Błażej Sowa <bsowa123@gmail.com>
  • Loading branch information
mergify[bot] and bjsowa authored Oct 15, 2024
1 parent b17b272 commit d04d2c8
Showing 1 changed file with 41 additions and 16 deletions.
57 changes: 41 additions & 16 deletions image_transport/src/republish.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ using namespace std::chrono_literals;
namespace image_transport
{

std::mutex pub_matched_mutex;

Republisher::Republisher(const rclcpp::NodeOptions & options)
: Node("image_republisher", options)
{
Expand Down Expand Up @@ -111,18 +113,30 @@ void Republisher::initialize()
if (out_transport.empty()) {
// Use all available transports for output

this->pub = image_transport::create_publisher(
this, out_topic,
rmw_qos_profile_default, pub_options);

// Use Publisher::publish as the subscriber callback
typedef void (image_transport::Publisher::* PublishMemFn)(
const sensor_msgs::msg::Image::ConstSharedPtr &) const;
PublishMemFn pub_mem_fn = &image_transport::Publisher::publish;

this->sub = image_transport::create_subscription(
this, in_topic, std::bind(pub_mem_fn, &pub, std::placeholders::_1),
in_transport, rmw_qos_profile_default, sub_options);
pub_options.event_callbacks.matched_callback =
[this, in_topic, in_transport, pub_mem_fn, sub_options](rclcpp::MatchedInfo &)
{
std::scoped_lock<std::mutex> lock(pub_matched_mutex);
if (this->pub.getNumSubscribers() == 0) {
this->sub.shutdown();
} else if (!this->sub) {
this->sub = image_transport::create_subscription(
this, in_topic,
std::bind(pub_mem_fn, &this->pub, std::placeholders::_1),
in_transport,
rmw_qos_profile_default,
sub_options);
}
};

this->pub = image_transport::create_publisher(
this, out_topic,
rmw_qos_profile_default, pub_options);
} else {
// Use one specific transport for output
// Load transport plugin
Expand All @@ -132,17 +146,28 @@ void Republisher::initialize()
"image_transport::PublisherPlugin");
std::string lookup_name = Plugin::getLookupName(out_transport);

instance = loader->createUniqueInstance(lookup_name);
instance->advertise(this, out_topic, rmw_qos_profile_default, pub_options);

// Use PublisherPlugin::publish as the subscriber callback
// Use PublisherPlugin::publishPtr as the subscriber callback
typedef void (Plugin::* PublishMemFn)(const sensor_msgs::msg::Image::ConstSharedPtr &) const;
PublishMemFn pub_mem_fn = &Plugin::publishPtr;
this->sub = image_transport::create_subscription(
this, in_topic,
std::bind(
pub_mem_fn,
instance.get(), std::placeholders::_1), in_transport, rmw_qos_profile_default, sub_options);

this->instance = loader->createUniqueInstance(lookup_name);

pub_options.event_callbacks.matched_callback =
[this, in_topic, in_transport, pub_mem_fn, sub_options](rclcpp::MatchedInfo & matched_info)
{
if (matched_info.current_count == 0) {
this->sub.shutdown();
} else if (!this->sub) {
this->sub = image_transport::create_subscription(
this, in_topic,
std::bind(
pub_mem_fn,
this->instance.get(), std::placeholders::_1), in_transport, rmw_qos_profile_default,
sub_options);
}
};

this->instance->advertise(this, out_topic, rmw_qos_profile_default, pub_options);
}
}

Expand Down

0 comments on commit d04d2c8

Please sign in to comment.