diff --git a/image_transport/include/image_transport/image_transport.hpp b/image_transport/include/image_transport/image_transport.hpp index 9de2de29..0645a762 100644 --- a/image_transport/include/image_transport/image_transport.hpp +++ b/image_transport/include/image_transport/image_transport.hpp @@ -139,7 +139,8 @@ class ImageTransport const std::string & base_topic, uint32_t queue_size, const Subscriber::Callback & callback, const VoidPtr & tracked_object = VoidPtr(), - const TransportHints * transport_hints = nullptr); + const TransportHints * transport_hints = nullptr, + const rclcpp::SubscriptionOptions options = rclcpp::SubscriptionOptions()); /** * \brief Subscribe to an image topic, version for bare function. @@ -148,12 +149,13 @@ class ImageTransport Subscriber subscribe( const std::string & base_topic, uint32_t queue_size, void (* fp)(const ImageConstPtr &), - const TransportHints * transport_hints = nullptr) + const TransportHints * transport_hints = nullptr, + const rclcpp::SubscriptionOptions options = rclcpp::SubscriptionOptions()) { return subscribe( base_topic, queue_size, std::function(fp), - VoidPtr(), transport_hints); + VoidPtr(), transport_hints, options); } /** @@ -163,11 +165,12 @@ class ImageTransport Subscriber subscribe( const std::string & base_topic, uint32_t queue_size, void (T::* fp)(const ImageConstPtr &), T * obj, - const TransportHints * transport_hints = nullptr) + const TransportHints * transport_hints = nullptr, + const rclcpp::SubscriptionOptions options = rclcpp::SubscriptionOptions()) { return subscribe( base_topic, queue_size, std::bind(fp, obj, std::placeholders::_1), - VoidPtr(), transport_hints); + VoidPtr(), transport_hints, options); } /** @@ -178,11 +181,12 @@ class ImageTransport const std::string & base_topic, uint32_t queue_size, void (T::* fp)(const ImageConstPtr &), const std::shared_ptr & obj, - const TransportHints * transport_hints = nullptr) + const TransportHints * transport_hints = nullptr, + const rclcpp::SubscriptionOptions options = rclcpp::SubscriptionOptions()) { return subscribe( base_topic, queue_size, std::bind(fp, obj.get(), std::placeholders::_1), - obj, transport_hints); + obj, transport_hints, options); } /*! diff --git a/image_transport/src/image_transport.cpp b/image_transport/src/image_transport.cpp index 85b7b9ba..d1ecfd61 100644 --- a/image_transport/src/image_transport.cpp +++ b/image_transport/src/image_transport.cpp @@ -154,14 +154,16 @@ Subscriber ImageTransport::subscribe( const std::string & base_topic, uint32_t queue_size, const Subscriber::Callback & callback, const VoidPtr & tracked_object, - const TransportHints * transport_hints) + const TransportHints * transport_hints, + const rclcpp::SubscriptionOptions options) { (void) tracked_object; rmw_qos_profile_t custom_qos = rmw_qos_profile_default; custom_qos.depth = queue_size; return create_subscription( impl_->node_.get(), base_topic, callback, - getTransportOrDefault(transport_hints), custom_qos); + getTransportOrDefault(transport_hints), custom_qos, + options); } CameraPublisher ImageTransport::advertiseCamera( diff --git a/image_transport/test/test_subscriber.cpp b/image_transport/test/test_subscriber.cpp index 787ae93b..d56e255b 100644 --- a/image_transport/test/test_subscriber.cpp +++ b/image_transport/test/test_subscriber.cpp @@ -80,6 +80,65 @@ TEST_F(TestSubscriber, camera_sub_shutdown) { EXPECT_EQ(node_->get_node_graph_interface()->count_subscribers("camera/camera_info"), 0u); } +TEST_F(TestSubscriber, callback_groups) { + using namespace std::chrono_literals; + + // Create a publisher node. + auto node_publisher = rclcpp::Node::make_shared("image_publisher", rclcpp::NodeOptions()); + image_transport::ImageTransport it_publisher(node_publisher); + image_transport::Publisher pub = it_publisher.advertise("camera/image", 1); + + auto msg = sensor_msgs::msg::Image(); + auto timer = node_publisher->create_wall_timer(100ms, [&]() {pub.publish(msg);}); + + // Create a subscriber to read the images. + std::atomic flag_1 = false; + std::atomic flag_2 = false; + std::function fcn1 = + [&](const auto & msg) { + (void)msg; + flag_1 = true; + std::this_thread::sleep_for(1s); + }; + std::function fcn2 = + [&](const auto & msg) { + (void)msg; + flag_2 = true; + std::this_thread::sleep_for(1s); + }; + + auto cb_group = node_->create_callback_group(rclcpp::CallbackGroupType::Reentrant); + rclcpp::SubscriptionOptions sub_options; + sub_options.callback_group = cb_group; + + image_transport::ImageTransport it(node_); + + auto subscriber_1 = it.subscribe("camera/image", 1, fcn1, nullptr, nullptr, sub_options); + auto subscriber_2 = it.subscribe("camera/image", 1, fcn2, nullptr, nullptr, sub_options); + + rclcpp::executors::MultiThreadedExecutor executor; + executor.add_node(node_); + executor.add_node(node_publisher); + // Both callbacks should be executed and the flags should be set. + std::thread executor_thread([&]() {executor.spin();}); + + // The callbacks sleep for 5 seconds and mutually exclusive callbacks should be blocked. + // However, because of the the multithreaded executor and renentrant callback group, + // the flags should be set, as the callbacks should be in different threads. + auto timeout_elapsed = 0.0s; + auto sleep_duration = 0.1s; + auto timeout = 0.5s; + + while (!(flag_1 && flag_2)) { + std::this_thread::sleep_for(sleep_duration); + timeout_elapsed += sleep_duration; + } + executor.cancel(); + executor_thread.join(); + + EXPECT_LT(timeout_elapsed, timeout); +} + int main(int argc, char ** argv) { rclcpp::init(argc, argv);