diff --git a/rclcpp/include/rclcpp/client.hpp b/rclcpp/include/rclcpp/client.hpp index 1fc761f491..e169b890bb 100644 --- a/rclcpp/include/rclcpp/client.hpp +++ b/rclcpp/include/rclcpp/client.hpp @@ -19,6 +19,7 @@ #include #include #include +#include #include // NOLINT, cpplint doesn't think this is a cpp std header #include #include @@ -29,20 +30,22 @@ #include "rcl/client.h" #include "rcl/error_handling.h" +#include "rcl/event_callback.h" #include "rcl/wait.h" +#include "rclcpp/detail/cpp_callback_trampoline.hpp" #include "rclcpp/exceptions.hpp" +#include "rclcpp/expand_topic_or_service_name.hpp" #include "rclcpp/function_traits.hpp" +#include "rclcpp/logging.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/node_interfaces/node_graph_interface.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/utilities.hpp" -#include "rclcpp/expand_topic_or_service_name.hpp" #include "rclcpp/visibility_control.hpp" -#include "rcutils/logging_macros.h" - #include "rmw/error_handling.h" +#include "rmw/impl/cpp/demangle.hpp" #include "rmw/rmw.h" namespace rclcpp @@ -215,6 +218,90 @@ class ClientBase bool exchange_in_use_by_wait_set_state(bool in_use_state); + /// Set a callback to be called when each new response is received. + /** + * The callback receives a size_t which is the number of responses received + * since the last time this callback was called. + * Normally this is 1, but can be > 1 if responses were received before any + * callback was set. + * + * Since this callback is called from the middleware, you should aim to make + * it fast and not blocking. + * If you need to do a lot of work or wait for some other event, you should + * spin it off to another thread, otherwise you risk blocking the middleware. + * + * Calling it again will clear any previously set callback. + * + * An exception will be thrown if the callback is not callable. + * + * This function is thread-safe. + * + * If you want more information available in the callback, like the client + * or other information, you may use a lambda with captures or std::bind. + * + * \sa rmw_client_set_on_new_response_callback + * \sa rcl_client_set_on_new_response_callback + * + * \param[in] callback functor to be called when a new response is received + */ + void + set_on_new_response_callback(std::function callback) + { + if (!callback) { + throw std::invalid_argument( + "The callback passed to set_on_new_response_callback " + "is not callable."); + } + + auto new_callback = + [callback, this](size_t number_of_responses) { + try { + callback(number_of_responses); + } catch (const std::exception & exception) { + RCLCPP_ERROR_STREAM( + node_logger_, + "rclcpp::ClientBase@" << this << + " caught " << rmw::impl::cpp::demangle(exception) << + " exception in user-provided callback for the 'on new response' callback: " << + exception.what()); + } catch (...) { + RCLCPP_ERROR_STREAM( + node_logger_, + "rclcpp::ClientBase@" << this << + " caught unhandled exception in user-provided callback " << + "for the 'on new response' callback"); + } + }; + + std::lock_guard lock(callback_mutex_); + + // Set it temporarily to the new callback, while we replace the old one. + // This two-step setting, prevents a gap where the old std::function has + // been replaced but the middleware hasn't been told about the new one yet. + set_on_new_response_callback( + rclcpp::detail::cpp_callback_trampoline, + static_cast(&new_callback)); + + // Store the std::function to keep it in scope, also overwrites the existing one. + on_new_response_callback_ = new_callback; + + // Set it again, now using the permanent storage. + set_on_new_response_callback( + rclcpp::detail::cpp_callback_trampoline, + static_cast(&on_new_response_callback_)); + } + + /// Unset the callback registered for new responses, if any. + void + clear_on_new_response_callback() + { + std::lock_guard lock(callback_mutex_); + if (on_new_response_callback_) { + set_on_new_response_callback(nullptr, nullptr); + on_new_response_callback_ = nullptr; + } + } + protected: RCLCPP_DISABLE_COPY(ClientBase) @@ -230,13 +317,21 @@ class ClientBase const rcl_node_t * get_rcl_node_handle() const; + RCLCPP_PUBLIC + void + set_on_new_response_callback(rcl_event_callback_t callback, const void * user_data); + rclcpp::node_interfaces::NodeGraphInterface::WeakPtr node_graph_; std::shared_ptr node_handle_; std::shared_ptr context_; + rclcpp::Logger node_logger_; std::shared_ptr client_handle_; std::atomic in_use_by_wait_set_{false}; + + std::recursive_mutex callback_mutex_; + std::function on_new_response_callback_{nullptr}; }; template diff --git a/rclcpp/include/rclcpp/detail/cpp_callback_trampoline.hpp b/rclcpp/include/rclcpp/detail/cpp_callback_trampoline.hpp new file mode 100644 index 0000000000..be857e1b58 --- /dev/null +++ b/rclcpp/include/rclcpp/detail/cpp_callback_trampoline.hpp @@ -0,0 +1,67 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__DETAIL__CPP_CALLBACK_TRAMPOLINE_HPP_ +#define RCLCPP__DETAIL__CPP_CALLBACK_TRAMPOLINE_HPP_ + +#include + +namespace rclcpp +{ + +namespace detail +{ + +/// Trampoline pattern for wrapping std::function into C-style callbacks. +/** + * A common pattern in C is for a function to take a function pointer and a + * void pointer for "user data" which is passed to the function pointer when it + * is called from within C. + * + * It works by using the user data pointer to store a pointer to a + * std::function instance. + * So when called from C, this function will cast the user data to the right + * std::function type and call it. + * + * This should allow you to use free functions, lambdas with and without + * captures, and various kinds of std::bind instances. + * + * The interior of this function is likely to be executed within a C runtime, + * so no exceptions should be thrown at this point, and doing so results in + * undefined behavior. + * + * \tparam UserDataT Deduced type based on what is passed for user data, + * usually this type is either `void *` or `const void *`. + * \tparam Args the arguments being passed to the callback + * \tparam ReturnT the return type of this function and the callback, default void + * \param user_data the function pointer, possibly type erased + * \returns whatever the callback returns, if anything + */ +template< + typename UserDataT, + typename ... Args, + typename ReturnT = void +> +ReturnT +cpp_callback_trampoline(UserDataT user_data, Args ... args) noexcept +{ + auto & actual_callback = *reinterpret_cast *>(user_data); + return actual_callback(args ...); +} + +} // namespace detail + +} // namespace rclcpp + +#endif // RCLCPP__DETAIL__CPP_CALLBACK_TRAMPOLINE_HPP_ diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp index 531e4c70e0..331c1b6da5 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp @@ -15,13 +15,16 @@ #ifndef RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_ #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BASE_HPP_ +#include #include #include #include #include "rcl/wait.h" +#include "rmw/impl/cpp/demangle.hpp" #include "rclcpp/guard_condition.hpp" +#include "rclcpp/logging.hpp" #include "rclcpp/qos.hpp" #include "rclcpp/waitable.hpp" @@ -35,6 +38,11 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable public: RCLCPP_SMART_PTR_ALIASES_ONLY(SubscriptionIntraProcessBase) + enum class EntityType + { + Subscription, + }; + RCLCPP_PUBLIC SubscriptionIntraProcessBase( rclcpp::Context::SharedPtr context, @@ -43,7 +51,8 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable : gc_(context), topic_name_(topic_name), qos_profile_(qos_profile) {} - virtual ~SubscriptionIntraProcessBase() = default; + RCLCPP_PUBLIC + virtual ~SubscriptionIntraProcessBase(); RCLCPP_PUBLIC size_t @@ -53,7 +62,17 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable void add_to_wait_set(rcl_wait_set_t * wait_set) override; - virtual bool + bool + is_ready(rcl_wait_set_t * wait_set) override = 0; + + std::shared_ptr + take_data() override = 0; + + void + execute(std::shared_ptr & data) override = 0; + + virtual + bool use_take_shared_method() const = 0; RCLCPP_PUBLIC @@ -64,13 +83,107 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable QoS get_actual_qos() const; + /// Set a callback to be called when each new message arrives. + /** + * The callback receives a size_t which is the number of messages received + * since the last time this callback was called. + * Normally this is 1, but can be > 1 if messages were received before any + * callback was set. + * + * The callback also receives an int identifier argument. + * This is needed because a Waitable may be composed of several distinct entities, + * such as subscriptions, services, etc. + * The application should provide a generic callback function that will be then + * forwarded by the waitable to all of its entities. + * Before forwarding, a different value for the identifier argument will be + * bound to the function. + * This implies that the provided callback can use the identifier to behave + * differently depending on which entity triggered the waitable to become ready. + * + * Calling it again will clear any previously set callback. + * + * An exception will be thrown if the callback is not callable. + * + * This function is thread-safe. + * + * If you want more information available in the callback, like the subscription + * or other information, you may use a lambda with captures or std::bind. + * + * \param[in] callback functor to be called when a new message is received. + */ + void + set_on_ready_callback(std::function callback) override + { + if (!callback) { + throw std::invalid_argument( + "The callback passed to set_on_ready_callback " + "is not callable."); + } + + // Note: we bind the int identifier argument to this waitable's entity types + auto new_callback = + [callback, this](size_t number_of_events) { + try { + callback(number_of_events, static_cast(EntityType::Subscription)); + } catch (const std::exception & exception) { + RCLCPP_ERROR_STREAM( + // TODO(wjwwood): get this class access to the node logger it is associated with + rclcpp::get_logger("rclcpp"), + "rclcpp::SubscriptionIntraProcessBase@" << this << + " caught " << rmw::impl::cpp::demangle(exception) << + " exception in user-provided callback for the 'on ready' callback: " << + exception.what()); + } catch (...) { + RCLCPP_ERROR_STREAM( + rclcpp::get_logger("rclcpp"), + "rclcpp::SubscriptionIntraProcessBase@" << this << + " caught unhandled exception in user-provided callback " << + "for the 'on ready' callback"); + } + }; + + std::lock_guard lock(callback_mutex_); + on_new_message_callback_ = new_callback; + + if (unread_count_ > 0) { + if (qos_profile_.history() == HistoryPolicy::KeepAll) { + on_new_message_callback_(unread_count_); + } else { + // Use qos profile depth as upper bound for unread_count_ + on_new_message_callback_(std::min(unread_count_, qos_profile_.depth())); + } + unread_count_ = 0; + } + } + + /// Unset the callback registered for new messages, if any. + void + clear_on_ready_callback() override + { + std::lock_guard lock(callback_mutex_); + on_new_message_callback_ = nullptr; + } + protected: - std::recursive_mutex reentrant_mutex_; + std::recursive_mutex callback_mutex_; + std::function on_new_message_callback_ {nullptr}; + size_t unread_count_{0}; rclcpp::GuardCondition gc_; virtual void trigger_guard_condition() = 0; + void + invoke_on_new_message() + { + std::lock_guard lock(this->callback_mutex_); + if (this->on_new_message_callback_) { + this->on_new_message_callback_(1); + } else { + this->unread_count_++; + } + } + private: std::string topic_name_; QoS qos_profile_; diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp index ecb02f501d..3c71512677 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp @@ -125,6 +125,7 @@ class SubscriptionIntraProcessBuffer : public SubscriptionROSMsgIntraProcessBuff buffer_->add_shared(convert_ros_message_to_subscribed_type_unique_ptr(*message)); trigger_guard_condition(); } + this->invoke_on_new_message(); } void @@ -137,6 +138,7 @@ class SubscriptionIntraProcessBuffer : public SubscriptionROSMsgIntraProcessBuff buffer_->add_unique(convert_ros_message_to_subscribed_type_unique_ptr(*message)); trigger_guard_condition(); } + this->invoke_on_new_message(); } void @@ -144,6 +146,7 @@ class SubscriptionIntraProcessBuffer : public SubscriptionROSMsgIntraProcessBuff { buffer_->add_shared(std::move(message)); trigger_guard_condition(); + this->invoke_on_new_message(); } void @@ -151,6 +154,7 @@ class SubscriptionIntraProcessBuffer : public SubscriptionROSMsgIntraProcessBuff { buffer_->add_unique(std::move(message)); trigger_guard_condition(); + this->invoke_on_new_message(); } bool diff --git a/rclcpp/include/rclcpp/guard_condition.hpp b/rclcpp/include/rclcpp/guard_condition.hpp index fce42f1d4b..f6f5af9586 100644 --- a/rclcpp/include/rclcpp/guard_condition.hpp +++ b/rclcpp/include/rclcpp/guard_condition.hpp @@ -98,10 +98,27 @@ class GuardCondition bool exchange_in_use_by_wait_set_state(bool in_use_state); + /// Adds the guard condition to a waitset + /** + * This function is thread-safe. + * \param[in] wait_set pointer to a wait set where to add the guard condition + */ + RCLCPP_PUBLIC + void + add_to_wait_set(rcl_wait_set_t * wait_set); + + RCLCPP_PUBLIC + void + set_on_trigger_callback(std::function callback); + protected: rclcpp::Context::SharedPtr context_; rcl_guard_condition_t rcl_guard_condition_; std::atomic in_use_by_wait_set_{false}; + std::recursive_mutex reentrant_mutex_; + std::function on_trigger_callback_{nullptr}; + size_t unread_count_{0}; + rcl_wait_set_t * wait_set_{nullptr}; }; } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/publisher_base.hpp b/rclcpp/include/rclcpp/publisher_base.hpp index 01bdbb585f..cdbd4bb758 100644 --- a/rclcpp/include/rclcpp/publisher_base.hpp +++ b/rclcpp/include/rclcpp/publisher_base.hpp @@ -24,6 +24,8 @@ #include #include #include +#include +#include #include #include "rcl/publisher.h" @@ -112,9 +114,10 @@ class PublisherBase : public std::enable_shared_from_this get_publisher_handle() const; /// Get all the QoS event handlers associated with this publisher. - /** \return The vector of QoS event handlers. */ + /** \return The map of QoS event handlers. */ RCLCPP_PUBLIC - const std::vector> & + const + std::unordered_map> & get_event_handlers() const; /// Get subscription count @@ -245,6 +248,71 @@ class PublisherBase : public std::enable_shared_from_this } } + /// Set a callback to be called when each new qos event instance occurs. + /** + * The callback receives a size_t which is the number of events that occurred + * since the last time this callback was called. + * Normally this is 1, but can be > 1 if events occurred before any + * callback was set. + * + * Since this callback is called from the middleware, you should aim to make + * it fast and not blocking. + * If you need to do a lot of work or wait for some other event, you should + * spin it off to another thread, otherwise you risk blocking the middleware. + * + * Calling it again will clear any previously set callback. + * + * An exception will be thrown if the callback is not callable. + * + * This function is thread-safe. + * + * If you want more information available in the callback, like the qos event + * or other information, you may use a lambda with captures or std::bind. + * + * \sa rclcpp::QOSEventHandlerBase::set_on_ready_callback + * + * \param[in] callback functor to be called when a new event occurs + * \param[in] event_type identifier for the qos event we want to attach the callback to + */ + void + set_on_new_qos_event_callback( + std::function callback, + rcl_publisher_event_type_t event_type) + { + if (event_handlers_.count(event_type) == 0) { + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Calling set_on_new_qos_event_callback for non registered publisher event_type"); + return; + } + + if (!callback) { + throw std::invalid_argument( + "The callback passed to set_on_new_qos_event_callback " + "is not callable."); + } + + // The on_ready_callback signature has an extra `int` argument used to disambiguate between + // possible different entities within a generic waitable. + // We hide that detail to users of this method. + std::function new_callback = std::bind(callback, std::placeholders::_1); + event_handlers_[event_type]->set_on_ready_callback(new_callback); + } + + /// Unset the callback registered for new qos events, if any. + void + clear_on_new_qos_event_callback(rcl_publisher_event_type_t event_type) + { + if (event_handlers_.count(event_type) == 0) { + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Calling clear_on_new_qos_event_callback for non registered event_type"); + return; + } + + event_handlers_[event_type]->clear_on_ready_callback(); + } + protected: template void @@ -258,7 +326,7 @@ class PublisherBase : public std::enable_shared_from_this rcl_publisher_event_init, publisher_handle_, event_type); - event_handlers_.emplace_back(handler); + event_handlers_.insert(std::make_pair(event_type, handler)); } RCLCPP_PUBLIC @@ -268,7 +336,8 @@ class PublisherBase : public std::enable_shared_from_this std::shared_ptr publisher_handle_; - std::vector> event_handlers_; + std::unordered_map> event_handlers_; using IntraProcessManagerWeakPtr = std::weak_ptr; diff --git a/rclcpp/include/rclcpp/qos_event.hpp b/rclcpp/include/rclcpp/qos_event.hpp index 04231e1395..59c99dda42 100644 --- a/rclcpp/include/rclcpp/qos_event.hpp +++ b/rclcpp/include/rclcpp/qos_event.hpp @@ -17,16 +17,21 @@ #include #include +#include #include #include #include "rcl/error_handling.h" +#include "rcl/event_callback.h" +#include "rmw/impl/cpp/demangle.hpp" #include "rmw/incompatible_qos_events_statuses.h" #include "rcutils/logging_macros.h" +#include "rclcpp/detail/cpp_callback_trampoline.hpp" #include "rclcpp/exceptions.hpp" #include "rclcpp/function_traits.hpp" +#include "rclcpp/logging.hpp" #include "rclcpp/waitable.hpp" namespace rclcpp @@ -84,6 +89,11 @@ class UnsupportedEventTypeException : public exceptions::RCLErrorBase, public st class QOSEventHandlerBase : public Waitable { public: + enum class EntityType + { + Event, + }; + RCLCPP_PUBLIC virtual ~QOSEventHandlerBase(); @@ -102,9 +112,111 @@ class QOSEventHandlerBase : public Waitable bool is_ready(rcl_wait_set_t * wait_set) override; + /// Set a callback to be called when each new event instance occurs. + /** + * The callback receives a size_t which is the number of events that occurred + * since the last time this callback was called. + * Normally this is 1, but can be > 1 if events occurred before any + * callback was set. + * + * The callback also receives an int identifier argument. + * This is needed because a Waitable may be composed of several distinct entities, + * such as subscriptions, services, etc. + * The application should provide a generic callback function that will be then + * forwarded by the waitable to all of its entities. + * Before forwarding, a different value for the identifier argument will be + * bond to the function. + * This implies that the provided callback can use the identifier to behave + * differently depending on which entity triggered the waitable to become ready. + * + * Since this callback is called from the middleware, you should aim to make + * it fast and not blocking. + * If you need to do a lot of work or wait for some other event, you should + * spin it off to another thread, otherwise you risk blocking the middleware. + * + * Calling it again will clear any previously set callback. + * + * An exception will be thrown if the callback is not callable. + * + * This function is thread-safe. + * + * If you want more information available in the callback, like the qos event + * or other information, you may use a lambda with captures or std::bind. + * + * \sa rmw_event_set_callback + * \sa rcl_event_set_callback + * + * \param[in] callback functor to be called when a new event occurs + */ + void + set_on_ready_callback(std::function callback) override + { + if (!callback) { + throw std::invalid_argument( + "The callback passed to set_on_ready_callback " + "is not callable."); + } + + // Note: we bind the int identifier argument to this waitable's entity types + auto new_callback = + [callback, this](size_t number_of_events) { + try { + callback(number_of_events, static_cast(EntityType::Event)); + } catch (const std::exception & exception) { + RCLCPP_ERROR_STREAM( + // TODO(wjwwood): get this class access to the node logger it is associated with + rclcpp::get_logger("rclcpp"), + "rclcpp::QOSEventHandlerBase@" << this << + " caught " << rmw::impl::cpp::demangle(exception) << + " exception in user-provided callback for the 'on ready' callback: " << + exception.what()); + } catch (...) { + RCLCPP_ERROR_STREAM( + rclcpp::get_logger("rclcpp"), + "rclcpp::QOSEventHandlerBase@" << this << + " caught unhandled exception in user-provided callback " << + "for the 'on ready' callback"); + } + }; + + std::lock_guard lock(callback_mutex_); + + // Set it temporarily to the new callback, while we replace the old one. + // This two-step setting, prevents a gap where the old std::function has + // been replaced but the middleware hasn't been told about the new one yet. + set_on_new_event_callback( + rclcpp::detail::cpp_callback_trampoline, + static_cast(&new_callback)); + + // Store the std::function to keep it in scope, also overwrites the existing one. + on_new_event_callback_ = new_callback; + + // Set it again, now using the permanent storage. + set_on_new_event_callback( + rclcpp::detail::cpp_callback_trampoline, + static_cast(&on_new_event_callback_)); + } + + /// Unset the callback registered for new events, if any. + void + clear_on_ready_callback() override + { + std::lock_guard lock(callback_mutex_); + if (on_new_event_callback_) { + set_on_new_event_callback(nullptr, nullptr); + on_new_event_callback_ = nullptr; + } + } + protected: + RCLCPP_PUBLIC + void + set_on_new_event_callback(rcl_event_callback_t callback, const void * user_data); + rcl_event_t event_handle_; size_t wait_set_event_index_; + std::recursive_mutex callback_mutex_; + std::function on_new_event_callback_{nullptr}; }; template @@ -117,9 +229,8 @@ class QOSEventHandler : public QOSEventHandlerBase InitFuncT init_func, ParentHandleT parent_handle, EventTypeEnum event_type) - : event_callback_(callback) + : parent_handle_(parent_handle), event_callback_(callback) { - parent_handle_ = parent_handle; event_handle_ = rcl_get_zero_initialized_event(); rcl_ret_t ret = init_func(&event_handle_, parent_handle.get(), event_type); if (ret != RCL_RET_OK) { diff --git a/rclcpp/include/rclcpp/service.hpp b/rclcpp/include/rclcpp/service.hpp index 78fc5e048f..47704a4c09 100644 --- a/rclcpp/include/rclcpp/service.hpp +++ b/rclcpp/include/rclcpp/service.hpp @@ -19,22 +19,28 @@ #include #include #include +#include #include #include #include "rcl/error_handling.h" +#include "rcl/event_callback.h" #include "rcl/service.h" +#include "rmw/error_handling.h" +#include "rmw/impl/cpp/demangle.hpp" +#include "rmw/rmw.h" + +#include "tracetools/tracetools.h" + #include "rclcpp/any_service_callback.hpp" +#include "rclcpp/detail/cpp_callback_trampoline.hpp" #include "rclcpp/exceptions.hpp" +#include "rclcpp/expand_topic_or_service_name.hpp" +#include "rclcpp/logging.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/type_support_decl.hpp" -#include "rclcpp/expand_topic_or_service_name.hpp" #include "rclcpp/visibility_control.hpp" -#include "rclcpp/logging.hpp" -#include "rmw/error_handling.h" -#include "rmw/rmw.h" -#include "tracetools/tracetools.h" namespace rclcpp { @@ -121,6 +127,91 @@ class ServiceBase bool exchange_in_use_by_wait_set_state(bool in_use_state); + /// Set a callback to be called when each new request is received. + /** + * The callback receives a size_t which is the number of requests received + * since the last time this callback was called. + * Normally this is 1, but can be > 1 if requests were received before any + * callback was set. + * + * Since this callback is called from the middleware, you should aim to make + * it fast and not blocking. + * If you need to do a lot of work or wait for some other event, you should + * spin it off to another thread, otherwise you risk blocking the middleware. + * + * Calling it again will clear any previously set callback. + * + * + * An exception will be thrown if the callback is not callable. + * + * This function is thread-safe. + * + * If you want more information available in the callback, like the service + * or other information, you may use a lambda with captures or std::bind. + * + * \sa rmw_service_set_on_new_request_callback + * \sa rcl_service_set_on_new_request_callback + * + * \param[in] callback functor to be called when a new request is received + */ + void + set_on_new_request_callback(std::function callback) + { + if (!callback) { + throw std::invalid_argument( + "The callback passed to set_on_new_request_callback " + "is not callable."); + } + + auto new_callback = + [callback, this](size_t number_of_requests) { + try { + callback(number_of_requests); + } catch (const std::exception & exception) { + RCLCPP_ERROR_STREAM( + node_logger_, + "rclcpp::ServiceBase@" << this << + " caught " << rmw::impl::cpp::demangle(exception) << + " exception in user-provided callback for the 'on new request' callback: " << + exception.what()); + } catch (...) { + RCLCPP_ERROR_STREAM( + node_logger_, + "rclcpp::ServiceBase@" << this << + " caught unhandled exception in user-provided callback " << + "for the 'on new request' callback"); + } + }; + + std::lock_guard lock(callback_mutex_); + + // Set it temporarily to the new callback, while we replace the old one. + // This two-step setting, prevents a gap where the old std::function has + // been replaced but the middleware hasn't been told about the new one yet. + set_on_new_request_callback( + rclcpp::detail::cpp_callback_trampoline, + static_cast(&new_callback)); + + // Store the std::function to keep it in scope, also overwrites the existing one. + on_new_request_callback_ = new_callback; + + // Set it again, now using the permanent storage. + set_on_new_request_callback( + rclcpp::detail::cpp_callback_trampoline, + static_cast(&on_new_request_callback_)); + } + + /// Unset the callback registered for new requests, if any. + void + clear_on_new_request_callback() + { + std::lock_guard lock(callback_mutex_); + if (on_new_request_callback_) { + set_on_new_request_callback(nullptr, nullptr); + on_new_request_callback_ = nullptr; + } + } + protected: RCLCPP_DISABLE_COPY(ServiceBase) @@ -132,12 +223,21 @@ class ServiceBase const rcl_node_t * get_rcl_node_handle() const; + RCLCPP_PUBLIC + void + set_on_new_request_callback(rcl_event_callback_t callback, const void * user_data); + std::shared_ptr node_handle_; std::shared_ptr service_handle_; bool owns_rcl_handle_ = true; + rclcpp::Logger node_logger_; + std::atomic in_use_by_wait_set_{false}; + + std::recursive_mutex callback_mutex_; + std::function on_new_request_callback_{nullptr}; }; template diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 2d52388e3b..69b6031405 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -197,6 +197,14 @@ class Subscription : public SubscriptionBase "intraprocess communication allowed only with volatile durability"); } + using SubscriptionIntraProcessT = rclcpp::experimental::SubscriptionIntraProcess< + MessageT, + SubscribedType, + SubscribedTypeAllocator, + SubscribedTypeDeleter, + ROSMessageT, + AllocatorT>; + // First create a SubscriptionIntraProcess which will be given to the intra-process manager. auto context = node_base->get_context(); subscription_intra_process_ = std::make_shared( @@ -404,15 +412,6 @@ class Subscription : public SubscriptionBase /// Component which computes and publishes topic statistics for this subscriber SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics_{nullptr}; - - using SubscriptionIntraProcessT = rclcpp::experimental::SubscriptionIntraProcess< - MessageT, - SubscribedType, - SubscribedTypeAllocator, - SubscribedTypeDeleter, - ROSMessageT, - AllocatorT>; - std::shared_ptr subscription_intra_process_; }; } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/subscription_base.hpp b/rclcpp/include/rclcpp/subscription_base.hpp index 72ddc23be2..26650cd121 100644 --- a/rclcpp/include/rclcpp/subscription_base.hpp +++ b/rclcpp/include/rclcpp/subscription_base.hpp @@ -17,16 +17,20 @@ #include #include +#include #include #include #include #include +#include "rcl/event_callback.h" #include "rcl/subscription.h" +#include "rmw/impl/cpp/demangle.hpp" #include "rmw/rmw.h" #include "rclcpp/any_subscription_callback.hpp" +#include "rclcpp/detail/cpp_callback_trampoline.hpp" #include "rclcpp/experimental/intra_process_manager.hpp" #include "rclcpp/experimental/subscription_intra_process_base.hpp" #include "rclcpp/macros.hpp" @@ -99,9 +103,10 @@ class SubscriptionBase : public std::enable_shared_from_this get_subscription_handle() const; /// Get all the QoS event handlers associated with this subscription. - /** \return The vector of QoS event handlers. */ + /** \return The map of QoS event handlers. */ RCLCPP_PUBLIC - const std::vector> & + const + std::unordered_map> & get_event_handlers() const; /// Get the actual QoS settings, after the defaults have been determined. @@ -283,6 +288,209 @@ class SubscriptionBase : public std::enable_shared_from_this std::vector get_network_flow_endpoints() const; + /// Set a callback to be called when each new message is received. + /** + * The callback receives a size_t which is the number of messages received + * since the last time this callback was called. + * Normally this is 1, but can be > 1 if messages were received before any + * callback was set. + * + * Since this callback is called from the middleware, you should aim to make + * it fast and not blocking. + * If you need to do a lot of work or wait for some other event, you should + * spin it off to another thread, otherwise you risk blocking the middleware. + * + * Calling it again will clear any previously set callback. + * + * This function is thread-safe. + * + * If you want more information available in the callback, like the subscription + * or other information, you may use a lambda with captures or std::bind. + * + * \sa rmw_subscription_set_on_new_message_callback + * \sa rcl_subscription_set_on_new_message_callback + * + * \param[in] callback functor to be called when a new message is received + */ + void + set_on_new_message_callback(std::function callback) + { + if (!callback) { + throw std::invalid_argument( + "The callback passed to set_on_new_message_callback " + "is not callable."); + } + + auto new_callback = + [callback, this](size_t number_of_messages) { + try { + callback(number_of_messages); + } catch (const std::exception & exception) { + RCLCPP_ERROR_STREAM( + node_logger_, + "rclcpp::SubscriptionBase@" << this << + " caught " << rmw::impl::cpp::demangle(exception) << + " exception in user-provided callback for the 'on new message' callback: " << + exception.what()); + } catch (...) { + RCLCPP_ERROR_STREAM( + node_logger_, + "rclcpp::SubscriptionBase@" << this << + " caught unhandled exception in user-provided callback " << + "for the 'on new message' callback"); + } + }; + + std::lock_guard lock(callback_mutex_); + + // Set it temporarily to the new callback, while we replace the old one. + // This two-step setting, prevents a gap where the old std::function has + // been replaced but the middleware hasn't been told about the new one yet. + set_on_new_message_callback( + rclcpp::detail::cpp_callback_trampoline, + static_cast(&new_callback)); + + // Store the std::function to keep it in scope, also overwrites the existing one. + on_new_message_callback_ = new_callback; + + // Set it again, now using the permanent storage. + set_on_new_message_callback( + rclcpp::detail::cpp_callback_trampoline, + static_cast(&on_new_message_callback_)); + } + + /// Unset the callback registered for new messages, if any. + void + clear_on_new_message_callback() + { + std::lock_guard lock(callback_mutex_); + + if (on_new_message_callback_) { + set_on_new_message_callback(nullptr, nullptr); + on_new_message_callback_ = nullptr; + } + } + + /// Set a callback to be called when each new intra-process message is received. + /** + * The callback receives a size_t which is the number of messages received + * since the last time this callback was called. + * Normally this is 1, but can be > 1 if messages were received before any + * callback was set. + * + * Calling it again will clear any previously set callback. + * + * This function is thread-safe. + * + * If you want more information available in the callback, like the subscription + * or other information, you may use a lambda with captures or std::bind. + * + * \sa rclcpp::SubscriptionIntraProcessBase::set_on_ready_callback + * + * \param[in] callback functor to be called when a new message is received + */ + void + set_on_new_intra_process_message_callback(std::function callback) + { + if (!use_intra_process_) { + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Calling set_on_new_intra_process_message_callback for subscription with IPC disabled"); + return; + } + + if (!callback) { + throw std::invalid_argument( + "The callback passed to set_on_new_intra_process_message_callback " + "is not callable."); + } + + // The on_ready_callback signature has an extra `int` argument used to disambiguate between + // possible different entities within a generic waitable. + // We hide that detail to users of this method. + std::function new_callback = std::bind(callback, std::placeholders::_1); + subscription_intra_process_->set_on_ready_callback(new_callback); + } + + /// Unset the callback registered for new intra-process messages, if any. + void + clear_on_new_intra_process_message_callback() + { + if (!use_intra_process_) { + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Calling clear_on_new_intra_process_message_callback for subscription with IPC disabled"); + return; + } + + subscription_intra_process_->clear_on_ready_callback(); + } + + /// Set a callback to be called when each new qos event instance occurs. + /** + * The callback receives a size_t which is the number of events that occurred + * since the last time this callback was called. + * Normally this is 1, but can be > 1 if events occurred before any + * callback was set. + * + * Since this callback is called from the middleware, you should aim to make + * it fast and not blocking. + * If you need to do a lot of work or wait for some other event, you should + * spin it off to another thread, otherwise you risk blocking the middleware. + * + * Calling it again will clear any previously set callback. + * + * An exception will be thrown if the callback is not callable. + * + * This function is thread-safe. + * + * If you want more information available in the callback, like the qos event + * or other information, you may use a lambda with captures or std::bind. + * + * \sa rclcpp::QOSEventHandlerBase::set_on_ready_callback + * + * \param[in] callback functor to be called when a new event occurs + * \param[in] event_type identifier for the qos event we want to attach the callback to + */ + void + set_on_new_qos_event_callback( + std::function callback, + rcl_subscription_event_type_t event_type) + { + if (event_handlers_.count(event_type) == 0) { + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Calling set_on_new_qos_event_callback for non registered subscription event_type"); + return; + } + + if (!callback) { + throw std::invalid_argument( + "The callback passed to set_on_new_qos_event_callback " + "is not callable."); + } + + // The on_ready_callback signature has an extra `int` argument used to disambiguate between + // possible different entities within a generic waitable. + // We hide that detail to users of this method. + std::function new_callback = std::bind(callback, std::placeholders::_1); + event_handlers_[event_type]->set_on_ready_callback(new_callback); + } + + /// Unset the callback registered for new qos events, if any. + void + clear_on_new_qos_event_callback(rcl_subscription_event_type_t event_type) + { + if (event_handlers_.count(event_type) == 0) { + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "Calling clear_on_new_qos_event_callback for non registered event_type"); + return; + } + + event_handlers_[event_type]->clear_on_ready_callback(); + } + protected: template void @@ -297,7 +505,7 @@ class SubscriptionBase : public std::enable_shared_from_this get_subscription_handle(), event_type); qos_events_in_use_by_wait_set_.insert(std::make_pair(handler.get(), false)); - event_handlers_.emplace_back(handler); + event_handlers_.insert(std::make_pair(event_type, handler)); } RCLCPP_PUBLIC @@ -307,17 +515,24 @@ class SubscriptionBase : public std::enable_shared_from_this bool matches_any_intra_process_publishers(const rmw_gid_t * sender_gid) const; + RCLCPP_PUBLIC + void + set_on_new_message_callback(rcl_event_callback_t callback, const void * user_data); + rclcpp::node_interfaces::NodeBaseInterface * const node_base_; std::shared_ptr node_handle_; std::shared_ptr subscription_handle_; std::shared_ptr intra_process_subscription_handle_; + rclcpp::Logger node_logger_; - std::vector> event_handlers_; + std::unordered_map> event_handlers_; bool use_intra_process_; IntraProcessManagerWeakPtr weak_ipm_; uint64_t intra_process_subscription_id_; + std::shared_ptr subscription_intra_process_; private: RCLCPP_DISABLE_COPY(SubscriptionBase) @@ -329,6 +544,9 @@ class SubscriptionBase : public std::enable_shared_from_this std::atomic intra_process_subscription_waitable_in_use_by_wait_set_{false}; std::unordered_map> qos_events_in_use_by_wait_set_; + + std::recursive_mutex callback_mutex_; + std::function on_new_message_callback_{nullptr}; }; } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/wait_set_template.hpp b/rclcpp/include/rclcpp/wait_set_template.hpp index 3adee41164..3654801c91 100644 --- a/rclcpp/include/rclcpp/wait_set_template.hpp +++ b/rclcpp/include/rclcpp/wait_set_template.hpp @@ -155,7 +155,8 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli this->storage_add_subscription(std::move(local_subscription)); } if (mask.include_events) { - for (auto event : inner_subscription->get_event_handlers()) { + for (auto key_event_pair : inner_subscription->get_event_handlers()) { + auto event = key_event_pair.second; auto local_subscription = inner_subscription; bool already_in_use = local_subscription->exchange_in_use_by_wait_set_state(event.get(), true); @@ -225,7 +226,8 @@ class WaitSetTemplate final : private SynchronizationPolicy, private StoragePoli this->storage_remove_subscription(std::move(local_subscription)); } if (mask.include_events) { - for (auto event : inner_subscription->get_event_handlers()) { + for (auto key_event_pair : inner_subscription->get_event_handlers()) { + auto event = key_event_pair.second; auto local_subscription = inner_subscription; local_subscription->exchange_in_use_by_wait_set_state(event.get(), false); this->storage_remove_waitable(std::move(event)); diff --git a/rclcpp/include/rclcpp/waitable.hpp b/rclcpp/include/rclcpp/waitable.hpp index c78b0a885f..db06f70678 100644 --- a/rclcpp/include/rclcpp/waitable.hpp +++ b/rclcpp/include/rclcpp/waitable.hpp @@ -16,6 +16,7 @@ #define RCLCPP__WAITABLE_HPP_ #include +#include #include #include "rclcpp/macros.hpp" @@ -200,6 +201,45 @@ class Waitable bool exchange_in_use_by_wait_set_state(bool in_use_state); + /// Set a callback to be called whenever the waitable becomes ready. + /** + * The callback receives a size_t which is the number of times the waitable was ready + * since the last time this callback was called. + * Normally this is 1, but can be > 1 if waitable was triggered before any + * callback was set. + * + * The callback also receives an int identifier argument. + * This is needed because a Waitable may be composed of several distinct entities, + * such as subscriptions, services, etc. + * The application should provide a generic callback function that will be then + * forwarded by the waitable to all of its entities. + * Before forwarding, a different value for the identifier argument will be + * bond to the function. + * This implies that the provided callback can use the identifier to behave + * differently depending on which entity triggered the waitable to become ready. + * + * Note: this function must be overridden with a proper implementation + * by the custom classes who inherit from rclcpp::Waitable if they want to use it. + * + * \sa rclcpp::Waitable::clear_on_ready_callback + * + * \param[in] callback functor to be called when the waitable becomes ready + */ + RCLCPP_PUBLIC + virtual + void + set_on_ready_callback(std::function callback); + + /// Unset any callback registered via set_on_ready_callback. + /** + * Note: this function must be overridden with a proper implementation + * by the custom classes who inherit from rclcpp::Waitable if they want to use it. + */ + RCLCPP_PUBLIC + virtual + void + clear_on_ready_callback(); + private: std::atomic in_use_by_wait_set_{false}; }; // class Waitable diff --git a/rclcpp/src/rclcpp/client.cpp b/rclcpp/src/rclcpp/client.cpp index 6a1d3934d4..2380b890f5 100644 --- a/rclcpp/src/rclcpp/client.cpp +++ b/rclcpp/src/rclcpp/client.cpp @@ -38,7 +38,8 @@ ClientBase::ClientBase( rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph) : node_graph_(node_graph), node_handle_(node_base->get_shared_rcl_node_handle()), - context_(node_base->get_context()) + context_(node_base->get_context()), + node_logger_(rclcpp::get_node_logger(node_handle_.get())) { std::weak_ptr weak_node_handle(node_handle_); rcl_client_t * new_rcl_client = new rcl_client_t; @@ -66,6 +67,7 @@ ClientBase::ClientBase( ClientBase::~ClientBase() { + clear_on_new_response_callback(); // Make sure the client handle is destructed as early as possible and before the node handle client_handle_.reset(); } @@ -198,3 +200,17 @@ ClientBase::exchange_in_use_by_wait_set_state(bool in_use_state) { return in_use_by_wait_set_.exchange(in_use_state); } + +void +ClientBase::set_on_new_response_callback(rcl_event_callback_t callback, const void * user_data) +{ + rcl_ret_t ret = rcl_client_set_on_new_response_callback( + client_handle_.get(), + callback, + user_data); + + if (RCL_RET_OK != ret) { + using rclcpp::exceptions::throw_from_rcl_error; + throw_from_rcl_error(ret, "failed to set the on new response callback for client"); + } +} diff --git a/rclcpp/src/rclcpp/context.cpp b/rclcpp/src/rclcpp/context.cpp index d3d123c065..bea4eeb583 100644 --- a/rclcpp/src/rclcpp/context.cpp +++ b/rclcpp/src/rclcpp/context.cpp @@ -32,8 +32,6 @@ #include "rcutils/error_handling.h" #include "rcutils/macros.h" -#include "rmw/impl/cpp/demangle.hpp" - #include "./logging_mutex.hpp" using rclcpp::Context; diff --git a/rclcpp/src/rclcpp/guard_condition.cpp b/rclcpp/src/rclcpp/guard_condition.cpp index cae0c4ce5c..ea68c78d73 100644 --- a/rclcpp/src/rclcpp/guard_condition.cpp +++ b/rclcpp/src/rclcpp/guard_condition.cpp @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include + #include "rclcpp/guard_condition.hpp" #include "rclcpp/exceptions.hpp" @@ -72,9 +74,16 @@ GuardCondition::get_rcl_guard_condition() const void GuardCondition::trigger() { - rcl_ret_t ret = rcl_trigger_guard_condition(&rcl_guard_condition_); - if (RCL_RET_OK != ret) { - rclcpp::exceptions::throw_from_rcl_error(ret); + std::lock_guard lock(reentrant_mutex_); + + if (on_trigger_callback_) { + on_trigger_callback_(1); + } else { + rcl_ret_t ret = rcl_trigger_guard_condition(&rcl_guard_condition_); + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error(ret); + } + unread_count_++; } } @@ -84,4 +93,42 @@ GuardCondition::exchange_in_use_by_wait_set_state(bool in_use_state) return in_use_by_wait_set_.exchange(in_use_state); } +void +GuardCondition::add_to_wait_set(rcl_wait_set_t * wait_set) +{ + std::lock_guard lock(reentrant_mutex_); + + if (exchange_in_use_by_wait_set_state(true)) { + if (wait_set != wait_set_) { + throw std::runtime_error("guard condition has already been added to a wait set."); + } + } else { + wait_set_ = wait_set; + } + + rcl_ret_t ret = rcl_wait_set_add_guard_condition(wait_set, &this->rcl_guard_condition_, NULL); + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error( + ret, "failed to add guard condition to wait set"); + } +} + +void +GuardCondition::set_on_trigger_callback(std::function callback) +{ + std::lock_guard lock(reentrant_mutex_); + + if (callback) { + on_trigger_callback_ = callback; + + if (unread_count_) { + callback(unread_count_); + unread_count_ = 0; + } + return; + } + + on_trigger_callback_ = nullptr; +} + } // namespace rclcpp diff --git a/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp b/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp index 284795ef6e..159409528d 100644 --- a/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp +++ b/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp @@ -64,7 +64,8 @@ NodeTopics::add_publisher( callback_group = node_base_->get_default_callback_group(); } - for (auto & publisher_event : publisher->get_event_handlers()) { + for (auto & key_event_pair : publisher->get_event_handlers()) { + auto publisher_event = key_event_pair.second; callback_group->add_waitable(publisher_event); } @@ -105,7 +106,8 @@ NodeTopics::add_subscription( callback_group->add_subscription(subscription); - for (auto & subscription_event : subscription->get_event_handlers()) { + for (auto & key_event_pair : subscription->get_event_handlers()) { + auto subscription_event = key_event_pair.second; callback_group->add_waitable(subscription_event); } diff --git a/rclcpp/src/rclcpp/publisher_base.cpp b/rclcpp/src/rclcpp/publisher_base.cpp index 17d72594d5..dd027f2e65 100644 --- a/rclcpp/src/rclcpp/publisher_base.cpp +++ b/rclcpp/src/rclcpp/publisher_base.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include "rcutils/logging_macros.h" @@ -99,6 +100,11 @@ PublisherBase::PublisherBase( PublisherBase::~PublisherBase() { + for (const auto & pair : event_handlers_) { + rcl_publisher_event_type_t event_type = pair.first; + clear_on_new_qos_event_callback(event_type); + } + // must fini the events before fini-ing the publisher event_handlers_.clear(); @@ -154,7 +160,8 @@ PublisherBase::get_publisher_handle() const return publisher_handle_; } -const std::vector> & +const +std::unordered_map> & PublisherBase::get_event_handlers() const { return event_handlers_; diff --git a/rclcpp/src/rclcpp/qos_event.cpp b/rclcpp/src/rclcpp/qos_event.cpp index daf1d3e01e..8bfd5b3304 100644 --- a/rclcpp/src/rclcpp/qos_event.cpp +++ b/rclcpp/src/rclcpp/qos_event.cpp @@ -35,6 +35,10 @@ UnsupportedEventTypeException::UnsupportedEventTypeException( QOSEventHandlerBase::~QOSEventHandlerBase() { + if (on_new_event_callback_) { + clear_on_ready_callback(); + } + if (rcl_event_fini(&event_handle_) != RCL_RET_OK) { RCUTILS_LOG_ERROR_NAMED( "rclcpp", @@ -67,4 +71,20 @@ QOSEventHandlerBase::is_ready(rcl_wait_set_t * wait_set) return wait_set->events[wait_set_event_index_] == &event_handle_; } +void +QOSEventHandlerBase::set_on_new_event_callback( + rcl_event_callback_t callback, + const void * user_data) +{ + rcl_ret_t ret = rcl_event_set_callback( + &event_handle_, + callback, + user_data); + + if (RCL_RET_OK != ret) { + using rclcpp::exceptions::throw_from_rcl_error; + throw_from_rcl_error(ret, "failed to set the on new message callback for QOS Event"); + } +} + } // namespace rclcpp diff --git a/rclcpp/src/rclcpp/service.cpp b/rclcpp/src/rclcpp/service.cpp index 827062bdd3..d2f333cacf 100644 --- a/rclcpp/src/rclcpp/service.cpp +++ b/rclcpp/src/rclcpp/service.cpp @@ -28,11 +28,14 @@ using rclcpp::ServiceBase; ServiceBase::ServiceBase(std::shared_ptr node_handle) -: node_handle_(node_handle) +: node_handle_(node_handle), + node_logger_(rclcpp::get_node_logger(node_handle_.get())) {} ServiceBase::~ServiceBase() -{} +{ + clear_on_new_request_callback(); +} bool ServiceBase::take_type_erased_request(void * request_out, rmw_request_id_t & request_id_out) @@ -84,3 +87,17 @@ ServiceBase::exchange_in_use_by_wait_set_state(bool in_use_state) { return in_use_by_wait_set_.exchange(in_use_state); } + +void +ServiceBase::set_on_new_request_callback(rcl_event_callback_t callback, const void * user_data) +{ + rcl_ret_t ret = rcl_service_set_on_new_request_callback( + service_handle_.get(), + callback, + user_data); + + if (RCL_RET_OK != ret) { + using rclcpp::exceptions::throw_from_rcl_error; + throw_from_rcl_error(ret, "failed to set the on new request callback for service"); + } +} diff --git a/rclcpp/src/rclcpp/subscription_base.cpp b/rclcpp/src/rclcpp/subscription_base.cpp index 472034f362..12841fe6b6 100644 --- a/rclcpp/src/rclcpp/subscription_base.cpp +++ b/rclcpp/src/rclcpp/subscription_base.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include "rclcpp/exceptions.hpp" @@ -39,6 +40,7 @@ SubscriptionBase::SubscriptionBase( bool is_serialized) : node_base_(node_base), node_handle_(node_base_->get_shared_rcl_node_handle()), + node_logger_(rclcpp::get_node_logger(node_handle_.get())), use_intra_process_(false), intra_process_subscription_id_(0), type_support_(type_support_handle), @@ -83,6 +85,13 @@ SubscriptionBase::SubscriptionBase( SubscriptionBase::~SubscriptionBase() { + clear_on_new_message_callback(); + + for (const auto & pair : event_handlers_) { + rcl_subscription_event_type_t event_type = pair.first; + clear_on_new_qos_event_callback(event_type); + } + if (!use_intra_process_) { return; } @@ -115,7 +124,8 @@ SubscriptionBase::get_subscription_handle() const return subscription_handle_; } -const std::vector> & +const +std::unordered_map> & SubscriptionBase::get_event_handlers() const { return event_handlers_; @@ -282,7 +292,8 @@ SubscriptionBase::exchange_in_use_by_wait_set_state( if (get_intra_process_waitable().get() == pointer_to_subscription_part) { return intra_process_subscription_waitable_in_use_by_wait_set_.exchange(in_use_state); } - for (const auto & qos_event : event_handlers_) { + for (const auto & key_event_pair : event_handlers_) { + auto qos_event = key_event_pair.second; if (qos_event.get() == pointer_to_subscription_part) { return qos_events_in_use_by_wait_set_[qos_event.get()].exchange(in_use_state); } @@ -290,7 +301,8 @@ SubscriptionBase::exchange_in_use_by_wait_set_state( throw std::runtime_error("given pointer_to_subscription_part does not match any part"); } -std::vector SubscriptionBase::get_network_flow_endpoints() const +std::vector +SubscriptionBase::get_network_flow_endpoints() const { rcutils_allocator_t allocator = rcutils_get_default_allocator(); rcl_network_flow_endpoint_array_t network_flow_endpoint_array = @@ -326,3 +338,19 @@ std::vector SubscriptionBase::get_network_flow_endp return network_flow_endpoint_vector; } + +void +SubscriptionBase::set_on_new_message_callback( + rcl_event_callback_t callback, + const void * user_data) +{ + rcl_ret_t ret = rcl_subscription_set_on_new_message_callback( + subscription_handle_.get(), + callback, + user_data); + + if (RCL_RET_OK != ret) { + using rclcpp::exceptions::throw_from_rcl_error; + throw_from_rcl_error(ret, "failed to set the on new message callback for subscription"); + } +} diff --git a/rclcpp/src/rclcpp/subscription_intra_process_base.cpp b/rclcpp/src/rclcpp/subscription_intra_process_base.cpp index b13123b65b..21ccb433ff 100644 --- a/rclcpp/src/rclcpp/subscription_intra_process_base.cpp +++ b/rclcpp/src/rclcpp/subscription_intra_process_base.cpp @@ -17,6 +17,11 @@ using rclcpp::experimental::SubscriptionIntraProcessBase; +SubscriptionIntraProcessBase::~SubscriptionIntraProcessBase() +{ + clear_on_ready_callback(); +} + void SubscriptionIntraProcessBase::add_to_wait_set(rcl_wait_set_t * wait_set) { diff --git a/rclcpp/src/rclcpp/waitable.cpp b/rclcpp/src/rclcpp/waitable.cpp index b76c7215e0..16ebb1b546 100644 --- a/rclcpp/src/rclcpp/waitable.cpp +++ b/rclcpp/src/rclcpp/waitable.cpp @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include + #include "rclcpp/waitable.hpp" using rclcpp::Waitable; @@ -57,3 +59,21 @@ Waitable::exchange_in_use_by_wait_set_state(bool in_use_state) { return in_use_by_wait_set_.exchange(in_use_state); } + +void +Waitable::set_on_ready_callback(std::function callback) +{ + (void)callback; + + throw std::runtime_error( + "Custom waitables should override set_on_ready_callback " + "if they want to use it."); +} + +void +Waitable::clear_on_ready_callback() +{ + throw std::runtime_error( + "Custom waitables should override clear_on_ready_callback if they " + "want to use it and make sure to call it on the waitable destructor."); +} diff --git a/rclcpp/test/rclcpp/test_client.cpp b/rclcpp/test/rclcpp/test_client.cpp index 5e2adb5e7d..5e18fd8b4a 100644 --- a/rclcpp/test/rclcpp/test_client.cpp +++ b/rclcpp/test/rclcpp/test_client.cpp @@ -27,6 +27,8 @@ #include "test_msgs/srv/empty.hpp" +using namespace std::chrono_literals; + class TestClient : public ::testing::Test { protected: @@ -340,3 +342,92 @@ TEST_F(TestClientWithServer, take_response) { rclcpp::exceptions::RCLError); } } + +/* + Testing on_new_response callbacks. + */ +TEST_F(TestClient, on_new_response_callback) { + auto client_node = std::make_shared("client_node", "ns"); + auto server_node = std::make_shared("server_node", "ns"); + + auto client = client_node->create_client("test_service"); + std::atomic server_requests_count {0}; + auto server_callback = [&server_requests_count]( + const test_msgs::srv::Empty::Request::SharedPtr, + test_msgs::srv::Empty::Response::SharedPtr) {server_requests_count++;}; + auto server = server_node->create_service("test_service", server_callback); + auto request = std::make_shared(); + + std::atomic c1 {0}; + auto increase_c1_cb = [&c1](size_t count_msgs) {c1 += count_msgs;}; + client->set_on_new_response_callback(increase_c1_cb); + + client->async_send_request(request); + auto start = std::chrono::steady_clock::now(); + while (server_requests_count == 0 && + (std::chrono::steady_clock::now() - start) < 10s) + { + rclcpp::spin_some(server_node); + } + + ASSERT_EQ(server_requests_count, 1u); + + start = std::chrono::steady_clock::now(); + do { + std::this_thread::sleep_for(100ms); + } while (c1 == 0 && std::chrono::steady_clock::now() - start < 10s); + + EXPECT_EQ(c1.load(), 1u); + + std::atomic c2 {0}; + auto increase_c2_cb = [&c2](size_t count_msgs) {c2 += count_msgs;}; + client->set_on_new_response_callback(increase_c2_cb); + + client->async_send_request(request); + start = std::chrono::steady_clock::now(); + while (server_requests_count == 1 && + (std::chrono::steady_clock::now() - start) < 10s) + { + rclcpp::spin_some(server_node); + } + + ASSERT_EQ(server_requests_count, 2u); + + start = std::chrono::steady_clock::now(); + do { + std::this_thread::sleep_for(100ms); + } while (c1 == 0 && std::chrono::steady_clock::now() - start < 10s); + + EXPECT_EQ(c1.load(), 1u); + EXPECT_EQ(c2.load(), 1u); + + client->clear_on_new_response_callback(); + + client->async_send_request(request); + client->async_send_request(request); + client->async_send_request(request); + start = std::chrono::steady_clock::now(); + while (server_requests_count < 5 && + (std::chrono::steady_clock::now() - start) < 10s) + { + rclcpp::spin_some(server_node); + } + + ASSERT_EQ(server_requests_count, 5u); + + std::atomic c3 {0}; + auto increase_c3_cb = [&c3](size_t count_msgs) {c3 += count_msgs;}; + client->set_on_new_response_callback(increase_c3_cb); + + start = std::chrono::steady_clock::now(); + do { + std::this_thread::sleep_for(100ms); + } while (c3 == 0 && std::chrono::steady_clock::now() - start < 10s); + + EXPECT_EQ(c1.load(), 1u); + EXPECT_EQ(c2.load(), 1u); + EXPECT_EQ(c3.load(), 3u); + + std::function invalid_cb = nullptr; + EXPECT_THROW(client->set_on_new_response_callback(invalid_cb), std::invalid_argument); +} diff --git a/rclcpp/test/rclcpp/test_guard_condition.cpp b/rclcpp/test/rclcpp/test_guard_condition.cpp index 8100cf1c9b..481051ccf9 100644 --- a/rclcpp/test/rclcpp/test_guard_condition.cpp +++ b/rclcpp/test/rclcpp/test_guard_condition.cpp @@ -102,3 +102,65 @@ TEST_F(TestGuardCondition, trigger) { } } } + +/* + * Testing addition to a wait set + */ +TEST_F(TestGuardCondition, add_to_wait_set) { + { + { + auto gc = std::make_shared(); + + auto mock = mocking_utils::patch_and_return( + "lib:rclcpp", rcl_wait_set_add_guard_condition, RCL_RET_OK); + + rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set(); + EXPECT_NO_THROW(gc->add_to_wait_set(&wait_set)); + EXPECT_NO_THROW(gc->add_to_wait_set(&wait_set)); + + rcl_wait_set_t wait_set_2 = rcl_get_zero_initialized_wait_set(); + EXPECT_THROW(gc->add_to_wait_set(&wait_set_2), std::runtime_error); + } + + { + auto mock = mocking_utils::patch_and_return( + "lib:rclcpp", rcl_wait_set_add_guard_condition, RCL_RET_ERROR); + + auto gd = std::make_shared(); + EXPECT_THROW(gd->add_to_wait_set(nullptr), rclcpp::exceptions::RCLError); + } + } +} + +/* + * Testing set on trigger callback + */ +TEST_F(TestGuardCondition, set_on_trigger_callback) { + { + auto gc = std::make_shared(); + + std::atomic c1 {0}; + auto increase_c1_cb = [&c1](size_t count_msgs) {c1 += count_msgs;}; + gc->set_on_trigger_callback(increase_c1_cb); + + EXPECT_EQ(c1.load(), 0u); + EXPECT_NO_THROW(gc->trigger()); + EXPECT_EQ(c1.load(), 1u); + + std::atomic c2 {0}; + auto increase_c2_cb = [&c2](size_t count_msgs) {c2 += count_msgs;}; + gc->set_on_trigger_callback(increase_c2_cb); + + EXPECT_NO_THROW(gc->trigger()); + EXPECT_EQ(c1.load(), 1u); + EXPECT_EQ(c2.load(), 1u); + + gc->set_on_trigger_callback(nullptr); + EXPECT_NO_THROW(gc->trigger()); + EXPECT_EQ(c1.load(), 1u); + EXPECT_EQ(c2.load(), 1u); + + gc->set_on_trigger_callback(increase_c1_cb); + EXPECT_EQ(c1.load(), 2u); + } +} diff --git a/rclcpp/test/rclcpp/test_publisher.cpp b/rclcpp/test/rclcpp/test_publisher.cpp index f844e44e6c..6671fa1176 100644 --- a/rclcpp/test/rclcpp/test_publisher.cpp +++ b/rclcpp/test/rclcpp/test_publisher.cpp @@ -514,7 +514,8 @@ TEST_F(TestPublisher, run_event_handlers) { initialize(); auto publisher = node->create_publisher("topic", 10); - for (const auto & handler : publisher->get_event_handlers()) { + for (const auto & key_event_pair : publisher->get_event_handlers()) { + auto handler = key_event_pair.second; std::shared_ptr data = handler->take_data(); handler->execute(data); } diff --git a/rclcpp/test/rclcpp/test_qos_event.cpp b/rclcpp/test/rclcpp/test_qos_event.cpp index f234591e33..221e2bdf0a 100644 --- a/rclcpp/test/rclcpp/test_qos_event.cpp +++ b/rclcpp/test/rclcpp/test_qos_event.cpp @@ -27,6 +27,8 @@ #include "../mocking_utils/patch.hpp" +using namespace std::chrono_literals; + class TestQosEvent : public ::testing::Test { protected: @@ -308,3 +310,106 @@ TEST_F(TestQosEvent, add_to_wait_set) { EXPECT_THROW(handler.add_to_wait_set(&wait_set), rclcpp::exceptions::RCLError); } } + +TEST_F(TestQosEvent, test_on_new_event_callback) +{ + auto offered_deadline = rclcpp::Duration(std::chrono::milliseconds(1)); + auto requested_deadline = rclcpp::Duration(std::chrono::milliseconds(2)); + + rclcpp::QoS qos_profile_publisher(10); + qos_profile_publisher.deadline(offered_deadline); + rclcpp::PublisherOptions pub_options; + pub_options.event_callbacks.deadline_callback = [](auto) {FAIL();}; + auto publisher = node->create_publisher( + topic_name, qos_profile_publisher, pub_options); + + rclcpp::QoS qos_profile_subscription(10); + qos_profile_subscription.deadline(requested_deadline); + rclcpp::SubscriptionOptions sub_options; + sub_options.event_callbacks.deadline_callback = [](auto) {FAIL();}; + auto subscription = node->create_subscription( + topic_name, qos_profile_subscription, message_callback, sub_options); + + std::atomic c1 {0}; + auto increase_c1_cb = [&c1](size_t count_events) {c1 += count_events;}; + publisher->set_on_new_qos_event_callback(increase_c1_cb, RCL_PUBLISHER_OFFERED_DEADLINE_MISSED); + + { + test_msgs::msg::Empty msg; + publisher->publish(msg); + } + + std::this_thread::sleep_for(std::chrono::seconds(1)); + + EXPECT_GT(c1, 1u); + + std::atomic c2 {0}; + auto increase_c2_cb = [&c2](size_t count_events) {c2 += count_events;}; + subscription->set_on_new_qos_event_callback( + increase_c2_cb, + RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED); + + EXPECT_GT(c2, 1u); +} + +TEST_F(TestQosEvent, test_invalid_on_new_event_callback) +{ + auto pub = node->create_publisher(topic_name, 10); + auto sub = node->create_subscription(topic_name, 10, message_callback); + auto dummy_cb = [](size_t count_events) {(void)count_events;}; + + EXPECT_NO_THROW( + pub->set_on_new_qos_event_callback(dummy_cb, RCL_PUBLISHER_OFFERED_DEADLINE_MISSED)); + + EXPECT_NO_THROW( + pub->clear_on_new_qos_event_callback(RCL_PUBLISHER_OFFERED_DEADLINE_MISSED)); + + EXPECT_NO_THROW( + pub->set_on_new_qos_event_callback(dummy_cb, RCL_PUBLISHER_LIVELINESS_LOST)); + + EXPECT_NO_THROW( + pub->clear_on_new_qos_event_callback(RCL_PUBLISHER_LIVELINESS_LOST)); + + EXPECT_NO_THROW( + pub->set_on_new_qos_event_callback(dummy_cb, RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS)); + + EXPECT_NO_THROW( + pub->clear_on_new_qos_event_callback(RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS)); + + EXPECT_NO_THROW( + sub->set_on_new_qos_event_callback(dummy_cb, RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED)); + + EXPECT_NO_THROW( + sub->clear_on_new_qos_event_callback(RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED)); + + EXPECT_NO_THROW( + sub->set_on_new_qos_event_callback(dummy_cb, RCL_SUBSCRIPTION_LIVELINESS_CHANGED)); + + EXPECT_NO_THROW( + sub->clear_on_new_qos_event_callback(RCL_SUBSCRIPTION_LIVELINESS_CHANGED)); + + EXPECT_NO_THROW( + sub->set_on_new_qos_event_callback(dummy_cb, RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS)); + + EXPECT_NO_THROW( + sub->clear_on_new_qos_event_callback(RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS)); + + std::function invalid_cb; + + rclcpp::SubscriptionOptions sub_options; + sub_options.event_callbacks.deadline_callback = [](auto) {}; + sub = node->create_subscription( + topic_name, 10, message_callback, sub_options); + + EXPECT_THROW( + sub->set_on_new_qos_event_callback(invalid_cb, RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED), + std::invalid_argument); + + rclcpp::PublisherOptions pub_options; + pub_options.event_callbacks.deadline_callback = [](auto) {}; + pub = node->create_publisher(topic_name, 10, pub_options); + + EXPECT_THROW( + pub->set_on_new_qos_event_callback(invalid_cb, RCL_PUBLISHER_OFFERED_DEADLINE_MISSED), + std::invalid_argument); +} diff --git a/rclcpp/test/rclcpp/test_service.cpp b/rclcpp/test/rclcpp/test_service.cpp index 16ab31cf1b..110e913da4 100644 --- a/rclcpp/test/rclcpp/test_service.cpp +++ b/rclcpp/test/rclcpp/test_service.cpp @@ -27,6 +27,8 @@ #include "test_msgs/srv/empty.hpp" #include "test_msgs/srv/empty.h" +using namespace std::chrono_literals; + class TestService : public ::testing::Test { protected: @@ -235,3 +237,72 @@ TEST_F(TestService, send_response) { rclcpp::exceptions::RCLError); } } + +/* + Testing on_new_request callbacks. + */ +TEST_F(TestService, on_new_request_callback) { + auto server_callback = + [](const test_msgs::srv::Empty::Request::SharedPtr, + test_msgs::srv::Empty::Response::SharedPtr) {FAIL();}; + auto server = node->create_service("~/test_service", server_callback); + + std::atomic c1 {0}; + auto increase_c1_cb = [&c1](size_t count_msgs) {c1 += count_msgs;}; + server->set_on_new_request_callback(increase_c1_cb); + + auto client = node->create_client("~/test_service"); + { + auto request = std::make_shared(); + client->async_send_request(request); + } + + auto start = std::chrono::steady_clock::now(); + do { + std::this_thread::sleep_for(100ms); + } while (c1 == 0 && std::chrono::steady_clock::now() - start < 10s); + + EXPECT_EQ(c1.load(), 1u); + + std::atomic c2 {0}; + auto increase_c2_cb = [&c2](size_t count_msgs) {c2 += count_msgs;}; + server->set_on_new_request_callback(increase_c2_cb); + + { + auto request = std::make_shared(); + client->async_send_request(request); + } + + start = std::chrono::steady_clock::now(); + do { + std::this_thread::sleep_for(100ms); + } while (c2 == 0 && std::chrono::steady_clock::now() - start < 10s); + + EXPECT_EQ(c1.load(), 1u); + EXPECT_EQ(c2.load(), 1u); + + server->clear_on_new_request_callback(); + + { + auto request = std::make_shared(); + client->async_send_request(request); + client->async_send_request(request); + client->async_send_request(request); + } + + std::atomic c3 {0}; + auto increase_c3_cb = [&c3](size_t count_msgs) {c3 += count_msgs;}; + server->set_on_new_request_callback(increase_c3_cb); + + start = std::chrono::steady_clock::now(); + do { + std::this_thread::sleep_for(100ms); + } while (c3 == 0 && std::chrono::steady_clock::now() - start < 10s); + + EXPECT_EQ(c1.load(), 1u); + EXPECT_EQ(c2.load(), 1u); + EXPECT_EQ(c3.load(), 3u); + + std::function invalid_cb = nullptr; + EXPECT_THROW(server->set_on_new_request_callback(invalid_cb), std::invalid_argument); +} diff --git a/rclcpp/test/rclcpp/test_subscription.cpp b/rclcpp/test/rclcpp/test_subscription.cpp index 5812985272..a10c5c4eab 100644 --- a/rclcpp/test/rclcpp/test_subscription.cpp +++ b/rclcpp/test/rclcpp/test_subscription.cpp @@ -440,6 +440,146 @@ TEST_F(TestSubscription, handle_loaned_message) { EXPECT_NO_THROW(sub->handle_loaned_message(&msg, message_info)); } +/* + Testing on_new_message callbacks. + */ +TEST_F(TestSubscription, on_new_message_callback) { + initialize(rclcpp::NodeOptions().use_intra_process_comms(false)); + using test_msgs::msg::Empty; + + auto do_nothing = [](std::shared_ptr) {FAIL();}; + auto sub = node->create_subscription("~/test_take", 10, do_nothing); + + std::atomic c1 {0}; + auto increase_c1_cb = [&c1](size_t count_msgs) {c1 += count_msgs;}; + sub->set_on_new_message_callback(increase_c1_cb); + + auto pub = node->create_publisher("~/test_take", 1); + { + test_msgs::msg::Empty msg; + pub->publish(msg); + } + + auto start = std::chrono::steady_clock::now(); + do { + std::this_thread::sleep_for(100ms); + } while (c1 == 0 && std::chrono::steady_clock::now() - start < 10s); + + EXPECT_EQ(c1.load(), 1u); + + std::atomic c2 {0}; + auto increase_c2_cb = [&c2](size_t count_msgs) {c2 += count_msgs;}; + sub->set_on_new_message_callback(increase_c2_cb); + + { + test_msgs::msg::Empty msg; + pub->publish(msg); + } + + start = std::chrono::steady_clock::now(); + do { + std::this_thread::sleep_for(100ms); + } while (c2 == 0 && std::chrono::steady_clock::now() - start < 10s); + + EXPECT_EQ(c1.load(), 1u); + EXPECT_EQ(c2.load(), 1u); + + sub->clear_on_new_message_callback(); + + { + test_msgs::msg::Empty msg; + pub->publish(msg); + pub->publish(msg); + pub->publish(msg); + } + + std::atomic c3 {0}; + auto increase_c3_cb = [&c3](size_t count_msgs) {c3 += count_msgs;}; + sub->set_on_new_message_callback(increase_c3_cb); + + start = std::chrono::steady_clock::now(); + do { + std::this_thread::sleep_for(100ms); + } while (c3 == 0 && std::chrono::steady_clock::now() - start < 10s); + + EXPECT_EQ(c1.load(), 1u); + EXPECT_EQ(c2.load(), 1u); + EXPECT_EQ(c3.load(), 3u); + + std::function invalid_cb = nullptr; + EXPECT_THROW(sub->set_on_new_message_callback(invalid_cb), std::invalid_argument); +} + +/* + Testing on_new_intra_process_message callbacks. + */ +TEST_F(TestSubscription, on_new_intra_process_message_callback) { + initialize(rclcpp::NodeOptions().use_intra_process_comms(true)); + using test_msgs::msg::Empty; + + auto do_nothing = [](std::shared_ptr) {FAIL();}; + auto sub = node->create_subscription("~/test_take", 10, do_nothing); + + std::atomic c1 {0}; + auto increase_c1_cb = [&c1](size_t count_msgs) {c1 += count_msgs;}; + sub->set_on_new_intra_process_message_callback(increase_c1_cb); + + auto pub = node->create_publisher("~/test_take", 1); + { + test_msgs::msg::Empty msg; + pub->publish(msg); + } + + auto start = std::chrono::steady_clock::now(); + do { + std::this_thread::sleep_for(100ms); + } while (c1 == 0 && std::chrono::steady_clock::now() - start < 10s); + + EXPECT_EQ(c1.load(), 1u); + + std::atomic c2 {0}; + auto increase_c2_cb = [&c2](size_t count_msgs) {c2 += count_msgs;}; + sub->set_on_new_intra_process_message_callback(increase_c2_cb); + + { + test_msgs::msg::Empty msg; + pub->publish(msg); + } + + start = std::chrono::steady_clock::now(); + do { + std::this_thread::sleep_for(100ms); + } while (c2 == 0 && std::chrono::steady_clock::now() - start < 10s); + + EXPECT_EQ(c1.load(), 1u); + EXPECT_EQ(c2.load(), 1u); + + sub->clear_on_new_intra_process_message_callback(); + + { + test_msgs::msg::Empty msg; + pub->publish(msg); + pub->publish(msg); + pub->publish(msg); + } + + std::atomic c3 {0}; + auto increase_c3_cb = [&c3](size_t count_msgs) {c3 += count_msgs;}; + sub->set_on_new_intra_process_message_callback(increase_c3_cb); + + start = std::chrono::steady_clock::now(); + do { + std::this_thread::sleep_for(100ms); + } while (c3 == 0 && std::chrono::steady_clock::now() - start < 10s); + + EXPECT_EQ(c1.load(), 1u); + EXPECT_EQ(c2.load(), 1u); + EXPECT_EQ(c3.load(), 3u); + + std::function invalid_cb = nullptr; + EXPECT_THROW(sub->set_on_new_intra_process_message_callback(invalid_cb), std::invalid_argument); +} + /* Testing subscription with intraprocess enabled and invalid QoS */ diff --git a/rclcpp/test/rclcpp/test_wait_set.cpp b/rclcpp/test/rclcpp/test_wait_set.cpp index f9d97a5759..bf7ee26ed0 100644 --- a/rclcpp/test/rclcpp/test_wait_set.cpp +++ b/rclcpp/test/rclcpp/test_wait_set.cpp @@ -257,7 +257,7 @@ TEST_F(TestWaitSet, add_guard_condition_to_two_different_wait_set) { rclcpp::PublisherOptions po; po.event_callbacks.deadline_callback = [](rclcpp::QOSDeadlineOfferedInfo &) {}; auto pub = node->create_publisher("~/test", 1, po); - auto qos_event = pub->get_event_handlers()[0]; + auto qos_event = pub->get_event_handlers().begin()->second; wait_set1.add_waitable(qos_event, pub); ASSERT_THROW( { @@ -301,7 +301,7 @@ TEST_F(TestWaitSet, add_remove_wait) { [](rclcpp::QOSDeadlineOfferedInfo &) {}; auto pub = node->create_publisher( "~/test", 1, publisher_options); - auto qos_event = pub->get_event_handlers()[0]; + auto qos_event = pub->get_event_handlers().begin()->second; // Subscription mask is required here for coverage. wait_set.add_subscription(sub, {true, true, true}); diff --git a/rclcpp_action/include/rclcpp_action/client.hpp b/rclcpp_action/include/rclcpp_action/client.hpp index a4ba5726e1..9bcddadec0 100644 --- a/rclcpp_action/include/rclcpp_action/client.hpp +++ b/rclcpp_action/include/rclcpp_action/client.hpp @@ -23,8 +23,11 @@ #include #include #include +#include #include +#include "rcl/event_callback.h" + #include "rclcpp/exceptions.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp" @@ -62,6 +65,16 @@ class ClientBase : public rclcpp::Waitable RCLCPP_ACTION_PUBLIC virtual ~ClientBase(); + /// Enum to identify entities belonging to the action client + enum class EntityType + { + GoalClient, + ResultClient, + CancelClient, + FeedbackSubscription, + StatusSubscription, + }; + /// Return true if there is an action server that is ready to take goal requests. RCLCPP_ACTION_PUBLIC bool @@ -126,6 +139,39 @@ class ClientBase : public rclcpp::Waitable void execute(std::shared_ptr & data) override; + /// \internal + /// Set a callback to be called when action client entities have an event + /** + * The callback receives a size_t which is the number of messages received + * since the last time this callback was called. + * Normally this is 1, but can be > 1 if messages were received before any + * callback was set. + * + * The callback also receives an int identifier argument, which identifies + * the action client entity which is ready. + * This implies that the provided callback can use the identifier to behave + * differently depending on which entity triggered the waitable to become ready. + * + * Calling it again will clear any previously set callback. + * + * An exception will be thrown if the callback is not callable. + * + * This function is thread-safe. + * + * If you want more information available in the callback, like the subscription + * or other information, you may use a lambda with captures or std::bind. + * + * \param[in] callback functor to be called when a new message is received. + */ + RCLCPP_ACTION_PUBLIC + void + set_on_ready_callback(std::function callback) override; + + /// Unset the callback registered for new events, if any. + RCLCPP_ACTION_PUBLIC + void + clear_on_ready_callback() override; + // End Waitables API // ----------------- @@ -244,8 +290,31 @@ class ClientBase : public rclcpp::Waitable // End API for communication between ClientBase and Client<> // --------------------------------------------------------- + /// \internal + /// Set a callback to be called when the specified entity is ready + RCLCPP_ACTION_PUBLIC + void + set_on_ready_callback( + EntityType entity_type, + rcl_event_callback_t callback, + const void * user_data); + + // Mutex to protect the callbacks storage. + std::recursive_mutex listener_mutex_; + // Storage for std::function callbacks to keep them in scope + std::unordered_map> entity_type_to_on_ready_callback_; + private: std::unique_ptr pimpl_; + + /// Set a std::function callback to be called when the specified entity is ready + RCLCPP_ACTION_PUBLIC + void + set_callback_to_entity( + EntityType entity_type, + std::function callback); + + bool on_ready_callback_set_{false}; }; /// Action Client diff --git a/rclcpp_action/include/rclcpp_action/server.hpp b/rclcpp_action/include/rclcpp_action/server.hpp index 2f79bee041..554cb1cf56 100644 --- a/rclcpp_action/include/rclcpp_action/server.hpp +++ b/rclcpp_action/include/rclcpp_action/server.hpp @@ -22,6 +22,7 @@ #include #include +#include "rcl/event_callback.h" #include "rcl_action/action_server.h" #include "rosidl_runtime_c/action_type_support_struct.h" #include "rosidl_typesupport_cpp/action_type_support.hpp" @@ -70,6 +71,14 @@ enum class CancelResponse : int8_t class ServerBase : public rclcpp::Waitable { public: + /// Enum to identify entities belonging to the action server + enum class EntityType + { + GoalService, + ResultService, + CancelService, + }; + RCLCPP_ACTION_PUBLIC virtual ~ServerBase(); @@ -128,6 +137,39 @@ class ServerBase : public rclcpp::Waitable void execute(std::shared_ptr & data) override; + /// \internal + /// Set a callback to be called when action server entities have an event + /** + * The callback receives a size_t which is the number of messages received + * since the last time this callback was called. + * Normally this is 1, but can be > 1 if messages were received before any + * callback was set. + * + * The callback also receives an int identifier argument, which identifies + * the action server entity which is ready. + * This implies that the provided callback can use the identifier to behave + * differently depending on which entity triggered the waitable to become ready. + * + * Calling it again will clear any previously set callback. + * + * An exception will be thrown if the callback is not callable. + * + * This function is thread-safe. + * + * If you want more information available in the callback, like the subscription + * or other information, you may use a lambda with captures or std::bind. + * + * \param[in] callback functor to be called when a new message is received. + */ + RCLCPP_ACTION_PUBLIC + void + set_on_ready_callback(std::function callback) override; + + /// Unset the callback to be called whenever the waitable becomes ready. + RCLCPP_ACTION_PUBLIC + void + clear_on_ready_callback() override; + // End Waitables API // ----------------- @@ -256,6 +298,29 @@ class ServerBase : public rclcpp::Waitable /// Private implementation /// \internal std::unique_ptr pimpl_; + + /// Set a std::function callback to be called when the specified entity is ready + RCLCPP_ACTION_PUBLIC + void + set_callback_to_entity( + EntityType entity_type, + std::function callback); + +protected: + // Mutex to protect the callbacks storage. + std::recursive_mutex listener_mutex_; + // Storage for std::function callbacks to keep them in scope + std::unordered_map> entity_type_to_on_ready_callback_; + + /// Set a callback to be called when the specified entity is ready + RCLCPP_ACTION_PUBLIC + void + set_on_ready_callback( + EntityType entity_type, + rcl_event_callback_t callback, + const void * user_data); + + bool on_ready_callback_set_{false}; }; /// Action Server diff --git a/rclcpp_action/src/client.cpp b/rclcpp_action/src/client.cpp index cedab99e92..07982092d5 100644 --- a/rclcpp_action/src/client.cpp +++ b/rclcpp_action/src/client.cpp @@ -136,6 +136,7 @@ ClientBase::ClientBase( ClientBase::~ClientBase() { + clear_on_ready_callback(); } bool @@ -385,6 +386,163 @@ ClientBase::generate_goal_id() return goal_id; } +void +ClientBase::set_on_ready_callback(std::function callback) +{ + if (!callback) { + throw std::invalid_argument( + "The callback passed to set_on_ready_callback " + "is not callable."); + } + + set_callback_to_entity(EntityType::GoalClient, callback); + set_callback_to_entity(EntityType::ResultClient, callback); + set_callback_to_entity(EntityType::CancelClient, callback); + set_callback_to_entity(EntityType::FeedbackSubscription, callback); + set_callback_to_entity(EntityType::StatusSubscription, callback); +} + +void +ClientBase::set_callback_to_entity( + EntityType entity_type, + std::function callback) +{ + // Note: we bind the int identifier argument to this waitable's entity types + auto new_callback = + [callback, entity_type, this](size_t number_of_events) { + try { + callback(number_of_events, static_cast(entity_type)); + } catch (const std::exception & exception) { + RCLCPP_ERROR_STREAM( + pimpl_->logger, + "rclcpp_action::ClientBase@" << this << + " caught " << rmw::impl::cpp::demangle(exception) << + " exception in user-provided callback for the 'on ready' callback: " << + exception.what()); + } catch (...) { + RCLCPP_ERROR_STREAM( + pimpl_->logger, + "rclcpp_action::ClientBase@" << this << + " caught unhandled exception in user-provided callback " << + "for the 'on ready' callback"); + } + }; + + + // Set it temporarily to the new callback, while we replace the old one. + // This two-step setting, prevents a gap where the old std::function has + // been replaced but the middleware hasn't been told about the new one yet. + set_on_ready_callback( + entity_type, + rclcpp::detail::cpp_callback_trampoline, + static_cast(&new_callback)); + + std::lock_guard lock(listener_mutex_); + // Store the std::function to keep it in scope, also overwrites the existing one. + auto it = entity_type_to_on_ready_callback_.find(entity_type); + + if (it != entity_type_to_on_ready_callback_.end()) { + it->second = new_callback; + } else { + entity_type_to_on_ready_callback_.emplace(entity_type, new_callback); + } + + // Set it again, now using the permanent storage. + it = entity_type_to_on_ready_callback_.find(entity_type); + + if (it != entity_type_to_on_ready_callback_.end()) { + auto & cb = it->second; + set_on_ready_callback( + entity_type, + rclcpp::detail::cpp_callback_trampoline, + static_cast(&cb)); + } + + on_ready_callback_set_ = true; +} + +void +ClientBase::set_on_ready_callback( + EntityType entity_type, + rcl_event_callback_t callback, + const void * user_data) +{ + rcl_ret_t ret = RCL_RET_ERROR; + + switch (entity_type) { + case EntityType::GoalClient: + { + ret = rcl_action_client_set_goal_client_callback( + pimpl_->client_handle.get(), + callback, + user_data); + break; + } + + case EntityType::ResultClient: + { + ret = rcl_action_client_set_result_client_callback( + pimpl_->client_handle.get(), + callback, + user_data); + break; + } + + case EntityType::CancelClient: + { + ret = rcl_action_client_set_cancel_client_callback( + pimpl_->client_handle.get(), + callback, + user_data); + break; + } + + case EntityType::FeedbackSubscription: + { + ret = rcl_action_client_set_feedback_subscription_callback( + pimpl_->client_handle.get(), + callback, + user_data); + break; + } + + case EntityType::StatusSubscription: + { + ret = rcl_action_client_set_status_subscription_callback( + pimpl_->client_handle.get(), + callback, + user_data); + break; + } + + default: + throw std::runtime_error("ClientBase::set_on_ready_callback: Unknown entity type."); + break; + } + + if (RCL_RET_OK != ret) { + using rclcpp::exceptions::throw_from_rcl_error; + throw_from_rcl_error(ret, "failed to set the on ready callback for action client"); + } +} + +void +ClientBase::clear_on_ready_callback() +{ + std::lock_guard lock(listener_mutex_); + + if (on_ready_callback_set_) { + set_on_ready_callback(EntityType::GoalClient, nullptr, nullptr); + set_on_ready_callback(EntityType::ResultClient, nullptr, nullptr); + set_on_ready_callback(EntityType::CancelClient, nullptr, nullptr); + set_on_ready_callback(EntityType::FeedbackSubscription, nullptr, nullptr); + set_on_ready_callback(EntityType::StatusSubscription, nullptr, nullptr); + on_ready_callback_set_ = false; + } + + entity_type_to_on_ready_callback_.clear(); +} + std::shared_ptr ClientBase::take_data() { diff --git a/rclcpp_action/src/server.cpp b/rclcpp_action/src/server.cpp index 5ec88281da..a07e203e28 100644 --- a/rclcpp_action/src/server.cpp +++ b/rclcpp_action/src/server.cpp @@ -132,6 +132,7 @@ ServerBase::ServerBase( ServerBase::~ServerBase() { + clear_on_ready_callback(); } size_t @@ -678,3 +679,138 @@ ServerBase::publish_feedback(std::shared_ptr feedback_msg) rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to publish feedback"); } } + +void +ServerBase::set_on_ready_callback(std::function callback) +{ + if (!callback) { + throw std::invalid_argument( + "The callback passed to set_on_ready_callback " + "is not callable."); + } + + set_callback_to_entity(EntityType::GoalService, callback); + set_callback_to_entity(EntityType::ResultService, callback); + set_callback_to_entity(EntityType::CancelService, callback); +} + +void +ServerBase::set_callback_to_entity( + EntityType entity_type, + std::function callback) +{ + // Note: we bind the int identifier argument to this waitable's entity types + auto new_callback = + [callback, entity_type, this](size_t number_of_events) { + try { + callback(number_of_events, static_cast(entity_type)); + } catch (const std::exception & exception) { + RCLCPP_ERROR_STREAM( + pimpl_->logger_, + "rclcpp_action::ServerBase@" << this << + " caught " << rmw::impl::cpp::demangle(exception) << + " exception in user-provided callback for the 'on ready' callback: " << + exception.what()); + } catch (...) { + RCLCPP_ERROR_STREAM( + pimpl_->logger_, + "rclcpp_action::ServerBase@" << this << + " caught unhandled exception in user-provided callback " << + "for the 'on ready' callback"); + } + }; + + + // Set it temporarily to the new callback, while we replace the old one. + // This two-step setting, prevents a gap where the old std::function has + // been replaced but the middleware hasn't been told about the new one yet. + set_on_ready_callback( + entity_type, + rclcpp::detail::cpp_callback_trampoline, + static_cast(&new_callback)); + + std::lock_guard lock(listener_mutex_); + // Store the std::function to keep it in scope, also overwrites the existing one. + auto it = entity_type_to_on_ready_callback_.find(entity_type); + + if (it != entity_type_to_on_ready_callback_.end()) { + it->second = new_callback; + } else { + entity_type_to_on_ready_callback_.emplace(entity_type, new_callback); + } + + // Set it again, now using the permanent storage. + it = entity_type_to_on_ready_callback_.find(entity_type); + + if (it != entity_type_to_on_ready_callback_.end()) { + auto & cb = it->second; + set_on_ready_callback( + entity_type, + rclcpp::detail::cpp_callback_trampoline, + static_cast(&cb)); + } + + on_ready_callback_set_ = true; +} + +void +ServerBase::set_on_ready_callback( + EntityType entity_type, + rcl_event_callback_t callback, + const void * user_data) +{ + rcl_ret_t ret = RCL_RET_ERROR; + + switch (entity_type) { + case EntityType::GoalService: + { + ret = rcl_action_server_set_goal_service_callback( + pimpl_->action_server_.get(), + callback, + user_data); + break; + } + + case EntityType::ResultService: + { + ret = rcl_action_server_set_result_service_callback( + pimpl_->action_server_.get(), + callback, + user_data); + break; + } + + case EntityType::CancelService: + { + ret = rcl_action_server_set_cancel_service_callback( + pimpl_->action_server_.get(), + callback, + user_data); + break; + } + + default: + throw std::runtime_error("ServerBase::set_on_ready_callback: Unknown entity type."); + break; + } + + if (RCL_RET_OK != ret) { + using rclcpp::exceptions::throw_from_rcl_error; + throw_from_rcl_error(ret, "failed to set the on ready callback for action client"); + } +} + +void +ServerBase::clear_on_ready_callback() +{ + std::lock_guard lock(listener_mutex_); + + if (on_ready_callback_set_) { + set_on_ready_callback(EntityType::GoalService, nullptr, nullptr); + set_on_ready_callback(EntityType::ResultService, nullptr, nullptr); + set_on_ready_callback(EntityType::CancelService, nullptr, nullptr); + on_ready_callback_set_ = false; + } + + entity_type_to_on_ready_callback_.clear(); +}