Skip to content

Commit

Permalink
Expose option to set callback groups (#274)
Browse files Browse the repository at this point in the history
Signed-off-by: Aditya Pande <adityapande@google.com>
  • Loading branch information
adityapande-1995 authored Jul 17, 2023
1 parent 4965283 commit e087073
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 9 deletions.
18 changes: 11 additions & 7 deletions image_transport/include/image_transport/image_transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ class ImageTransport
const std::string & base_topic, uint32_t queue_size,
const Subscriber::Callback & callback,
const VoidPtr & tracked_object = VoidPtr(),
const TransportHints * transport_hints = nullptr);
const TransportHints * transport_hints = nullptr,
const rclcpp::SubscriptionOptions options = rclcpp::SubscriptionOptions());

/**
* \brief Subscribe to an image topic, version for bare function.
Expand All @@ -148,12 +149,13 @@ class ImageTransport
Subscriber subscribe(
const std::string & base_topic, uint32_t queue_size,
void (* fp)(const ImageConstPtr &),
const TransportHints * transport_hints = nullptr)
const TransportHints * transport_hints = nullptr,
const rclcpp::SubscriptionOptions options = rclcpp::SubscriptionOptions())
{
return subscribe(
base_topic, queue_size,
std::function<void(const ImageConstPtr &)>(fp),
VoidPtr(), transport_hints);
VoidPtr(), transport_hints, options);
}

/**
Expand All @@ -163,11 +165,12 @@ class ImageTransport
Subscriber subscribe(
const std::string & base_topic, uint32_t queue_size,
void (T::* fp)(const ImageConstPtr &), T * obj,
const TransportHints * transport_hints = nullptr)
const TransportHints * transport_hints = nullptr,
const rclcpp::SubscriptionOptions options = rclcpp::SubscriptionOptions())
{
return subscribe(
base_topic, queue_size, std::bind(fp, obj, std::placeholders::_1),
VoidPtr(), transport_hints);
VoidPtr(), transport_hints, options);
}

/**
Expand All @@ -178,11 +181,12 @@ class ImageTransport
const std::string & base_topic, uint32_t queue_size,
void (T::* fp)(const ImageConstPtr &),
const std::shared_ptr<T> & obj,
const TransportHints * transport_hints = nullptr)
const TransportHints * transport_hints = nullptr,
const rclcpp::SubscriptionOptions options = rclcpp::SubscriptionOptions())
{
return subscribe(
base_topic, queue_size, std::bind(fp, obj.get(), std::placeholders::_1),
obj, transport_hints);
obj, transport_hints, options);
}

/*!
Expand Down
6 changes: 4 additions & 2 deletions image_transport/src/image_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,16 @@ Subscriber ImageTransport::subscribe(
const std::string & base_topic, uint32_t queue_size,
const Subscriber::Callback & callback,
const VoidPtr & tracked_object,
const TransportHints * transport_hints)
const TransportHints * transport_hints,
const rclcpp::SubscriptionOptions options)
{
(void) tracked_object;
rmw_qos_profile_t custom_qos = rmw_qos_profile_default;
custom_qos.depth = queue_size;
return create_subscription(
impl_->node_.get(), base_topic, callback,
getTransportOrDefault(transport_hints), custom_qos);
getTransportOrDefault(transport_hints), custom_qos,
options);
}

CameraPublisher ImageTransport::advertiseCamera(
Expand Down
59 changes: 59 additions & 0 deletions image_transport/test/test_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,65 @@ TEST_F(TestSubscriber, camera_sub_shutdown) {
EXPECT_EQ(node_->get_node_graph_interface()->count_subscribers("camera/camera_info"), 0u);
}

TEST_F(TestSubscriber, callback_groups) {
using namespace std::chrono_literals;

// Create a publisher node.
auto node_publisher = rclcpp::Node::make_shared("image_publisher", rclcpp::NodeOptions());
image_transport::ImageTransport it_publisher(node_publisher);
image_transport::Publisher pub = it_publisher.advertise("camera/image", 1);

auto msg = sensor_msgs::msg::Image();
auto timer = node_publisher->create_wall_timer(100ms, [&]() {pub.publish(msg);});

// Create a subscriber to read the images.
std::atomic<bool> flag_1 = false;
std::atomic<bool> flag_2 = false;
std::function<void(const sensor_msgs::msg::Image::ConstSharedPtr & msg)> fcn1 =
[&](const auto & msg) {
(void)msg;
flag_1 = true;
std::this_thread::sleep_for(1s);
};
std::function<void(const sensor_msgs::msg::Image::ConstSharedPtr & msg)> fcn2 =
[&](const auto & msg) {
(void)msg;
flag_2 = true;
std::this_thread::sleep_for(1s);
};

auto cb_group = node_->create_callback_group(rclcpp::CallbackGroupType::Reentrant);
rclcpp::SubscriptionOptions sub_options;
sub_options.callback_group = cb_group;

image_transport::ImageTransport it(node_);

auto subscriber_1 = it.subscribe("camera/image", 1, fcn1, nullptr, nullptr, sub_options);
auto subscriber_2 = it.subscribe("camera/image", 1, fcn2, nullptr, nullptr, sub_options);

rclcpp::executors::MultiThreadedExecutor executor;
executor.add_node(node_);
executor.add_node(node_publisher);
// Both callbacks should be executed and the flags should be set.
std::thread executor_thread([&]() {executor.spin();});

// The callbacks sleep for 5 seconds and mutually exclusive callbacks should be blocked.
// However, because of the the multithreaded executor and renentrant callback group,
// the flags should be set, as the callbacks should be in different threads.
auto timeout_elapsed = 0.0s;
auto sleep_duration = 0.1s;
auto timeout = 0.5s;

while (!(flag_1 && flag_2)) {
std::this_thread::sleep_for(sleep_duration);
timeout_elapsed += sleep_duration;
}
executor.cancel();
executor_thread.join();

EXPECT_LT(timeout_elapsed, timeout);
}

int main(int argc, char ** argv)
{
rclcpp::init(argc, argv);
Expand Down

0 comments on commit e087073

Please sign in to comment.