diff --git a/rclcpp/include/rclcpp/experimental/client_intra_process.hpp b/rclcpp/include/rclcpp/experimental/client_intra_process.hpp index ebd4692f2f..fb30e6ee81 100644 --- a/rclcpp/include/rclcpp/experimental/client_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/client_intra_process.hpp @@ -87,6 +87,7 @@ class ClientIntraProcess : public ClientIntraProcessBase { buffer_->add(std::move(response)); gc_.trigger(); + invoke_on_new_response(); } std::shared_ptr diff --git a/rclcpp/include/rclcpp/experimental/client_intra_process_base.hpp b/rclcpp/include/rclcpp/experimental/client_intra_process_base.hpp index 0e3454496a..5c59d8e421 100644 --- a/rclcpp/include/rclcpp/experimental/client_intra_process_base.hpp +++ b/rclcpp/include/rclcpp/experimental/client_intra_process_base.hpp @@ -15,12 +15,16 @@ #ifndef RCLCPP__EXPERIMENTAL__CLIENT_INTRA_PROCESS_BASE_HPP_ #define RCLCPP__EXPERIMENTAL__CLIENT_INTRA_PROCESS_BASE_HPP_ +#include #include #include #include +#include "rmw/impl/cpp/demangle.hpp" + #include "rclcpp/context.hpp" #include "rclcpp/guard_condition.hpp" +#include "rclcpp/logging.hpp" #include "rclcpp/qos.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/waitable.hpp" @@ -35,6 +39,11 @@ class ClientIntraProcessBase : public rclcpp::Waitable public: RCLCPP_SMART_PTR_ALIASES_ONLY(ClientIntraProcessBase) + enum class EntityType : std::size_t + { + Client, + }; + RCLCPP_PUBLIC ClientIntraProcessBase( rclcpp::Context::SharedPtr context, @@ -60,6 +69,13 @@ class ClientIntraProcessBase : public rclcpp::Waitable std::shared_ptr take_data() = 0; + std::shared_ptr + take_data_by_entity_id(size_t id) override + { + (void)id; + return take_data(); + } + virtual void execute(std::shared_ptr & data) = 0; @@ -71,9 +87,103 @@ class ClientIntraProcessBase : public rclcpp::Waitable QoS get_actual_qos() const; + /// Set a callback to be called when each new response arrives. + /** + * The callback receives a size_t which is the number of responses received + * since the last time this callback was called. + * Normally this is 1, but can be > 1 if responses were received before any + * callback was set. + * + * The callback also receives an int identifier argument. + * This is needed because a Waitable may be composed of several distinct entities, + * such as subscriptions, services, etc. + * The application should provide a generic callback function that will be then + * forwarded by the waitable to all of its entities. + * Before forwarding, a different value for the identifier argument will be + * bound to the function. + * This implies that the provided callback can use the identifier to behave + * differently depending on which entity triggered the waitable to become ready. + * + * Calling it again will clear any previously set callback. + * + * An exception will be thrown if the callback is not callable. + * + * This function is thread-safe. + * + * If you want more information available in the callback, like the subscription + * or other information, you may use a lambda with captures or std::bind. + * + * \param[in] callback functor to be called when a new response is received. + */ + void + set_on_ready_callback(std::function callback) override + { + if (!callback) { + throw std::invalid_argument( + "The callback passed to set_on_ready_callback " + "is not callable."); + } + + // Note: we bind the int identifier argument to this waitable's entity types + auto new_callback = + [callback, this](size_t number_of_events) { + try { + callback(number_of_events, static_cast(EntityType::Client)); + } catch (const std::exception & exception) { + RCLCPP_ERROR_STREAM( + // TODO(wjwwood): get this class access to the node logger it is associated with + rclcpp::get_logger("rclcpp"), + "rclcpp::ClientIntraProcessBase@" << this << + " caught " << rmw::impl::cpp::demangle(exception) << + " exception in user-provided callback for the 'on ready' callback: " << + exception.what()); + } catch (...) { + RCLCPP_ERROR_STREAM( + rclcpp::get_logger("rclcpp"), + "rclcpp::ClientIntraProcessBase@" << this << + " caught unhandled exception in user-provided callback " << + "for the 'on ready' callback"); + } + }; + + std::lock_guard lock(reentrant_mutex_); + on_new_response_callback_ = new_callback; + + if (unread_count_ > 0) { + if (qos_profile_.history() == HistoryPolicy::KeepAll) { + on_new_response_callback_(unread_count_); + } else { + // Use qos profile depth as upper bound for unread_count_ + on_new_response_callback_(std::min(unread_count_, qos_profile_.depth())); + } + unread_count_ = 0; + } + } + + /// Unset the callback registered for new messages, if any. + void + clear_on_ready_callback() override + { + std::lock_guard lock(reentrant_mutex_); + on_new_response_callback_ = nullptr; + } + protected: std::recursive_mutex reentrant_mutex_; rclcpp::GuardCondition gc_; + std::function on_new_response_callback_ {nullptr}; + size_t unread_count_{0}; + + void + invoke_on_new_response() + { + std::lock_guard lock(reentrant_mutex_); + if (on_new_response_callback_) { + on_new_response_callback_(1); + } else { + unread_count_++; + } + } private: std::string service_name_; diff --git a/rclcpp/include/rclcpp/experimental/service_intra_process.hpp b/rclcpp/include/rclcpp/experimental/service_intra_process.hpp index 989e31b841..9a4852a6a7 100644 --- a/rclcpp/include/rclcpp/experimental/service_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/service_intra_process.hpp @@ -97,6 +97,7 @@ class ServiceIntraProcess : public ServiceIntraProcessBase { buffer_->add(std::make_pair(intra_process_client_id, std::move(request))); gc_.trigger(); + invoke_on_new_request(); } std::shared_ptr diff --git a/rclcpp/include/rclcpp/experimental/service_intra_process_base.hpp b/rclcpp/include/rclcpp/experimental/service_intra_process_base.hpp index fe5858b704..a75a83226f 100644 --- a/rclcpp/include/rclcpp/experimental/service_intra_process_base.hpp +++ b/rclcpp/include/rclcpp/experimental/service_intra_process_base.hpp @@ -15,6 +15,7 @@ #ifndef RCLCPP__EXPERIMENTAL__SERVICE_INTRA_PROCESS_BASE_HPP_ #define RCLCPP__EXPERIMENTAL__SERVICE_INTRA_PROCESS_BASE_HPP_ +#include #include #include #include @@ -22,10 +23,12 @@ #include #include "rcl/error_handling.h" +#include "rmw/impl/cpp/demangle.hpp" #include "rclcpp/context.hpp" #include "rclcpp/experimental/client_intra_process_base.hpp" #include "rclcpp/guard_condition.hpp" +#include "rclcpp/logging.hpp" #include "rclcpp/qos.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/waitable.hpp" @@ -40,6 +43,11 @@ class ServiceIntraProcessBase : public rclcpp::Waitable public: RCLCPP_SMART_PTR_ALIASES_ONLY(ServiceIntraProcessBase) + enum class EntityType : std::size_t + { + Service, + }; + RCLCPP_PUBLIC ServiceIntraProcessBase( rclcpp::Context::SharedPtr context, @@ -65,6 +73,13 @@ class ServiceIntraProcessBase : public rclcpp::Waitable std::shared_ptr take_data() = 0; + std::shared_ptr + take_data_by_entity_id(size_t id) override + { + (void)id; + return take_data(); + } + virtual void execute(std::shared_ptr & data) = 0; @@ -82,9 +97,103 @@ class ServiceIntraProcessBase : public rclcpp::Waitable rclcpp::experimental::ClientIntraProcessBase::SharedPtr client, uint64_t client_id); + /// Set a callback to be called when each new request arrives. + /** + * The callback receives a size_t which is the number of requests received + * since the last time this callback was called. + * Normally this is 1, but can be > 1 if requests were received before any + * callback was set. + * + * The callback also receives an int identifier argument. + * This is needed because a Waitable may be composed of several distinct entities, + * such as subscriptions, services, etc. + * The application should provide a generic callback function that will be then + * forwarded by the waitable to all of its entities. + * Before forwarding, a different value for the identifier argument will be + * bound to the function. + * This implies that the provided callback can use the identifier to behave + * differently depending on which entity triggered the waitable to become ready. + * + * Calling it again will clear any previously set callback. + * + * An exception will be thrown if the callback is not callable. + * + * This function is thread-safe. + * + * If you want more information available in the callback, like the subscription + * or other information, you may use a lambda with captures or std::bind. + * + * \param[in] callback functor to be called when a new request is received. + */ + void + set_on_ready_callback(std::function callback) override + { + if (!callback) { + throw std::invalid_argument( + "The callback passed to set_on_ready_callback " + "is not callable."); + } + + // Note: we bind the int identifier argument to this waitable's entity types + auto new_callback = + [callback, this](size_t number_of_events) { + try { + callback(number_of_events, static_cast(EntityType::Service)); + } catch (const std::exception & exception) { + RCLCPP_ERROR_STREAM( + // TODO(wjwwood): get this class access to the node logger it is associated with + rclcpp::get_logger("rclcpp"), + "rclcpp::ServiceIntraProcessBase@" << this << + " caught " << rmw::impl::cpp::demangle(exception) << + " exception in user-provided callback for the 'on ready' callback: " << + exception.what()); + } catch (...) { + RCLCPP_ERROR_STREAM( + rclcpp::get_logger("rclcpp"), + "rclcpp::ServiceIntraProcessBase@" << this << + " caught unhandled exception in user-provided callback " << + "for the 'on ready' callback"); + } + }; + + std::lock_guard lock(reentrant_mutex_); + on_new_request_callback_ = new_callback; + + if (unread_count_ > 0) { + if (qos_profile_.history() == HistoryPolicy::KeepAll) { + on_new_request_callback_(unread_count_); + } else { + // Use qos profile depth as upper bound for unread_count_ + on_new_request_callback_(std::min(unread_count_, qos_profile_.depth())); + } + unread_count_ = 0; + } + } + + /// Unset the callback registered for new messages, if any. + void + clear_on_ready_callback() override + { + std::lock_guard lock(reentrant_mutex_); + on_new_request_callback_ = nullptr; + } + protected: std::recursive_mutex reentrant_mutex_; rclcpp::GuardCondition gc_; + std::function on_new_request_callback_ {nullptr}; + size_t unread_count_{0}; + + void + invoke_on_new_request() + { + std::lock_guard lock(reentrant_mutex_); + if (on_new_request_callback_) { + on_new_request_callback_(1); + } else { + unread_count_++; + } + } using ClientMap = std::unordered_map;