Skip to content

Commit

Permalink
Support intra-process communication: Clients & Services (ros2#1847)
Browse files Browse the repository at this point in the history
Signed-off-by: Mauro Passerino <mpasserino@irobot.com>
  • Loading branch information
Mauro Passerino committed Nov 8, 2023
1 parent 02d0ae8 commit 8f5f88a
Show file tree
Hide file tree
Showing 32 changed files with 1,676 additions and 66 deletions.
2 changes: 2 additions & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/any_executable.cpp
src/rclcpp/callback_group.cpp
src/rclcpp/client.cpp
src/rclcpp/client_intra_process_base.cpp
src/rclcpp/clock.cpp
src/rclcpp/context.cpp
src/rclcpp/contexts/default_context.cpp
Expand Down Expand Up @@ -108,6 +109,7 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/serialization.cpp
src/rclcpp/serialized_message.cpp
src/rclcpp/service.cpp
src/rclcpp/service_intra_process_base.cpp
src/rclcpp/signal_handler.cpp
src/rclcpp/subscription_base.cpp
src/rclcpp/subscription_intra_process_base.cpp
Expand Down
122 changes: 115 additions & 7 deletions rclcpp/include/rclcpp/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,13 @@

#include "rclcpp/clock.hpp"
#include "rclcpp/detail/cpp_callback_trampoline.hpp"
#include "rclcpp/detail/resolve_use_intra_process.hpp"
#include "rclcpp/exceptions.hpp"
#include "rclcpp/expand_topic_or_service_name.hpp"
#include "rclcpp/experimental/client_intra_process.hpp"
#include "rclcpp/experimental/intra_process_manager.hpp"
#include "rclcpp/function_traits.hpp"
#include "rclcpp/intra_process_setting.hpp"
#include "rclcpp/logging.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_graph_interface.hpp"
Expand All @@ -48,6 +52,8 @@
#include "rclcpp/utilities.hpp"
#include "rclcpp/visibility_control.hpp"

#include "rcutils/logging_macros.h"

#include "rmw/error_handling.h"
#include "rmw/impl/cpp/demangle.hpp"
#include "rmw/rmw.h"
Expand Down Expand Up @@ -133,7 +139,7 @@ class ClientBase
rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph);

RCLCPP_PUBLIC
virtual ~ClientBase() = default;
virtual ~ClientBase();

/// Take the next response for this client as a type erased pointer.
/**
Expand Down Expand Up @@ -254,6 +260,15 @@ class ClientBase
rclcpp::QoS
get_response_subscription_actual_qos() const;

/// Return the waitable for intra-process
/**
* \return the waitable sharedpointer for intra-process, or nullptr if intra-process is not setup.
* \throws std::runtime_error if the intra process manager is destroyed
*/
RCLCPP_PUBLIC
rclcpp::Waitable::SharedPtr
get_intra_process_waitable();

/// Set a callback to be called when each new response is received.
/**
* The callback receives a size_t which is the number of responses received
Expand Down Expand Up @@ -358,6 +373,19 @@ class ClientBase
void
set_on_new_response_callback(rcl_event_callback_t callback, const void * user_data);

using IntraProcessManagerWeakPtr =
std::weak_ptr<rclcpp::experimental::IntraProcessManager>;

/// Implementation detail.
RCLCPP_PUBLIC
void
setup_intra_process(
uint64_t intra_process_client_id,
IntraProcessManagerWeakPtr weak_ipm);

std::shared_ptr<rclcpp::experimental::ClientIntraProcessBase> client_intra_process_;
std::atomic_uint ipc_sequence_number_{1};

rclcpp::node_interfaces::NodeGraphInterface::WeakPtr node_graph_;
std::shared_ptr<rcl_node_t> node_handle_;
std::shared_ptr<rclcpp::Context> context_;
Expand All @@ -373,6 +401,11 @@ class ClientBase
std::shared_ptr<rcl_client_t> client_handle_;

std::atomic<bool> in_use_by_wait_set_{false};

std::recursive_mutex ipc_mutex_;
bool use_intra_process_{false};
IntraProcessManagerWeakPtr weak_ipm_;
uint64_t intra_process_client_id_;
};

template<typename ServiceT>
Expand Down Expand Up @@ -468,12 +501,14 @@ class Client : public ClientBase
* \param[in] node_graph The node graph interface of the corresponding node.
* \param[in] service_name Name of the topic to publish to.
* \param[in] client_options options for the subscription.
* \param[in] ipc_setting Intra-process communication setting for the client.
*/
Client(
rclcpp::node_interfaces::NodeBaseInterface * node_base,
rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph,
const std::string & service_name,
rcl_client_options_t & client_options)
rcl_client_options_t & client_options,
rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault)
: ClientBase(node_base, node_graph),
srv_type_support_handle_(rosidl_typesupport_cpp::get_service_type_support_handle<ServiceT>())
{
Expand All @@ -496,10 +531,27 @@ class Client : public ClientBase
}
rclcpp::exceptions::throw_from_rcl_error(ret, "could not create client");
}

// Setup intra process if requested.
if (rclcpp::detail::resolve_use_intra_process(ipc_setting, *node_base)) {
create_intra_process_client();
}
}

virtual ~Client()
{
if (!use_intra_process_) {
return;
}
auto ipm = weak_ipm_.lock();
if (!ipm) {
// TODO(ivanpauno): should this raise an error?
RCLCPP_WARN(
rclcpp::get_logger("rclcpp"),
"Intra process manager died before than a client.");
return;
}
ipm->remove_client(intra_process_client_id_);
}

/// Take the next response for this client.
Expand Down Expand Up @@ -616,7 +668,7 @@ class Client : public ClientBase
Promise promise;
auto future = promise.get_future();
auto req_id = async_send_request_impl(
*request,
std::move(request),
std::move(promise));
return FutureAndRequestId(std::move(future), req_id);
}
Expand Down Expand Up @@ -651,7 +703,7 @@ class Client : public ClientBase
Promise promise;
auto shared_future = promise.get_future().share();
auto req_id = async_send_request_impl(
*request,
std::move(request),
std::make_tuple(
CallbackType{std::forward<CallbackT>(cb)},
shared_future,
Expand Down Expand Up @@ -682,7 +734,7 @@ class Client : public ClientBase
PromiseWithRequest promise;
auto shared_future = promise.get_future().share();
auto req_id = async_send_request_impl(
*request,
request,
std::make_tuple(
CallbackWithRequestType{std::forward<CallbackT>(cb)},
request,
Expand Down Expand Up @@ -824,11 +876,33 @@ class Client : public ClientBase
CallbackWithRequestTypeValueVariant>;

int64_t
async_send_request_impl(const Request & request, CallbackInfoVariant value)
async_send_request_impl(SharedRequest request, CallbackInfoVariant value)
{
std::lock_guard<std::recursive_mutex> lock(ipc_mutex_);
if (use_intra_process_) {
auto ipm = weak_ipm_.lock();
if (!ipm) {
throw std::runtime_error(
"intra process send called after destruction of intra process manager");
}
bool intra_process_server_available = ipm->service_is_available(intra_process_client_id_);

// Check if there's an intra-process server available matching this client.
// If there's not, we fall back into inter-process communication, since
// the server might be available in another process or was configured to not use IPC.
if (intra_process_server_available) {
// Send intra-process request
ipm->send_intra_process_client_request<ServiceT>(
intra_process_client_id_,
std::make_pair(std::move(request), std::move(value)));
return ipc_sequence_number_++;
}
}

// Send inter-process request
int64_t sequence_number;
std::lock_guard<std::mutex> lock(pending_requests_mutex_);
rcl_ret_t ret = rcl_send_request(get_client_handle().get(), &request, &sequence_number);
rcl_ret_t ret = rcl_send_request(get_client_handle().get(), request.get(), &sequence_number);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "failed to send request");
}
Expand All @@ -854,6 +928,40 @@ class Client : public ClientBase
return value;
}

void
create_intra_process_client()
{
// Check if the QoS is compatible with intra-process.
auto qos_profile = get_response_subscription_actual_qos();

if (qos_profile.history() != rclcpp::HistoryPolicy::KeepLast) {
throw std::invalid_argument(
"intraprocess communication allowed only with keep last history qos policy");
}
if (qos_profile.depth() == 0) {
throw std::invalid_argument(
"intraprocess communication is not allowed with 0 depth qos policy");
}
if (qos_profile.durability() != rclcpp::DurabilityPolicy::Volatile) {
throw std::invalid_argument(
"intraprocess communication allowed only with volatile durability");
}

// Create a ClientIntraProcess which will be given to the intra-process manager.
using ClientIntraProcessT = rclcpp::experimental::ClientIntraProcess<ServiceT>;

client_intra_process_ = std::make_shared<ClientIntraProcessT>(
context_,
this->get_service_name(),
qos_profile);

// Add it to the intra process manager.
using rclcpp::experimental::IntraProcessManager;
auto ipm = context_->get_sub_context<IntraProcessManager>();
uint64_t intra_process_client_id = ipm->add_intra_process_client(client_intra_process_);
this->setup_intra_process(intra_process_client_id, ipm);
}

RCLCPP_DISABLE_COPY(Client)

std::unordered_map<
Expand Down
6 changes: 4 additions & 2 deletions rclcpp/include/rclcpp/create_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ create_client(
std::shared_ptr<node_interfaces::NodeServicesInterface> node_services,
const std::string & service_name,
const rmw_qos_profile_t & qos_profile,
rclcpp::CallbackGroup::SharedPtr group)
rclcpp::CallbackGroup::SharedPtr group,
rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault)
{
rcl_client_options_t options = rcl_client_get_default_options();
options.qos = qos_profile;
Expand All @@ -74,7 +75,8 @@ create_client(
node_base.get(),
node_graph,
service_name,
options);
options,
ipc_setting);

auto cli_base_ptr = std::dynamic_pointer_cast<rclcpp::ClientBase>(cli);
node_services->add_client(cli_base_ptr, group);
Expand Down
7 changes: 4 additions & 3 deletions rclcpp/include/rclcpp/create_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ create_service(
const std::string & service_name,
CallbackT && callback,
const rmw_qos_profile_t & qos_profile,
rclcpp::CallbackGroup::SharedPtr group)
rclcpp::CallbackGroup::SharedPtr group,
rclcpp::IntraProcessSetting ipc_setting = rclcpp::IntraProcessSetting::NodeDefault)
{
rclcpp::AnyServiceCallback<ServiceT> any_service_callback;
any_service_callback.set(std::forward<CallbackT>(callback));
Expand All @@ -72,8 +73,8 @@ create_service(
service_options.qos = qos_profile;

auto serv = Service<ServiceT>::make_shared(
node_base->get_shared_rcl_node_handle(),
service_name, any_service_callback, service_options);
node_base,
service_name, any_service_callback, service_options, ipc_setting);
auto serv_base_ptr = std::dynamic_pointer_cast<ServiceBase>(serv);
node_services->add_service(serv_base_ptr, group);
return serv;
Expand Down
6 changes: 3 additions & 3 deletions rclcpp/include/rclcpp/detail/resolve_use_intra_process.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ namespace detail
{

/// Return whether or not intra process is enabled, resolving "NodeDefault" if needed.
template<typename OptionsT, typename NodeBaseT>
template<typename NodeBaseT>
bool
resolve_use_intra_process(const OptionsT & options, const NodeBaseT & node_base)
resolve_use_intra_process(IntraProcessSetting ipc_setting, const NodeBaseT & node_base)
{
bool use_intra_process;
switch (options.use_intra_process_comm) {
switch (ipc_setting) {
case IntraProcessSetting::Enable:
use_intra_process = true;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,51 @@ class TypedIntraProcessBuffer : public IntraProcessBuffer<MessageT, Alloc, Messa
}
};

template<typename BufferT>
class ServiceIntraProcessBuffer : public IntraProcessBufferBase
{
public:
RCLCPP_SMART_PTR_ALIASES_ONLY(ServiceIntraProcessBuffer)

virtual ~ServiceIntraProcessBuffer() {}

explicit
ServiceIntraProcessBuffer(
std::unique_ptr<BufferImplementationBase<BufferT>> buffer_impl)
{
buffer_ = std::move(buffer_impl);
}

bool use_take_shared_method() const override
{
return false;
}

bool has_data() const override
{
return buffer_->has_data();
}

void clear() override
{
buffer_->clear();
}

void add(BufferT && msg)
{
buffer_->enqueue(std::move(msg));
}

BufferT
consume()
{
return buffer_->dequeue();
}

private:
std::unique_ptr<BufferImplementationBase<BufferT>> buffer_;
};

} // namespace buffers
} // namespace experimental
} // namespace rclcpp
Expand Down
Loading

0 comments on commit 8f5f88a

Please sign in to comment.