Skip to content

Commit

Permalink
Add on new request/responses callbacks to IPC srv/cli
Browse files Browse the repository at this point in the history
Signed-off-by: Mauro Passerino <mpasserino@irobot.com>
  • Loading branch information
Mauro Passerino committed Sep 23, 2022
1 parent ec8e490 commit abf4e71
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class ClientIntraProcess : public ClientIntraProcessBase
{
buffer_->add(std::move(response));
gc_.trigger();
invoke_on_new_response();
}

std::shared_ptr<void>
Expand Down
110 changes: 110 additions & 0 deletions rclcpp/include/rclcpp/experimental/client_intra_process_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@
#ifndef RCLCPP__EXPERIMENTAL__CLIENT_INTRA_PROCESS_BASE_HPP_
#define RCLCPP__EXPERIMENTAL__CLIENT_INTRA_PROCESS_BASE_HPP_

#include <algorithm>
#include <memory>
#include <mutex>
#include <string>

#include "rmw/impl/cpp/demangle.hpp"

#include "rclcpp/context.hpp"
#include "rclcpp/guard_condition.hpp"
#include "rclcpp/logging.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/waitable.hpp"
Expand All @@ -35,6 +39,11 @@ class ClientIntraProcessBase : public rclcpp::Waitable
public:
RCLCPP_SMART_PTR_ALIASES_ONLY(ClientIntraProcessBase)

enum class EntityType : std::size_t
{
Client,
};

RCLCPP_PUBLIC
ClientIntraProcessBase(
rclcpp::Context::SharedPtr context,
Expand All @@ -60,6 +69,13 @@ class ClientIntraProcessBase : public rclcpp::Waitable
std::shared_ptr<void>
take_data() = 0;

std::shared_ptr<void>
take_data_by_entity_id(size_t id) override
{
(void)id;
return take_data();
}

virtual void
execute(std::shared_ptr<void> & data) = 0;

Expand All @@ -71,9 +87,103 @@ class ClientIntraProcessBase : public rclcpp::Waitable
QoS
get_actual_qos() const;

/// Set a callback to be called when each new response arrives.
/**
* The callback receives a size_t which is the number of responses received
* since the last time this callback was called.
* Normally this is 1, but can be > 1 if responses were received before any
* callback was set.
*
* The callback also receives an int identifier argument.
* This is needed because a Waitable may be composed of several distinct entities,
* such as subscriptions, services, etc.
* The application should provide a generic callback function that will be then
* forwarded by the waitable to all of its entities.
* Before forwarding, a different value for the identifier argument will be
* bound to the function.
* This implies that the provided callback can use the identifier to behave
* differently depending on which entity triggered the waitable to become ready.
*
* Calling it again will clear any previously set callback.
*
* An exception will be thrown if the callback is not callable.
*
* This function is thread-safe.
*
* If you want more information available in the callback, like the subscription
* or other information, you may use a lambda with captures or std::bind.
*
* \param[in] callback functor to be called when a new response is received.
*/
void
set_on_ready_callback(std::function<void(size_t, int)> callback) override
{
if (!callback) {
throw std::invalid_argument(
"The callback passed to set_on_ready_callback "
"is not callable.");
}

// Note: we bind the int identifier argument to this waitable's entity types
auto new_callback =
[callback, this](size_t number_of_events) {
try {
callback(number_of_events, static_cast<int>(EntityType::Client));
} catch (const std::exception & exception) {
RCLCPP_ERROR_STREAM(
// TODO(wjwwood): get this class access to the node logger it is associated with
rclcpp::get_logger("rclcpp"),
"rclcpp::ClientIntraProcessBase@" << this <<
" caught " << rmw::impl::cpp::demangle(exception) <<
" exception in user-provided callback for the 'on ready' callback: " <<
exception.what());
} catch (...) {
RCLCPP_ERROR_STREAM(
rclcpp::get_logger("rclcpp"),
"rclcpp::ClientIntraProcessBase@" << this <<
" caught unhandled exception in user-provided callback " <<
"for the 'on ready' callback");
}
};

std::lock_guard<std::recursive_mutex> lock(reentrant_mutex_);
on_new_response_callback_ = new_callback;

if (unread_count_ > 0) {
if (qos_profile_.history() == HistoryPolicy::KeepAll) {
on_new_response_callback_(unread_count_);
} else {
// Use qos profile depth as upper bound for unread_count_
on_new_response_callback_(std::min(unread_count_, qos_profile_.depth()));
}
unread_count_ = 0;
}
}

/// Unset the callback registered for new messages, if any.
void
clear_on_ready_callback() override
{
std::lock_guard<std::recursive_mutex> lock(reentrant_mutex_);
on_new_response_callback_ = nullptr;
}

protected:
std::recursive_mutex reentrant_mutex_;
rclcpp::GuardCondition gc_;
std::function<void(size_t)> on_new_response_callback_ {nullptr};
size_t unread_count_{0};

void
invoke_on_new_response()
{
std::lock_guard<std::recursive_mutex> lock(reentrant_mutex_);
if (on_new_response_callback_) {
on_new_response_callback_(1);
} else {
unread_count_++;
}
}

private:
std::string service_name_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class ServiceIntraProcess : public ServiceIntraProcessBase
{
buffer_->add(std::make_pair(intra_process_client_id, std::move(request)));
gc_.trigger();
invoke_on_new_request();
}

std::shared_ptr<void>
Expand Down
109 changes: 109 additions & 0 deletions rclcpp/include/rclcpp/experimental/service_intra_process_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@
#ifndef RCLCPP__EXPERIMENTAL__SERVICE_INTRA_PROCESS_BASE_HPP_
#define RCLCPP__EXPERIMENTAL__SERVICE_INTRA_PROCESS_BASE_HPP_

#include <algorithm>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <utility>

#include "rcl/error_handling.h"
#include "rmw/impl/cpp/demangle.hpp"

#include "rclcpp/context.hpp"
#include "rclcpp/experimental/client_intra_process_base.hpp"
#include "rclcpp/guard_condition.hpp"
#include "rclcpp/logging.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/waitable.hpp"
Expand All @@ -40,6 +43,11 @@ class ServiceIntraProcessBase : public rclcpp::Waitable
public:
RCLCPP_SMART_PTR_ALIASES_ONLY(ServiceIntraProcessBase)

enum class EntityType : std::size_t
{
Service,
};

RCLCPP_PUBLIC
ServiceIntraProcessBase(
rclcpp::Context::SharedPtr context,
Expand All @@ -65,6 +73,13 @@ class ServiceIntraProcessBase : public rclcpp::Waitable
std::shared_ptr<void>
take_data() = 0;

std::shared_ptr<void>
take_data_by_entity_id(size_t id) override
{
(void)id;
return take_data();
}

virtual void
execute(std::shared_ptr<void> & data) = 0;

Expand All @@ -82,9 +97,103 @@ class ServiceIntraProcessBase : public rclcpp::Waitable
rclcpp::experimental::ClientIntraProcessBase::SharedPtr client,
uint64_t client_id);

/// Set a callback to be called when each new request arrives.
/**
* The callback receives a size_t which is the number of requests received
* since the last time this callback was called.
* Normally this is 1, but can be > 1 if requests were received before any
* callback was set.
*
* The callback also receives an int identifier argument.
* This is needed because a Waitable may be composed of several distinct entities,
* such as subscriptions, services, etc.
* The application should provide a generic callback function that will be then
* forwarded by the waitable to all of its entities.
* Before forwarding, a different value for the identifier argument will be
* bound to the function.
* This implies that the provided callback can use the identifier to behave
* differently depending on which entity triggered the waitable to become ready.
*
* Calling it again will clear any previously set callback.
*
* An exception will be thrown if the callback is not callable.
*
* This function is thread-safe.
*
* If you want more information available in the callback, like the subscription
* or other information, you may use a lambda with captures or std::bind.
*
* \param[in] callback functor to be called when a new request is received.
*/
void
set_on_ready_callback(std::function<void(size_t, int)> callback) override
{
if (!callback) {
throw std::invalid_argument(
"The callback passed to set_on_ready_callback "
"is not callable.");
}

// Note: we bind the int identifier argument to this waitable's entity types
auto new_callback =
[callback, this](size_t number_of_events) {
try {
callback(number_of_events, static_cast<int>(EntityType::Service));
} catch (const std::exception & exception) {
RCLCPP_ERROR_STREAM(
// TODO(wjwwood): get this class access to the node logger it is associated with
rclcpp::get_logger("rclcpp"),
"rclcpp::ServiceIntraProcessBase@" << this <<
" caught " << rmw::impl::cpp::demangle(exception) <<
" exception in user-provided callback for the 'on ready' callback: " <<
exception.what());
} catch (...) {
RCLCPP_ERROR_STREAM(
rclcpp::get_logger("rclcpp"),
"rclcpp::ServiceIntraProcessBase@" << this <<
" caught unhandled exception in user-provided callback " <<
"for the 'on ready' callback");
}
};

std::lock_guard<std::recursive_mutex> lock(reentrant_mutex_);
on_new_request_callback_ = new_callback;

if (unread_count_ > 0) {
if (qos_profile_.history() == HistoryPolicy::KeepAll) {
on_new_request_callback_(unread_count_);
} else {
// Use qos profile depth as upper bound for unread_count_
on_new_request_callback_(std::min(unread_count_, qos_profile_.depth()));
}
unread_count_ = 0;
}
}

/// Unset the callback registered for new messages, if any.
void
clear_on_ready_callback() override
{
std::lock_guard<std::recursive_mutex> lock(reentrant_mutex_);
on_new_request_callback_ = nullptr;
}

protected:
std::recursive_mutex reentrant_mutex_;
rclcpp::GuardCondition gc_;
std::function<void(size_t)> on_new_request_callback_ {nullptr};
size_t unread_count_{0};

void
invoke_on_new_request()
{
std::lock_guard<std::recursive_mutex> lock(reentrant_mutex_);
if (on_new_request_callback_) {
on_new_request_callback_(1);
} else {
unread_count_++;
}
}

using ClientMap =
std::unordered_map<uint64_t, rclcpp::experimental::ClientIntraProcessBase::WeakPtr>;
Expand Down

0 comments on commit abf4e71

Please sign in to comment.